Huey’s API

Most end-users will interact with the API using the two decorators:

The API documentation will follow the structure of the huey api.py module.

Huey types

Implementations of Huey which handle task and result persistence.

Note

See the documentation for Huey for the list of initialization parameters common to all Huey implementations.

Warning

If you have a busy application and plan to switch from one of the Redis implementations to another (e.g. switch from RedisHuey to the PriorityRedisHuey) you may want to start the new huey consumer on a different Redis database (e.g. db=15). Then let your old consumer drain any pre-existing tasks while the new consumer accepts new tasks.

class RedisHuey

Huey that utilizes redis for queue and result storage. Requires redis-py.

Commonly-used keyword arguments for storage configuration:

Parameters:
  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • host – hostname of the Redis server.

  • port – port number.

  • password – password for Redis.

  • db (int) – Redis database to use (typically 0-15, default is 0).

The redis-py documentation contains the complete list of arguments supported by the Redis client.

Note

RedisHuey does not support task priorities. If you wish to use task priorities with Redis, use PriorityRedisHuey.

RedisHuey uses a Redis LIST to store the queue of pending tasks. Redis lists are a natural fit, as they offer O(1) append and pop from either end of the list. Redis also provides blocking-pop commands which allow the consumer to react to a new message as soon as it is available without resorting to polling.

See also

RedisStorage

class PriorityRedisHuey

Huey that utilizes redis for queue and result storage. Requires redis-py. Accepts the same arguments as RedisHuey.

PriorityRedisHuey supports task priorities, and requires Redis 5.0 or newer.

PriorityRedisHuey uses a Redis SORTED SET to store the queue of pending tasks. Sorted sets consist of a unique value and a numeric score. In addition to being sorted by numeric score, Redis also orders the items within the set lexicographically. Huey takes advantage of these two characteristics to implement the priority queue. Redis 5.0 added a new command, ZPOPMIN, which pops the lowest-scoring item from the sorted set (and BZPOPMIN, the blocking variety).

class RedisExpireHuey

Identical to RedisHuey except for the way task result values are stored. RedisHuey keeps all task results in a Redis hash, and whenever a task result is read (via the result handle), it is also removed from the result hash. This is done to prevent the task result storage from growing without bound. Additionally, using a Redis hash for all results helps avoid cluttering up the Redis keyspace and utilizes less RAM for storing the keys themselves.

RedisExpireHuey uses a different approach: task results are stored in ordinary Redis keys with a special prefix. Result keys are then given a time-to-live, and will be expired automatically by the Redis server. This removes the necessity to remove results from the result store after they are read once.

Commonly-used keyword arguments for storage configuration:

Parameters:
  • expire_time (int) – Expire time in seconds, default is 86400 (1 day).

  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • host – hostname of the Redis server.

  • port – port number.

  • password – password for Redis.

  • db (int) – Redis database to use (typically 0-15, default is 0).

class PriorityRedisExpireHuey

Combines behavior of RedisExpireHuey and PriorityRedisHuey.

class SqliteHuey

Huey that utilizes sqlite3 for queue and result storage. Only requirement is the standard library sqlite3 module.

Commonly-used keyword arguments:

Parameters:
  • filename (str) – filename for database, defaults to ‘huey.db’.

  • cache_mb (int) – megabytes of memory to allow for sqlite page-cache.

  • fsync (bool) – use durable writes. Slower but more resilient to corruption in the event of sudden power loss. Defaults to false.

SqliteHuey fully supports task priorities.

See also

SqliteStorage

class MemoryHuey

Huey that uses in-memory storage. Only should be used when testing or when using immediate mode. MemoryHuey fully supports task priorities.

class FileHuey

Huey that uses the file-system for storage. Should not be used in high-throughput, highly-concurrent environments, as the FileStorage utilizes exclusive locks around all file-system operations.

Parameters:
  • path (str) – base-path for huey data (queue tasks, schedule and results will be stored in sub-directories of this path).

  • levels (int) – number of levels in result-file directory structure to ensure the results directory does not contain an unmanageable number of files.

  • use_thread_lock (bool) – use the standard lib threading.Lock instead of a lockfile for file-system operations. This should only be enabled when using the greenlet or thread consumer worker models.

FileHuey fully supports task priorities.

Huey object

class Huey(name='huey', results=True, store_none=False, utc=True, immediate=False, serializer=None, compression=False, use_zlib=False, immediate_use_memory=True, storage_kwargs)
Parameters:
  • name (str) – the name of the task queue, e.g. your application’s name.

  • results (bool) – whether to store task results.

  • store_none (bool) – whether to store None in the result store.

  • utc (bool) – use UTC internally, convert naive datetimes from local time to UTC (if local time is other than UTC).

  • immediate (bool) – useful for debugging; causes tasks to be executed synchronously in the application.

  • serializer (Serializer) – serializer implementation for tasks and result data. The default implementation uses pickle.

  • compression (bool) – compress tasks and result data.

  • use_zlib (bool) – use zlib for compression instead of gzip.

  • immediate_use_memory (bool) – automatically switch to a local in-memory storage backend whenever immediate-mode is enabled.

  • storage_kwargs – arbitrary keyword arguments that will be passed to the storage backend for additional configuration.

Huey executes tasks by exposing function decorators that cause the function call to be enqueued for execution by the consumer.

