Guide
The purpose of this document is to present Huey using simple examples that cover the most common usage of the library. Detailed documentation can be found in the API documentation.
Example task() that adds two numbers:
# demo.py
from huey import SqliteHuey
huey = SqliteHuey(filename='/tmp/demo.db')
@huey.task()
def add(a, b):
return a + b
To test, run the consumer, specifying the import path to the huey object:
huey_consumer.py demo.huey
In a Python shell, we can call our add task:
>>> from demo import add
>>> r = add(1, 2)
>>> r()
3
If you try to resolve the result (r) before the task has been executed,
then r() will return None. You can avoid this by instructing the
result to block until the task has finished and a result is ready:
>>> r = add(1, 2)
>>> r(blocking=True, timeout=5) # Wait up to 5 seconds for result.
3
You can also check to see if a result is ready using Result.is_ready():
>>> r = add(1, 2)
>>> r.is_ready()
False
>>> r.is_ready() # A few moments later.
True
>>> r()
3
What happens when we call a task function?
When the
add()function is called, a message representing the call is placed in a queue.The function returns immediately without actually running, and returns a
Resulthandle, which can be used to retrieve the result once the task has been executed by the consumer.The consumer process sees that a message has arrived, and a worker will call the
add()function and place the return value into the result store.We can use the
Resulthandle to read the return value from the result store.
For more information, see the task() decorator documentation.
Scheduling tasks
Tasks can be scheduled to execute at a certain time, or after a delay.
In the following example, we will schedule a call to add() to run in 10
seconds, and then will block until the result becomes available:
>>> r = add.schedule((3, 4), delay=10)
>>> r(blocking=True) # Will block for ~10 seconds before returning.
7
If we wished to schedule the task to run at a particular time, we can use the
eta parameter instead. The following example will run after a 10 second
delay:
>>> eta = datetime.datetime.now() + datetime.timedelta(seconds=10)
>>> r = add.schedule((4, 5), eta=eta)
>>> r(blocking=True) # Will block for ~10 seconds.
9
What happens when we schedule a task?
When we call
schedule(), a message is placed on the queue instructing the consumer to call theadd()function in 10 seconds.The function returns immediately, and returns a
Resulthandle.The consumer process sees that a message has arrived, and will notice that the message is not yet ready to be executed, but should be run in ~10s.
The consumer adds the message to a schedule.
In ~10 seconds, the scheduler will pick-up the message and place it back into the queue for execution.
A worker will dequeue the message, execute the
add()function, and place the return value in the result store.The
Resulthandle from step 2 will now be able to read the return value from the task.
For more details, see the schedule() API documentation.
Periodic tasks
Huey provides crontab-like functionality that enables functions to be executed automatically on a given schedule.
In the following example, we will declare a periodic_task() that
executes every 3 minutes and prints a message on consumer process stdout:
from huey import SqliteHuey
from huey import crontab
huey = SqliteHuey(filename='/tmp/demo.db')
@huey.task()
def add(a, b):
return a + b
@huey.periodic_task(crontab(minute='*/3'))
def every_three_minutes():
print('This task runs every three minutes')
Once a minute, the scheduler will check to see if any of the periodic tasks should be called. If so, the task will be enqueued for execution.
Because periodic tasks are called independent of any user interaction, they do not accept any arguments.
Similarly, the return-value for periodic tasks is discarded, rather than
being put into the result store. This is because there is not an obvious
way for an application to obtain a Result handle to access the
result of a given periodic task execution.
The crontab() function accepts the following arguments, in the same
order as the standard Linux crontab format:
minute
hour
day
month
day_of_week (0=Sunday, 6=Saturday)
Acceptable inputs:
*- always true, e.g. ifhour='*', then the rule matches any hour.*/n- every n interval, e.g.minute='*/15'means every 15 minutes.m-n- run every timem..ninclusive.m,n- run on m and n.
Multiple rules can be expressed by separating the individual rules with a comma, for example:
# Runs every 10 minutes between 9a and 11a, and 4p-6p.
crontab(minute='*/10', hour='9-11,16-18')
Huey also provides shortcuts for crontab.daily() and
crontab.hourly() schedules:
@huey.periodic_task(crontab.hourly())
def aggregate_stats():
# This task runs every hour, on the hour.
...
For more information see the following API documentation:
Retrying tasks that fail
Sometimes we may have a task that we anticipate might fail from time to time, in which case we should retry it. Huey supports automatically retrying tasks a given number of times, optionally with a delay between attempts.
Here we’ll declare a task that fails approximately half of the time. To
configure this task to be automatically retried, use the retries parameter
of the task() decorator:
import random
@huey.task(retries=2) # Retry the task up to 2 times.
def flaky_task():
if random.randint(0, 1) == 0:
raise Exception('failing!')
return 'OK'
What happens when we call this task?
Message is placed on the queue and a
Resulthandle is returned to the caller.Consumer picks up the message and attempts to run the task, but the call to
random.randint()happens to return0, raising anException.The consumer puts the error into the result store and the exception is logged. If the caller resolves the
Resultnow, aTaskExceptionwill be raised which contains information about the exception that occurred in our task.The consumer notices that the task can be retried 2 times, so it decrements the retry count and re-enqueues it for execution.
The consumer picks up the message again and runs the task. This time, the task succeeds! The new return value is placed into the result store (“OK”).
We can reset our
Resulthandle by callingreset()and then re-resolve it. The result handle will now give us the new value, “OK”.
Should the task fail on the first invocation, it will be retried up-to two times. Note that it will be retried immediately after it returns.
To specify a delay between retry attempts, we can add a retry_delay
argument. The task will be retried up-to two times, with a delay of 10 seconds
between attempts:
@huey.task(retries=2, retry_delay=10)
def flaky_task():
# ...
Note
retries and retry_delay can also be specified for periodic tasks.
It is also possible to explicitly retry a task from within the task, by raising
a RetryTask exception. When this exception is used, the task will
be retried regardless of whether it was declared with retries. Similarly,
the task’s remaining retries (if they were declared) will not be affected by
raising RetryTask. Example:
@huey.task()
def fetch_api_data(url):
try:
fh = urlopen(url)
except HTTPError:
# Try again in 60 seconds for an HTTP error (500, etc).
raise RetryTask(delay=60)
...
For more information, see the following API documentation:
Error handling
When a task raises an unhandled exception, Huey performs several actions:
The exception and traceback are logged.
An error result is stored in the result store (a dict containing the exception representation, traceback, task ID, and remaining retries).
The
SIGNAL_ERRORsignal is emitted.If the task has retries remaining, it is re-enqueued (and
SIGNAL_RETRYINGis emitted).If the task has an
on_errorhandler, the error handler task is enqueued with the exception as an argument.
Reading error results
When you call Result.get() on a task that failed, a TaskException
is raised. The TaskException has a metadata attribute containing details
about the failure:
from huey.exceptions import TaskException
@huey.task()
def download(url):
resp = requests.get(url)
resp.raise_for_status()
return resp.text
result = download('https://example.com/missing')
try:
result.get(blocking=True, timeout=10)
except TaskException as exc:
print(exc.metadata['error']) # "HTTPError('404 ...')"
print(exc.metadata['traceback']) # Full traceback string.
print(exc.metadata['task_id']) # UUID of the failed task.
print(exc.metadata['retries']) # Retries remaining (0 if exhausted).
Result handle after a retry
When a task fails, the error result is written to the result store. If the task
is retried and eventually succeeds, the success result overwrites the error.
However, the Result object caches the result locally after the
first read. To see the updated result after a retry, you must call
reset():
@huey.task(retries=2, retry_delay=10)
def flaky_download(url):
resp = requests.get(url)
resp.raise_for_status()
return resp.text
result = flaky_download('https://example.com/intermittent')
try:
result.get(blocking=True, timeout=5)
except TaskException:
# First attempt failed. Wait for the retry to finish.
result.reset() # Clear cached error so we can read the new result.
value = result.get(blocking=True, timeout=30)
# value now contains the successful return value (or raises again).
For more information, see:
Immediate mode
Huey can be run in a special mode called immediate mode, which is very useful during testing and development. In immediate mode, Huey will execute task functions immediately rather than enqueueing them, while still preserving the APIs and behaviors one would expect when running a dedicated consumer process.
Immediate mode can be enabled in two ways:
huey = RedisHuey('my-app', immediate=True)
# Or at any time, via the "immediate" attribute:
huey = RedisHuey('my-app')
huey.immediate = True
To disable immediate mode:
huey.immediate = False
By default, enabling immediate mode will switch your Huey instance to using
in-memory storage. This is to prevent accidentally reading or writing to live
storage while doing development or testing. If you prefer to use immediate mode
with live storage, you can specify immediate_use_memory=False when creating
your Huey instance:
huey = RedisHuey('my-app', immediate_use_memory=False)
You can try out immediate mode quite easily in the Python shell. In the following example, everything happens within the interpreter – no separate consumer process is needed. In fact, because immediate mode switches to an in-memory storage when enabled, we don’t even have to be running a Redis server:
>>> from huey import RedisHuey
>>> huey = RedisHuey()
>>> huey.immediate = True
>>> @huey.task()
... def add(a, b):
... return a + b
...
>>> result = add(1, 2)
>>> result()
3
>>> add.revoke(revoke_once=True) # We can revoke tasks.
>>> result = add(2, 3)
>>> result() is None
True
>>> add(3, 4)() # No longer revoked, was restored automatically.
7
What happens if we try to schedule a task for execution in the future, while using immediate mode?
>>> result = add.schedule((4, 5), delay=60)
>>> result() is None # No result.
True
As you can see, the task was not executed. So what happened to it? The answer
is that the task was added to the in-memory storage layer’s schedule. We can
check this by calling Huey.scheduled():
>>> huey.scheduled()
[__main__.add: 8873...bcbd @2019-03-27 02:50:06]
Since immediate mode is fully synchronous, there is not a separate thread monitoring the schedule. The schedule can still be read or written to, but scheduled tasks will not automatically be executed and periodic tasks will not be enqueued.
Testing Guidelines
When testing Huey task-decorated functions, a couple guidelines will make your life easier.
Set your
Hueyinstance to Immediate mode. Any code that calls a task will run it synchronously, so it is safe to block on results as well.Unit test individual tasks by using the
.call_local()to call the underlying function.
Examples:
# Consider the following Huey instance and tasks:
huey = RedisHuey(...)
# Fake database to check side-effects.
database = []
@huey.task()
def add(a, b):
return a + b
@huey.periodic_task(crontab(...))
def run_reports():
global database
database.append(True) # Simulate a side-effect.
# The return values for periodic tasks are discarded when run by the
# Huey consumer. A return value may be helpful for testing, however, so
# in this example we will return something to demonstrate how to test
# our periodic task.
return 42
class TestMyTasks(unittest.TestCase):
def setUp(self):
# Make tasks run synchronously.
huey.immediate = True
def tearDown(self):
huey.immediate = False
def test_task(self):
result_handle = add(3, 4)
self.assertEqual(result_handle.get(), 7)
# Alternatively, you can also:
self.assertEqual(add.call_local(3, 4), 7)
def test_task_exceptions(self):
# We can also test exceptions.
result_handle = add(3, None) # Exception logged, but not raised.
with self.assertRaises(TaskException):
result_handle.get()
with self.assertRaises(TypeError):
# Exception raised directly when using call_local().
add.call_local(3, None)
def test_periodic_task(self):
# We cannot use the result-handle from a periodic task, because the
# results are always discarded by the consumer. In this case it is
# necessary to use `.call_local()` if we want to check the return
# value.
self.assertEqual(run_reports.call_local(), 42)
self.assertTrue(len(database), 1)
# If our periodic task has a side-effect, however, we can call it
# normally and check the side-effect happened. For example, if the
# run_reports() periodic task wrote a row to a database, we could
# do something like:
run_reports()
self.assertTrue(len(database), 2)
Task priority
Note
Priority support for Redis requires Redis 5.0 or newer. To use task
priorities with Redis, use the PriorityRedisHuey instead of
RedisHuey.
Task prioritization is fully supported by SqliteHuey and the
file-based FileHuey. The in-memory storage layer (used when
Immediate mode is enabled) also supports task priorities.
Huey tasks can be given a priority, allowing you to ensure that your most important tasks do not get delayed when the workers are busy.
Priorities can be assigned to a task function, in which case all invocations of the task will default to the given priority. Additionally, individual task invocations can be assigned a priority on a one-off basis.
When no priority is given, the task will default to a priority of 0.
To see how this works, lets define a task that has a priority (10):
@huey.task(priority=10)
def send_email(to, subj, body):
return mailer.send(to, 'webmaster@myapp.com', subj, body)
When we invoke this task, it will be processed before any other pending tasks whose priority is less than 10. So we could imagine our queue looking something like this:
process_payment- priority = 50check_spam- priority = 1make_thumbnail- priority = 0 (default)
Invoke the send_email() task:
send_email('new_user@foo.com', 'Welcome', 'blah blah')
Now the queue of pending tasks would be:
process_payment- priority = 50send_email- priority = 10check_spam- priority = 1make_thumbnail- priority = 0
We can override the default priority by passing priority= as a keyword
argument to the task function:
send_email('boss@mycompany.com', 'Important!', 'etc', priority=90)
Now the queue of pending tasks would be:
send_email(to boss) - priority = 90process_payment- priority = 50send_email- priority = 10check_spam- priority = 1make_thumbnail- priority = 0
Task priority only affects the ordering of tasks as they are pulled from the
queue of pending tasks. If there are periods of time where your workers are not
able to keep up with the influx of tasks, Huey’s priority feature can
ensure that your most important tasks do not get delayed.
Task-specific priority overrides can also be specified when scheduling a task to run in the future:
# Uses priority=10, since that was the default we used when
# declaring the send_email task:
send_email.schedule(('foo@bar.com', 'subj', 'msg'), delay=60)
# Override, specifying priority=50 for this task.
send_email.schedule(('bar@foo.com', 'subj', 'msg'), delay=60, priority=50)
Lastly, we can specify priority on periodic_task:
@huey.periodic_task(crontab(minute='0', hour='*/3'), priority=10)
def some_periodic_task():
# ...
For more information:
PriorityRedisHuey- Huey implementation that adds support for task priorities with the Redis storage layer. Requires Redis 5.0 or newer.SqliteHueyand the in-memory storage used when immediate-mode is enabled have full support for task priorities.
Canceling or pausing tasks
Huey tasks can be cancelled dynamically at runtime. This applies to regular tasks, tasks scheduled to execute in the future, and periodic tasks.
Any task can be canceled (“revoked”), provided the task has not started executing yet. Similarly, a revoked task can be restored, provided it has not already been processed and discarded by the consumer.
Using the Result.revoke() and Result.restore() methods:
# Schedule a task to execute in 60 seconds.
res = add.schedule((1, 2), delay=60)
# Provided the 60s has not elapsed, the task can be canceled
# by calling the `revoke()` method on the result object.
res.revoke()
# We can check to see if the task is revoked.
res.is_revoked() # -> True
# Similarly, we can restore the task, provided the 60s has
# not elapsed (at which point it would have been read and
# discarded by the consumer).
res.restore()
To revoke all instances of a given task, use the
revoke() and restore() methods on
the task function itself:
# Prevent all instances of the add() task from running.
add.revoke()
# We can check to see that all instances of the add() task
# are revoked:
add.is_revoked() # -> True
# We can enqueue an instance of the add task, and then check
# to verify that it is revoked:
res = add(1, 2)
res.is_revoked() # -> True
# To re-enable a task, we'll use the restore() method on
# the task function:
add.restore()
# Is the add() task enabled again?
add.is_revoked() # -> False
Huey provides APIs to revoke / restore on both individual instances of a task, as well as all instances of the task. For more information, see the following API docs:
Result.revoke()andResult.restore()for revoking individual instances of a task.Result.is_revoked()for checking the status of a task instance.TaskWrapper.revoke()andTaskWrapper.restore()for revoking all instances of a task.TaskWrapper.is_revoked()for checking the status of the task function itself.
Canceling from within a Task
Huey provides a special CancelExecution exception which can be
raised, either within a pre_execute() hook or within the body of
a task()-decorated function, to cancel the execution of the
task. Additionally, when raised from within a task, the CancelExecution
exception can override the task’s default retry policy, by specifying either
retry=True/False.
Example:
@huey.task(retries=2)
def load_data():
if something_temporary_is_wrong():
# Task will be retried, even if it has run out of retries or is a
# task that does not specify any automatic retries.
raise CancelExecution(retry=True)
elif something_fatal_is_wrong():
# Task will NOT be retried, even if it has more than one retry
# remaining.
raise CancelExecution(retry=False)
elif cancel_and_maybe_retry():
# Task will only be retried if it has one or more retries
# remaining (this is the default).
raise CancelExecution()
...
For more information, see: CancelExecution.
Canceling or pausing periodic tasks
The revoke() and restore() methods support some additional options
which may be especially useful for periodic_task().
The revoke() method accepts two optional parameters:
revoke_once- boolean flag, if set then only the next occurrence of the task will be revoked, after which it will be restored automatically.revoke_until- datetime, which specifies the time at which the task should be automatically restored.
For example, suppose we have a task that sends email notifications, but our mail server goes down and won’t be fixed for a while. We can revoke the task for a couple of hours, after which time it will start executing again:
@huey.periodic_task(crontab(minute='0', hour='*'))
def send_notification_emails():
# ... code to send emails ...
Here is how we might revoke the task for the next 3 hours:
>>> now = datetime.datetime.now()
>>> eta = now + datetime.timedelta(hours=3)
>>> send_notification_emails.revoke(revoke_until=eta)
Alternatively, we could use revoke_once=True to just skip the next
execution of the task:
>>> send_notification_emails.revoke(revoke_once=True)
At any time, the task can be restored using the usual
restore() method, and it’s status can be checked using
the is_revoked() method.
Task expiration
Huey tasks can be configured with an expiration time. Setting an expiration time on tasks will prevent them being run after the given time has elapsed. Expiration times may be useful if your queue is busy and there may be a significant lag between the time a task is enqueued and the time the consumer starts executing it.
Expiration times can be specified as:
datetime()instances, which are treated as absolute times.timedelta()orint, which are relative to the time at which the task is enqueued.
A default expire time can be provided when declaring a task:
# Task must be executed by consumer within 60s of being enqueued.
@huey.task(expires=60)
def time_sensitive_task(...):
Expiration times can be specified per-invocation, as well:
# Task must be executed by consumer within 5 minutes of being enqueued.
time_sensitive_task(report_file, expires=timedelta(seconds=300))
Expiration times can also be specified when scheduling tasks:
# Task scheduled to run in 1 hour, and once enqueued for execution, must be
# run within 60 seconds.
time_sensitive_task.schedule(
args=(report_file,),
delay=timedelta(seconds=3600),
expires=timedelta(seconds=60))
# Example using absolute datetimes instead of relative deltas:
one_hr = datetime.now() + timedelta(seconds=3600)
time_sensitive_task.schedule(
args=(report_file,),
eta=one_hr,
expires=one_hr + timedelta(seconds=60))
Task timeouts
Huey tasks can be interrupted by setting a timeout on the task, in seconds.
The actual implementation of the timeout mechanism depends on which worker type
you are using in the consumer, and is selected automatically by Huey:
Process: uses
SIGALRM, most robust.Thread: threads do NOT support a hard timeout. In order to use timeouts with threaded workers, it is necessary to do cooperative timeout-checking from within your task function, described below.
Greenlet: uses
gevent.Timeout, reliable so long as the task yields to the event loop and is not held-up making a blocking call.
A default timeout can be provided when declaring a task:
# Task must finish executing in 60s. If not, a `TaskTimeout` is raised and
# returned to the caller via the result handle.
@huey.task(timeout=60)
def maybe_slow(...):
Timeouts can be specified per-invocation, as well:
# Task will raise a `TaskTimeout` if runs for more than 30s.
maybe_slow(report_file, timeout=30)
Timeouts can also be specified when scheduling tasks:
maybe_slow.schedule(
args=(report_file,),
timeout=120)
When a task times-out, the SIGNAL_TIMEOUT signal will be
fired.
Cooperative Timeout
Threads do not support a hard timeout, so in order to implement timeouts when using threaded workers or if you prefer more control within your task function, you can use the cooperative timeout APIs:
These methods require that the Task instance be passed as context
into the task function with context=True.
Example:
@huey.task(timeout=60, context=True) # 60s deadline, takes Task as context.
def process_report(data, task=None):
for batch in chunk(data, 100):
task.check_timeout() # Will trigger a TaskTimeout if exceeds 60s.
service.process_data(batch) # Some expensive/slow operation.
For budget-aware scheduling you can use the more granular time_remaining:
@huey.task(timeout=60, context=True) # 60s deadline, takes Task as context.
def process_accounts(accounts, task=None):
for i, account in enumerate(accounts):
# If it looks like we might run out of time, schedule the remaining
# accounts to run in 5 minutes.
if task.time_remaining < 10:
process_accounts.schedule(
args=(accounts[i:],),
delay=300)
return
service.process_single_account(account)
Locking tasks
Task locking can be accomplished using the Huey.lock_task() method,
which can be used as a context-manager or decorator.
This lock prevents multiple invocations of a task from running concurrently.
If a second invocation occurs and the lock cannot be acquired, then a special
TaskLockedException is raised and the task will not be executed.
If the task is configured to be retried, then it will be retried normally.
Examples:
@huey.periodic_task(crontab(minute='*/5'))
@huey.lock_task('reports-lock') # Goes *after* the task decorator.
def generate_report():
# If a report takes longer than 5 minutes to generate, we do
# not want to kick off another until the previous invocation
# has finished.
run_report()
@huey.periodic_task(crontab(minute='0'))
def backup():
# Generate backup of code
do_code_backup()
# Generate database backup. Since this may take longer than an
# hour, we want to ensure that it is not run concurrently.
with huey.lock_task('db-backup'):
do_db_backup()
See Huey.lock_task() for API documentation.
Rate-Limiting
Huey provides simple fixed-window rate-limiting for tasks. Rate-limiting behaves much like locking and can be used as a context-manager or a decorator.
Rate-limiting allows for granular specification of when and how a rate-limited task should be retried.
Example:
@huey.task()
@huey.rate_limit('data-sync', limit=10, per=60)
def data_sync(data):
# synchronize `data` with another system, but limit to 10 invocations
# in a 60s window.
service.apply_updates(data)
In the above example, if more than 10 calls to data_sync() occur during a
60 second window, the task raises a RateLimitExceeded. This
exception, by default, will cause the task to be retried at the start of the
next rate-limit window - in the example this would be some delay between 0 and
60 seconds. This occurs even if the task has no retries, as above.
Flow:
data_sync()called 10 times in rapid succession, all succeed.data_sync()called again 33 seconds into the rate-limit window.RateLimitExceededis raised. Huey schedules the rate-limited 11th invocation ofdata_sync()to run in 27 seconds when the window resets.data_sync()called again 48 seconds into the rate-limit window.RateLimitExceededraised, 12th invocation ofdata_syncscheduled to run in 12 seconds when the window resets.
This is good for tasks that you must not lose, but which may overwhelm a service if they come in too quickly. There is a risk, however, if the tasks continually exceed the rate-limits, that they “outpace” the rate-limiting and pile up. This can be mitigated by specifying a limited number of retries on the task itself:
Example:
@huey.task(retries=2)
@huey.rate_limit('no-thundering-herd', limit=10, per=60, retry=False)
def data_sync(data):
service.apply_updates(data)
This example will behave like the previous example, but will cap each invocation to two retries.
Equivalent example using a context-manager instead of a decorator:
@huey.task(retries=2)
def data_sync(data):
with huey.rate_limit('data-sync', limit=10, per=60, retry=False):
service.apply_updates(data)
See Huey.rate_limit() for API documentation.
Task pipelines
Huey supports pipelines (or chains) of one or more tasks that should be executed sequentially.
To get started, let’s review the usual way we execute tasks:
@huey.task()
def add(a, b):
return a + b
result = add(1, 2)
An equivalent, but more verbose, way is to use the s()
method to create a Task instance and then enqueue it explicitly:
# Create a task representing the execution of add(1, 2).
task = add.s(1, 2)
# Enqueue the task instance, which returns a Result handle.
result = huey.enqueue(task)
So the following are equivalent:
result = add(1, 2)
# And:
result = huey.enqueue(add.s(1, 2))
The TaskWrapper.s() method is used to create a Task
instance (which represents the execution of the given function). The
Task is what gets serialized and sent to the consumer.
To create a pipeline, we will use the TaskWrapper.s() method to create
a Task instance. We can then chain additional tasks using the
Task.then() method:
add_task = add.s(1, 2) # Create Task to represent add(1, 2) invocation.
# Add additional tasks to pipeline by calling add_task.then().
pipeline = (add_task
.then(add, 3) # Call add() with previous result (1+2) and 3.
.then(add, 4) # Previous result ((1+2)+3) and 4.
.then(add, 5)) # Etc.
# When a pipeline is enqueued, a ResultGroup is returned (which is
# comprised of individual Result instances).
result_group = huey.enqueue(pipeline)
# Print results of above pipeline.
print(result_group.get(blocking=True))
# [3, 6, 10, 15]
# Alternatively, we could have iterated over the result group:
for result in result_group:
print(result.get(blocking=True))
# 3
# 6
# 10
# 15
When enqueueing a task pipeline, the return value will be a
ResultGroup, which encapsulates the Result objects for
the individual tasks. ResultGroup can be iterated over or you can
use the ResultGroup.get() method to get all the task return values as
a list.
Note that the return value from the parent task is passed to the next task in the pipeline, and so on.
If the value returned by the parent function is a tuple, then the tuple
will be used to extend the *args for the next task. Likewise, if the
parent function returns a dict, then the dict will be used to update the
**kwargs for the next task.
Example of chaining fibonacci calculations:
@huey.task()
def fib(a, b=1):
a, b = a + b, a
return (a, b) # returns tuple, which is passed as *args
pipe = (fib.s(1)
.then(fib)
.then(fib)
.then(fib))
results = huey.enqueue(pipe)
print(results(True)) # Resolve results, blocking until all are finished.
# [(2, 1), (3, 2), (5, 3), (8, 5)]
For more information, see the following API docs:
Warning
If a pipeline step returns None and Huey was initialized
with store_none=False (the default), the result for that step will
not be written to the result store. When you read the ResultGroup,
that step’s value will appear as None and be indistinguishable from
“result not ready yet.”
Pipeline execution is unaffected: the next task in the chain still runs,
but it receives no additional arguments from the None-returning step
(since None is treated as “no data to pass”).
If your pipeline steps may legitimately return None and you need to
read results from the ResultGroup, initialize Huey with
store_none=True or (preferable) have your tasks return a sentinel value
instead.
Error pipelines
Just as Task.then() chains a follow-up task on success, the
Task.error() method chains a task that runs when the parent fails.
The exception is passed as the first argument to the error handler:
@huey.task()
def download(url):
resp = requests.get(url)
resp.raise_for_status()
return resp.text
@huey.task()
def on_download_error(exc, url=None):
logger.error('Download of %s failed: %s', url, exc)
send_alert('Download failed', str(exc))
# Build a task with an error handler.
task = download.s('https://example.com/data')
task.error(on_download_error, url='https://example.com/data')
result = huey.enqueue(task)
Error handlers can also be combined with success pipelines:
task = (download.s('https://example.com/data')
.then(parse_response)
.then(store_result))
task.error(on_download_error)
result = huey.enqueue(task)
If download raises an exception, on_download_error is called and the
success pipeline (parse_response, store_result) is skipped. If
download succeeds, parse_response runs and on_download_error is
never called.
Note
The error handler is attached to the first task in the pipeline. If a
later task in the pipeline fails (e.g., parse_response), the error
handler on download is not triggered. To handle errors in later stages,
attach error handlers to those tasks directly.
For more information, see:
Groups and Chords
Pipelines execute tasks sequentially, while groups and chords allow you to run tasks in parallel, optionally collecting their results and passing them to a final callback.
group - parallel execution
group allows you to enqueue one or more tasks in parallel and
gather the results. All tasks are enqueued immediately and may execute
concurrently across workers. A ResultGroup is returned to enable
fetching the results of the tasks individually, as they become available, or
after they are all completed.
from huey import group
@huey.task()
def fetch(url):
return requests.get(url).json()
urls = ['https://.../1/', 'https://.../2/', ...]
result_group = huey.enqueue(group([
fetch.s(url) for url in urls]))
# Block until all tasks are done.
results = result_group.get(blocking=True)
# [{'id': 1, ...}, {'id': 2, ...}, ...]
Unlike map(), groups may contain different types of
tasks:
@huey.task()
def check_cache():
return cache.ping()
@huey.task()
def check_db():
try:
return db.execute('select 1').fetchone() is not None
except Exception:
logger.exception('database health-check failed')
return False
g = group([check_cache.s(), check_db.s()])
result_group = huey.enqueue(g)
cache_ok, db_ok = result_group(blocking=True)
Error handlers can be attached to every member of a group:
@huey.task()
def on_fetch_error(exc):
logger.error('Fetch failed: %s', exc)
g = group([fetch.s(u) for u in urls]).error(on_fetch_error)
chord - map / reduce
chord is a group with a callback. The member tasks are
run in parallel and when all tasks are finished, their results are passed to
the callback task in order.
from huey import chord
@huey.task()
def fetch(url):
return (url, requests.get(url).text)
@huey.task()
def index_pages(results):
for url, html in results:
search.index(url, html)
return len(results)
# Huey will run all the fetch_url() tasks, then when they are all finished
# the `index_pages()` task will be enqueued with the task results.
urls = ['https://a.com', 'https://b.com', ...]
c = chord(
[fetch.s(url) for url in urls],
index_pages)
result = huey.enqueue(c)
pages = result(blocking=True)
print('Indexed %s pages' % pages)
What happens when a chord is enqueued?
All sub-tasks are placed on the queue. Workers begin executing them as they are dequeued.
As each sub-task completes (or fails permanently), its return value (or exception) is stored and an internal completion counter is atomically incremented.
When the last sub-task is complete, all sub-task results are collected in order. The final callback is then enqueued with the sub-task results.
The callback is executed by a worker and the final result is made available.
Enqueueing a chord returns a ChordResult, which
provides access to the final callback result, the sub-task results, and any
tasks chained to the final callback.
c = chord([fetch.s(u) for u in urls], index_pages).then(send_report)
result = huey.enqueue(c)
result() # Get result of index_pages (chord callback).
result.results # ResultGroup for the sub-tasks (fetch(...)).
result.callback # Result handle for callback.
result.pipeline_results # ResultGroup for [index_pages, send_report]
Composing groups and chords
Groups and chords allow the same composition APIs as pipelines. The following are equivalent:
@huey.task()
def incr(n):
return n + 1
@huey.task()
def total(ns):
return sum(ns)
result = huey.enqueue(
group([incr.s(i) for i in range(10)])
.then(total))
result = huey.enqueue(
chord([incr.s(i) for i in range(10)],
total))
Using then() or error() on a chord
will attach an additional callback to the chord’s callback:
result = huey.enqueue(
group([incr.s(i) for i in range(3)])
.then(total)
.then(incr)
.error(alert_admin))
# Chord result is the output of total().
print(result(blocking=True)) # 1 + 2 + 3 == 6.
# Results of chord pipeline are 6, 7.
print(result.pipeline_results(blocking=True)) # [6, 7]
In this example, total receives the results from the incr subtasks.
Then the result of total is passed to incr again. If total
raises an exception, alert_admin is executed instead.
Execution flow:
incr(0),incr(1)andincr(2)are enqueued.All three eventually finish and
total([1, 2, 3])is enqueued.incr(6)is then enqueued and executed, returning7.
Pipelines inside chords
Individual chord members can be pipelines:
@huey.task()
def incr(n):
return n + 1
@huey.task()
def total(ns):
return sum(ns)
result = huey.enqueue(chord(
[incr.s(i).then(incr).then(incr) for i in range(3)],
total))
# First runs incr(0), incr(1), incr(2).
# Then: runs incr(1), incr(2), incr(3).
# Then: runs incr(2), incr(3), incr(4).
# Then: runs total([3, 4, 5])
print(result(True)) # 12.
The chord callback is not enqueued until all sub-task pipelines have completed.
chords inside chords
chords can also contain chords:
@huey.task()
def fetch(url):
return requests.get(url).json()
@huey.task()
def merge(results):
combined = {}
for r in results:
combined.update(r)
return combined
@huey.task()
def publish(merged):
database.write(merged)
inner1 = chord([fetch.s(u) for u in us_urls], merge.s())
inner2 = chord([fetch.s(u) for u in eu_urls], merge.s())
result = huey.enqueue(
chord([inner1, inner2], publish.s()))
In this example, two inner chords run in parallel. Each inner chord fans out
to its own set of URLs, then merges the results. When both inner merge
tasks complete, their results are collected and passed to publish.
Error handling in chords
chords are designed to always complete. When a sub-task fails and has no retries remaining, the exception is sent to the results list in place of a normal return value. The callback fires once all sub-tasks have completed, and in the event an error occurred, that sub-tasks’ value is the exception.
This means the callback may receive a mix of normal values and exceptions:
@huey.task()
def aggregate(results):
if any(isinstance(r, Exception) for r in results):
logger.warning('One of the sub-tasks failed.')
return process([r for r in results if not isinstance(r, Exception)])
If a sub-task fails but has retries remaining, it will retry normally. The chord will not receive an Exception unless the sub-task has exhausted available retries.
@huey.task(retries=3, retry_delay=10)
def flaky_fetch(url):
return requests.get(url).json()
c = chord(
[flaky_fetch.s(u) for u in urls],
aggregate.s())
If you prefer to handle errors before they reach the callback, catch exceptions inside your sub-tasks and return a sentinel value:
@huey.task()
def safe_fetch(url):
try:
return requests.get(url).json()
except Exception:
return None
@huey.task()
def aggregate(results):
valid = [r for r in results if r is not None]
return process(valid)
Error callbacks on members work independently of the chord and can be
applied via group.error():
@huey.task()
def on_fetch_error(exc):
logger.error('Fetch failed: %s', exc)
tasks = [fetch.s(u).error(on_fetch_error) for u in urls]
c = chord(tasks, aggregate.s())
# Or equivalently using group.error():
c = (group([fetch.s(u) for u in urls])
.error(on_fetch_error)
.then(aggregate))
Error callbacks on the chord itself apply to the callback task:
c = chord(
[fetch.s(u) for u in urls],
aggregate.s()
).error(alert_admin)
In this example, alert_admin runs if aggregate raises an exception,
not if a member fails.
Note
Revoking a chord member will prevent it from running, which will prevent the chord from ever completing. If you need to cancel a chord, revoke the callback task instead.
Note
Unlike regular tasks, chord sub-task results are stored unconditionally
by the chord’s internal bookkeeping. If a sub-task returns None, the
callback will correctly receive None in the results list regardless
of the store_none setting. You do not need store_none=True for
chords to work with None return values.
Dynamic fan-out
Because chord members must be known at enqueue time, dynamic fan-out (where the set of subtasks depends on a runtime value) should be done from inside a task:
@huey.task()
def discover_and_fetch(config):
urls = compute_urls(config)
result = huey.enqueue(chord(
[fetch.s(u) for u in urls],
aggregate.s()))
return result.callback.id # Return the callback task id for tracking.
For more information, see the following API docs:
Signals
The Consumer sends signals as it processes tasks.
The Huey.signal() method can be used to attach a callback to one or
more signals, which will be invoked synchronously by the consumer when the
signal is sent.
For a simple example, we can add a signal handler that simply prints the signal name and the ID of the related task.
@huey.signal()
def print_signal_args(signal, task, exc=None):
if signal == SIGNAL_ERROR:
print('%s - %s - exception: %s' % (signal, task.id, exc))
else:
print('%s - %s' % (signal, task.id))
The signal() method is used to decorate the signal-handling
function. It accepts an optional list of signals. If none are provided, as in
our example, then the handler will be called for any signal.
The callback function (print_signal_args) accepts two required arguments,
which are present on every signal: signal and task. Additionally, our
handler accepts an optional third argument exc which is only included with
SIGNAL_ERROR. SIGNAL_ERROR is only sent when a task raises an uncaught
exception during execution.
Warning
Signal handlers are executed synchronously by the consumer, so it is typically a bad idea to introduce any slow operations into a signal handler.
Huey emits the following signals:
SIGNAL_ENQUEUED- task has been placed on the queue.SIGNAL_EXECUTING- task is about to be executed by a worker.SIGNAL_COMPLETE- task finished successfully.SIGNAL_ERROR- task raised an unhandled exception. Handler receives an extraexcargument containing the exception instance.SIGNAL_CANCELED- task was canceled viaCancelExecution.SIGNAL_RETRYING- task failed but will be retried.SIGNAL_SCHEDULED- task was added to the schedule for future execution.SIGNAL_REVOKED- task was revoked and will not be executed.SIGNAL_EXPIRED- task’s expiration time passed before execution.SIGNAL_LOCKED- task could not acquire its lock.SIGNAL_TIMEOUT- task exceeded its execution timeout.SIGNAL_RATE_LIMITED- task was rate-limited.SIGNAL_INTERRUPTED- consumer was shut down while the task was still executing.
All signals are available in the huey.signals module. For signal ordering,
registration details, and examples, see the Signals document and the
Huey.signal() API documentation.
Consumer shutdown and interrupted tasks
When the consumer is shut down gracefully (SIGINT / Ctrl+C), it finishes
executing any in-progress tasks before exiting. However, if the consumer is
killed with SIGTERM or crashes, tasks that are mid-execution are lost and
will not be retried automatically.
Huey tracks in-flight tasks internally. When the consumer shuts down, it calls
Huey.notify_interrupted_tasks(), which emits SIGNAL_INTERRUPTED
for each task that was still executing. You can use this signal to re-enqueue
interrupted tasks:
from huey.signals import SIGNAL_INTERRUPTED
@huey.signal(SIGNAL_INTERRUPTED)
def on_interrupted(signal, task, *args, **kwargs):
# Re-enqueue the task so it will be executed again.
huey.enqueue(task)
Warning
Re-enqueueing interrupted tasks means the task may run more than once. If your task is not idempotent, consider tracking execution state externally (e.g. in a database) rather than blindly re-enqueueing.
Logging
Huey uses the standard library logging module to log information about task
execution and consumer activity. Messages are logged to the huey namespace,
with consumer-specific messages being logged to huey.consumer.
When the consumer is run, it binds a default StreamHandler() to the huey
namespace so that all messages are logged to the console. The consumer logging
can be configured using the following consumer options:
-l FILE, --logfile=FILE- log to a file.-v, --verbose- verbose logging (includes DEBUG level)-q, --quiet- minimal logging-S, --simple- simple logging format (“time message”)
If you would like to get email alerts when an error occurs, you can attach a
logging.handlers.SMTPHandler to the huey namespace at level ERROR:
from logging.handlers import SMTPHandler
import logging
mail_handler = SMTPHandler(
mailhost=('smtp.gmail.com', 587),
fromaddr='errors@myapp.com',
toaddrs=['developers@myapp.com'],
subject='Huey error log',
credentials=('errors@myapp.com', 'secret_password'),
secure=())
mail_handler.setLevel(logging.ERROR)
logging.getLogger('huey').addHandler(mail_handler)
Storage Options
Huey provides a number of different storage layers suitable to different types of workloads. Below I will try to sketch the differences, strengths, and weaknesses of each storage layer.
RedisHueyHuey’s capabilities are, to a large extent, informed by the functionality available in Redis. This is the most robust option available and can handle very busy workloads. Because Redis runs as a separate server process, it is even possible to run Huey consumers on multiple machines to facilitate “scale-out” operation.
Operations are guaranteed to be atomic, following the guarantees provided by Redis. The queue is stored in a Redis list, scheduled tasks use a sorted set, and the task result-store is kept in a hash.
Tasks that return a meaningful value must be sure that the caller “resolves” those return values at some point, to ensure that the result store does not become filled with unused data (to mitigate this, you can just modify your tasks to return
Noneif you never intend to use the result).By default Huey performs a “blocking” pop on the queue, which reduces latency, although polling can be used instead by passing
blocking=Falsewhen instantiatingRedisHuey.For low-latency result-fetching, you can specify
notify_result=Truewhen instantiatingRedisHuey. This works by coordinating result readiness through a blocking pop on a dedicated result list. By default this result-readiness list will expire after 86400 seconds, but this can be controlled with thenotify_result_ttlparameter. All RedisHuey implementations support these options.Task priorities are not supported by
RedisHuey.PriorityRedisHueyRedis storage layer that supports task priorities. In order to make this possible and efficient,
PriorityRedisHueystores the queue in a sorted set. Since sorted sets require the key to be unique, Huey will use the timestamp in microseconds to differentiate tasks enqueued with the same priority.RedisExpireHueyRedis storage layer that stores task results in top-level keys, in order to add an expiration time to them. Putting an expiration on task result keys can ensure that the result-store does not fill up with unresolved result values. The default expire time is 86400 seconds, although this can be controlled by setting the
expire_timeparameter during instantiation.PriorityRedisExpireHueyCombines the behaviors of
PriorityRedisHueyto support task priorities, with the result-store expiration behavior ofRedisExpireHuey.SqliteHueySqlite works well for many workloads (see Appropriate uses for Sqlite), and Huey’s Sqlite storage layer works well regardless of the worker-type chosen. Sqlite locks the database during writes, ensuring only a single writer can write to the database at any given time. Writes generally happen very quickly, however, so in practice this is rarely an issue. Because the database is stored in a single file, taking backups is quite simple.
SqliteHueymay be a good choice for moderate workloads where the operational complexity of running a separate server process like Redis is undesirable.FileHueyStores the queue, schedule and task results in files on the filesystem. This implementation is provided mostly for testing and development. An exclusive lock is used around all file-system operations, since multiple operations (list directory, read file, unlink file, e.g.) are typically required for each storage primitive (enqueue, dequeue, store result, etc).
MemoryHueyIn-memory implementation of the storage layer used for Immediate mode.
BlackHoleHueyAll storage methods are no-ops.
Tips and tricks
To call a task-decorated function in its original form, you can use
call_local():
@huey.task()
def add(a, b):
return a + b
# Call the add() function in "un-decorated" form, skipping all
# the huey stuff:
add.call_local(3, 4) # Returns 7.
It’s also worth mentioning that python decorators are just syntactical sugar for wrapping a function with another function. Thus, the following two examples are equivalent:
@huey.task()
def add(a, b):
return a + b
# Equivalent to:
def _add(a, b):
return a + b
add = huey.task()(_add)
Task functions can be applied multiple times to a list (or iterable) of
parameters using the map() method:
>>> @huey.task()
... def add(a, b):
... return a + b
...
>>> params = [(i, i ** 2) for i in range(10)]
>>> result_group = add.map(params)
>>> result_group.get(blocking=True)
[0, 2, 6, 12, 20, 30, 42, 56, 72, 90]
Retrieving Results by Task ID
If you have a task ID but not the original Result handle (e.g.,
because you stored the ID in a database or session), you can retrieve the
result using Huey.result():
# In a web handler, store the task ID when enqueuing:
result = process_upload(file_id)
session['task_id'] = result.id
# Later, in a status-checking endpoint:
task_id = session['task_id']
try:
value = huey.result(task_id, blocking=False, preserve=True)
except TaskException as exc:
return {'status': 'error', 'detail': exc.metadata['error']}
if value is None:
return {'status': 'pending'}
return {'status': 'complete', 'value': value}
Schedule Shorthand
The schedule() method accepts a delay (int/float or
timedelta) or an ETA (datetime):
# These are all equivalent:
add.schedule(args=(1, 2), delay=60)
add.schedule(args=(1, 2), delay=timedelta(seconds=60))
add.schedule(args=(1, 2), eta=datetime.now() + timedelta(seconds=60))
Iterating Results as They Complete
When working with a ResultGroup (from map()
or group), you can iterate over results as they become available
using as_completed():
result_group = huey.enqueue(group([
fetch.s(url) for url in urls]))
for value in result_group.as_completed():
# Process each result as soon as it's ready, rather than waiting
# for all tasks to finish.
process(value)
Reserved Keyword Arguments
When calling a task-decorated function or using s(), the
following keyword arguments are intercepted by Huey and will not be passed
to your task function:
etadelayretriesretry_delay,priorityexpirestimeout
This means you cannot use these names as keyword arguments to your task function:
@huey.task()
def bad_example(priority=None): # "priority" is reserved!
...
# This will set the task's priority to 10, NOT pass priority=10 to
# the function.
bad_example(priority=10)
If you need a parameter with one of these names, rename it or pass it positionally.
Reading more
That sums up the basic usage patterns of huey. Below are links for details on other aspects of the APIs:
Huey- responsible for coordinating executable tasks and queue backendsHuey.task()- decorator to indicate an executable task.Result- handle for interacting with a task.Huey.periodic_task()- decorator to indicate a task that executes at periodic intervals.crontab()- define what intervals to execute a periodic command.For information about managing shared resources like database connections, refer to the shared resources document.
Also check out the notes on running the consumer.