Recipes

This document contains practical patterns and solutions for common problems encountered when building applications with Huey. Each recipe includes a working example and discussion of the tradeoffs involved.

Key/Value Data Storage

The Huey result-store can be used directly if you need a convenient way to cache arbitrary key/value data:

@huey.task()
def calculate_something():
    # By default, the result store treats get() like a pop(), so in
    # order to preserve the data so it can be read again, we specify
    # the second argument, peek=True.
    prev_results = huey.get('calculate-something.result', peek=True)
    if prev_results is None:
        # No previous results found, start from the beginning.
        data = start_from_beginning()
    else:
        # Only calculate what has changed since last time.
        data = just_what_changed(prev_results)

    # We can store the updated data back in the result store.
    huey.put('calculate-something.result', data)
    return data

See Huey.get() and Huey.put() for additional details.

Exponential Backoff Retries

Huey supports retries and retry_delay, but does not support exponential backoff out-of-the-box. Exponential backoff is important when calling external services – without it, a fleet of workers retrying at the same interval can create a “thundering herd” that overwhelms a recovering service.

We can implement exponential backoff with a small decorator that leverages the context=True parameter to access and modify the task instance:

import functools

def exp_backoff_task(retries=10, retry_backoff=1.15):
    def deco(fn):
        @functools.wraps(fn)
        def inner(*args, **kwargs):
            task = kwargs.pop('task')
            try:
                return fn(*args, **kwargs)
            except Exception:
                task.retry_delay *= retry_backoff
                raise
        return huey.task(retries=retries, retry_delay=1, context=True)(inner)
    return deco

@exp_backoff_task(retries=5, retry_backoff=2)
def call_external_api(endpoint, payload):
    resp = requests.post(endpoint, json=payload)
    resp.raise_for_status()
    return resp.json()

If the consumer starts executing the task at 12:00:00, the retry schedule would look like:

  • 12:00:00 first call

  • 12:00:02 retry 1 (delay=1*2=2s)

  • 12:00:06 retry 2 (delay=2*2=4s)

  • 12:00:14 retry 3 (delay=4*2=8s)

  • 12:00:30 retry 4 (delay=8*2=16s)

  • 12:01:02 retry 5 (delay=16*2=32s)

Note

The retry_delay is modified on the task instance itself, so the change persists across retries. The initial retry_delay=1 is set when the task is registered, and gets multiplied by retry_backoff on each failure.

Idempotent Tasks Using Deterministic IDs

By default, every task invocation gets a random UUID. If the same logical task is enqueued multiple times (e.g., from a webhook handler that might be called more than once), you may end up with duplicate work. You can prevent this by assigning a deterministic task ID.

The schedule() method accepts an id parameter:

@huey.task()
def sync_user(user_id):
    user = User.get(user_id)
    external_service.sync(user.to_dict())

# Only one sync per user will be enqueued at a time.  If the task is
# already in the queue, enqueuing a second one with the same ID will
# simply overwrite the result-store entry for that ID.
sync_user.schedule(
    args=(user_id,),
    delay=5,
    id='sync-user-%s' % user_id)

Warning

Huey does not de-duplicate by ID at the queue level – both messages will exist in the queue. However, the result store will only track one result per ID. If true at-most-once delivery matters, combine a deterministic ID with the task deduplication recipe below.

Progress Tracking via Key/Value Storage

Long-running tasks often need to report progress to the calling application. Huey’s key/value storage (Huey.put() / Huey.get()) is an easy way to accomplish this:

@huey.task(context=True)
def process_large_file(filepath, task=None):
    lines = open(filepath).readlines()
    total = len(lines)
    results = []

    for i, line in enumerate(lines):
        results.append(transform(line))
        if i % 100 == 0:
            huey.put('progress:%s' % task.id, {
                'current': i,
                'total': total,
                'pct': int(100 * i / total),
            })

    huey.put('progress:%s' % task.id, {
        'current': total,
        'total': total,
        'pct': 100,
    })
    return results

On the application side, you can poll for progress:

result = process_large_file('/data/big.csv')

# Poll for progress.  Use peek=True to read without deleting.
progress = huey.get('progress:%s' % result.id, peek=True)
if progress:
    print('%d%% complete' % progress['pct'])

Note

By default Huey.get() is destructive (it deletes the value after reading). Pass peek=True to read the value without removing it. Remember to clean up the progress key when you are finished: huey.delete('progress:%s' % task_id).

Custom Error Metadata

When a task fails, Huey stores an error result containing the exception representation, traceback, retry count, and task ID. You can enrich this by subclassing your Huey instance and overriding build_error_result():