Typically your application will only need one Huey instance, but you can have as many as you like – the only caveat is that one consumer process must be executed for each Huey instance.

Example usage:

# demo.py
from huey import RedisHuey

# Create a huey instance.
huey = RedisHuey('my-app')

@huey.task()
def add_numbers(a, b):
    return a + b

@huey.periodic_task(crontab(minute='0', hour='2'))
def nightly_report():
    generate_nightly_report()

To run the consumer with 4 worker threads:

$ huey_consumer.py demo.huey -w 4

To add two numbers, the “huey” way:

>>> from demo import add_numbers
>>> res = add_numbers(1, 2)
>>> res(blocking=True)  # Blocks until result is available.
3

To test huey without using a consumer, you can use “immediate” mode. Immediate mode follows all the same code paths as Huey does when running the consumer process, but does so synchronously within the application.

>>> from demo import add_numbers, huey
>>> huey.immediate = True  # Tasks executed immediately.
>>> res = add_numbers(2, 3)
>>> res()
5
immediate

The immediate property is used to enable and disable immediate mode. When immediate mode is enabled, task-decorated functions are executed synchronously by the caller, making it very useful for development and testing. Calling a task function still returns a Result handle, but the task itself is executed immediately.

By default, when immediate mode is enabled, Huey will switch to using in-memory storage. This is to help prevent accidentally writing to a live Redis server while testing. To disable this functionality, specify immediate_use_memory=False when initializing Huey.

Enabling immediate mode:

huey = RedisHuey()

# Enable immediate mode. Tasks now executed synchronously.
# Additionally, huey will now use in-memory storage.
huey.immediate = True

# Disable immediate mode. Tasks will now be enqueued in a Redis
# queue.
huey.immediate = False

Immediate mode can also be specified when your Huey instance is created:

huey = RedisHuey(immediate=True)
task(retries=0, retry_delay=0, priority=None, context=False, name=None, expires=None, **kwargs)
Parameters:
  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait between retries.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • context (bool) – when the task is executed, include the Task instance as a keyword argument.

  • name (str) – name for this task. If not provided, Huey will default to using the module name plus function name.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • kwargs – arbitrary key/value arguments that are passed to the TaskWrapper instance.

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

Function decorator that marks the decorated function for processing by the consumer. Calls to the decorated function will do the following:

  1. Serialize the function call into a Message suitable for storing in the queue.

  2. Enqueue the message for execution by the consumer.

  3. Return a Result handle, which can be used to check the result of the task function, revoke the task (assuming it hasn’t started yet), reschedule the task, and more.

Note

Huey can be configured to execute the function immediately by instantiating Huey with immediate=True – this is useful for running in debug mode or when you do not wish to run the consumer.

For more information, see the immediate mode section of the guide.

The task() decorator returns a TaskWrapper, which implements special methods for enqueueing the decorated function. These methods are used to schedule() the task to run in the future, chain tasks to form a pipeline, and more.

Example:

from huey import RedisHuey

huey = RedisHuey()

@huey.task()
def add(a, b):
    return a + b

Whenever the add() function is called, the actual execution will occur when the consumer dequeues the message.

>>> res = add(1, 2)
>>> res
<Result: task 6b6f36fc-da0d-4069-b46c-c0d4ccff1df6>
>>> res()
3

To further illustrate this point:

@huey.task()
def slow(n):
    time.sleep(n)
    return n

Calling the slow() task will return immediately. We can, however, block until the task finishes by waiting for the result:

>>> res = slow(10)  # Returns immediately.
>>> res(blocking=True)  # Block until task finishes, ~10s.
10

Note

The return value of any calls to the decorated function depends on whether the Huey instance is configured to store the results of tasks (results=True is the default). When the result store is disabled, calling a task-decorated function will return None instead of a result handle.

In some cases, it may be useful to receive the Task instance itself as an argument.

@huey.task(context=True)  # Include task as an argument.
def print_a_task_id(message, task=None):
    print('%s: %s' % (message, task.id))


print_a_task_id('hello')
print_a_task_id('goodbye')

This would cause the consumer would print something like:

hello: e724a743-e63e-4400-ac07-78a2fa242b41
goodbye: 606f36fc-da0d-4069-b46c-c0d4ccff1df6

Note

When using other decorators on task functions, make sure that you understand when they will be evaluated. In the following example the decorator a will be evaluated in the calling process, while b will be evaluated in the worker process.

@a
@huey.task()
@b
def task():
    pass

For more information, see TaskWrapper, Task, and Result.

periodic_task(validate_datetime, retries=0, retry_delay=0, priority=None, context=False, name=None, expires=None, **kwargs)
Parameters:
  • validate_datetime (function) – function which accepts a datetime instance and returns whether the task should be executed at the given time.

  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait in-between retries.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • context (bool) – when the task is executed, include the Task instance as a parameter.

  • name (str) – name for this task. If not provided, Huey will default to using the module name plus function name.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • kwargs – arbitrary key/value arguments that are passed to the TaskWrapper instance.

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

The periodic_task() decorator marks a function for automatic execution by the consumer at a specific interval, like cron.

The validate_datetime parameter is a function which accepts a datetime object and returns a boolean value whether or not the decorated function should execute at that time or not. The consumer will send a datetime to the function once per minute, giving it the same granularity as the cron.

For simplicity, there is a special function crontab(), which can be used to quickly specify intervals at which a function should execute. It is described below.

