Simple Multiprocessing Task Queue in Python

Python's most popular implementation does Threading quite differently from what most people understand. In order to utilize multiple CPUs on a modern computer, one has to start multiple processes. One way to do it, is to write python programs which accept command line parameters and start different instances with different parameters (either from the shell or using Python's subprocess module. Thus, one lets the Operating System utilize the CPUs.
But one can also start multiple Python processes directly from within a running Python program.

In the following tutorial, a simple multiple tasks are started via a main process, which looks for tasks, and when it finds them, it runs them without blocking the main process (which has only one task, look for tasks and lunch them when found). If you want to see a full blown application that does something similar to that, e.g. a Task Queue see Celeryproject.org.

So let's get started. The skeleton of the application called tasku.py is rather simple:

def run():

    """
    The main loop of tasku where everything happens.
    """

    while True:
        for task in check_for_tasks():
            start_task(task)
        clean_finished_tasks()


if __name__ == '__main__':
   run()

The application is quite straight forward. Jobs are sought, and when found they are launched. A second step cleans all the finished tasks. But the above application lack the functions, check_for_task, start_task and clean_finished_tasks. Let's begin defining a Task and check_for_tasks.

A Task is a Python class that does not do much, it starts, reads a message from a file, and hangs around for a period of time which is also read from that file. The fact that the task does not immediately terminate is useful, so we can see the task in the OS process tree:

class Task(object):
   """
   read a task file with the following content: Hello Parallel Tasks;5
   """
   def __init__(self, taskfile):
      self.name = taskfile
      with open(taskfile) as t:
           msg, delay = t.read().split(";")

      self.msg = msg
      self.delay = delay

   def run(self):
       print(self.msg)
       time.sleep(self.delay)

The Task class is also quite straight forward. It has a constructor, and a run method. The function check_for_tasks is as follows:

def check_for_tasks():
    """
    find files with .task ending in current directory.
    This a file based no-priority queue which only find the files
    """
    tasks = filter(lambda x: x.endswith('.task'),
                  filter(path.isfile, os.listdir('.')))
    # this should also check for jobs that are not already running ...
    return tasks

The Task file can be named do_something.task and say_hello.task they will be return in alphabetical order. Hence, priority setting is by the naming of files.

Next, the function start_task is shown, and finally, multiprocessing is coming into the game:

def start_task(task):
    """
    open the file, read it's conent, then start a process with the file
    running is a global directory holding the running jobs
    """
    if task not in running:

        ti = Task(task)

        p = mp.Process(target=ti.run, name=ti.name)
        p.start()
        running[ti.name] = (p, p.pid, p.is_alive()) 

Note, here that I did import multiprocessing as mp. The last step in the main loop is clean_finished_tasks:

def clean_finished_tasks():
    """
    Check the running dictionary for items, each task that is done,
    is moved to the correct place (errors, or completed),
    errors and executed are global dictionaries. 
    """
    for k, v in running.items():
        print k, v.is_alive()
        if not v.is_alive():
            if v.exitcode:
                errors[k] = v.exitcode
            else:
                executed[k] = 'success!'
                running.pop(k)
                os.rename(k, k+'.done')

Finally, all the building stones are there. The full program is then:

import multiprocessing as mp
import os
from os import path
import time
import datetime

# non persistant task list
running = {}
executed = {}
errors = {}


class Task(object):
    """
    read a task file with the following content: Hello Parallel Tasks;5
    """

def __init__(self, taskfile):
    self.name = taskfile
    with open(taskfile) as t:
        msg, delay = t.read().split(";")

    self.msg = msg
    self.delay = int(delay)

def run(self):
    print(self.msg)
    time.sleep(self.delay)


def check_for_tasks():
    """
    find files with .task ending in current directory.
    This a file based no-priority queue which only find the files
    """
    tasks = filter(lambda x: x.endswith('.task'),
                    filter(path.isfile, os.listdir('.')))
    # this should also check for jobs that are not already running ...
    return tasks


def start_task(task):
    """
    open the file, read it's conent, then start a process with the file
    running is a global directory holding the running jobs
    """
    if task not in running:

        ti = Task(task)

        p = mp.Process(target=ti.run, name=ti.name)
        p.start()
        running[ti.name] = (p, p.pid, p.is_alive())


def clean_finished_tasks():
    """
    Check the running dictionary for items, each task that is done,
    is moved to the correct place (errors, or completed),
    errors and executed are global dictionaries.
    """
    for k, v in running.items():
        print k, v[0].is_alive()
        if not v[0].is_alive():
            if v[0].exitcode:
                errors[k] = v[0].exitcode
            else:
                executed[k] = 'success!'
                running.pop(k)
                os.rename(k, k+'.done')