from huey import RedisHuey

class MyHuey(RedisHuey):
    def build_error_result(self, task, exception):
        err = super().build_error_result(task, exception)
        # Add the task name and arguments for easier debugging.
        err['task_name'] = task.name
        err['task_args'] = task.args
        err['task_kwargs'] = task.kwargs
        return err

huey = MyHuey('my-app')

Now when you catch a TaskException, the metadata dict contains your custom fields:

result = failing_task('some-arg')
try:
    result.get(blocking=True, timeout=10)
except TaskException as exc:
    print(exc.metadata['task_name'])    # 'failing_task'
    print(exc.metadata['task_args'])    # ('some-arg',)
    print(exc.metadata['traceback'])    # Full traceback string.

Task Deduplication

You can use the key/value storage and a pre_execute() hook to skip a task if an identical one is already running:

@huey.pre_execute()
def deduplicate(task):
    # Build a dedup key from the task name and arguments.
    dedup_key = 'dedup:%s:%s' % (task.name, hash(task.data))
    if not huey.put_if_empty(dedup_key, '1'):
        raise CancelExecution('Duplicate task, skipping')

@huey.post_execute()
def clear_dedup(task, task_value, exc):
    dedup_key = 'dedup:%s:%s' % (task.name, hash(task.data))
    huey.delete(dedup_key)

Huey.put_if_empty() is atomic – it stores the value only if the key does not already exist, and returns False if the key was already present. The post-execute hook clears the key so that the same arguments can be processed again in the future.

Warning

hash() is randomized across Python processes by default (due to hash randomization). For cross-process deduplication, use a deterministic hash such as hashlib.md5.

Graceful Shutdown and Re-enqueueing Interrupted Tasks

When the consumer is shut down with SIGTERM, any tasks that are mid- execution are interrupted. By default, these tasks are lost. You can use the SIGNAL_INTERRUPTED signal to re-enqueue them:

from huey.signals import SIGNAL_INTERRUPTED

@huey.signal(SIGNAL_INTERRUPTED)
def on_interrupted(signal, task, *args, **kwargs):
    # The consumer was killed before this task finished.
    # Re-enqueue it so it will be picked up again.
    huey.enqueue(task)

This is especially important in production deployments. When using supervisord, set stopwaitsecs to give the consumer time for a graceful shutdown (SIGINT) before supervisor sends SIGTERM:

[program:my_huey]
command=/path/to/venv/bin/huey_consumer my_app.huey -w 4
stopwaitsecs=30
stopsignal=INT

With this configuration, supervisor sends SIGINT first, waits 30 seconds for a graceful shutdown, and only sends SIGTERM if the consumer is still running.

Monitoring Queue Depth

Huey provides several introspection methods that are useful for building a monitoring endpoint or health check:

def get_queue_stats():
    return {
        'pending': huey.pending_count(),
        'scheduled': huey.scheduled_count(),
        'results': huey.result_count(),
    }

If you are using a web framework, you can expose this as an endpoint:

# Flask example.
@app.route('/huey/health/')
def huey_health():
    stats = get_queue_stats()
    return jsonify(stats)

You can also inspect the actual tasks in the queue:

# List all pending tasks (deserializes each one, can be expensive).
for task in huey.pending():
    print(task.name, task.id, task.args)

# List all scheduled tasks.
for task in huey.scheduled():
    print(task.name, task.id, task.eta)

Note

Huey.pending() and Huey.scheduled() deserialize every task in the queue, which can be slow if the queue is very large. Use Huey.pending_count() and Huey.scheduled_count() when you only need the count.

Using Signals for Task Metrics

Signals can be used to record execution metrics for every task. This recipe shows how to time task execution and record it to a metrics system:

import time
from huey.signals import SIGNAL_EXECUTING, SIGNAL_COMPLETE, SIGNAL_ERROR

_task_start_times = {}

@huey.signal(SIGNAL_EXECUTING)
def on_executing(signal, task):
    _task_start_times[task.id] = time.monotonic()

@huey.signal(SIGNAL_COMPLETE, SIGNAL_ERROR)
def on_finished(signal, task, exc=None):
    start = _task_start_times.pop(task.id, None)
    if start is not None:
        duration = time.monotonic() - start
        # Record to your metrics system (statsd, prometheus, etc).
        metrics.timing('huey.task.duration', duration, tags={
            'task': task.name,
            'status': 'error' if signal == 'error' else 'ok',
        })

Warning

Signal handlers are executed synchronously by the consumer worker. Keep them fast – a slow signal handler blocks the worker from picking up the next task. If your metrics client performs network I/O, consider buffering writes or using an async client.

