Huey Extensions

The huey.contrib package contains modules that provide extra functionality beyond the core APIs.

Mini-Huey

MiniHuey provides a very lightweight huey-like API that may be useful for certain applications. The MiniHuey consumer runs inside a greenlet in your main application process. This means there is no separate consumer process to manage, nor is there any persistence for the enqueued/scheduled tasks; whenever a task is enqueued or is scheduled to run, a new greenlet is spawned to execute the task.

MiniHuey may be useful if:

  • Your application is a WSGI application.

  • Your tasks do stuff like check for spam, send email, make requests to web-based APIs, query a database server.

  • You do not need automatic retries, persistence for your message queue, dynamic task revocation.

  • You wish to keep things nice and simple and don’t want the overhead of additional process(es) to manage.

MiniHuey may be a bad choice if:

  • Your application is incompatible with gevent (e.g. uses asyncio).

  • Your tasks do stuff like process large files, crunch numbers, parse large XML or JSON documents, or other CPU or disk-intensive work.

  • You need a persistent store for messages and results, so the consumer can be restarted without losing any unprocessed messages.

If you are not sure, then you should probably not use MiniHuey. Use the regular Huey instead.

Usage and task declaration:

class MiniHuey([name='huey'[, interval=1[, pool_size=None]]])
Parameters:
  • name (str) – Name given to this huey instance.

  • interval (int) – How frequently to check for scheduled tasks (seconds).

  • pool_size (int) – Limit number of concurrent tasks to given size.

task([validate_func=None])

Task decorator similar to Huey.task() or Huey.periodic_task(). For tasks that should be scheduled automatically at regular intervals, simply provide a suitable crontab() definition.

The decorated task will gain a schedule() method which can be used like the TaskWrapper.schedule() method.

Examples task declarations:

from huey import crontab
from huey.contrib.mini import MiniHuey

huey = MiniHuey()

@huey.task()
def fetch_url(url):
    return urlopen(url).read()

@huey.task(crontab(minute='0', hour='4'))
def run_backup():
    pass

Example usage. Running tasks and getting results work about the same as regular Huey:

# Executes the task asynchronously in a new greenlet.
result = fetch_url('https://google.com/')

# Wait for the task to finish.
html = result.get()

Scheduling a task for execution:

# Fetch in ~30s.
result = fetch_url.schedule(('https://google.com',), delay=30)

# Wait until result is ready, takes ~30s.
html = result.get()
start()

Start the scheduler in a new green thread. The scheduler is needed if you plan to schedule tasks for execution using the schedule() method, or if you want to run periodic tasks.

Typically this method should be called when your application starts up. For example, a WSGI application might do something like:

# Always apply gevent monkey-patch before anything else!
from gevent import monkey; monkey.patch_all()

from my_app import app  # flask/bottle/whatever WSGI app.
from my_app import mini_huey

# Start the scheduler. Returns immediately.
mini_huey.start()

# Run the WSGI server.
from gevent.pywsgi import WSGIServer
WSGIServer(('127.0.0.1', 8000), app).serve_forever()
stop()

Stop the scheduler.

Note

There is not a separate decorator for periodic, or crontab, tasks. Just use MiniHuey.task() and pass in a validation function. A validation function can be generated using the crontab() function.

Note

Tasks enqueued for immediate execution will be run regardless of whether the scheduler is running. You only need to start the scheduler if you plan to schedule tasks in the future or run periodic tasks.

Django

Huey comes with special integration for use with the Django framework. The integration provides:

  1. Configuration of huey via the Django settings module.

  2. Running the consumer as a Django management command.

  3. Auto-discovery of tasks.py modules to simplify task importing.

  4. Properly manage database connections.

Supported Django versions are those officially supported at https://www.djangoproject.com/download/#supported-versions

Note

For multiple-queue support, check out gaiacoop/django-huey.

Setting things up

To use huey with Django, the first step is to add an entry to your project’s settings.INSTALLED_APPS:

# settings.py
# ...
INSTALLED_APPS = (
    # ...
    'huey.contrib.djhuey',  # Add this to the list.
    # ...
)

The above is the bare minimum needed to start using huey’s Django integration. If you like, though, you can also configure both Huey and the consumer using the settings module.

Note

Huey settings are optional. If not provided, Huey will default to using Redis running on localhost:6379 (standard setup).

Configuration is kept in settings.HUEY, which can be either a dictionary or a Huey instance. Here is an example that shows all of the supported options with their default values:

