Huey’s API

Most end-users will interact with the API using the two decorators:

The API documentation will follow the structure of the huey api.py module.

Huey types

Implementations of Huey which handle task and result persistence.

Note

See the documentation for Huey for the list of initialization parameters common to all Huey implementations.

Warning

If you have a busy application and plan to switch from one of the Redis implementations to another (e.g. switch from RedisHuey to the PriorityRedisHuey) you may want to start the new huey consumer on a different Redis database (e.g. db=15). Then let your old consumer drain any pre-existing tasks while the new consumer accepts new tasks.

class RedisHuey

Huey that utilizes redis for queue and result storage. Requires redis-py.

Commonly-used keyword arguments for storage configuration:

Parameters:
  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • host – hostname of the Redis server.

  • port – port number.

  • password – password for Redis.

  • db (int) – Redis database to use (typically 0-15, default is 0).

  • notify_result (bool) – use a blocking-pop on a result-ready key to enable low-latency result reading.

  • notify_result_ttl (int) – TTL for result-ready key to automatically expire un-awaited results.

The redis-py documentation contains the complete list of arguments supported by the Redis client.

Note

RedisHuey does not support task priorities. If you wish to use task priorities with Redis, use PriorityRedisHuey.

RedisHuey uses a Redis LIST to store the queue of pending tasks. Redis lists are a natural fit, as they offer O(1) append and pop from either end of the list. Redis also provides blocking-pop commands which allow the consumer to react to a new message as soon as it is available without resorting to polling.

For low-latency result-fetching, you can specify notify_result=True when instantiating RedisHuey (or subclasses). This works by coordinating result readiness through a blocking pop on a dedicated result list. By default this result-readiness list will expire after 86400 seconds, but this can be controlled with the notify_result_ttl parameter. All RedisHuey implementations support these options.

See also

RedisStorage

class PriorityRedisHuey

Huey that utilizes redis for queue and result storage. Requires redis-py. Accepts the same arguments as RedisHuey.

PriorityRedisHuey supports task priorities, and requires Redis 5.0 or newer.

PriorityRedisHuey uses a Redis SORTED SET to store the queue of pending tasks. Sorted sets consist of a unique value and a numeric score. In addition to being sorted by numeric score, Redis also orders the items within the set lexicographically. Huey takes advantage of these two characteristics to implement the priority queue. Redis 5.0 added a new command, ZPOPMIN, which pops the lowest-scoring item from the sorted set (and BZPOPMIN, the blocking variety).

class RedisExpireHuey

Identical to RedisHuey except for the way task result values are stored. RedisHuey keeps all task results in a Redis hash, and whenever a task result is read (via the result handle), it is also removed from the result hash. This is done to prevent the task result storage from growing without bound. Additionally, using a Redis hash for all results helps avoid cluttering up the Redis keyspace and utilizes less RAM for storing the keys themselves.

RedisExpireHuey uses a different approach: task results are stored in ordinary Redis keys with a special prefix. Result keys are then given a time-to-live, and will be expired automatically by the Redis server. This removes the necessity to remove results from the result store after they are read once.

Commonly-used keyword arguments for storage configuration:

Parameters:
  • expire_time (int) – Expire time in seconds, default is 86400 (1 day).

  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • host – hostname of the Redis server.

  • port – port number.

  • password – password for Redis.

  • db (int) – Redis database to use (typically 0-15, default is 0).

  • notify_result (bool) – use a blocking-pop on a result-ready key to enable low-latency result reading.

  • notify_result_ttl (int) – TTL for result-ready key to automatically expire un-awaited results.

class PriorityRedisExpireHuey

Combines behavior of RedisExpireHuey and PriorityRedisHuey.

class SqliteHuey

Huey that utilizes sqlite3 for queue and result storage. Only requirement is the standard library sqlite3 module.

Commonly-used keyword arguments:

Parameters:
  • filename (str) – filename for database, defaults to ‘huey.db’.

  • cache_mb (int) – megabytes of memory to allow for sqlite page-cache.

  • fsync (bool) – use durable writes. Slower but more resilient to corruption in the event of sudden power loss. Defaults to false.

SqliteHuey fully supports task priorities.

See also

SqliteStorage

class MemoryHuey

Huey that uses in-memory storage. Only should be used when testing or when using immediate mode. MemoryHuey fully supports task priorities.

class FileHuey

Huey that uses the file-system for storage. Should not be used in high-throughput, highly-concurrent environments, as the FileStorage utilizes exclusive locks around all file-system operations.

Parameters:
  • path (str) – base-path for huey data (queue tasks, schedule and results will be stored in sub-directories of this path).

  • levels (int) – number of levels in result-file directory structure to ensure the results directory does not contain an unmanageable number of files.

  • use_thread_lock (bool) – use the standard lib threading.Lock instead of a lockfile for file-system operations. This should only be enabled when using the greenlet or thread consumer worker models.

FileHuey fully supports task priorities.

class BlackHoleHuey

Huey that uses BlackHoleStorage. All storage operations are no-ops: enqueued tasks are silently discarded and the result store is always empty. Intended for testing scenarios where you want to verify that tasks are being called without actually running them or storing anything.

Huey object

class Huey(name='huey', results=True, store_none=False, utc=True, immediate=False, serializer=None, compression=False, use_zlib=False, immediate_use_memory=True, storage_kwargs)
Parameters:
  • name (str) – the name of the task queue, e.g. your application’s name.

  • results (bool) – whether to store task results.

  • store_none (bool) – whether to store None in the result store.

  • utc (bool) – use UTC internally, convert naive datetimes from local time to UTC (if local time is other than UTC).

  • immediate (bool) – useful for debugging; causes tasks to be executed synchronously in the application.

  • serializer (Serializer) – serializer implementation for tasks and result data. The default implementation uses pickle.

  • compression (bool) – compress tasks and result data.

  • use_zlib (bool) – use zlib for compression instead of gzip.

  • immediate_use_memory (bool) – automatically switch to a local in-memory storage backend whenever immediate-mode is enabled.

  • storage_kwargs – arbitrary keyword arguments that will be passed to the storage backend for additional configuration.

Huey executes tasks by exposing function decorators that cause the function call to be enqueued for execution by the consumer.

Typically your application will only need one Huey instance, but you can have as many as you like – the only caveat is that one consumer process must be executed for each Huey instance.

Example usage:

# demo.py
from huey import RedisHuey

# Create a huey instance.
huey = RedisHuey('my-app')

@huey.task()
def add_numbers(a, b):
    return a + b

@huey.periodic_task(crontab(minute='0', hour='2'))
def nightly_report():
    generate_nightly_report()

To run the consumer with 4 worker threads:

$ huey_consumer.py demo.huey -w 4

To add two numbers, the “huey” way:

>>> from demo import add_numbers
>>> res = add_numbers(1, 2)
>>> res(blocking=True)  # Blocks until result is available.
3

To test huey without using a consumer, you can use “immediate” mode. Immediate mode follows all the same code paths as Huey does when running the consumer process, but does so synchronously within the application.

>>> from demo import add_numbers, huey
>>> huey.immediate = True  # Tasks executed immediately.
>>> res = add_numbers(2, 3)
>>> res()
5
immediate

The immediate property is used to enable and disable immediate mode. When immediate mode is enabled, task-decorated functions are executed synchronously by the caller, making it very useful for development and testing. Calling a task function still returns a Result handle, but the task itself is executed immediately.

By default, when immediate mode is enabled, Huey will switch to using in-memory storage. This is to help prevent accidentally writing to a live Redis server while testing. To disable this functionality, specify immediate_use_memory=False when initializing Huey.

Enabling immediate mode:

huey = RedisHuey()

# Enable immediate mode. Tasks now executed synchronously.
# Additionally, huey will now use in-memory storage.
huey.immediate = True

# Disable immediate mode. Tasks will now be enqueued in a Redis
# queue.
huey.immediate = False

Immediate mode can also be specified when your Huey instance is created:

huey = RedisHuey(immediate=True)
create_consumer(**options)
Parameters:

options – keyword arguments passed to the Consumer constructor (e.g. workers, worker_type, periodic, initial_delay, etc.).

Returns:

a Consumer instance.

Create a consumer programmatically, rather than using the huey_consumer.py command-line tool. This is useful for embedding the consumer within your own application process, or for advanced testing scenarios.

consumer = huey.create_consumer(workers=4, worker_type='thread')
consumer.start()
# ... later ...
consumer.stop(graceful=True)
task(retries=0, retry_delay=0, priority=None, context=False, name=None, expires=None, timeout=None, **kwargs)
Parameters:
  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait between retries.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • context (bool) – when the task is executed, include the Task instance as a keyword argument.

  • name (str) – name for this task. If not provided, Huey will default to using the module name plus function name.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • timeout (int) – set execution timeout for task. Implementation used by the consumer is specific to the worker type and is NOT available for threads. Processes use SIGALRM, greenlet uses gevent.Timeout. When using with threads it is necessary to use cooperative timeout checking. See Cooperative Timeout for example.

  • kwargs – arbitrary key/value arguments that are passed to the TaskWrapper instance.

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

Function decorator that marks the decorated function for processing by the consumer. Calls to the decorated function will do the following:

  1. Serialize the function call into a Message suitable for storing in the queue.

  2. Enqueue the message for execution by the consumer.

  3. Return a Result handle, which can be used to check the result of the task function, revoke the task (assuming it hasn’t started yet), reschedule the task, and more.

The task() decorator returns a TaskWrapper, which implements special methods for enqueueing the decorated function, schedule() it for the future, chain tasks to form a pipeline, and more.

Example task:

@huey.task()
def slow(n):
    time.sleep(n)
    return n

Calling the slow() task will return immediately. We can block until the task finishes by waiting for the result:

>>> res = slow(10)  # Returns immediately.

>>> res()  # Non-blocking read - returns None if not ready.

>>> res(blocking=True)  # Block until task finishes, ~10s.
10

When context=True, the Task instance is passed to your function as a task keyword argument. This gives access to the task ID, remaining retries, and cooperative timeout APIs.

When the result store is disabled (results=False), calling a task-decorated function returns None instead of a Result handle.

If you stack additional decorators on a task function, note that decorators above @huey.task() run in the calling process, while decorators below it run in the consumer worker.

For a thorough walkthrough with examples, see the Guide. For the full TaskWrapper API (scheduling, pipelines, revocation, etc.), see TaskWrapper, Task, and Result.

periodic_task(validate_datetime, retries=0, retry_delay=0, priority=None, context=False, name=None, expires=None, timeout=None, **kwargs)
Parameters:
  • validate_datetime (function) – function which accepts a datetime instance and returns whether the task should be executed at the given time.

  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait in-between retries.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • context (bool) – when the task is executed, include the Task instance as a parameter.

  • name (str) – name for this task. If not provided, Huey will default to using the module name plus function name.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • timeout (int) – set execution timeout for task. Implementation used by the consumer is specific to the worker type and is NOT available for threads. Processes use SIGALRM, greenlet uses gevent.Timeout. When using with threads it is necessary to use cooperative timeout checking. See Cooperative Timeout for example.

  • kwargs – arbitrary key/value arguments that are passed to the TaskWrapper instance.

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

The periodic_task() decorator marks a function for automatic execution by the consumer at a specific interval, like cron. The validate_datetime parameter is typically the crontab() helper (described below). The consumer checks once per minute whether the function should run.

Example — execute every three hours, on the hour:

@huey.periodic_task(crontab(minute='0', hour='*/3'))
def update_feeds():
    for feed in my_list_of_feeds:
        fetch_feed_data(feed)

Note

Because functions decorated with periodic_task are meant to be executed at intervals in isolation, they should not take any required parameters nor should they be expected to return a meaningful value.

Like task(), the periodic task decorator adds helpers to the decorated function. These helpers allow you to revoke() and restore() the periodic task, enabling you to pause it or prevent its execution. For more information, see TaskWrapper.

Note

The result (return value) of a periodic task is not stored in the result store. This is primarily due to the fact that there is not an obvious way one would read such results, since the invocation of the periodic task happens inside the consumer scheduler. As such, there is no task result handle which the user could use to read the result. To store the results of periodic tasks, you will need to use your own storage or use the storage APIs directly:

@huey.periodic_task(crontab(minute='*/10'))
def my_task():
    # do some work...
    do_something()

    # Manually store some data in the result store.
    huey.put('my-task', some_data_to_store)

More info:

context_task(obj, as_argument=False, **kwargs)
Parameters:
  • obj – object that implements the context-manager APIs.

  • as_argument (bool) – pass the return value of the context-manager’s __enter__ method into the decorated task as the first argument.

  • kwargs – additional keyword arguments are forwarded to task() (e.g. retries, retry_delay, priority, context, name, expires, timeout).

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

This is an extended implementation of the Huey.task() decorator, which wraps the decorated task in a with obj: block. Roughly equivalent to:

db = PostgresqlDatabase(...)

@huey.task()
def without_context_task(n):
    with db:
        do_something(n)

@huey.context_task(db)
def with_context_task(n):
    return do_something(n)

When as_argument=True, the value returned by __enter__ is passed as the first positional argument to the task function:

@huey.context_task(db, as_argument=True)
def with_ctx(conn, n):
    # conn is the value returned by db.__enter__()
    return conn.execute(n)