Also note that _task_start_times is a plain dict. This works correctly with thread and greenlet workers (shared memory), but with process workers each process has its own dict, which is still correct since a given task runs in a single process.

Signed Serializer for Untrusted Environments

By default Huey uses pickle to serialize tasks and results. If your Redis instance is shared or network-exposed, a malicious actor could inject a crafted pickle payload. The SignedSerializer adds an HMAC signature to every message, so tampered data is rejected:

from huey import RedisHuey
from huey.serializer import SignedSerializer

huey = RedisHuey(
    'my-app',
    serializer=SignedSerializer(secret='my-secret-key'))

The secret must be the same for both the application process and the consumer. If a message has been tampered with, deserialization will raise a ValueError.

Note

The signed serializer does not encrypt the data – it only detects tampering. The task arguments are still visible in Redis. If you need encryption, you can subclass Serializer and implement your own _serialize/_deserialize methods using a library like cryptography.

Dynamic Fan-Out

Chord members must be known at enqueue time. When the set of sub-tasks depends on a runtime value (e.g., paginated API results), enqueue the chord from inside a task:

@huey.task()
def fetch_page(url):
    return requests.get(url).json()

@huey.task()
def aggregate(results):
    combined = {}
    for page_data in results:
        combined.update(page_data)
    return combined

@huey.task()
def discover_and_fetch(base_url):
    # Discover the pages to fetch at runtime.
    index = requests.get(base_url).json()
    urls = [item['url'] for item in index['items']]

    result = huey.enqueue(
        chord([fetch_page.s(u) for u in urls], aggregate.s()))

    # Optionally, store the callback task ID so the caller can track it.
    huey.put('fanout-result-id', result.callback.id)

Run Arbitrary Functions as Tasks

Instead of explicitly declaring all tasks up-front, you can write a general- purpose task that accepts a dotted import path and calls any function:

from importlib import import_module

@huey.task()
def path_task(path, *args, **kwargs):
    module_path, name = path.rsplit('.', 1)
    mod = import_module(module_path)
    return getattr(mod, name)(*args, **kwargs)

# Usage: runs myapp.utils.reindex('products') in the consumer.
path_task('myapp.utils.reindex', 'products')

Warning

This pattern is powerful but should be used with care. The function must be importable by the consumer process, and the arguments must be picklable. Avoid exposing this to untrusted input.

Dynamic periodic tasks

To create periodic tasks dynamically we need to register them so that they are added to the in-memory schedule managed by the consumer’s scheduler thread. Since this registry is in-memory, any dynamically defined tasks must be registered within the process that will ultimately schedule them: the consumer.

Warning

The following example will not work with the process worker-type option, since there is currently no way to interact with the scheduler process. When threads or greenlets are used, the worker threads share the same in-memory schedule as the scheduler thread, allowing modification to take place.

Example:

def dynamic_ptask(message):
    print('dynamically-created periodic task: "%s"' % message)

@huey.task()
def schedule_message(message, cron_minutes, cron_hours='*'):
    # Create a new function that represents the application
    # of the "dynamic_ptask" with the provided message.
    def wrapper():
        dynamic_ptask(message)

    # The schedule that was specified for this task.
    schedule = crontab(cron_minutes, cron_hours)

    # Need to provide a unique name for the task. There are any number of
    # ways you can do this -- based on the arguments, etc. -- but for our
    # example we'll just use the time at which it was declared.
    task_name = 'dynamic_ptask_%s' % int(time.time())

    huey.periodic_task(schedule, name=task_name)(wrapper)

Assuming the consumer is running, we can now set up as many instances as we like of the “dynamic ptask” function:

>>> from demo import schedule_message
>>> schedule_message('I run every 5 minutes', '*/5')
<Result: task ...>
>>> schedule_message('I run between 0-15 and 30-45', '0-15,30-45')
<Result: task ...>

When the consumer executes the “schedule_message” tasks, our new periodic task will be registered and added to the schedule.

Multiple Queues

A huey application consists of a Huey instance (which is a named queue), tasks registered with that instance, and a consumer process that runs them. One instance is one queue, served by one consumer – and routing therefore happens at decoration time: whichever instance’s task() decorator you use is the queue the task runs on.

Before reaching for a second queue, consider whether task priorities solve your problem. If the goal is simply “urgent tasks should jump the line”, priorities give you that with one queue and no extra processes. Separate queues earn their keep when you want:

  • Different concurrency or worker-type per class of task – e.g. -k process for CPU-bound work and -k greenlet -w 50 for IO-bound work.

  • Isolation – a flood of cheap tasks must never be able to starve your critical tasks of workers.

  • Per-machine routing – certain tasks should only run on certain hosts.