# settings.py
HUEY = {
    'huey_class': 'huey.RedisHuey',  # Huey implementation to use.
    'name': settings.DATABASES['default']['NAME'],  # Use db name for huey.
    'results': True,  # Store return values of tasks.
    'store_none': False,  # If a task returns None, do not save to results.
    'immediate': settings.DEBUG,  # If DEBUG=True, run synchronously.
    'utc': True,  # Use UTC for all times internally.
    'blocking': True,  # Perform blocking pop rather than poll Redis.
    'connection': {
        'host': 'localhost',
        'port': 6379,
        'db': 0,
        'connection_pool': None,  # Definitely you should use pooling!
        # ... tons of other options, see redis-py for details.

        # huey-specific connection parameters.
        'read_timeout': 1,  # If not polling (blocking pop), use timeout.
        'url': None,  # Allow Redis config via a DSN.
    },
    'consumer': {
        'workers': 1,
        'worker_type': 'thread',
        'initial_delay': 0.1,  # Smallest polling interval, same as -d.
        'backoff': 1.15,  # Exponential backoff using this rate, -b.
        'max_delay': 10.0,  # Max possible polling interval, -m.
        'scheduler_interval': 1,  # Check schedule every second, -s.
        'periodic': True,  # Enable crontab feature.
        'check_worker_health': True,  # Enable worker health checks.
        'health_check_interval': 1,  # Check worker health every second.
    },
}

The following huey_class implementations are provided out-of-the-box:

  • huey.RedisHuey - default.

  • huey.PriorityRedisHuey - uses Redis but adds support for Task priority. Requires redis server 5.0 or newer.

  • huey.RedisExpireHuey - Redis implementation that expires result keys automatically if results are not read.

  • huey.PriorityRedisExpireHuey - Redis implementation that expires result keys automatically if results are not read and supports priority.

  • huey.SqliteHuey - uses Sqlite, full support for task priorities. Accepts a filename parameter for the path to the database file.

  • huey.FileHuey - uses filesystem for storage. Accepts a path parameter for the base storage directory.

Alternatively, you can simply set settings.HUEY to a Huey instance and do your configuration directly. In the example below, I’ve also shown how you can create a connection pool:

# settings.py -- alternative configuration method
from huey import RedisHuey
from redis import ConnectionPool

pool = ConnectionPool(host='my.redis.host', port=6379, max_connections=20)
HUEY = RedisHuey('my-app', connection_pool=pool)

Running the Consumer

To run the consumer, use the run_huey management command. This command will automatically import any modules in your INSTALLED_APPS named tasks.py. The consumer can be configured using both the django settings module and/or by specifying options from the command-line.

Note

Options specified on the command line take precedence over those specified in the settings module.

To start the consumer, you simply run:

$ ./manage.py run_huey

In addition to the HUEY.consumer setting dictionary, the management command supports all the same options as the standalone consumer. These options are listed and described in the Options for the consumer section.

For quick reference, the most important command-line options are briefly listed here.

-w, --workers

Number of worker threads/processes/greenlets. Default is 1, but most applications should use at least 2.

-k, --worker-type

Worker type, must be “thread”, “process” or “greenlet”. The default is thread, which provides good all-around performance. For CPU-intensive workloads, process is likely to be more performant. The greenlet worker type is suited for IO-heavy workloads. When using greenlet you can specify tens or hundreds of workers since they are extremely lightweight compared to threads/processes. See note below on using gevent/greenlet.

-A, --disable-autoload

Disable automatic loading of tasks modules.

Note

Due to a conflict with Django’s base option list, the “verbose” option is set using -V or --huey-verbose. When enabled, huey logs at the DEBUG level.

For more information, read the Options for the consumer section.

Using gevent

When using worker type greenlet, it’s necessary to apply a monkey-patch before any libraries or system modules are imported. Gevent monkey-patches things like socket to provide non-blocking I/O, and if those modules are loaded before the patch is applied, then the resulting code will execute synchronously.

Unfortunately, because of Django’s design, the only way to reliably apply this patch is to create a custom bootstrap script that mimics the functionality of manage.py. Here is the patched manage.py code:

#!/usr/bin/env python
import os
import sys

# Apply monkey-patch if we are running the huey consumer.
if 'run_huey' in sys.argv:
    from gevent import monkey
    monkey.patch_all()

if __name__ == "__main__":
    os.environ.setdefault("DJANGO_SETTINGS_MODULE", "conf")
    from django.core.management import execute_from_command_line
    execute_from_command_line(sys.argv)

How to create tasks

The task() and periodic_task() decorators can be imported from the huey.contrib.djhuey module. Here is how you might define two tasks:

from huey import crontab
from huey.contrib.djhuey import periodic_task, task

