Asynchronous serverless task queue with timed leasing of tasks

Overview

Build Status PyPI version

python-task-queue

This package provides a client and system for generating, uploading, leasing, and executing dependency free tasks both locally and in the cloud using AWS SQS or on a single machine or cluster with a common file system using file based queues. Of note, file queue requires no setup or queue service and can be used in a distributed fashion on a network filesystem.

Installation

pip install numpy # make sure you do this first on a seperate line
pip install task-queue

The task queue uses your CloudVolume secrets located in $HOME/.cloudvolume/secrets/. When using AWS SQS as your queue backend, you must provide $HOME/.cloudvolume/secrets/aws-secret.json. See the CloudVolume repo for additional instructions.

Usage

As of version 2.7.0, there are two ways to create a queueable task. The new way is simpler and probably preferred.

MacOS Only: Note that proxy servers are disabled for parallel operation due to libdispatch being not fork-safe.

New School: Queueable Functions

Designate a function as queueable using the @queueable decorator. Currently variable positional arguments (*args) and variable keyword arguments (**kwargs) are not yet supported. If a function is not marked with the decorator, it cannot be executed via the queue.

from taskqueue import queueable

@queueable
def print_task(txt):
  print(str(txt))

You then create queueable instantiations of these functions by using the standard library partial function to create a concrete binding.

from functools import partial
bound_fn = partial(print_task, txt="hello world")

Old School: RegisteredTask Subclasses

Define a class that inherits from taskqueue.RegisteredTask and implements the execute method. RegisteredTasks contain logic that will render their attributes into a JSON payload and can be reconstituted into a live class on the other side of a task queue.

Tasks can be loaded into queues locally or in the cloud and executed later. Here's an example implementation of a trivial PrintTask. The attributes of your container class should be simple values that can be easily encoded into JSON such as ints, floats, strings, and numpy arrays. Let the execute method download and manipulate heavier data. If you're feeling curious, you can see what JSON a task will turn into by calling task.payload().

from taskqueue import RegisteredTask

class PrintTask(RegisteredTask):
  def __init__(self, txt=''):
    super(PrintTask, self).__init__(txt)
    # attributes passed to super().__init__ are automatically assigned
    # use this space to perform additional processing such as:
    self.txt = str(txt)

  def execute(self):
    if self.txt:
      print(str(self) + ": " + self.txt)
    else:
      print(self)

Local Usage

For small jobs, you might want to use one or more processes to execute the tasks.

from functools import partial
from taskqueue import LocalTaskQueue
from mylibrary import PrintTask # mylibrary is wherever you defined PrintTask

tq = LocalTaskQueue(parallel=5) # use 5 processes


tasks = ( PrintTask(i) for i in range(2000) ) # OLD SCHOOL
tasks = ( partial(print_task, i) for i in range(2000) ) # NEW SCHOOL

tq.insert_all(tasks) # performs on-line execution (naming is historical)

# alterternative serial model
tq.insert(tasks)
tq.execute()

# delete tasks
tq.delete(tasks)
tq.purge() # delete all tasks

This will load the queue with 1000 print tasks then execute them across five processes.

Cloud and Cluster Usage

  1. Set up an SQS queue and acquire an aws-secret.json that is compatible with CloudVolume. Generate the tasks and insert them into the cloud queue.

  2. You can alternatively set up a file based queue that has the same time-based leasing property of an SQS queue.

IMPORTANT: You must import the tasks that will be executed, otherwise the code to execute them has not been loaded.

# import gevent.monkey
# gevent.monkey.patch_all(thread=False)
from taskqueue import TaskQueue
from mylibrary import PrintTask # mylibrary is wherever you defined PrintTask

# region is SQS specific, green means cooperative threading
tq = TaskQueue('sqs://queue-name', region_name="us-east1-b", green=False)
tq = TaskQueue('fq:///path/to/queue/directory/') # file queue ('fq')

# insert accepts any iterable
tq.insert(( PrintTask(i) for i in range(1000) )) # OLD SCHOOL
tq.insert(( partial(print_task, i) for i in range(1000) )) # NEW SCHOOL
tq.enqueued # approximate number of tasks in the queue

# FileQueue Only:
tq.inserted #  total number of tasks inserted
tq.completed # number of tasks completed, requires tally=True with poll
tq.rezero() # reset statistics like inserted and completed
tq.release_all() # set all tasks to available

This inserts 1000 PrintTask JSON descriptions into your SQS queue.