To run multiple queues, declare multiple instances. A dedicated module keeps imports clean, and the instances can share a connection pool:

# myapp/queues.py
from huey import RedisHuey
from redis import ConnectionPool

pool = ConnectionPool(host='localhost', port=6379, max_connections=20)

emails = RedisHuey('myapp_emails', connection_pool=pool)
reports = RedisHuey('myapp_reports', connection_pool=pool)

Tasks are routed by the decorator used to declare them:

# myapp/tasks.py
from huey import crontab

from myapp.queues import emails, reports

@emails.task(retries=2)
def send_email(to, subject, body):
    ...

@reports.task()
def build_report(day):
    ...

@reports.periodic_task(crontab(minute='0', hour='3'))
def nightly_rollup():
    # Enqueued by the *reports* consumer's scheduler only.
    ...

Each queue gets its own consumer, tuned independently:

huey_consumer myapp.queues.emails  -w 8 -k greenlet
huey_consumer myapp.queues.reports -w 2 -k process

Running N consumers means supervising N processes. With systemd this is a single template unit, /etc/systemd/system/huey@.service, where %i expands to the attribute name in myapp.queues:

[Unit]
Description=huey consumer for %i

[Service]
WorkingDirectory=/srv/myapp
ExecStart=/srv/myapp/venv/bin/huey_consumer myapp.queues.%i -w 4
KillSignal=SIGINT
TimeoutStopSec=60
Restart=on-failure

[Install]
WantedBy=multi-user.target
systemctl enable --now huey@emails huey@reports

(See Deploying to Production for the full discussion of supervisor configuration.)

This works with any storage backend. Since the sqlite storage namespaces all of its tables by queue name, multiple instances can even share a single database file:

emails = SqliteHuey('emails', filename='/var/lib/myapp/huey.db')
reports = SqliteHuey('reports', filename='/var/lib/myapp/huey.db')

Pitfalls to be aware of:

  • Everything is per-instance: results, schedules, locks and revocations. A Result handle can only be read through the instance that produced it, and revoking a task on one queue has no effect on another.

  • Periodic tasks belong to the instance that declared them, and are enqueued by that instance’s consumer. No coordination is needed between consumers of different queues – the -n / --no-periodic dance is only required when running multiple consumers of the same queue (Multiple Consumers).

  • The queue name is the storage namespace. Two instances with the same name on the same storage are the same queue; instances sharing a sqlite file must use distinct names. Note that the Redis storage sanitizes the name, stripping all characters besides alphanumerics and underscores – so names must remain distinct after sanitization.

  • immediate mode is a per-instance setting – in tests, remember to flip it on every instance.

Django users wanting multiple queues with the settings.HUEY-style configuration should check out the third-party django-huey package.

High-Availability Redis: Sentinel, Valkey and Cluster

RedisHuey accepts a pre-configured connection_pool, which means huey works with Redis Sentinel out-of-the-box. The pool returned by master_for() re-discovers the current master automatically after a failover:

from redis.sentinel import Sentinel
from huey import RedisHuey

sentinel = Sentinel(
    [('10.0.0.1', 26379), ('10.0.0.2', 26379), ('10.0.0.3', 26379)],
    # IMPORTANT: the socket timeout must comfortably exceed the
    # consumer's blocking read timeout (default 1s). If it does not,
    # every blocking dequeue times out at the socket level and is
    # indistinguishable from an empty queue -- the consumer silently
    # degrades to polling, with no errors logged.
    socket_timeout=5.0)

huey = RedisHuey(
    'my-app',
    connection_pool=sentinel.master_for('my-master').connection_pool)

Always use master_for(). Huey reads and writes on every code-path, so a slave_for() pool will fail with ReadOnlyError. If your deployment requires authentication, pass password= for the data nodes and sentinel_kwargs={'password': '...'} for the sentinels themselves.

Django users configure this by assigning the instance directly (djhuey accepts a Huey instance as well as a settings dict):

# settings.py
from redis.sentinel import Sentinel
from huey import RedisHuey

sentinel = Sentinel([('10.0.0.1', 26379), ...], socket_timeout=5.0)
HUEY = RedisHuey(
    'my-app',
    connection_pool=sentinel.master_for('my-master').connection_pool)