Here is an example of how you might use the periodic_task decorator and the crontab`() helper. The following task will be executed every three hours, on the hour:

from huey import crontab
from huey import RedisHuey

huey = RedisHuey()

@huey.periodic_task(crontab(minute='0', hour='*/3'))
def update_feeds():
    for feed in my_list_of_feeds:
        fetch_feed_data(feed)

Note

Because functions decorated with periodic_task are meant to be executed at intervals in isolation, they should not take any required parameters nor should they be expected to return a meaningful value.

Like task(), the periodic task decorator adds helpers to the decorated function. These helpers allow you to revoke() and restore() the periodic task, enabling you to pause it or prevent its execution. For more information, see TaskWrapper.

Note

The result (return value) of a periodic task is not stored in the result store. This is primarily due to the fact that there is not an obvious way one would read such results, since the invocation of the periodic task happens inside the consumer scheduler. As such, there is no task result handle which the user could use to read the result. To store the results of periodic tasks, you will need to use your own storage or use the storage APIs directly:

@huey.periodic_task(crontab(minute='*/10'))
def my_task():
    # do some work...
    do_something()

    # Manually store some data in the result store.
    huey.put('my-task', some_data_to_store)

More info:

context_task(obj, retries=0, retry_delay=0, context=False, name=None, **kwargs)
Parameters:
  • obj – object that implements the context-manager APIs.

  • as_argument (bool) – pass the context-manager object into the decorated task as the first argument.

  • retries (int) – number of times to retry the function if an unhandled exception occurs when it is executed.

  • retry_delay (int) – number of seconds to wait in-between retries.

  • context (bool) – when the task is executed, include the Task instance as a parameter.

  • name (str) – name for this task. If not provided, Huey will default to using the module name plus function name.

  • kwargs – arbitrary key/value arguments that are passed to the TaskWrapper instance.

Returns:

a TaskWrapper that wraps the decorated function and exposes a number of APIs for enqueueing the task.

This is an extended implementation of the Huey.task() decorator, which wraps the decorated task in a with obj: block. Roughly equivalent to:

db = PostgresqlDatabase(...)

@huey.task()
def without_context_task(n):
    with db:
        do_something(n)

@huey.context_task(db)
def with_context_task(n):
    return do_something(n)
pre_execute(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual pre-execute function.

Decorator for registering a pre-execute hook. The callback will be executed before the execution of every task. Execution of the task can be cancelled by raising a CancelExecution exception. Uncaught exceptions will be logged but will not cause the task itself to be cancelled.

The callback function should accept a single task instance, the return value is ignored.

Hooks are executed in the order in which they are registered.

Usage:

@huey.pre_execute()
def my_pre_execute_hook(task):
    if datetime.datetime.now().weekday() == 6:
        raise CancelExecution('Sunday -- no work will be done.')
unregister_pre_execute(name_or_fn)
Parameters:

name_or_fn – the name given to the pre-execute hook, or the function object itself.

Returns:

boolean

Unregister the specified pre-execute hook.

post_execute(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual post-execute function.

Register a post-execute hook. The callback will be executed after the execution of every task. Uncaught exceptions will be logged but will have no other effect on the overall operation of the consumer.

The callback function should accept:

  • a Task instance

  • the return value from the execution of the task (which may be None)

  • any exception that was raised during the execution of the task (which will be None for tasks that executed normally).

The return value of the callback itself is ignored.

Hooks are executed in the order in which they are registered.

Usage:

@huey.post_execute()
def my_post_execute_hook(task, task_value, exc):
    do_something()
unregister_post_execute(name_or_fn)
Parameters:

name_or_fn – the name given to the post-execute hook, or the function object itself.

Returns:

boolean

Unregister the specified post-execute hook.

on_startup(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual on-startup function.

Register a startup hook. The callback will be executed whenever a worker comes online. Uncaught exceptions will be logged but will have no other effect on the overall operation of the worker.

The callback function must not accept any parameters.

This API is provided to simplify setting up shared resources that, for whatever reason, should not be created as import-time side-effects. For example, your tasks need to write data into a Postgres database. If you create the connection at import-time, before the worker processes are spawned, you’ll likely run into errors when attempting to use the connection from the child (worker) processes. To avoid this problem, you can register a startup hook which executes once when the worker starts up.

Usage:

db_connection = None

@huey.on_startup()
def setup_db_connection():
    global db_connection
    db_connection = psycopg2.connect(database='my_db')

@huey.task()
def write_data(rows):
    cursor = db_connection.cursor()
    # ...
unregister_on_startup(name_or_fn)
Parameters:

name_or_fn – the name given to the on-startup hook, or the function object itself.

Returns:

boolean

Unregister the specified on-startup hook.

on_shutdown(name=None)
Parameters:

name – (optional) name for the hook.

Returns:

a decorator used to wrap the actual on-shutdown function.

Register a shutdown hook. The callback will be executed by a worker immediately before it goes offline. Uncaught exceptions will be logged but will have no other effect on the overall shutdown of the worker.

The callback function must not accept any parameters.

This API is provided to simplify cleaning-up shared resources.

unregister_on_shutdown(name_or_fn)
Parameters:

name_or_fn – the name given to the on-shutdown hook, or the function object itself.

Returns:

boolean

Unregister the specified on-shutdown hook.

signal(*signals)
Parameters:

signals – zero or more signals to handle.

Returns:

a decorator used to wrap the actual signal handler.

Attach a signal handler callback, which will be executed when the specified signals are sent by the consumer. If no signals are listed, then the handler will be bound to all signals. The list of signals and additional information can be found in the Signals documentation.

Example:

from huey.signals import SIGNAL_ERROR, SIGNAL_LOCKED

@huey.signal(SIGNAL_ERROR, SIGNAL_LOCKED)
def task_not_run_handler(signal, task, exc=None):
    # Do something in response to the "ERROR" or "LOCEKD" signals.
    # Note that the "ERROR" signal includes a third parameter,
    # which is the unhandled exception that was raised by the task.
    # Since this parameter is not sent with the "LOCKED" signal, we
    # provide a default of ``exc=None``.
    pass
disconnect_signal(receiver, *signals)
Parameters:
  • receiver – the signal handling function to disconnect.

  • signals – zero or more signals to stop handling.

Disconnect the signal handler from the provided signals. If no signals are provided, then the handler is disconnected from any signals it may have been connected to.

enqueue(task)
Parameters:

task (Task) – task instance to enqueue.

Returns:

Result handle for the given task.

Enqueue the given task. When the result store is enabled (default), the return value will be a Result handle which provides access to the result of the task execution (as well as other things).

If the task specifies another task to run on completion (see Task.then()), the return value will be a ResultGroup, which encapsulates a list of individual Result instances for the given pipeline.

Note

Unless you are executing a pipeline of tasks, it should not be necessary to use the enqueue() method directly. Calling (or scheduling) a task-decorated function will automatically enqueue a task for execution.

When you create a task pipeline, however, it is necessary to enqueue the pipeline once it has been set up.

revoke(task, revoke_until=None, revoke_once=False)

See also

Use Result.revoke() instead.

revoke_by_id(task_id, revoke_until=None, revoke_once=False)
Parameters:
  • task_id (str) – task instance id.

  • revoke_until (datetime) – optional expiration date for revocation.

  • revoke_once (bool) – revoke once and then re-enable.

Revoke a Task instance using the task id.

revoke_all(task_class, revoke_until=None, revoke_once=False)

See also

Use TaskWrapper.revoke() instead.

restore(task)

See also

Use Result.restore() instead.

restore_by_id(task_id)
Parameters:

task_id (str) – task instance id.

Returns:

boolean indicating success.

Restore a Task instance using the task id. Returns boolean indicating if the revocation was successfully removed.

restore_all(task_class)

See also

Use TaskWrapper.restore() instead.

is_revoked(task, timestamp=None)
Parameters:

task – either a task instance, a task ID, a Result, or a Task class.

This method should rarely need to be called directly. Typically you should rather use the is_revoked method on the object that is being revoked, for example:

@huey.task()
def greet(name):
    return 'Hello %s' % name

r = greet.schedule(delay=60, args=('Huey',))
r.revoke()  # Revoke this task.
r.is_revoked()  # True.

greet.revoke()  # Revoke ALL invocations of this task.
greet.is_revoked()  # True.

See also

For task instances, use Result.is_revoked().

For task functions, use TaskWrapper.is_revoked().

result(task_id, blocking=False, timeout=None, backoff=1.15, max_delay=1.0, revoke_on_timeout=False, preserve=False)
Parameters:
  • task_id – the task’s unique identifier.

  • blocking (bool) – whether to block while waiting for task result

  • timeout – number of seconds to block (if blocking=True)

  • backoff – amount to backoff delay each iteration of loop

  • max_delay – maximum amount of time to wait between iterations when attempting to fetch result.

  • revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.

  • preserve (bool) – when set to True, this parameter ensures that the task result will be preserved after having been successfully retrieved. Ordinarily, Huey will discard results after they have been read, to prevent the result store from growing without bounds.

Attempts to retrieve the return value of a task. By default, result() will simply check for the value, returning None if it is not ready yet. If you want to wait for the result, specify blocking=True. This will loop, backing off up to the provided max_delay, until the value is ready or the timeout is reached. If the timeout is reached before the result is ready, a HueyException will be raised.

See also

Result - the result() method is simply a wrapper that creates a Result object and calls its get() method.

Note

If the task failed with an exception, then a TaskException that wraps the original exception will be raised.

Warning

By default the result store will delete a task’s return value after the value has been successfully read (by a successful call to the result() or Result.get() methods). If you intend to access the task result multiple times, you must specify preserve=True when calling these methods.

lock_task(lock_name)
Parameters:

lock_name (str) – Name to use for the lock.

Returns:

TaskLock instance, which can be used as a decorator or context-manager.

Utilize the Storage key/value APIs to implement simple locking.

This lock is designed to be used to prevent multiple invocations of a task from running concurrently. Can be used as either a context-manager within the task, or as a task decorator. If using as a decorator, place it directly above the function declaration.

If a second invocation occurs and the lock cannot be acquired, then a TaskLockedException is raised, which is handled by the consumer. The task will not be executed and a SIGNAL_LOCKED will be sent. If the task is configured to be retried, then it will be retried normally.

Examples:

@huey.periodic_task(crontab(minute='*/5'))
@huey.lock_task('reports-lock')  # Goes *after* the task decorator.
def generate_report():
    # If a report takes longer than 5 minutes to generate, we do
    # not want to kick off another until the previous invocation
    # has finished.
    run_report()

@huey.periodic_task(crontab(minute='0'))
def backup():
    # Generate backup of code
    do_code_backup()

    # Generate database backup. Since this may take longer than an
    # hour, we want to ensure that it is not run concurrently.
    with huey.lock_task('db-backup'):
        do_db_backup()
is_locked(lock_name)
Parameters:

lock_name (str) – Name of lock to check.

Returns:

boolean value indicating whether lock is held or not.

flush_locks(*names)
Parameters:

names – additional lock-names to flush.

Returns:

set of lock names that were set and subsequently cleared.

Flush any locks that may be held. Top-level tasks or functions that use the lock_task() decorator will be registered as import-time side-effects, but it is possible that locks in nested scopes (e.g. a context-manager inside a task function) will not be registered. These undiscovered locks can be flushed by passing their lock-names explicitly.

put(key, value)
Parameters:
  • key – key for data

  • value – arbitrary data to store in result store.

Store a value in the result-store under the given key.

get(key, peek=False)
Parameters:
  • key – key to read

  • peek (bool) – non-destructive read

Read a value from the result-store at the given key. By default reads are destructive. To preserve the value for additional reads, specify peek=True.

pending(limit=None)
Parameters:

limit (int) – optionally limit the number of tasks returned.

Returns:

a list of Task instances waiting to be run.

scheduled(limit=None)
Parameters:

limit (int) – optionally limit the number of tasks returned.

Returns:

a list of Task instances that are scheduled to execute at some time in the future.

all_results()
Returns:

a dict of task-id to the serialized result data for all key/value pairs in the result store.

__len__()

Return the number of items currently in the queue.

class TaskWrapper(huey, func, retries=None, retry_delay=None, context=False, name=None, task_base=None, **settings)
Parameters:
  • huey (Huey) – A huey instance.

  • func – User function.

  • retries (int) – Upon failure, number of times to retry the task.

  • retry_delay (int) – Number of seconds to wait before retrying after a failure/exception.

  • context (bool) – when the task is executed, include the Task instance as a parameter.

  • name (str) – Name for task (will be determined based on task module and function name if not provided).

  • task_base – Base-class for task, defaults to Task.

  • settings – Arbitrary settings to pass to the task class constructor.

Wrapper around a user-defined function that converts function calls into tasks executed by the consumer. The wrapper, which decorates the function, replaces the function in the scope with a TaskWrapper instance.

The wrapper class, when called, will enqueue the requested function call for execution by the consumer.

Note

You should not need to create TaskWrapper instances directly. The Huey.task() and Huey.periodic_task() decorators will create the appropriate TaskWrapper automatically.

schedule(args=None, kwargs=None, eta=None, delay=None)
Parameters:
  • args (tuple) – arguments for decorated function.

  • kwargs (dict) – keyword arguments for decorated function.

  • eta (datetime) – the time at which the function should be executed.

  • delay (int) – number of seconds to wait before executing function.

Returns:

a Result handle for the task.

Use the schedule method to schedule the execution of the queue task for a given time in the future:

import datetime

one_hour = datetime.datetime.now() + datetime.timedelta(hours=1)

# Schedule the task to be run in an hour. It will be called with
# three arguments.
res = check_feeds.schedule(args=(url1, url2, url3), eta=one_hour)

# Equivalent, but uses delay rather than eta.
res = check_feeds.schedule(args=(url1, url2, url3), delay=3600)
revoke(revoke_until=None, revoke_once=False)
Parameters:
  • revoke_until (datetime) – Automatically restore the task after the given datetime.

  • revoke_once (bool) – Revoke the next execution of the task and then automatically restore.

Revoking a task will prevent any instance of the given task from executing. When no parameters are provided the function will not execute again until TaskWrapper.restore() is called.

This function can be called multiple times, but each call will supercede any restrictions from the previous revocation.

# Skip the next execution
send_emails.revoke(revoke_once=True)

# Prevent any invocation from executing.
send_emails.revoke()

# Prevent any invocation for 24 hours.
tomorrow = datetime.datetime.now() + datetime.timedelta(days=1)
send_emails.revoke(revoke_until=tomorrow)
is_revoked(timestamp=None)
Parameters:

timestamp (datetime) – If provided, checks whether task is revoked with respect to the given timestamp.

Returns:

bool indicating whether task is revoked.

Check whether the given task is revoked.

restore()
Returns:

bool indicating whether a previous revocation rule was found and removed successfully.

Removes a previous task revocation, if one was configured.

call_local()

Call the @task-decorated function, bypassing all Huey-specific logic. In other words, call_local() provides access to the underlying user-defined function.

>>> add.call_local(1, 2)
3
s(*args, **kwargs)
Parameters:
  • args – Arguments for task function.

  • kwargs – Keyword arguments for task function.

  • priority (int) – assign priority override to task, higher numbers are processed first by the consumer when there is a backlog.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

Returns:

a Task instance representing the execution of the task function with the given arguments.

Create a Task instance representing the invocation of the task function with the given arguments and keyword-arguments.

Note

The returned task instance is not enqueued automatically.

To illustrate the distinction, when you call a task()-decorated function, behind-the-scenes, Huey is doing something like this:

@huey.task()
def add(a, b):
    return a + b

result = add(1, 2)

# Is equivalent to:
task = add.s(1, 2)
result = huey.enqueue(task)

Typically, one will only use the TaskWrapper.s() helper when creating task execution pipelines.

For example:

add_task = add.s(1, 2)  # Represent task invocation.
pipeline = (add_task
            .then(add, 3)  # Call add() with previous result and 3.
            .then(add, 4)  # etc...
            .then(add, 5))

results = huey.enqueue(pipeline)

# Print results of above pipeline.
print(results.get(blocking=True))

# [3, 6, 10, 15]
map(it)
Parameters:

it – a list, tuple or generic iterable that contains the arguments for a number of individual task executions.

Returns:

a ResultGroup encapsulating the individual Result handlers for the task executions.

Note

The iterable should be a list of argument tuples which will be passed to the task function.

Example:

@huey.task()
def add(a, b):
    return a + b

rg = add.map([(i, i * i) for i in range(10)])

# Resolve all results.
rg.get(blocking=True)

# [0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
class Task(args=None, kwargs=None, id=None, eta=None, retries=None, retry_delay=None, expires=None, on_complete=None, on_error=None)
Parameters:
  • args (tuple) – arguments for the function call.

  • kwargs (dict) – keyword arguments for the function call.

  • id (str) – unique id, defaults to a UUID if not provided.

  • eta (datetime) – time at which task should be executed.

  • retries (int) – automatic retry attempts.

  • retry_delay (int) – seconds to wait before retrying a failed task.

  • priority (int) – priority assigned to task, higher numbers are processed first by the consumer when there is a backlog.

  • expires – set expiration time for task - if task is not run before expires, it will be discarded. The expires parameter can be either an integer (seconds), a timedelta, or a datetime. For relative expiration values, the expire time will be resolved when the task is enqueued.

  • on_complete (Task) – Task to execute upon completion of this task.

  • on_error (Task) – Task to execute upon failure / error.

The Task class represents the execution of a function. Instances of the task are serialized and enqueued for execution by the consumer, which deserializes and executes the task function.

Note

You should not need to create instances of Task directly, but instead use either the Huey.task() decorator or the TaskWrapper.s() method.

Here’s a refresher on how tasks work:

@huey.task()
def add(a, b):
    return a + b

ret = add(1, 2)
print(ret.get(blocking=True))  # "3".

# The above two lines are equivalent to:
task_instance = add.s(1, 2)  # Create a Task instance.
ret = huey.enqueue(task_instance)  # Enqueue the queue task.
print(ret.get(blocking=True))  # "3".
then(task, *args, **kwargs)
Parameters:
  • task (TaskWrapper) – A task()-decorated function.

  • args – Arguments to pass to the task.

  • kwargs – Keyword arguments to pass to the task.

Returns:

The parent task.

The then() method is used to create task pipelines. A pipeline is a lot like a unix pipe, such that the return value from the parent task is then passed (along with any parameters specified by args and kwargs) to the child task.

Here’s an example of chaining some addition operations:

add_task = add.s(1, 2)  # Represent task invocation.
pipeline = (add_task
            .then(add, 3)  # Call add() with previous result and 3.
            .then(add, 4)  # etc...
            .then(add, 5))

result_group = huey.enqueue(pipeline)

print(result_group.get(blocking=True))

# [3, 6, 10, 15]

If the value returned by the parent function is a tuple, then the tuple will be used to update the *args for the child function. Likewise, if the parent function returns a dict, then the dict will be used to update the **kwargs for the child function.

Example of chaining fibonacci calculations:

@huey.task()
def fib(a, b=1):
    a, b = a + b, a
    return (a, b)  # returns tuple, which is passed as *args

pipe = (fib.s(1)
        .then(fib)
        .then(fib))
result_group = huey.enqueue(pipe)

print(result_group.get(blocking=True))
# [(2, 1), (3, 2), (5, 3)]
error(task, *args, **kwargs)
Parameters:
  • task (TaskWrapper) – A task()-decorated function.

  • args – Arguments to pass to the task.

  • kwargs – Keyword arguments to pass to the task.

Returns:

The parent task.

The error() method is similar to the then() method, which is used to construct a task pipeline, except the error() task will only be called in the event of an unhandled exception in the parent task.

crontab(month='*', day='*', day_of_week='*', hour='*', minute='*'[, strict=False])

Convert a “crontab”-style set of parameters into a test function that will return True when a given datetime matches the parameters set forth in the crontab.

Day-of-week uses 0=Sunday and 6=Saturday.

Acceptable inputs:

  • “*” = every distinct value

  • “*/n” = run every “n” times, i.e. hours=’*/4’ == 0, 4, 8, 12, 16, 20

  • “m-n” = run every time m..n

  • “m,n” = run on m and n

Parameters:

strict (bool) – cause crontab to raise a ValueError if an input does not match a supported input format.

Return type:

a test function that takes a datetime and returns a boolean

Note

It is currently not possible to run periodic tasks with an interval less than once per minute. If you need to run tasks more frequently, you can create a periodic task that runs once per minute, and from that task, schedule any number of sub-tasks to run after the desired delays.

class TaskLock(huey, name)

This class should not be instantiated directly, but is instead returned by Huey.lock_task(). This object implements a context-manager or decorator which can be used to ensure only one instance of the wrapped task is executed at a given time.

If the consumer executes a task and encounters the TaskLockedException, then the task will not be retried, an error will be logged by the consumer, and a SIGNAL_LOCKED signal will be emitted.

See Huey.lock_task() for example usage.

clear()

Helper method to manually clear the lock. This method is provided to allow the lock to be flushed in the event that the consumer process was killed while executing a task holding the lock.

Alternatively, at start-up time you can execute the consumer with the -f method which will flush all locks before beginning to execute tasks.

Result

class Result(huey, task)

Although you will probably never instantiate an Result object yourself, they are returned whenever you execute a task-decorated function, or schedule a task for execution. The Result object talks to the result store and is responsible for fetching results from tasks.

Once the consumer finishes executing a task, the return value is placed in the result store, allowing the original caller to retrieve it.

Getting results from tasks is very simple:

>>> @huey.task()
... def add(a, b):
...     return a + b
...

>>> res = add(1, 2)
>>> res  # what is "res" ?
<Result: task 6b6f36fc-da0d-4069-b46c-c0d4ccff1df6>

>>> res()  # Fetch the result of this task.
3

What happens when data isn’t available yet? Let’s assume the next call takes about a minute to calculate:

>>> res = add(100, 200)  # Imagine this is very slow.
>>> res.get()  # Data is not ready, so None is returned.

>>> res() is None  # We can omit ".get", it works the same way.
True

>>> res(blocking=True, timeout=5)  # Block for up to 5 seconds
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/tmp/huey/src/huey/huey/queue.py", line 46, in get
    raise HueyException
huey.exceptions.HueyException

>>> res(blocking=True)  # No timeout, will block until it gets data.
300

If the task failed with an exception, then a TaskException will be raised when reading the result value:

>>> @huey.task()
... def fails():
...     raise Exception('I failed')

>>> res = fails()
>>> res()  # raises a TaskException!
Traceback (most recent call last):
  File "<stdin>", line 1, in <module>
  File "/home/charles/tmp/huey/src/huey/huey/api.py", line 684, in get
    raise TaskException(result.metadata)
huey.exceptions.TaskException: Exception('I failed',)
id

Returns the unique id of the corresponding task.

get(blocking=False, timeout=None, backoff=1.15, max_delay=1.0, revoke_on_timeout=False, preserve=False)
Parameters:
  • blocking (bool) – whether to block while waiting for task result

  • timeout – number of seconds to block (if blocking=True)

  • backoff – amount to backoff delay each iteration of loop

  • max_delay – maximum amount of time to wait between iterations when attempting to fetch result.

  • revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.

Attempt to retrieve the return value of a task. By default, get() will simply check for the value, returning None if it is not ready yet. If you want to wait for a value, you can specify blocking=True. This will loop, backing off up to the provided max_delay, until the value is ready or the timeout is reached. If the timeout is reached before the result is ready, a HueyException exception will be raised.

Note

Instead of calling .get(), you can simply call the Result object directly. Both methods accept the same arguments.

__call__(**kwargs)

Identical to the get() method, provided as a shortcut.

revoke(revoke_once=True)
Parameters:

revoke_once (bool) – revoke only once.

Revoke the given task. Unless it is in the process of executing, the task will be discarded without being executed.

one_hour = datetime.datetime.now() + datetime.timedelta(hours=1)

# Run this command in an hour
res = add.schedule((1, 2), eta=one_hour)

# I changed my mind, do not run it after all.
res.revoke()
restore()

Restore the given task instance. Unless the task instance has already been dequeued and discarded, it will be restored and run as scheduled.

Warning

If the task class itself has been revoked, via a call to TaskWrapper.revoke(), then this method has no effect.

is_revoked()

Return a boolean value indicating whether this particular task instance or the task class itself has been revoked.

reschedule(eta=None, delay=None, expires=None)
Parameters:
  • eta (datetime) – execute function at the given time.

  • delay (int) – execute function after specified delay in seconds.

  • expires – set expiration time for task. If not provided, then the task’s original expire time (if any) will be used.

Returns:

Result handle for the new task.

Reschedule the given task. The original task instance will be revoked, but no checks are made to verify that it hasn’t already been executed.

If neither an eta nor a delay is specified, the task will be run as soon as it is received by a worker.

reset()

Reset the cached result and allow re-fetching a new result for the given task (i.e. after a task error and subsequent retry).

class ResultGroup

A ResultGroup will be returned when you enqueue a task pipeline or if you use the TaskWrapper.map() method. It is a simple wrapper around a number of individual Result() instances, and provides a convenience API for fetching the results in bulk.

get(**kwargs)

Call get() on each individual Result() instance in the group and returns a list of return values. Any keyword arguments are passed along.

Serializer

class Serializer(compression=False, compression_level=6, use_zlib=False)
Parameters:
  • compression (bool) – use gzip compression

  • compression_level (int) – 0 for least, 9 for most.

  • use_zlib (bool) – use zlib for compression instead of gzip.

The Serializer class implements a simple interface that can be extended to provide your own serialization format. The default implementation uses pickle.

To override, the following methods should be implemented. Compression is handled transparently elsewhere in the API.

_serialize(data)
Parameters:

data – arbitrary Python object to serialize.

Rtype bytes:

_deserialize(data)
Parameters:

data (bytes) – serialized data.

Returns:

the deserialized object.

Exceptions

class HueyException

General exception class.

class ConfigurationError

Raised when Huey encounters a configuration problem.

class TaskLockedException

Raised by the consumer when a task lock cannot be acquired.

class CancelExecution

Cancel the execution of a task. Can be raised either within a pre_execute() hook, or within a task()-decorated function.

When raised from within a pre_execute() hook, this exception signals to the consumer that the task shall be cancelled and not run.

When raised in the body of a task()-decorated function, this exception accepts a boolean retry parameter (default is False). If retry=False then the task will not be retried, even if it has 1 or more retries remaining. Similarly, if retry=True then the task will be retried regardless.

class RetryTask(msg=None, delay=None, eta=None)

Raised by user code from within a task() function to force a retry. When this exception is raised, the task will be retried irrespective of whether it is configured with automatic retries.

If delay or eta is specified, then any retry_delay set on the task will be overridden and the value specified will be used to determine when the task will be retried next.

class TaskException

General exception raised by Result handles when reading the result of a task that failed due to an error.

Storage

Huey comes with several built-in storage implementations:

class RedisStorage(name='huey', blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, **connection_params)
Parameters:
  • blocking (bool) – Use blocking-pop when reading from the queue (as opposed to polling). Default is true.

  • read_timeout – Timeout to use when performing a blocking pop, default is 1 second.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • client_name – name used to identify Redis clients used by Huey.

Additional keyword arguments will be passed directly to the Redis client constructor. See the redis-py documentation for the complete list of arguments supported by the Redis client.

class RedisExpireStorage(name='huey', expire_time=86400, blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, **connection_params)
Parameters:

expire_time (int) – TTL for results of individual tasks.

Subclass of RedisStorage that implements the result store APIs using normal Redis keys with a TTL, so that unread results will automatically be cleaned-up. RedisStorage uses a HASH for the result store, which has the benefit of keeping the Redis keyspace orderly, but which comes with the downside that unread task results can build up over time. This storage implementation trades keyspace sprawl for automatic clean-up.

class PriorityRedisStorage(name='huey', blocking=True, read_timeout=1, connection_pool=None, url=None, client_name=None, **connection_params)
Parameters:
  • blocking (bool) – Use blocking-zpopmin when reading from the queue (as opposed to polling). Default is true.

  • read_timeout – Timeout to use when performing a blocking pop, default is 1 second.

  • connection_pool – a redis-py ConnectionPool instance.

  • url – url for Redis connection.

  • client_name – name used to identify Redis clients used by Huey.

Redis storage that uses a different data-structure for the task queue in order to support task priorities.

Additional keyword arguments will be passed directly to the Redis client constructor. See the redis-py documentation for the complete list of arguments supported by the Redis client.

Warning

This storage engine requires Redis 5.0 or newer.

class PriorityRedisExpireStorage(name='huey', expire_time=86400, ...)
Parameters:

expire_time (int) – TTL for results of individual tasks.

Combination of PriorityRedisStorage, which supports task priorities, and RedisExpireStorage, which stores task results as top-level Redis keys in order set a TTL so that unread results are automatically cleaned-up.

class SqliteStorage(filename='huey.db', name='huey', cache_mb=8, fsync=False, timeout=5, strict_fifo=False, **kwargs)
Parameters:
  • filename (str) – sqlite database filename.

  • cache_mb (int) – sqlite page-cache size in megabytes.

  • fsync (bool) – if enabled, all writes to the Sqlite database will be synchonized. This provides greater safety from database corruption in the event of sudden power-loss.

  • journal_mode (str) – sqlite journaling mode to use. Defaults to using write-ahead logging, which enables readers to coexist with a single writer.

  • timeout (int) – busy timeout (in seconds), amount of time to wait to acquire the write lock when another thread / connection holds it.

  • strict_fifo (bool) – ensure that the task queue behaves as a strict FIFO. By default, Sqlite may reuse rowids for deleted tasks, which can cause tasks to be run in a different order than the order in which they were enqueued.

  • kwargs – Additional keyword arguments passed to the sqlite3 connection constructor.

class FileStorage(name, path, levels=2, use_thread_lock=False)
Parameters:
  • name (str) – (unused by the file storage API)

  • path (str) – directory path used to store task results. Will be created if it does not exist.

  • levels (int) – number of levels in cache-file directory structure to ensure a given directory does not contain an unmanageable number of files.

  • use_thread_lock (bool) – use the standard lib threading.Lock instead of a lockfile. Note: this should only be enabled when using the greenlet or thread consumer worker models.

The FileStorage implements a simple file-system storage layer. This storage class should not be used in high-throughput, highly-concurrent environments, as it utilizes exclusive locks around all file-system operations. This is done to prevent race-conditions when reading from the file-system.

class MemoryStorage

In-memory storage engine for use when testing or developing. Designed for use with immediate mode.

class BlackHoleStorage

Storage class that discards all data written to it, and thus always appears to be empty. Intended for testing only.

class BaseStorage(name='huey', **storage_kwargs)
enqueue(data, priority=None)
dequeue()
queue_size()
enqueued_items(limit=None)
flush_queue()
add_to_schedule(data, timestamp)
read_schedule(timestamp)
schedule_size()
scheduled_items(limit=None)
flush_schedule()
put_data(key, value)
peek_data(key)
pop_data(key)
put_if_empty(key, value)
has_data_for_key(key)
result_store_size()
result_items()
flush_results()