Somewhere else, you'll do the following (probably across multiple workers):

from taskqueue import TaskQueue
import MY_MODULE # MY_MODULE contains the definitions of RegisteredTasks

tq = TaskQueue('sqs://queue-name')
tq.poll(
  lease_seconds=int(LEASE_SECONDS),
  verbose=True, # print out a success message
  tally=True, # count number of tasks completed (fq only!)
)

Poll will check the queue for a new task periodically. If a task is found, it will execute it immediately, delete the task from the queue, and request another. If no task is found, a random exponential backoff of up to 120sec is built in to prevent workers from attempting to DDOS the queue. If the task fails to complete, the task will eventually recirculate within the queue, ensuring that all tasks will eventually complete provided they are not fundementally flawed in some way.

Local Container testing

If there is a AWS compatible queue running on a local cluster, e.g. alpine-sqs, the underlying connection client needs additional parameters. These can be passed into the TaskQueue constructor.

The following code on a worker will work in local and production contexts:

queue = os.environ['SQS_QUEUE']  # for local, set to "default"
region_name = os.environ.get('SQS_REGION_NAME')  # set only for prod
endpoint_url = os.environ.get('SQS_ENDPOINT_URL')  # set only for local
tqueue = taskqueue.TaskQueue(f'sqs://{queue}',
                             region_name=region_name,
                             endpoint_url=endpoint_url)

Example docker-compose.yml for local testing:

version: "3.7"

services:
  worker:
    image: yourlab/yourworker:v1
    environment:
      - SQS_QUEUE=default
      - SQS_ENDPOINT_URL=http://local_sqs:9324
    depends_on:
      - local_sqs

  local_sqs:
    image: roribio16/alpine-sqs

Example docker-compose.yml for production:

version: "3.7"

services:
  worker:
    image: yourlab/yourworker:v1
    environment:
      - SQS_QUEUE=my-aws-queue
      - SQS_REGION=us-west-1

Notes on File Queue

# FileQueue Specific Features

tq.inserted # number of inserted tasks
tq.completed # number of completed tasks (counts rerun tasks too)
tq.rezero() # sets tq.inserted and tq.completed to zero.
tq.release_all() # sets all tasks to available