@task()
def count_beans(number):
    print('-- counted %s beans --' % number)
    return 'Counted %s beans' % number

@periodic_task(crontab(minute='*/5'))
def every_five_mins():
    print('Every five minutes this will be printed by the consumer')

The huey.contrib.djhuey module exposes a number of additional helpers:

Tasks that execute queries

If you plan on executing queries inside your task, it is a good idea to close the connection once your task finishes. To make this easier, huey provides a special decorator to use in place of task and periodic_task which will automatically close the connection for you.

from huey import crontab
from huey.contrib.djhuey import db_periodic_task, db_task

@db_task()
def do_some_queries():
    # This task executes queries. Once the task finishes, the connection
    # will be closed.

@db_periodic_task(crontab(minute='*/5'))
def every_five_mins():
    # This is a periodic task that executes queries.

DEBUG and Synchronous Execution

When settings.DEBUG = True, and settings.HUEY is a dict that does not explicitly specify a value for immediate, tasks will be executed synchronously just like regular function calls. The purpose of this is to avoid running both Redis and an additional consumer process while developing or running tests. If you prefer to use a live storage engine when DEBUG is enabled, you can specify immediate_use_memory=False - which still runs Huey in immediate mode, but using a live storage API. To completely disable immediate mode when DEBUG is set, you can specify immediate=False in your settings.

# settings.py
HUEY = {
    'name': 'my-app',

    # To run Huey in "immediate" mode with a live storage API, specify
    # immediate_use_memory=False.
    'immediate_use_memory': False,

    # OR:
    # To run Huey in "live" mode regardless of whether DEBUG is enabled,
    # specify immediate=False.
    'immediate': False,
}

Getting the Huey Instance

If you want to interact with Huey APIs that are not exposed through djhuey explicitly, you can get the actual Huey instance in the following way:

from huey.contrib.djhuey import HUEY as huey

# E.g., get the underlying Storage instance.
storage = huey.storage

Configuration Examples

This section contains example HUEY configurations.

# Redis running locally with four worker threads.
HUEY = {
    'name': 'my-app',
    'consumer': {'workers': 4, 'worker_type': 'thread'},
}
# Redis on network host with 64 worker greenlets and connection pool
# supporting up to 100 connections.
from redis import ConnectionPool

pool = ConnectionPool(
    host='192.168.1.123',
    port=6379,
    max_connections=100)

HUEY = {
    'name': 'my-app',
    'connection': {'connection_pool': pool},
    'consumer': {'workers': 64, 'worker_type': 'greenlet'},
}

It is also possible to specify the connection using a Redis URL, making it easy to configure this setting using a single environment variable:

HUEY = {
    'name': 'my-app',
    'url': os.environ.get('REDIS_URL', 'redis://localhost:6379/?db=1')
}

Alternatively, you can just assign a Huey instance to the HUEY setting:

from huey import RedisHuey

HUEY = RedisHuey('my-app')

AsyncIO

While Huey does not provide first-class support for a full asyncio pipeline, in practice one of the most useful locations to be “async”-friendly is when blocking while waiting for a task result to be ready. When waiting for a task result, Huey must poll the storage backend to determine if the result is ready which means lots of opportunity for an asynchronous solution.

In order to simplify this, Huey provides two helpers for await-ing task results:

aget_result(result, backoff=1.15, max_delay=1.0, preserve=False)
Parameters:

result (Result) – a result handle returned when calling a task.

Returns:

task return value.

AsyncIO helper for awaiting the result of a task execution.

Example:

@huey.task()
def sleep(n):
    time.sleep(n)
    return n

async def main():
    # Single task, will finish in ~2 seconds (other coroutines can run
    # during this time!).
    rh = sleep(2)
    result = await aget_result(rh)

    # Awaiting multiple results. This will also finish in ~2 seconds.
    r1 = sleep(2)
    r2 = sleep(2)
    r3 = sleep(2)
    results = await asyncio.gather(
        aget_result(r1),
        aget_result(r2),
        aget_result(r3))
aget_result_group(rg, *args, **kwargs)
Parameters:

rg (ResultGroup) – a result-group handle for multiple tasks.

Returns:

return values for all tasks in the result group.

AsyncIO helper for awaiting the result of multiple task executions.

Example:

@huey.task()
def sleep(n):
    time.sleep(n)
    return n

async def main():
    # Spawn 3 "sleep" tasks, each sleeping for 2 seconds.
    rg = sleep.map([2, 2, 2])

    # Await the results. This will finish in ~2 seconds while also
    # allowing other coroutines to run.
    results = await aget_result_group(rg)