pre_execute(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual pre-execute function.

Decorator for registering a pre-execute hook. The callback will be executed before the execution of every task. Execution of the task can be cancelled by raising a CancelExecution exception. Uncaught exceptions will be logged but will not cause the task itself to be cancelled.

The callback function should accept a single task instance, the return value is ignored.

Hooks are executed in the order in which they are registered.

Usage:

@huey.pre_execute()
def my_pre_execute_hook(task):
    if datetime.datetime.now().weekday() == 6:
        raise CancelExecution('Sunday -- no work will be done.')
unregister_pre_execute(name_or_fn)
Parameters:

name_or_fn – the name given to the pre-execute hook, or the function object itself.

Returns:

boolean

Unregister the specified pre-execute hook.

post_execute(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual post-execute function.

Register a post-execute hook. The callback will be executed after the execution of every task. Uncaught exceptions will be logged but will have no other effect on the overall operation of the consumer.

The callback function should accept:

  • a Task instance

  • the return value from the execution of the task (which may be None)

  • any exception that was raised during the execution of the task (which will be None for tasks that executed normally).

The return value of the callback itself is ignored.

Hooks are executed in the order in which they are registered.

Usage:

@huey.post_execute()
def my_post_execute_hook(task, task_value, exc):
    do_something()
unregister_post_execute(name_or_fn)
Parameters:

name_or_fn – the name given to the post-execute hook, or the function object itself.

Returns:

boolean

Unregister the specified post-execute hook.

on_startup(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual on-startup function.

Register a startup hook. The callback will be executed whenever a worker comes online. Uncaught exceptions will be logged but will have no other effect on the overall operation of the worker.

The callback function must not accept any parameters.

This API is provided to simplify setting up shared resources that, for whatever reason, should not be created as import-time side-effects. For example, your tasks need to write data into a Postgres database. If you create the connection at import-time, before the worker processes are spawned, you’ll likely run into errors when attempting to use the connection from the child (worker) processes. To avoid this problem, you can register a startup hook which executes once when the worker starts up.

Usage:

db_connection = None

@huey.on_startup()
def setup_db_connection():
    global db_connection
    db_connection = psycopg2.connect(database='my_db')

@huey.task()
def write_data(rows):
    cursor = db_connection.cursor()
    # ...
unregister_on_startup(name_or_fn)
Parameters:

name_or_fn – the name given to the on-startup hook, or the function object itself.

Returns:

boolean

Unregister the specified on-startup hook.

on_shutdown(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual on-shutdown function.

Register a shutdown hook. The callback will be executed by a worker immediately before it goes offline. Uncaught exceptions will be logged but will have no other effect on the overall shutdown of the worker.

The callback function must not accept any parameters.

This API is provided to simplify cleaning-up shared resources.

unregister_on_shutdown(name_or_fn)
Parameters:

name_or_fn – the name given to the on-shutdown hook, or the function object itself.

Returns:

boolean

Unregister the specified on-shutdown hook.

signal(*signals)
Parameters:

signals – zero or more signals to handle.

Returns:

a decorator used to wrap the actual signal handler.

Attach a signal handler callback, which will be executed when the specified signals are sent by the consumer. If no signals are listed, then the handler will be bound to all signals. The list of signals and additional information can be found in the Signals documentation.

Example:

from huey.signals import SIGNAL_ERROR, SIGNAL_LOCKED

@huey.signal(SIGNAL_ERROR, SIGNAL_LOCKED)
def task_not_run_handler(signal, task, exc=None):
    # Do something in response to the "ERROR" or "LOCKED" signals.
    # Note that the "ERROR" signal includes a third parameter,
    # which is the unhandled exception that was raised by the task.
    # Since this parameter is not sent with the "LOCKED" signal, we
    # provide a default of ``exc=None``.
    pass
disconnect_signal(receiver, *signals)
Parameters:
  • receiver – the signal handling function to disconnect.

  • signals – zero or more signals to stop handling.

Disconnect the signal handler from the provided signals. If no signals are provided, then the handler is disconnected from any signals it may have been connected to.

notify_interrupted_tasks()

Emit SIGNAL_INTERRUPTED for every task that is currently being executed (i.e. in-flight). This is called automatically by the consumer during shutdown. Application code should not normally need to call this method directly, but it is available for custom consumer implementations or advanced testing.

Tasks are tracked from the moment execution begins until execution completes (or fails). If the consumer is killed mid-task, those tasks remain in the _tasks_in_flight set, and this method emits SIGNAL_INTERRUPTED for each one.

enqueue(task)
Parameters:

task – a Task instance, a group, or a chord.

Returns:

depends on the input type (see below).

Enqueue the given task, group, or chord for execution. When the result store is enabled (default), the return value depends on what was enqueued:

Note

Calling (or scheduling) a task-decorated function will automatically enqueue a task for execution. You only need to call enqueue() directly when working with pipelines, groups, or chords.

revoke(task, revoke_until=None, revoke_once=False)

See also

Use Result.revoke() instead.

revoke_by_id(task_id, revoke_until=None, revoke_once=False)
Parameters:
  • task_id (str) – task instance id.

  • revoke_until (datetime) – optional expiration date for revocation.

  • revoke_once (bool) – revoke once and then re-enable.

Revoke a Task instance using the task id.

revoke_all(task_class, revoke_until=None, revoke_once=False)

See also

Use TaskWrapper.revoke() instead.

restore(task)

See also

Use Result.restore() instead.

restore_by_id(task_id)
Parameters:

task_id (str) – task instance id.

Returns:

boolean indicating success.

Restore a Task instance using the task id. Returns boolean indicating if the revocation was successfully removed.

restore_all(task_class)

See also

Use TaskWrapper.restore() instead.

is_revoked(task, timestamp=None)
Parameters:

task – either a task instance, a task ID, a Result, or a Task class.

This method should rarely need to be called directly. Typically you should rather use the is_revoked method on the object that is being revoked, for example:

@huey.task()
def greet(name):
    return 'Hello %s' % name

r = greet.schedule(delay=60, args=('Huey',))
r.revoke()  # Revoke this task.
r.is_revoked()  # True.

greet.revoke()  # Revoke ALL invocations of this task.
greet.is_revoked()  # True.

See also

For task instances, use Result.is_revoked().

For task functions, use TaskWrapper.is_revoked().

result(task_id, blocking=False, timeout=None, backoff=1.15, max_delay=1.0, revoke_on_timeout=False, preserve=False)
Parameters:
  • task_id – the task’s unique identifier.

  • blocking (bool) – whether to block while waiting for task result

  • timeout – number of seconds to block (if blocking=True)

  • backoff – amount to backoff delay each iteration of loop

  • max_delay – maximum amount of time to wait between iterations when attempting to fetch result.

  • revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.

  • preserve (bool) – when set to True, this parameter ensures that the task result will be preserved after having been successfully retrieved. Ordinarily, Huey will discard results after they have been read, to prevent the result store from growing without bounds.

Attempts to retrieve the return value of a task. By default, result() will simply check for the value, returning None if it is not ready yet. If you want to wait for the result, specify blocking=True. This will loop, backing off up to the provided max_delay, until the value is ready or the timeout is reached. If the timeout is reached before the result is ready, a ResultTimeout will be raised.

See also

Result - the result() method is simply a wrapper that creates a Result object and calls its get() method.

Note

If the task failed with an exception, then a TaskException that wraps the original exception will be raised.

Warning

By default the result store will delete a task’s return value after the value has been successfully read (by a successful call to the result() or Result.get() methods). If you intend to access the task result multiple times, you must specify preserve=True when calling these methods.

lock_task(lock_name)
Parameters:

lock_name (str) – Name to use for the lock.

Returns:

TaskLock instance, which can be used as a decorator or context-manager.

Utilize the Storage key/value APIs to implement simple locking.

This lock is designed to be used to prevent multiple invocations of a task from running concurrently. Can be used as either a context-manager within the task, or as a task decorator. If using as a decorator, place it directly above the function declaration.

If a second invocation occurs and the lock cannot be acquired, then a TaskLockedException is raised, which is handled by the consumer. The task will not be executed and a SIGNAL_LOCKED will be sent. If the task is configured to be retried, then it will be retried normally.

Examples:

@huey.periodic_task(crontab(minute='*/5'))
@huey.lock_task('reports-lock')  # Goes *after* the task decorator.
def generate_report():
    # If a report takes longer than 5 minutes to generate, we do
    # not want to kick off another until the previous invocation
    # has finished.
    run_report()

@huey.periodic_task(crontab(minute='0'))
def backup():
    # Generate backup of code
    do_code_backup()

    # Generate database backup. Since this may take longer than an
    # hour, we want to ensure that it is not run concurrently.
    with huey.lock_task('db-backup'):
        do_db_backup()
is_locked(lock_name)
Parameters:

lock_name (str) – Name of lock to check.

Returns:

boolean value indicating whether lock is held or not.

flush_locks(*names)
Parameters:

names – optional additional lock-names to flush.

Returns:

set of lock names that were held and subsequently cleared.

Flush any locks that may be held. Locks created by using lock_task() as a decorator are automatically discovered (registered at import time). Locks created inside a function body using the context-manager form are not automatically discovered, and their names must be passed explicitly.

When names are provided, both the discovered locks and the explicitly named locks are flushed.

# Decorator form — automatically discovered:
@huey.periodic_task(crontab(minute='*/5'))
@huey.lock_task('reports-lock')
def generate_report():
    run_report()

# Context-manager form — NOT automatically discovered:
@huey.task()
def backup():
    with huey.lock_task('db-backup'):
        do_db_backup()

# Flush discovered locks only:
huey.flush_locks()  # Clears 'reports-lock' if held.

# Flush discovered locks AND the context-manager lock:
huey.flush_locks('db-backup')  # Clears both.
rate_limit(name, limit, per, retry=True)
Parameters:
  • name (str) – Name to use for the rate-limiter.

  • limit (int) – Maximum invocations in the given window.

  • per (int) – Size of rate-limiting window in seconds.

  • retry (bool) – Automatically reschedule rate-limited tasks to run at beginning of next rate-limit window.

Returns:

RateLimit instance, which can be used as a decorator or context-manager.

Simple fixed-window rate-limiting for tasks.

When a task fails due to being rate-limited, it will be automatically rescheduled to run at the start of the next window when retry=True or the task itself has one or more retries.

Tasks that fail due to rate-limiting will emit a SIGNAL_RATE_LIMITED.

By default any task that is rate-limited will be retried at the start of the next rate-limiting window. For instance say we have a task that is limited to 10 calls / 60 seconds. The 11th call in a given 60s period will be retried at the start of the next window (~1 to 60 seconds later, depending on timing).

This is good for tasks that must not be lost. There is a risk if the volume of tasks continually exceeds the rate-limit that the automatic retries will grow continuously. To mitigate this you can specify the number of retries in the task decorator and disable automatic retries on the rate limiter. See examples below.

Example of limiting to 10 calls per minute. Tasks that are limited will automatically be retried at the start of the next rate-limit window.

@huey.task()
@huey.rate_limit('data_sync', limit=10, per=60)
def data_sync(data):
    service.apply_updates(data)

In the above example, if a task fails due to being rate-limited, it will be rescheduled for the start of the next window. If it fails again due to rate-limiting, it will again be rescheduled, etc.

Example of limiting to 10 calls per minute AND only allowing 2 retries. This mitigates unbounded growth for rate-limits that are continuously being hit:

@huey.task(retries=2)
@huey.rate_limit('data_sync', limit=10, per=60)
def data_sync(data):
    service.apply_updates(data)

In the above example, if a task fails due to being rate-limited, it will be rescheduled for the start of the next window up to 2 times.

Equivalent example using a context-manager instead of a decorator:

@huey.task(retries=2)
def data_sync(data):
    with huey.rate_limit('data_sync', limit=10, per=60):
        service.apply_updates(data)

By default, rate-limited tasks are scheduled to be retried at the beginning of the next rate-limit window. If a task specifies an explicit retry_delay, however, that value will be used instead:

@huey.task(retries=2, retry_delay=120)
@huey.rate_limit('data_sync', limit=10, per=60)
def data_sync(data):
    service.apply_updates(data)

In the above code, if the task fails due to being rate-limited, it will be retried up to 2 times after a delay of 120s between retries.

put(key, value)
Parameters:
  • key – key for data

  • value – arbitrary data to store in result store.

Store a value in the result-store under the given key.

get(key, peek=False)
Parameters:
  • key – key to read

  • peek (bool) – non-destructive read

Read a value from the result-store at the given key. By default reads are destructive. To preserve the value for additional reads, specify peek=True.

delete(key)
Parameters:

key – key to delete from the result store.

Returns:

boolean indicating whether the key existed and was deleted.

Remove a value from the result-store at the given key. Useful for cleaning up manually-stored data created with put().

put_if_empty(key, value)
Parameters:
  • key – key to store data under.

  • value – arbitrary data to store.

Returns:

boolean indicating whether the value was stored. Returns False if the key already exists.

Atomically store a value only if the key does not already exist. This is the primitive used internally by TaskLock to implement exclusive locking. It can also be used to build custom coordination patterns like task deduplication.

pending(limit=None)
Parameters:

limit (int) – optionally limit the number of tasks returned.

Returns:

a list of Task instances waiting to be run.

Note

This method must deserialize every task in the queue, which can be slow if the queue is large. Use pending_count() when you only need the count.

pending_count()
Returns:

the number of tasks currently in the queue, without deserializing them.

scheduled(limit=None)
Parameters:

limit (int) – optionally limit the number of tasks returned.

Returns:

a list of Task instances that are scheduled to execute at some time in the future.

Note

This method deserializes every task on the schedule. Use scheduled_count() when you only need the count.

scheduled_count()
Returns:

the number of tasks currently in the schedule, without deserializing them.

all_results()
Returns:

a dict of task-id to the serialized result data for all key/value pairs in the result store.

result_count()
Returns:

the number of key/value pairs in the result store.

flush()

Remove all data from the queue, schedule, and result store. This is a destructive operation. Primarily useful for testing.

__len__()

Return the number of items currently in the queue.

class TaskWrapper(huey, func, retries=None, retry_delay=None, context=False, name=None, task_base=None, **settings)
Parameters:
  • huey (Huey) – A huey instance.

  • func – User function.

  • retries (int) – Upon failure, number of times to retry the task.

  • retry_delay (int) – Number of seconds to wait before retrying after a failure/exception.

  • context (bool) – when the task is executed, include the Task instance as a parameter.

  • name (str) – Name for task (will be determined based on task module and function name if not provided).

  • task_base – Base-class for task, defaults to Task.

  • settings – Arbitrary settings to pass to the task class constructor.

Wrapper around a user-defined function that converts function calls into tasks executed by the consumer. The wrapper, which decorates the function, replaces the function in the scope with a TaskWrapper instance.

The wrapper class, when called, will enqueue the requested function call for execution by the consumer.

Note

You should not need to create TaskWrapper instances directly. The Huey.task() and Huey.periodic_task() decorators will create the appropriate TaskWrapper automatically.

schedule(args=None, kwargs=None, eta=None, delay=None, priority=None, retries=None, retry_delay=None, expires=None, timeout=None, id=None)
Parameters:
  • args (tuple) – arguments for decorated function.

  • kwargs (dict) – keyword arguments for decorated function.

  • eta (datetime) – the time at which the function should be executed.

  • delay (int) – number of seconds to wait before executing function.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait between retries.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • timeout (int) – set execution timeout for task. Implementation used by the consumer is specific to the worker type and is NOT available for threads. Processes use SIGALRM, greenlet uses gevent.Timeout. When using with threads it is necessary to use cooperative timeout checking. See Cooperative Timeout for example.

  • id – assign the given id to the Task being scheduled.

Returns:

a Result handle for the task.

Use the schedule method to schedule the execution of the queue task for a given time in the future:

import datetime

one_hour = datetime.datetime.now() + datetime.timedelta(hours=1)

# Schedule the task to be run in an hour. It will be called with
# three arguments.
res = check_feeds.schedule(args=(url1, url2, url3), eta=one_hour)

# Equivalent, but uses delay rather than eta.
res = check_feeds.schedule(args=(url1, url2, url3), delay=3600)
revoke(revoke_until=None, revoke_once=False)
Parameters:
  • revoke_until (datetime) – Automatically restore the task after the given datetime.

  • revoke_once (bool) – Revoke the next execution of the task and then automatically restore.

Revoking a task will prevent any instance of the given task from executing. When no parameters are provided the function will not execute again until TaskWrapper.restore() is called.

This function can be called multiple times, but each call will supercede any restrictions from the previous revocation.

# Skip the next execution
send_emails.revoke(revoke_once=True)

# Prevent any invocation from executing.
send_emails.revoke()

# Prevent any invocation for 24 hours.
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
send_emails.revoke(revoke_until=tomorrow)
is_revoked(timestamp=None)
Parameters:

timestamp (datetime) – If provided, checks whether task is revoked with respect to the given timestamp.

Returns:

bool indicating whether task is revoked.

Check whether the given task is revoked.

restore()
Returns:

bool indicating whether a previous revocation rule was found and removed successfully.

Removes a previous task revocation, if one was configured.

call_local()

Call the @task-decorated function, bypassing all Huey-specific logic. In other words, call_local() provides access to the underlying user-defined function.

>>> add.call_local(1, 2)
3
s(*args, **kwargs)
Parameters:
  • args – Arguments for task function.

  • kwargs – Keyword arguments for task function.

  • priority (int) – assign priority override to task, higher numbers are processed first by the consumer when there is a backlog.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • timeout (int) – set execution timeout for task. Implementation used by the consumer is specific to the worker type and is NOT available for threads. Processes use SIGALRM, greenlet uses gevent.Timeout. When using with threads it is necessary to use cooperative timeout checking. See Cooperative Timeout for example.

Returns:

a Task instance representing the execution of the task function with the given arguments.

Create a Task instance representing the invocation of the task function with the given arguments and keyword-arguments.

Note

The returned task instance is not enqueued automatically.

Warning

The following keyword argument names are reserved and will be intercepted by Huey rather than passed to your task function: eta, delay, retries, retry_delay, priority, expires, and timeout. If your task function has a parameter with one of these names, rename the parameter or pass the value positionally.

To illustrate the distinction, when you call a task()-decorated function, behind-the-scenes, Huey is doing something like this:

@huey.task()
def add(a, b):
    return a + b

result = add(1, 2)

# Is equivalent to:
task = add.s(1, 2)
result = huey.enqueue(task)

Typically, one will only use the TaskWrapper.s() helper when creating task execution pipelines.

For example:

add_task = add.s(1, 2)  # Represent task invocation.
pipeline = (add_task
            .then(add, 3)  # Call add() with previous result and 3.
            .then(add, 4)  # etc...
            .then(add, 5))

results = huey.enqueue(pipeline)

# Print results of above pipeline.
print(results.get(blocking=True))

# [3, 6, 10, 15]
map(it)
Parameters:

it – a list, tuple or generic iterable that contains the arguments for a number of individual task executions.

Returns:

a ResultGroup encapsulating the individual Result handlers for the task executions.

Note

The iterable should be a list of argument tuples which will be passed to the task function.

Example:

@huey.task()
def add(a, b):
    return a + b

rg = add.map([(i, i * i) for i in range(10)])

# Resolve all results.
rg.get(blocking=True)

# [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
unregister()
Returns:

boolean indicating whether the task was found and removed from the registry.

Remove this task from Huey’s internal task registry. After unregistering, the consumer will no longer be able to deserialize or execute instances of this task. If the task is a periodic task, it will also be removed from the periodic task schedule.

This is primarily useful when working with dynamically-created periodic tasks that need to be removed at runtime.

class Task(args=None, kwargs=None, id=None, eta=None, retries=None, retry_delay=None, priority=None, expires=None, timeout=None, on_complete=None, on_error=None)
Parameters:
  • args (tuple) – arguments for the function call.

  • kwargs (dict) – keyword arguments for the function call.

  • id (str) – unique id, defaults to a UUID if not provided.

  • eta (datetime) – time at which task should be executed.

  • retries (int) – automatic retry attempts.

  • retry_delay (int) – seconds to wait before retrying a failed task.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • timeout (int) – set execution timeout for task. Implementation used by the consumer is specific to the worker type and is NOT available for threads. Processes use SIGALRM, greenlet uses gevent.Timeout. When using with threads it is necessary to use cooperative timeout checking. See Cooperative Timeout for example.

  • on_complete (Task) – Task to execute upon completion of this task.

  • on_error (Task) – Task to execute upon failure / error.

The Task class represents the execution of a function. Instances of the task are serialized and enqueued for execution by the consumer, which deserializes and executes the task function.

Note

You should not need to create instances of Task directly, but instead use either the Huey.task() decorator or the TaskWrapper.s() method.

Here’s a refresher on how tasks work:

@huey.task()
def add(a, b):
    return a + b

ret = add(1, 2)
print(ret.get(blocking=True))  # "3".

# The above two lines are equivalent to:
task_instance = add.s(1, 2)  # Create a Task instance.
ret = huey.enqueue(task_instance)  # Enqueue the queue task.
print(ret.get(blocking=True))  # "3".
is_timed_out

If task was configured with a timeout, returns True if the task has exceeded the timeout. The task timer starts when the task begins to execute (after any pre-execute hooks are fired).

This is a property and the return value is calculated dynamically.

time_remaining

If task was configured with a timeout, returns the amount of time remaining. For tasks that do not specify a timeout, this property returns float('inf') so it is always safe to compare.

This is a property and the return value is calculated dynamically.

check_timeout()

If task was configured with a timeout, this method provides a single hook for cooperatively checking whether the task has exceeded its available time, and if so, raises a TaskTimeout. This exception ensures that the SIGNAL_TIMEOUT fires for the task.

This method is the only mechanism for enforcing a timeout when using thread workers.

See Cooperative Timeout for discussion.

then(task, *args, **kwargs)
Parameters:
  • task (TaskWrapper) – A task()-decorated function.

  • args – Arguments to pass to the task.

  • kwargs – Keyword arguments to pass to the task.

Returns:

The parent task.

The then() method is used to create task pipelines. A pipeline is a lot like a unix pipe, such that the return value from the parent task is then passed (along with any parameters specified by args and kwargs) to the child task.

Here’s an example of chaining some addition operations:

add_task = add.s(1, 2)  # Represent task invocation.
pipeline = (add_task
            .then(add, 3)  # Call add() with previous result and 3.
            .then(add, 4)  # etc...
            .then(add, 5))

result_group = huey.enqueue(pipeline)

print(result_group.get(blocking=True))

# [3, 6, 10, 15]

If the value returned by the parent function is a tuple, then the tuple will be used to update the *args for the child function. Likewise, if the parent function returns a dict, then the dict will be used to update the **kwargs for the child function.

Example of chaining fibonacci calculations:

@huey.task()
def fib(a, b=1):
    a, b = a + b, a
    return (a, b)  # returns tuple, which is passed as *args

pipe = (fib.s(1)
        .then(fib)
        .then(fib))
result_group = huey.enqueue(pipe)

print(result_group.get(blocking=True))
# [(2, 1), (3, 2), (5, 3)]
error(task, *args, **kwargs)
Parameters:
  • task (TaskWrapper) – A task()-decorated function.

  • args – Arguments to pass to the task.

  • kwargs – Keyword arguments to pass to the task.

Returns:

The parent task.

The error() method is similar to the then() method, which is used to construct a task pipeline, except the error() task will only be called in the event of an unhandled exception in the parent task.

crontab(minute='*', hour='*', day='*', month='*', day_of_week='*'[, strict=False])

Convert a “crontab”-style set of parameters into a test function that will return True when a given datetime matches the parameters set forth in the crontab.

The argument order matches the standard Linux crontab format: minute, hour, day, month, day-of-week.

Day-of-week uses 0=Sunday and 6=Saturday.

Acceptable inputs:

  • * = every distinct value

  • */n = run every “n” times, i.e. hours=`*/4` == 0, 4, 8, 12, 16, 20

  • m-n = run every time m..n

  • m,n = run on m and n

Parameters:

strict (bool) – cause crontab to raise a ValueError if an input does not match a supported input format.

Return type:

a test function that takes a datetime and returns a boolean

Note

It is currently not possible to run periodic tasks with an interval less than once per minute. If you need to run tasks more frequently, you can create a periodic task that runs once per minute, and from that task, schedule any number of sub-tasks to run after the desired delays.

crontab.hourly()

Convenience function for a task that should run every hour, on the hour.

crontab.daily()

Convenience function for a task that should run daily, at midnight.

class TaskLock(huey, name)

This class should not be instantiated directly, but is instead returned by Huey.lock_task(). This object implements a context-manager or decorator which can be used to ensure only one instance of the wrapped task is executed at a given time.

If the consumer executes a task and encounters the TaskLockedException, then the task will not be executed, an error will be logged by the consumer, and a SIGNAL_LOCKED signal will be emitted. If the task is configured with retries, it will be retried normally (the lock is released, so the retry has a chance to acquire it).

See Huey.lock_task() for example usage.

clear()

Helper method to manually clear the lock. This method is provided to allow the lock to be flushed in the event that the consumer process was killed while executing a task holding the lock.

Alternatively, at start-up time you can execute the consumer with the -f method which will flush all locks before beginning to execute tasks.

acquire()

Acquire the lock, raising a TaskLockedException if the lock could not be acquired. Otherwise, returns True.

release()

Identical to clear().

locked()
Returns:

boolean whether lock is currently being held.

class RateLimit(huey, name, limit, per, retry=True)

This class should not be instantiated directly, but is instead returned by Huey.rate_limit(). This object implements a context-manager or decorator which can be used to enforce a simple fixed-window rate-limit of the wrapped block.

When a task fails due to being rate-limited, it will be automatically rescheduled to run at the start of the next window when retry=True or the task itself has one or more retries.

Interaction between task retries and the retry flag on the rate-limit:

  • retry=True, task has no retries: task will be retried at the start of the next window (or task.retry_delay if specified).

  • retry=True, task has 1 or more retries: same as above, except task’s own retry count is preserved.

  • retry=False, task has no retries: error is the final state, task is not rescheduled.

  • retry=False, task has 1 or more retries: task retries decrement normally and the task’s retry_delay is honored.

Rate-limited tasks emit a SIGNAL_RATE_LIMITED.

Parameters:
  • name (str) – Name to use for the rate-limiter.

  • limit (int) – Maximum invocations in the given window.

  • per (int) – Size of rate-limiting window in seconds.

  • retry (bool) – Automatically reschedule rate-limited tasks to run at beginning of next rate-limit window.

Returns:

RateLimit instance, which can be used as a decorator or context-manager.

reset()

Reset the rate-limiter, clearing the current window and counter. The next call to a rate-limited task will start a fresh window.

current_usage()
Returns:

the number of invocations in the current rate-limit window. Returns 0 if no invocations have occurred in this window.

acquire()

Check whether the rate-limit has been exceeded. If so, raises RateLimitExceeded. Otherwise, increments the counter and returns normally. This is called automatically when the rate-limiter is used as a decorator or context-manager.

class group(tasks)

A group is a lightweight wrapper around any number or types of tasks that may be executed in parallel. When a group is enqueued, Huey returns a ResultGroup for reading the results.

Parameters:

tasks (list) – any number of Task instances.

Example:

@huey.task()
def health_check(service):
    return service.name, perform_health_check(service)

g = group([
    health_check.s(api_service),
    health_check.s(cache_service),
    health_check.s(db_service),
    health_check.s(proxy_service),
])
result_group = huey.enqueue(g)

for name, is_healthy in result_group(blocking=True):
    print('%s healthy? %s' % (name, is_healthy))
then(task, *args, **kwargs)
Parameters:
  • task – A task()-decorated function (TaskWrapper), or a Task instance to run with the group results.

  • args – Arguments to pass to the callback task.

  • kwargs – Keyword arguments to pass to the callback task.

Returns:

A chord instance.

Convert a group into a chord by attaching a callback. When all members have finished executing, the callback will be enqueued with the member results (as a list, in order).

result = huey.enqueue(
    group([fetch.s(u) for u in urls])
    .then(aggregate))
error(task, *args, **kwargs)
Parameters:
  • task – A task()-decorated function (TaskWrapper), or a Task instance to run with the group results.

  • args – Arguments to pass to the error handler.

  • kwargs – Keyword arguments to pass to the error handler.

Returns:

the group instance.

Attach an error handler to every member of the group. If any member raises an exception, the error handler will be enqueued with the exception as an argument. Supports chaining:

result = huey.enqueue(
    group([fetch.s(u) for u in urls])
    .error(on_fetch_error)
    .then(aggregate))
class chord(tasks, callback)

A chord consists of a group of tasks that execute in parallel and a callback task. When all sub-tasks have completed (or permanently failed), their results are passed in order as a list to the callback task.

Parameters:
  • tasks (list) – a list of Task instances (including pipelines built with Task.then()), chord instances (for nesting), in any combination.

  • callback – a task()-decorated function (TaskWrapper), or a Task instance. The callback receives the list of sub-task results as its first argument.

@huey.task()
def fetch(url):
    return requests.get(url).json()

@huey.task()
def aggregate(results):
    return {r['id']: r for r in results}

c = chord(
    [fetch.s(u) for u in urls],
    aggregate.s())
result = huey.enqueue(c)

# result is a ChordResult.
data = result(blocking=True)  # Blocks until callback finishes.

Enqueueing a chord returns a ChordResult.

Note

The callback always receives the member results as a list, even if there is only one member. Results are ordered by the position of the member in the original list, not by the order in which members finished executing.

Sub-task errors:

Chords are designed to always complete. If a sub-task fails and exhausts all its retries, the exception is used as the task result. Tasks with retries will retry normally, only after retries are exhausted is the exception used.

The callback may therefore receive a mix of normal return values and Exception objects. See Groups and Chords in the guide for patterns on handling mixed results.

Note

Revoking a chord member prevents it from running and will stall the chord. If you need to cancel a chord, revoke the callback task instead.

Pipeline members:

When a chord member is a pipeline (e.g. fetch.s(u).then(parse)), Huey walks the on_complete chain and uses the end of the pipeline for the sub-task. This ensures the chord callback is not triggered until all pipeline members have fully finished.

Error callbacks placed on individual tasks within a pipeline member operate independently of the chord.

c = chord(
    [download.s(u).then(parse).then(validate) for u in urls],
    aggregate.s())
then(task, *args, **kwargs)
Parameters:
  • task – A task()-decorated function (TaskWrapper), or a Task instance.

  • args – Arguments to pass to the task.

  • kwargs – Keyword arguments to pass to the task.

Returns:

the chord instance.

Chain an additional task after the chord’s callback. The callback’s return value is passed to the chained task, exactly like Task.then(). Multiple calls to then() extend the chain.

When tasks are chained to the chord callback, the ChordResult returned by Huey.enqueue() will have a non-None pipeline_results attribute containing a ResultGroup for the full callback pipeline.

result = huey.enqueue(
    chord([fetch.s(u) for u in urls], aggregate.s())
    .then(generate_report)
    .then(send_email))

# result.pipeline_results is a ResultGroup with three entries:
# [aggregate result, generate_report result, send_email result]

Delegates to the callback’s Task.then().

error(task, *args, **kwargs)
Parameters:
  • task – A task()-decorated function (TaskWrapper), or a Task instance.

  • args – Arguments to pass to the error handler.

  • kwargs – Keyword arguments to pass to the error handler.

Returns:

the chord instance.

Attach an error handler to the chord’s callback. If the callback raises an exception, this task will be enqueued with the exception as an argument.

result = huey.enqueue(
    chord([fetch.s(u) for u in urls], aggregate.s())
    .error(alert_admin))

This handles errors in the callback, not in the sub-tasks. To handle errors in individual members, use Task.error() on each member or group.error() before constructing the chord.

Delegates to the callback’s Task.error().

Nested chords:

Chord members can themselves be chords. The inner chord’s callback pipeline serves as the synchronization point for the outer chord.

inner1 = chord([a.s(), b.s()], merge.s())
inner2 = chord([c.s(), d.s()], merge.s())

result = huey.enqueue(chord([inner1, inner2], final.s()))

When the inner chords complete and their callbacks finish, the outer chord’s callback (final) receives the two inner callback results as a list. See Groups and Chords in the guide for a detailed example.

class ChordResult(results, callback_result, pipeline=None)

Returned by Huey.enqueue() when enqueueing a chord. Provides access to the individual member results, the callback result, and the results of any tasks chained after the callback.

results

A ResultGroup containing the Result handles for each chord member.

callback

A Result handle for the callback task.

pipeline_results

A ResultGroup containing Result handles for the callback and every task chained after it via chord.then(). The first entry is the callback itself, followed by each chained task in order.

None if no tasks were chained after the callback.

result = huey.enqueue(
    chord([a.s(), b.s()], combine.s())
    .then(report)
    .then(archive))

result.pipeline_results[0]()  # combine result
result.pipeline_results[1]()  # report result
result.pipeline_results[2]()  # archive result
get(*args, **kwargs)

Shortcut for self.callback.get(*args, **kwargs). Blocks until the callback finishes and returns its result.

__call__(*args, **kwargs)

Identical to get().

reset()

Reset the callback result and allow re-fetching a new result for the given task (i.e. after a task error and subsequent retry).

Result

class Result(huey, task)

Although you will probably never instantiate an Result object yourself, they are returned whenever you execute a task-decorated function, or schedule a task for execution. The Result object talks to the result store and is responsible for fetching results from tasks.

Once the consumer finishes executing a task, the return value is placed in the result store, allowing the original caller to retrieve it.

Getting results from tasks is very simple:

>>> @huey.task()
... def add(a, b):
...     return a + b
...

>>> res = add(1, 2)
>>> res  # what is "res" ?
<Result: task 6b6f36fc-da0d-4069-b46c-c0d4ccff1df6>

>>> res()  # Fetch the result of this task.
3

What happens when data isn’t available yet? Let’s assume the next call takes about a minute to calculate:

>>> res = add(100, 200)  # Imagine this is very slow.
>>> res.get()  # Data is not ready, so None is returned.

>>> res() is None  # We can omit ".get", it works the same way.
True

>>> res(blocking=True, timeout=5)  # Block for up to 5 seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/tmp/huey/src/huey/huey/queue.py", line 46, in get
    raise ResultTimeout
huey.exceptions.ResultTimeout

>>> res(blocking=True)  # No timeout, will block until it gets data.
300

If the task failed with an exception, then a TaskException will be raised when reading the result value:

>>> @huey.task()
... def fails():
...     raise Exception('I failed')

>>> res = fails()
>>> res()  # raises a TaskException!
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/tmp/huey/src/huey/huey/api.py", line 684, in get
    raise TaskException(result.metadata)
huey.exceptions.TaskException: Exception('I failed',)
id

Returns the unique id of the corresponding task.

is_ready()
Returns:

whether the task result value is available.

Return type:

bool

get(blocking=False, timeout=None, backoff=1.15, max_delay=1.0, revoke_on_timeout=False, preserve=False)
Parameters:
  • blocking (bool) – whether to block while waiting for task result

  • timeout – number of seconds to block (if blocking=True)

  • backoff – amount to backoff delay each iteration of loop

  • max_delay – maximum amount of time to wait between iterations when attempting to fetch result.

  • revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.

  • preserve (bool) – when set to True, this parameter ensures that the task result will be preserved after having been successfully retrieved. Ordinarily, Huey will discard results after they have been read, to prevent the result store from growing without bounds.

Raises:

ResultTimeout if blocking and timeout specified without result becoming ready yet.

Attempt to retrieve the return value of a task. By default, get() will simply check for the value, returning None if it is not ready yet. If you want to wait for a value, you can specify blocking=True. This will loop, backing off up to the provided max_delay, until the value is ready or the timeout is reached. If the timeout is reached before the result is ready, a ResultTimeout exception will be raised.

Note

Instead of calling .get(), you can simply call the Result object directly. Both methods accept the same arguments.

__call__(**kwargs)

Identical to the get() method, provided as a shortcut.

revoke(revoke_once=True)
Parameters:

revoke_once (bool) – revoke only once.

Revoke the given task. Unless it is in the process of executing, the task will be discarded without being executed.

one_hour = datetime.datetime.now() + datetime.timedelta(hours=1)

# Run this command in an hour
res = add.schedule((1, 2), eta=one_hour)

# I changed my mind, do not run it after all.
res.revoke()
restore()

Restore the given task instance. Unless the task instance has already been dequeued and discarded, it will be restored and run as scheduled.

Warning

If the task class itself has been revoked, via a call to TaskWrapper.revoke(), then this method has no effect.

is_revoked()

Return a boolean value indicating whether this particular task instance or the task class itself has been revoked.

reschedule(eta=None, delay=None, expires=None, priority=None, preserve_pipeline=True)
Parameters:
  • eta (datetime) – execute function at the given time.

  • delay (int) – execute function after specified delay in seconds.

  • expires – set expiration time for task. If not provided, then the task’s original expire time (if any) will be used.

  • priority – override the task’s priority. If not provided, the original priority is preserved.

  • preserve_pipeline (bool) – when True (default), the rescheduled task retains any on_complete and on_error chains from the original task. Set to False to discard them.

Returns:

Result handle for the new task.

Reschedule the given task. The original task instance will be revoked, but no checks are made to verify that it hasn’t already been executed.

If neither an eta nor a delay is specified, the task will be run as soon as it is received by a worker.

reset()

Reset the cached result and allow re-fetching a new result for the given task. This is essential when a task fails and is retried: the first call to get() caches the error locally, and subsequent calls return the cached error even after the retry succeeds. Calling reset() clears the local cache so the next get() reads from the result store again.

result = flaky_task()
try:
    result.get(blocking=True, timeout=5)
except TaskException:
    result.reset()  # Clear cached error.
    # Now we can read the result from the successful retry:
    value = result.get(blocking=True, timeout=30)
class ResultGroup

A ResultGroup will be returned when you enqueue a task pipeline or if you use the TaskWrapper.map() method. It is a simple wrapper around a number of individual Result() instances, and provides a convenience API for fetching the results in bulk.

get(**kwargs)

Call get() on each individual Result() instance in the group and returns a list of return values. Any keyword arguments are passed along.

as_completed(backoff=1.15, max_delay=1.0)
Parameters:
  • backoff (float) – factor to increase delay between polls.

  • max_delay (float) – maximum seconds between polls.

Returns:

a generator that yields individual result values as they become available.

Iterate over results in the order they become ready, rather than the order they were enqueued. This is useful when you want to begin processing results as soon as possible without waiting for all tasks to finish.

result_group = huey.enqueue(group([
    fetch.s(url) for url in urls]))

for value in result_group.as_completed():
    process(value)

The generator polls each result handle in a round-robin fashion with exponential backoff, yielding values as they appear.

__getitem__(idx)

Resolves the result at the given index, blocking until it is ready.

__iter__()

Iterate over the individual Result instances in the group.

__len__()

Return the number of results in the group.

Serializer

class Serializer(compression=False, compression_level=6, use_zlib=False)
Parameters:
  • compression (bool) – use gzip compression

  • compression_level (int) – 0 for least, 9 for most.

  • use_zlib (bool) – use zlib for compression instead of gzip.

The Serializer class implements a simple interface that can be extended to provide your own serialization format. The default implementation uses pickle.

To override, the following methods should be implemented. Compression is handled transparently elsewhere in the API.

_serialize(data)
Parameters:

data – arbitrary Python object to serialize.

Rtype bytes:

_deserialize(data)
Parameters:

data (bytes) – serialized data.

Returns:

the deserialized object.

class SignedSerializer(secret, salt='huey', **kwargs)
Parameters:
  • secret (str) – secret key used to generate HMAC signatures.

  • salt (str) – salt combined with the secret (default 'huey').

  • kwargs – additional keyword arguments passed to the base Serializer (e.g. compression, use_zlib).

A subclass of Serializer that adds an HMAC-SHA1 signature to every serialized message. When deserializing, the signature is verified and a ValueError is raised if the message has been tampered with.

This is useful when the storage backend (e.g. Redis) is shared or network-exposed and you want to prevent malicious injection of crafted pickle payloads.

from huey import RedisHuey
from huey.serializer import SignedSerializer

huey = RedisHuey(
    'my-app',
    serializer=SignedSerializer(secret='my-secret-key'))

Note

The signed serializer detects tampering but does not encrypt the data. Task arguments remain visible in the storage backend.

Both the application and the consumer must use the same secret and salt.

Exceptions

class HueyException

General exception class.

class ConfigurationError

Raised when Huey encounters a configuration problem.

class TaskLockedException

Raised by the consumer when a task lock cannot be acquired.

class ResultTimeout

Raised when attempting to block on a call to Result.get() (for instance) and the timeout is exceeded without the result being ready.

class TaskTimeout

Raised when a task exceeds its specified execution time.

class RateLimitExceeded(key, delay, retry=True)

Raised when a rate-limit has been exceeded.

class CancelExecution(retry=None)

Cancel the execution of a task. Can be raised either within a pre_execute() hook, or within a task()-decorated function.

When raised from within a pre_execute() hook, this exception signals to the consumer that the task shall be cancelled and not run.

When raised in the body of a task()-decorated function, the retry parameter controls whether the task is retried:

  • retry=None (default): the task’s normal retry policy applies. If the task has retries remaining, they are decremented and the task is re-enqueued. If no retries remain, the task is not retried.

  • retry=True: the task will be retried regardless of whether it was declared with retries. If the task has no retries, Huey sets the retry count to 1 so at least one retry occurs.

  • retry=False: the task will not be retried, even if it has retries remaining.

class RetryTask(msg=None, eta=None, delay=None)

Raised by user code from within a task() function to force a retry. When this exception is raised, the task will be retried irrespective of whether it is configured with automatic retries.

If delay or eta is specified, then any retry_delay set on the task will be overridden and the value specified will be used to determine when the task will be retried next.

@huey.task()
def fetch_api_data(url):
    try:
        fh = urlopen(url)
    except HTTPError:
        # Try again in 60 seconds for an HTTP error (500, etc).
        raise RetryTask(delay=60)
    ...
class TaskException

General exception raised by Result handles when reading the result of a task that failed due to an error.

Storage

Huey comes with several built-in storage implementations:

class RedisStorage(name='huey', blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, notify_result=False, notify_result_ttl=86400, **connection_params)
Parameters:
  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • read_timeout – Timeout to use when performing a blocking pop, default is 1 second.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • client_name – name used to identify Redis clients used by Huey.

  • notify_result (bool) – use a blocking-pop on a result-ready key to enable low-latency result reading.

  • notify_result_ttl (int) – TTL for result-ready key to automatically expire un-awaited results.

Additional keyword arguments will be passed directly to the Redis client constructor. See the redis-py documentation for the complete list of arguments supported by the Redis client.

class RedisExpireStorage(name='huey', expire_time=86400, blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, notify_result=False, notify_result_ttl=86400, **connection_params)
Parameters:

expire_time (int) – TTL for results of individual tasks.

Subclass of RedisStorage that implements the result store APIs using normal Redis keys with a TTL, so that unread results will automatically be cleaned-up. RedisStorage uses a HASH for the result store, which has the benefit of keeping the Redis keyspace orderly, but which comes with the downside that unread task results can build up over time. This storage implementation trades keyspace sprawl for automatic clean-up.

class PriorityRedisStorage(name='huey', blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, notify_result=False, notify_result_ttl=86400, **connection_params)
Parameters:
  • blocking (bool) – Use blocking-zpopmin when reading from the queue (as opposed to polling). Default is true.

  • read_timeout – Timeout to use when performing a blocking pop, default is 1 second.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • client_name – name used to identify Redis clients used by Huey.

  • notify_result (bool) – use a blocking-pop on a result-ready key to enable low-latency result reading.

  • notify_result_ttl (int) – TTL for result-ready key to automatically expire un-awaited results.

Redis storage that uses a different data-structure for the task queue in order to support task priorities.

Additional keyword arguments will be passed directly to the Redis client constructor. See the redis-py documentation for the complete list of arguments supported by the Redis client.

Warning

This storage engine requires Redis 5.0 or newer.

class PriorityRedisExpireStorage(name='huey', expire_time=86400, ...)
Parameters:

expire_time (int) – TTL for results of individual tasks.

Combination of PriorityRedisStorage, which supports task priorities, and RedisExpireStorage, which stores task results as top-level Redis keys in order set a TTL so that unread results are automatically cleaned-up.

class SqliteStorage(filename='huey.db', name='huey', cache_mb=8, fsync=False, timeout=5, strict_fifo=False, **kwargs)
Parameters:
  • filename (str) – sqlite database filename.

  • cache_mb (int) – sqlite page-cache size in megabytes.

  • fsync (bool) – if enabled, all writes to the Sqlite database will be synchonized. This provides greater safety from database corruption in the event of sudden power-loss.

  • journal_mode (str) – sqlite journaling mode to use. Defaults to using write-ahead logging, which enables readers to coexist with a single writer.

  • timeout (int) – busy timeout (in seconds), amount of time to wait to acquire the write lock when another thread / connection holds it.

  • strict_fifo (bool) – ensure that the task queue behaves as a strict FIFO. By default, Sqlite may reuse rowids for deleted tasks, which can cause tasks to be run in a different order than the order in which they were enqueued.

  • kwargs – Additional keyword arguments passed to the sqlite3 connection constructor.

class FileStorage(name, path, levels=2, use_thread_lock=False)
Parameters:
  • name (str) – (unused by the file storage API)

  • path (str) – directory path used to store task results. Will be created if it does not exist.

  • levels (int) – number of levels in cache-file directory structure to ensure a given directory does not contain an unmanageable number of files.

  • use_thread_lock (bool) – use the standard lib threading.Lock instead of a lockfile. Note: this should only be enabled when using the greenlet or thread consumer worker models.

The FileStorage implements a simple file-system storage layer. This storage class should not be used in high-throughput, highly-concurrent environments, as it utilizes exclusive locks around all file-system operations. This is done to prevent race-conditions when reading from the file-system.

class MemoryStorage

In-memory storage engine for use when testing or developing. Designed for use with immediate mode.

class BlackHoleStorage

Storage class that discards all data written to it, and thus always appears to be empty. Intended for testing only.

class huey.storage.BaseStorage(name='huey', **storage_kwargs)

Base storage-layer interface. Subclasses should implement all methods.

add_to_schedule(data, ts)

Add the given task data to the schedule, to be executed at the given timestamp.

Parameters:
  • data (bytes) – Task data.

  • ts (datetime) – Timestamp at which task should be executed.

Returns:

No return value.

close()

Close or release any objects/handles used by storage layer.

Returns:

(optional) boolean indicating success

delete_counter(key)

Delete the counter at the given key.

delete_data(key)

Delete the value at the given key, if it exists.

Parameters:

key (bytes) – Key to delete.

Returns:

boolean success or failure.

dequeue()

Atomically remove data from the queue. If no data is available, no data is returned.

Returns:

Opaque binary task data or None if queue is empty.

enqueue(data, priority=None)

Given an opaque chunk of data, add it to the queue.

Parameters:
  • data (bytes) – Task data.

  • priority (float) – Priority, higher priorities processed first. Defaults to 0.

Returns:

No return value.

Some storage may not implement support for priority. In that case, the storage may raise a NotImplementedError for non-None priority values.

enqueued_items(limit=None)

Non-destructively read the given number of tasks from the queue. If no limit is specified, all tasks will be read.

Parameters:

limit (int) – Restrict the number of tasks returned.

Returns:

A list containing opaque binary task data.

flush_all()

Remove all persistent or semi-persistent data.

Returns:

No return value.

flush_counters()

Clear all counters.

Returns:

No return value.

flush_queue()

Remove all data from the queue.

Returns:

No return value.

flush_results()

Delete all key/value pairs from the data-store.

Returns:

No return value.

flush_schedule()

Delete all scheduled tasks.

Returns:

No return value.

has_data_for_key(key)

Return whether there is data for the given key.

Returns:

Boolean value.

incr(key, amount=1)

Atomically increment a counter, returning the new value. If the key does not exist, it is assumed to be 0.

peek_data(key)

Non-destructively read the value at the given key, if it exists.

Parameters:

key (bytes) – Key to read.

Returns:

Associated value, if key exists, or EmptyData.

pop_data(key)

Destructively read the value at the given key, if it exists.

Parameters:

key (bytes) – Key to read.

Returns:

Associated value, if key exists, or EmptyData.

put_data(key, value, is_result=False)

Store an arbitrary key/value pair, overwrites any existing value.

Parameters:
  • key (bytes) – lookup key

  • value (bytes) – value

  • is_result (bool) – indicate if we are storing a (volatile) task result versus metadata like a task revocation key or lock.

Returns:

No return value.

put_if_empty(key, value)

Atomically write data only if the key is not already set.

Parameters:
  • key (bytes) – Key to check/set.

  • value (bytes) – Arbitrary data.

Returns:

Boolean whether key/value was set.

queue_size()

Return the length of the queue.

Returns:

Number of tasks.

read_schedule(ts)

Read all tasks from the schedule that should be executed at or before the given timestamp. Once read, the tasks are removed from the schedule.

Parameters:

ts (datetime) – Timestamp

Returns:

List containing task data for tasks which should be executed at or before the given timestamp.

result_items()

Non-destructively read all the key/value pairs from the data-store.

Returns:

Dictionary mapping all key/value pairs in the data-store.

result_store_size()
Returns:

Number of key/value pairs in the result store.

schedule_size()
Returns:

The number of tasks currently in the schedule.

scheduled_items(limit=None)

Non-destructively read the given number of tasks from the schedule.

Parameters:

limit (int) – Restrict the number of tasks returned.

Returns:

List of tasks that are in schedule, in order from soonest to latest.

wait_result(key, timeout=None, backoff=1.15, max_delay=1.0)

Block until a result is available for the given key, or until the timeout expires. Returns True if the result is available, or False if the timeout expired.

The default implementation polls with exponential backoff, but Redis subclasses provide option to override with BLPOP for lower latency result notification (specify notify_result=True).