FileQueue (fq://) is designed to simulate the timed task leasing feature from SQS and exploits a common filesystem to avoid requiring an additional queue server. You can read in detail about its design on the wiki.

There are a few things FileQueue can do that SQS can't and also some quirks you should be aware of. For one, FileQueue can track the number of task completions (tq.completions, tq.poll(..., tally=True)), but it does so by appending a byte to a file called completions for each completion. The size of the file in bytes is the number of completions. This design is an attempt to avoid problems with locking and race conditions. FileQueue also tracks insertions (tq.insertions) in a more typical way in an insertions file. Also unlike SQS, FileQueue allows listing all tasks at once.

FileQueue also allows releasing all current tasks from their leases, something impossible in SQS. Sometimes a few tasks will die immediately after leasing, but with a long lease, and you'll figure out how to fix them. Instead of starting over or waiting possibly hours, you can set the queue to be made available again (tq.release_all()).

As FileQueue is based on the filesystem, it can be managed somewhat via the command line. To delete a queue, just rm -r $QUEUE_PATH. To reset a counter: rm $QUEUE_PATH/completions (e.g.). If you are brave, you could even use the mv command to reassign a task's availability.

We also discovered that FileQueues are also amenable to fixing problems on the fly. In one case, we generated a set of tasks that took 4.5 hours of computation time and decided to run those tasks on a different cluster. The 500k tasks each contained a path to the old storage cluster. Using find, xargs, and sed we were able to fix them efficiently.

Bundled ptq CLI Tool

As of 2.5.0, we now bundle a command line tool ptq to make managing running FileQueues easier.

ptq status fq://./my-queue # prints vital statistics
ptq release fq://./my-queue # releases all tasks from their lease
ptq rezero fq://./my-queue # resets statistics to zero
ptq cp fq://./my-queue sqs://my-cloud-queue # copy a queue (no copies of sqs)
ptq mv sqs://my-cloud-queue fq://./my-queue # move a queue (all supported)

Motivation

Distributed dependency free task execution engines (such as Igneous) often make use of cloud based queues like Amazon Simple Queue Service (SQS). In the connectomics field we process petascale images which requires generating hundreds of thousands or millions of cloud tasks per a run. In one case, we were processing serial blocks of a large image where each block depended on the previous block's completion. Each block's run required the generation and upload of millions of tasks and the use of thousands of workers. The workers would rapidly drain the task queue and it was important to ensure that it could be fed fast enough to prevent starvation of this enormous cluster.

There are a few strategies for accomplishing this. One way might be to use a fully featured DAG supporting engine which could generate the next task on demand. However, we were experienced with SQS and had designed our architecture around it. Furthermore, it was, in our experience, robust to thousands of machines knocking on it. This does not discount that there could be better methods out there, but this was convenient for us.

The two major ways to populate the SQS queue at scale would be a task generating task so a single processor could could enlist hundreds or thousands of others or we could just make our task generating client fast and memory efficient and use a handful of cores for multiprocessing. Keeping things simple and local allows for greater operational flexibility and the addition of a drop-in mutiprocessing execution engine allows for the omission of cloud services for small jobs. Importantly, improved small scale performance doesn't preclude the later development of metageneration facilities.

By default, the Python task queue libraries are single threaded and blocking, resulting in upload rates of at most tens of tasks per second. It is possible to do much better by using threads, multiple processes, and by batching requests. TaskQueue has achivied upload rates of over 3000 tasks per second single core, and around 10,000 per second multicore on a single machine. This is sufficient to keep our cluster fed and allows for programmer flexibility as they can populate queues from their local machine using simple scripts.

How to Achieve High Performance

Attaining the quoted upload rates is simple but takes a few tricks to tune the queue. By default, TaskQueue will upload hundreds of tasks per second using its threading model. We'll show via progressive examples how to tune your upload script to get many thousands of tasks per second with near zero latency and memory usage. Note that the examples below use sqs://, but apply to fq:// as well. These examples also use the old school style of task instantiation, but you can substitute the new style without consequence.

# Listing 1: 10s per second, high memory usage, non-zero latency

tasks = [ PrintTask(i) for i in range(1000000) ]
tq = TaskQueue('sqs://queue-name')
for task in tasks:
  tq.insert(task)

This first example shows how you might use the queue in the most naive fashion. The tasks list takes a long time to compute, uses a lot of memory, and then inserts a single task at a time, failing to exploit the threading model in TaskQueue. Note that this behavior has changed from previous versions where we endorsed the "with" statement where this form was faster, though still problematic.

# Listing 2: 100-1000s per second, high memory usage, non-zero latency

tasks = [ PrintTask(i) for i in range(1000000) ]
tq = TaskQueue('sqs://queue-name')
tq.insert(tasks)

The listing above allows you to use ordinary iterative programming techniques to achieve an upload rate of hundreds per a second without much configuration, a marked improvement over simply using boto nakedly. However, the initial generation of a list of tasks uses a lot of memory and introduces a delay while the list is generated.

This form also takes advantage of SQS batch upload which allows for submitting 10 tasks at once. As the overhead for submitting a task lies mainly in HTTP/1.1 TCP/IP connection overhead, batching 10 requests results in nearly a 10x improvement in performance. However, in this case we've created all the tasks up front again in order to batch them correctly which results in the same memory and latency issues as in Listing 1.

# Listing 3: 100-1000s per second, low memory usage, near-zero latency

tasks = ( PrintTask(i) for i in range(1000000) )
tq = TaskQueue('sqs://queue-name')
tq.insert(tasks, total=1000000) # total necessary for progress bars to work

In Listing 3, we've started using generators instead of lists. Generators are essentially lazy-lists that compute the next list element on demand. Defining a generator is fast and takes constant time, so we are able to begin production of new elements nearly instantly. The elements are produced on demand and consumed instantly, resulting in a small constant memory overhead that can be typically measured in kilobytes to megabytes.

As generators do not support the len operator, we manually pass in the number of items to display a progress bar.

# Listing 4: 100s-1000s per second, low memory usage, near-zero latency

import gevent.monkey
gevent.monkey.patch_all()
from taskqueue import TaskQueue

tasks = ( PrintTask(i) for i in range(1000000) )
tq = TaskQueue('sqs://queue-name', green=True)
tq.insert(tasks, total=1000000) # total helps the progress bar

In Listing 4, we use the green=True argument to use cooperative threads. Under the hood, TaskQueue relies on Python kernel threads to achieve concurrent IO. However, on systems with mutliple cores, especially those in a virutalized or NUMA context, the OS will tend to distribute the threads fairly evenly between cores leading to high context-switching overhead. Ironically, a more powerful multicore system can lead to lower performance. To remedy this issue, we introduce a user-space cooperative threading model (green threads) using gevent (which depending on your system is uses either libev or libuv for an event loop).

This can result in a substantial performance increase on some systems. Typically a single core will be fully utilized with extremely low overhead. However, using cooperative threading with networked IO in Python requires monkey patching the standard library (!!). Refusing to patch the standard library will result in single threaded performance. Thus, using GreenTaskQueue can introduce problems into many larger applications (we've seen problems with multiprocessing and ipython). However, often the task upload script can be isolated from the rest of the system and this allows monkey patching to be safely performed. To give users more control over when they wish to accept the risk of monkey patching, it is not performed automatically and a warning will appear with instructions for amending your program.

# Listing 5: 1000s-10000 per second, low memory usage, near zero latency, efficient multiprocessing

import gevent.monkey
gevent.monkey.patch_all()
from taskqueue import TaskQueue
from concurrent.futures import ProcessPoolExecutor

def upload(args):
  start, end = args
  tasks = ( PrintTask(i) for i in range(start, end) )
  tq = TaskQueue('sqs://queue-name', green=True)
  tq.insert(tasks, total=(end - start))

task_ranges = [ (0, 250000), (250000, 500000), (500000, 750000), (750000, 1000000) ]
with ProcessPoolExecutor(max_workers=4) as pool:
  pool.map(upload, task_ranges)

In Listing 5, we finally move to multiprocessing to attain the highest speeds. There are three critical pieces of this construction to note.

First, we do not use the usual multiprocessing package and instead use concurrent.futures.ProcessPoolExecutor. If a child process dies in multiprocessing, the parent process will simply hang (this is by design unfortunately...). Using this alternative package, at least an exception will be thrown.

Second, we pass parameters for task generation to the child proceses, not tasks. It is not possible to pass generators from parent to child processes in CPython [1]. It is also inefficient to pass tasks directly as it requires first generating them (as in Listing 1) and then invisibly pickling and unpickling them as they are passed to the child processes. Therefore, we pass only a small number of small picklable objects that are used for constructing a task generator on the other side.

Third, as described in the narrative for Listing 5, the GreenTaskQueue has less context-switching overhead than ordinary multithreaded TaskQueue. Using GreenTaskQueue will cause each core to efficiently run independently of the others. At this point, your main bottlenecks will probably be OS/network card related (let us know if they aren't!). Multiprocessing does scale task production, but it's sub-linear in the number of processes. The task upload rate per a process will fall with each additional core added, but each core still adds additional throughput up to some inflection point.

# Listing 6: Exchanging Generators for Iterators

import gevent.monkey
gevent.monkey.patch_all()
from taskqueue import TaskQueue
from concurrent.futures import ProcessPoolExecutor

class PrintTaskIterator(object):
  def __init__(self, start, end):
    self.start = start
    self.end = end
  def __len__(self):
    return self.end - self.start
  def __iter__(self):
    for i in range(self.start, self.end):
      yield PrintTask(i)

def upload(tsks):
  tq = TaskQueue('sqs://queue-name', green=True)
  tq.insert(tsks)

tasks = [ PrintTaskIterator(0, 100), PrintTaskIterator(100, 200) ]
with ProcessPoolExecutor(max_workers=2) as execute:
  execute.map(upload, tasks)

If you insist on wanting to pass generators to your subprocesses, you can use iterators instead. The construction above allows us to write the generator call up front, pass only a few primatives through the pickling process, and transparently call the generator on the other side. We can even support the len() function which is not available for generators.

# Listing 7: Easy Multiprocessing

import gevent.monkey
gevent.monkey.patch_all(thread=False)
import copy
from taskqueue import TaskQueue

class PrintTaskIterator(object):
  def __init__(self, start, end):
    self.start = start
    self.end = end
  def __getitem__(self, slc):
    itr = copy.deepcopy(self)
    itr.start = self.start + slc.start
    itr.end = self.start + slc.stop
    return itr
  def __len__(self):
    return self.end - self.start
  def __iter__(self):
    for i in range(self.start, self.end):
      yield PrintTask(i)

tq = TaskQueue('sqs://queue-name', green=True)
tq.insert(PrintTaskIterator(0,200), parallel=2)

If you design your iterators such that the slice operator works, TaskQueue can automatically resection the iterator such that it can be fed to multiple processes. Notably, we don't return PrintTaskIterator(self.start+slc.start, self.start+slc.stop) because it triggers an endless recursion during pickling. However, the runtime copy implementation above sidesteps this issue. Internally, PrintTaskIterator(0,200) will be turned into [ PrintTaskIterator(0,100), PrintTaskIterator(100,200) ]. We also perform tracking of exceptions raised by child processes in a queue. gevent.monkey.patch_all(thread=False) was necessary to avoid multiprocess hanging.

[1] You can't pass generators in CPython but you can pass iterators. You can pass generators if you use Pypy or Stackless Python.

--
Made with <3.

Comments
  • Dict args with integer keys get renamed as strings

    Dict args with integer keys get renamed as strings

    Hey @william-silversmith :) It's 10pm on a work night, which means it's time for a weird bug(feature?) report from Jordan!

    I've put together a minimum viable example to illustrate this, I promise I can do this math in my head usually :)

    Check out the code below. In short, I have a dictionary with a key of 1 (int) and a value of some number (here, 100), which I decrement by 1, re-insert into the queue, and then process the next job:

    Min-reproducible:

    from functools import partial
    from taskqueue import queueable, TaskQueue
    
    QUEUE_URI = "fq://demoqueue"
    Q = TaskQueue(QUEUE_URI)
    
    
    @queueable
    def subtract_one(my_data: dict[int, int]):
        if my_data[1] > 0:
            new_data = my_data.copy()
            new_data[1] -= 1
            Q.insert([partial(subtract_one, new_data)])
        else:
            print("Done")
    
    
    if __name__ == "__main__":
        subtract_one({1: 100})
        Q.poll()
    

    So on the first execution, my payload should be {1: 100}, then {1: 99}, etc.

    Error

    but when I run this, I get the following error:

    Traceback (most recent call last):
      File "mvp-integer-keys.py", line 25, in <module>
        Q.poll()
      File ".../python3.9/site-packages/taskqueue/taskqueue.py", line 375, in poll
        task.execute(*execute_args, **execute_kwargs)
      File ".../python3.9/site-packages/taskqueue/queueablefns.py", line 78, in execute
        self(*args, **kwargs)
      File ".../python3.9/site-packages/taskqueue/queueablefns.py", line 87, in __call__
        return self.tofunc()()
      File "mvp-integer-keys.py", line 14, in subtract_one
        if my_data[1] > 0:
    KeyError: 1
    

    Checking the queue files, the my_data arg in subtract_one gets populated as {"1": 99} instead:

    From <ts>-<uuid>.json in demoqueue/queue/:

    {"payload": [["__main__", "subtract_one"], [{"1": 99}], {}, -1], "queueName": "demoqueue", "id": "fcb21030-f8f8-4721-a9df-3001cb5c2c79"}
    

    Funky! I'm guessing this is because orjson needs string keys? But it's opaque to the end-user here (I'm still not totally sure that that's what's going wrong!)

    Fix

    If I just assume that my keys are all str, this same code runs perfectly:

    ...
    
    @queueable
    def subtract_one(my_data: dict[str, int]):
        if my_data["1"] > 0:                                 # πŸ‘€
            new_data = my_data.copy()
            new_data["1"] -= 1                               # πŸ‘€
            Q.insert([partial(subtract_one, new_data)])
        else:
            print("Done")
    
    
    if __name__ == "__main__":
        subtract_one({"1": 100})                             # πŸ‘€
        Q.poll()
    

    So not a huge deal! But thought I'd report it in case others encounter the same thing!

    documentation 
    opened by j6k4m8 5
  • Documentation - specify that worker must import function in the same way as when it was inserted

    Documentation - specify that worker must import function in the same way as when it was inserted

    It may be helpful someone in the future to specify that functions must be imported in the same way for the worker and for insertion (readme or a special error message).

    I had a function from a module of mine that I copied into a notebook to troubleshoot and edit. I added the queueable decorator right there in the notebook and used that function for my task insertion. I copied that edited function back into my module, and imported from there for the worker.

    I was getting the error 'not registered as a queueable function' and could not figure out what the issue was for a while. I had to import the function from my module for the insertion as well and then it worked.

    documentation 
    opened by emjoyce 4
  • exception error

    exception error

    version 0.4 python 3.6

    >>> from taskqueue import TaskQueue
    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/opt/anaconda3/lib/python3.6/site-packages/taskqueue.py", line 99
        except BaseException, e:
                            ^
    SyntaxError: invalid syntax
    
    opened by jingpengw 4
  • Allow taskqueue instantiation with any boto3 client keywords

    Allow taskqueue instantiation with any boto3 client keywords

    • Translate legacy keyrword 'region' to 'region_name'

    I needed this for local testing using a dockerized sqs queue, where the endpoint_url to boto3.client is required and credentials are not necessary. https://hub.docker.com/r/roribio16/alpine-sqs/

    opened by chrisroat 3
  • How to multiprocessing the poll() operation?

    How to multiprocessing the poll() operation?

    Hi, @william-silversmith

    As for the "tq.poll()" operation of TaskQueue on the consumer side, is it possible for multiprocessing? I just started multiple programs to achieve that as far as I know. How about using ProcessPoolExecutor or something else?

    Many thanks!

    feature 
    opened by JackieZhai 2
  • Queueable Functions

    Queueable Functions

    In discussion with @fcollman, we noted that the RegisteredTask paradigm requires a lot of repeated typing of function signatures. It might be possible to create a @queuable decorator for ordinary functions that would remove a lot of boilerplate. I think the way it might look would be something like this:

    from functools import partial 
    
    @queueable
    def mesh_task(cloudpath, bbox, etc):
         ...
    
    tq = TaskQueue(...)
    for _ in range(10):
       tq.insert(partial(mesh_task, 'gs://...', Bbox(...), ...))
    
    tq.poll()
       ---> curried_mesh_task = deserialize(['mesh_task', [ ... ])
       ---> curried_mesh_task()
    

    The idea is that insert would accept fully curried functions using the partial functool which would allow for the inspection and extraction of their parameters and the reapplication of those parameters when executing. We could offer our own function for performing the same concept, but it would basically replicate this functionality.

    Additionally, the @queuable decorator would register the function in a table, just like RegisteredTask to prevent arbitrary execution of functions.

    feature 
    opened by william-silversmith 1
  • Queue Transfer

    Queue Transfer

    It would be cool to have a way to copy or move a queue from one location / protocol to another.

    For example, maybe you are experimenting with fq because it's easier to manage and then want to transfer the results to SQS directly instead of rerunning because the process took forever to compute the right tasks. This is the case for me right now with computing tasks from all the billions labels in a dataset.

    feature 
    opened by william-silversmith 1
  • feat: Batched Uploads and Green Threads

    feat: Batched Uploads and Green Threads

    This contains a few features.

    1. Send messages to AWS SQS using the send_message_batch (via tq.insert_all) which allows sending 10 messages at a time. This results in a 10x speedup.
    2. Add GreenTaskQueue as an option. Users need to manually monkeypatch their application to make it work. This can be useful in NUMA or vCPU environments.
    3. Added delay_seconds to insert and insert_all which allows you to delay a message's visibility in the queue.
    feature refactor 
    opened by william-silversmith 1
  • task construction error

    task construction error

    I am running a chunkflow task locally, but get some task construction error. I am using the tested chunkflow branch, so the error might come from taskqueue?

    I tried to pip install --upgrade task-queue, but it still have this error. I double check the version, it is version latest release of 0.7.

    raised __init__() got an unexpected keyword argument 'output_shape'                                    
     Traceback (most recent call last):                                                                    
      File "task_execution.py", line 71, in execute                                                        
        task = tq.lease(tag=tag, seconds=int(LEASE_SECONDS))                                               
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/taskqueue.py",
     line 164, in lease                                                                                    
        return totask(task)                                                                                
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/taskqueue.py",
     line 22, in totask                                                                                    
        taskobj = payloadBase64Decode(task['payloadBase64'])                                               
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/registered_tas
    k.py", line 27, in payloadBase64Decode                                                                 
        return deserialize(decoded_string)                                                                 
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/registered_tas
    k.py", line 23, in deserialize                                                                         
        return target_class(**params)                                                                      
    TypeError: __init__() got an unexpected keyword argument 'output_shape'                                
                                                                                                           
     undefined task                                                                                        
     on host seungworkstation20                                                                            
    Traceback (most recent call last):                                                                     
      File "task_execution.py", line 94, in <module>                                                       
        command()                                                                                          
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/click/core.py", line 722
    , in __call__                                                                                          
        return self.main(*args, **kwargs)                                                                  
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/click/core.py", line 697
    , in main                                                                                              
        rv = self.invoke(ctx)                                                                              
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/click/core.py", line 895
    , in invoke                                                                                            
        return ctx.invoke(self.callback, **ctx.params)                                                     
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/click/core.py", line 535
    , in invoke                                                                                            
        return callback(*args, **kwargs)                                                                   
      File "task_execution.py", line 34, in command                                                        
        execute(tag, queue, server, qurl, loop)                                                            
      File "task_execution.py", line 71, in execute                                                        
        task = tq.lease(tag=tag, seconds=int(LEASE_SECONDS))                                               
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/taskqueue.py",
     line 164, in lease                                                                                    
        return totask(task)                                                                                
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/taskqueue.py",
     line 22, in totask                                                                                    
        taskobj = payloadBase64Decode(task['payloadBase64'])                                               
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/registered_tas
    k.py", line 27, in payloadBase64Decode                                                                 
        return deserialize(decoded_string)                                                                 
      File "/usr/people/jingpeng/workspace/igneous/jwu/lib/python3.5/site-packages/taskqueue/registered_tas
    k.py", line 23, in deserialize                                                                         
        return target_class(**params)                                                                      
    TypeError: __init__() got an unexpected keyword argument 'output_shape'                                                                                                           
    
    opened by jingpengw 1
  • MultiProcessing of Tasks

    MultiProcessing of Tasks

    It's becoming more useful over time to have support for multi-process uploads. However, that requires a bit of a different architecture. For instance, we'll need a single multiprocess queue that feeds individual threaded queues.

    feature 
    opened by william-silversmith 1
  • fix: progress bar overestimate

    fix: progress bar overestimate

    fix: progress bar overestimate

    We now measure the number of items processed rather than increment by batch_size each time which overestimated the final batch.

    fix: no_proxy for MacOS

    Grand Central Dispatch (libdispatch) is not fork-safe and is called by urllib if proxying is possible. Therefore, we disable proxies to avoid this issue.

    https://bugs.python.org/issue6894

    bug 
    opened by william-silversmith 0
  • Add

    Add "stasis" to Tasks

    Sometimes particular tasks are extremely memory hungry out of proportion with others and need special attention. We should have a "stasis" flag that causes these tasks to be bypassed until released.

    feature 
    opened by william-silversmith 0
  • FileQueue: More Precise Task Completions

    FileQueue: More Precise Task Completions

    If we moved completed items into a "completed" directory, we could get a precise count of completed tasks instead of a count of executed tasks.

    feature 
    opened by william-silversmith 0
  • Task Dependencies

    Task Dependencies

    It would be useful to support task chaining. This allows processors to guarantee locality when acquiring the next task.

    The simplest version of this would be to insert a list of tasks. A more sophisticated version would allow for the specification of a tree that would process breadth-first.

    There are drawbacks to using this kind of processing mode. One of the big ones is that tasks with dependencies won't benefit from automatic parallelism and retries of individual tasks. Instead, the whole unit must complete or be retried.

    feature 
    opened by william-silversmith 0
  • risk of getting empty messages

    risk of getting empty messages

    sqs can return empty messages if the message number is low (<1000), maybe this explains that why the last few hundred tasks takes longer time.

    this package set the wait time to be 0, and could potentially get empty messages. https://github.com/seung-lab/python-task-queue/blob/master/taskqueue/aws_queue_api.py#L60

    here is the documentation of the long polling: https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/sqs-long-polling.html https://docs.aws.amazon.com/AWSSimpleQueueService/latest/SQSDeveloperGuide/working-with-messages.html#setting-up-long-polling

    as suggested, simply setting the waittime to 20 seconds should fix this.

    opened by jingpengw 1
Owner
null
Distributed Task Queue (development branch)

Version: 5.0.5 (singularity) Web: http://celeryproject.org/ Download: https://pypi.org/project/celery/ Source: https://github.com/celery/celery/ Keywo

Celery 20.7k Jan 2, 2023
A multiprocessing distributed task queue for Django

A multiprocessing distributed task queue for Django Features Multiprocessing worker pool Asynchronous tasks Scheduled, cron and repeated tasks Signed

Ilan Steemers 1.7k Jan 3, 2023
Distributed Task Queue (development branch)

Version: 5.1.0b1 (singularity) Web: https://docs.celeryproject.org/en/stable/index.html Download: https://pypi.org/project/celery/ Source: https://git

Celery 20.7k Jan 1, 2023
Asynchronous tasks in Python with Celery + RabbitMQ +Β Redis

python-asynchronous-tasks Setup & Installation Create a virtual environment and install the dependencies: $ python -m venv venv $ source env/bin/activ

Valon Januzaj 40 Dec 3, 2022
Dagon - An Asynchronous Task Graph Execution Engine

Dagon - An Asynchronous Task Graph Execution Engine Dagon is a job execution sys

null 8 Nov 17, 2022
Django database backed celery periodic task scheduler with support for task dependency graph

Djag Scheduler (Dj)ango Task D(AG) (Scheduler) Overview Djag scheduler associates scheduling information with celery tasks The task schedule is persis

Mohith Reddy 3 Nov 25, 2022
RQ (Redis Queue) integration for Flask applications

Flask-RQ RQ (Redis Queue) integration for Flask applications Resources Documentation Issue Tracker Code Development Version Installation $ pip install

Matt Wright 205 Nov 6, 2022
A simple app that provides django integration for RQ (Redis Queue)

Django-RQ Django integration with RQ, a Redis based Python queuing library. Django-RQ is a simple app that allows you to configure your queues in djan

RQ 1.6k Dec 28, 2022
Accept queue automatically on League of Legends.

Accept queue automatically on League of Legends. I was inspired by the lucassmonn code accept-queue-lol-telegram, and I modify it according to my need

null 2 Sep 6, 2022
Redis-backed message queue implementation that can hook into a discord bot written with hikari-lightbulb.

Redis-backed FIFO message queue implementation that can hook into a discord bot written with hikari-lightbulb. This is eventually intended to be the backend communication between a bot and a web dashboard.

thomm.o 7 Dec 5, 2022
Sync Laravel queue with Python. Provides an interface for communication between Laravel and Python.

Python Laravel Queue Queue sync between Python and Laravel using Redis driver. You can process jobs dispatched from Laravel in Python. NOTE: This pack

Sinan Bekar 3 Oct 1, 2022
Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code

py_extract Py_extract is a simple, light-weight python library to handle some extraction tasks using less lines of code. Still in Development Stage! I

I'm Not A Bot #Left_TG 7 Nov 7, 2021
A fast and reliable background task processing library for Python 3.

dramatiq A fast and reliable distributed task processing library for Python 3. Changelog: https://dramatiq.io/changelog.html Community: https://groups

Bogdan Popa 3.4k Jan 1, 2023
Beatserver, a periodic task scheduler for Django 🎡

Beat Server Beatserver, a periodic task scheduler for django channels | beta software How to install Prerequirements: Follow django channels documenta

Raja Simon 130 Dec 17, 2022
Mr. Queue - A distributed worker task queue in Python using Redis & gevent

MRQ MRQ is a distributed task queue for python built on top of mongo, redis and gevent. Full documentation is available on readthedocs Why? MRQ is an

Pricing Assistant 871 Dec 25, 2022
A django integration for huey task queue that supports multi queue management

django-huey This package is an extension of huey contrib djhuey package that allows users to manage multiple queues. Installation Using pip package ma

GAIA Software 32 Nov 26, 2022
Fully Automated YouTube Channel ▢️with Added Extra Features.

Fully Automated Youtube Channel β–’β–ˆβ–€β–€β–ˆ β–ˆβ–€β–€β–ˆ β–€β–€β–ˆβ–€β–€ β–€β–€β–ˆβ–€β–€ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–€β–€β–ˆ β–’β–ˆβ–€β–€β–„ β–ˆβ–‘β–‘β–ˆ β–‘β–‘β–ˆβ–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–ˆβ–‘β–‘β–ˆ β–ˆβ–€β–€β–„ β–ˆβ–€β–€ β–ˆβ–„β–„β–€ β–’β–ˆβ–„β–„β–ˆ β–€β–€β–€β–€ β–‘β–‘β–€β–‘β–‘ β–‘β–’β–ˆβ–‘β–‘ β–‘β–€β–€β–€ β–€β–€β–€β–‘

sam-sepiol 249 Jan 2, 2023
Minimal example utilizing fastapi and celery with RabbitMQ for task queue, Redis for celery backend and flower for monitoring the celery tasks.

FastAPI with Celery Minimal example utilizing FastAPI and Celery with RabbitMQ for task queue, Redis for Celery backend and flower for monitoring the

Grega Vrbančič 371 Jan 1, 2023
Django Serverless Cron - Run cron jobs easily in a serverless environment

Django Serverless Cron - Run cron jobs easily in a serverless environment

Paul Onteri 41 Dec 16, 2022
ChronoRace is a tool to accurately perform timed race conditions to circumvent application business logic.

ChronoRace is a tool to accurately perform timed race conditions to circumvent application business logic. I've found in my research that w

Tanner 64 Aug 4, 2022