def run():
    """
    The main loop of tasku where everything happens.
    """

    while True:
        for task in check_for_tasks():
            start_task(task)

        clean_finished_tasks()
        print "successfully cleaned! "
        # added some information output
        print "executed, ", executed
        print "running, ", running
        print datetime.datetime.now()
        # add some delay so, we could see what is going on in the OS
        time.sleep(5)

if __name__ == '__main__':
    run()

To see what happens when this code runs, start it with python tasku.py, you should see something similar to this:

$ python tasku.py                               
executed,  {}                                                                      
running,  {}                                                                       
2015-02-25 21:32:51.766882                                                         
executed,  {}                                                                      
running,  {}                                                                       
2015-02-25 21:32:56.771922  

This is kind of Boring, so let's put create a task. On a different shell issue:

$ echo "I am the first task;7" > 1.task

After a maximum of 5 seconds you should see you task working:

executed,  {}
running,  {}
2015-02-26 05:03:27.223866
1.task True
executed,  {}
running,  {'1.task': (<Process(1.task, started)>, 6882, True)}
2015-02-26 05:03:32.232389
I am the first task
1.task True
executed,  {}
running,  {'1.task': (<Process(1.task, started)>, 6882, True)}
2015-02-26 05:03:37.237662
1.task False
successfully cleaned 1.task ! 
executed,  {'1.task': 'success!'}
running,  {}
2015-02-26 05:03:42.243716

The task has it's own PID, and you can see the process tree with pstree:

$ pstree -a -p 6870
python,6870 tasku.py
  |
   `-python,6882 tasku.py

The process 6882 is launched by 6870. You can find this by doing:

echo "I am the first task;7" > 1.task && sleep 3 && pgrep -f tasku

To make thing a bit more interesting in tasku lets see what happens when multiple tasks are placed on the queue for immediate run, but different delay times to finish. In order to automate the putting of tasks I wrote a Makefile which can be downloaded here. The following assumes this make file was downloaded (You can find all the sources needed for here: tasku.py).

make create_all_tasks

The create_all_tasks target will enqueue 4 jobs with different delay times, wait a little for tasku to find and launch the tasks and then print the process tree. Here is what should be seen in both terminals:

make create_all_tasks
echo

python,26652 tasku.py
  |-python,26653 tasku.py
  |-python,26654 tasku.py
  |-python,26655 tasku.py
  `-python,26656 tasku.py
python,26653 tasku.py
python,26654 tasku.py
python,26655 tasku.py
python,26656 tasku.py

The process tasku.py

$ python tasku.py`
I am the the second task
this task will finish quickly
2.task True
3.task True
1.task True
4.task True
executed,  {}
running,  {'2.task': (<Process(2.task, started)>, 26653, True), '3.task': 
          (<Process(3.task, started)>, 26654, True), 
          '1.task': (<Process(1.task, started)>, 26655, True), 
          '4.task': (<Process(4.task, started)>, 26656, True)}
2015-02-26 10:48:21.235133
this task will hang around
I am the first task
2.task True
3.task True
1.task True
4.task True
executed,  {}
running,  {'2.task': (<Process(2.task, started)>, 26653, True), 
           '3.task': (<Process(3.task, started)>, 26654, True), 
           '1.task': (<Process(1.task, started)>, 26655, True), 
           '4.task': (<Process(4.task, started)>, 26656, True)}
2015-02-26 10:48:26.242138
2.task False
3.task False
1.task False
4.task True
successfully cleaned 2.task ! 
successfully cleaned 3.task ! 
successfully cleaned 1.task ! 
executed,  {'2.task': 'success!', '3.task': 'success!', '1.task': 'success!'}
running,  {'4.task': (<Process(4.task, started)>, 26656, True)}
2015-02-26 10:48:31.248629
4.task False
successfully cleaned 4.task ! 
executed,  {'2.task': 'success!', '3.task': 'success!', 
            '1.task': 'success!', '4.task': 'success!'}
running,  {}

As can be seen, the main process (pid 26652) now launched other processes, which take a certain delay time to finish. Once, these process are finish, they will exist, and will no longer be found in the process tree. This way, one can distribute the work of certain complicated tasks to multiple CPUs using python. This rudimentary example only scratches the surface of multiprocessing. For example it does not show how the processes can share data with each other, or how to pass parameters to the process when launching it. I will try to cover those in another part.

This entry was tagged: python, linux

Share this post:

Discussions/Feedback.

comments powered by Disqus