What happens during a failover. The consumer is failover-tolerant by construction: when the master goes away, workers log the dequeue error and apply backoff, the scheduler does the same, and once Sentinel promotes a new master the pool reconnects to it automatically – no restart required. Two caveats to understand: Redis replication is asynchronous, so writes that land in the failover window (enqueues, results) can be lost when the old master’s unreplicated data is discarded; and dequeueing is destructive, so a task that has been handed to a worker exists only in that worker’s memory – if the worker dies mid-task it is gone, which is huey’s normal at-most-once behavior, not something Sentinel introduces. High-availability Redis does not make individual tasks durable; pair it with idempotent task design and the Graceful Shutdown and Re-enqueueing Interrupted Tasks recipe.

Valkey and Redict are wire-compatible with Redis and work with RedisHuey unchanged – simply point it at your valkey host (the redis-py client speaks to either). If you prefer the official valkey-glide client, huey ships ValkeyGlideHuey in huey.contrib.valkey_glide.

Redis Cluster. Huey’s Redis usage is single-key (the one Lua script was made cluster-safe in 2.4.2), but RedisHuey constructs a standard redis-py client and connection pool, and does not support redis-py’s RedisCluster client directly. For high availability, Sentinel is the supported topology.

Using Huey with Flask

Huey is framework-agnostic and needs no extension to work with Flask. There are only two questions to answer: where to declare the Huey instance, and how to give tasks access to the application context.

Declare the instance alongside (or before) the app, and import it from your tasks module:

# app.py
from flask import Flask
from huey import RedisHuey

app = Flask(__name__)
huey = RedisHuey('my-app')
# tasks.py
from app import app, huey

@huey.task()
def send_welcome_email(user_id):
    # Tasks run in the consumer process, outside any request. If the
    # task uses Flask extensions (database, mail, etc.), give it an
    # application context:
    with app.app_context():
        user = User.query.get(user_id)
        mail.send(make_welcome_message(user))

Views simply call the task to enqueue it:

# views.py
from app import app
from tasks import send_welcome_email

@app.route('/signup/', methods=['POST'])
def signup():
    user = create_user(request.form)
    send_welcome_email(user.id)
    return redirect(url_for('welcome'))

If most of your tasks need the application context, wrap the boilerplate in a small decorator:

import functools

def flask_task(*task_args, **task_kwargs):
    def decorator(fn):
        @functools.wraps(fn)
        def inner(*args, **kwargs):
            with app.app_context():
                return fn(*args, **kwargs)
        return huey.task(*task_args, **task_kwargs)(inner)
    return decorator

@flask_task(retries=2)
def send_welcome_email(user_id):
    ...

The consumer is pointed at an entry module that imports the app and all of the tasks (see Understanding how tasks are imported):

# main.py
from app import app, huey
import tasks  # Import tasks so they are registered with the instance.
huey_consumer main.huey -w 4

A complete, runnable application is available in examples/flask_ex.

Using Huey with FastAPI

Huey works well as the background task-queue for an async application like FastAPI: task functions themselves are regular (synchronous) functions executed by the consumer in a separate process, while the web app enqueues work and awaits results without blocking the event loop.

Declare the instance and tasks in their own module, as usual:

# tasks.py
from huey import RedisHuey

huey = RedisHuey('my-app')

@huey.task()
def generate_report(user_id):
    ...  # Heavy lifting happens in the consumer.
    return report_data

Enqueueing from an async request handler is fine as-is – it is a single, fast storage write:

# api.py
from fastapi import FastAPI
from huey.contrib.asyncio import aget_result

from tasks import generate_report, huey

app = FastAPI()

@app.post('/report/{user_id}')
async def begin_report(user_id: int):
    rh = generate_report(user_id)
    return {'task_id': rh.id}

@app.get('/report/status/{task_id}')
async def report_status(task_id: str):
    # Non-blocking, non-destructive read of the result store.
    value = huey.result(task_id, preserve=True)
    return {'ready': value is not None, 'value': value}

To hold the request open until the task finishes, await the result with the asyncio helpers – other requests continue to be served while this coroutine waits:

@app.post('/report/{user_id}/wait')
async def report_wait(user_id: int):
    rh = generate_report(user_id)
    value = await aget_result(rh)
    return {'value': value}

Notes:

  • The consumer runs separately, exactly as with any huey application: huey_consumer tasks.huey -w 4.

  • Task functions are plain def functions – huey does not execute async def tasks. IO-bound workloads can still get high concurrency in the consumer via -k greenlet.

  • If a task raised an exception, reading its result raises TaskException – handle it in the status endpoint if your tasks can fail.

  • huey.result(task_id) is destructive by default; preserve=True keeps the result so the status endpoint can be polled repeatedly.