Signals
The consumer will send various signals as it processes tasks. Callbacks can be registered as signal handlers, and will be called synchronously by the consumer process.
Signal Reference
The following table lists all signals, when they are emitted, and any extra
arguments passed to the handler beyond the standard (signal, task) pair.
Signal |
When emitted |
Extra arguments |
|---|---|---|
|
Task has been placed on the queue. Emitted in both the application process (when your code calls a task) and the consumer (when re-enqueueing retries, periodic tasks, or scheduled tasks). |
None |
|
Task is about to be executed by a worker. |
None |
|
Task has finished executing successfully and the result has been stored in the result-store. |
None |
|
Task raised an unhandled exception during execution. |
|
|
Task was canceled, either by a |
None |
|
Task failed but will be retried (retries remaining, or
|
None |
|
Task is not yet ready to run and has been added to the schedule for
future execution (e.g., has an |
None |
|
Task was revoked and will not be executed. No further signals are emitted for this task. |
None |
|
Task’s expiration time has passed; it will not be executed. |
None |
|
Task could not acquire its lock ( |
None |
|
Task exceeded its execution timeout. |
None |
|
Task was rate-limited by a |
None |
|
Consumer was shut down while the task was still executing (e.g., via
|
None |
Signal Ordering
Signals are emitted in a deterministic order. Understanding this order is important when writing signal handlers that depend on the state of the task or the result store.
Successful task execution:
SIGNAL_ENQUEUED– task placed on the queue (in the application process).SIGNAL_EXECUTING– worker picks up the task.SIGNAL_COMPLETE– task finished. The result is in the result store.If the task has an
on_completepipeline, the next task is enqueued (emitting anotherSIGNAL_ENQUEUED).
Task failure with retry:
SIGNAL_ENQUEUEDSIGNAL_EXECUTINGSIGNAL_ERROR– exception is passed asexc. The error result is stored at this point.SIGNAL_RETRYING– task will be retried.If
retry_delayis set:SIGNAL_SCHEDULED(task added to the schedule for later). Otherwise:SIGNAL_ENQUEUED(task re-added to the queue immediately).
Task failure without retry (retries exhausted or not configured):
SIGNAL_ENQUEUEDSIGNAL_EXECUTINGSIGNAL_ERROR
Scheduled task:
SIGNAL_ENQUEUED– task placed on the queue (application process).SIGNAL_SCHEDULED– worker sees the task is not ready to run, adds it to the schedule.When the scheduler determines the task is ready:
SIGNAL_ENQUEUED(in the consumer process).SIGNAL_EXECUTINGSIGNAL_COMPLETE(orSIGNAL_ERROR, etc.)
Revoked task:
SIGNAL_ENQUEUEDSIGNAL_REVOKED– no further signals are emitted.
Rate-limited task (with automatic retry):
SIGNAL_ENQUEUEDSIGNAL_EXECUTINGSIGNAL_RATE_LIMITEDSIGNAL_RETRYINGSIGNAL_SCHEDULED– task is scheduled for the start of the next rate-limit window.
Chord signals:
When a chord is enqueued, each sub-task emits its own SIGNAL_ENQUEUED.
As sub-tasks complete, they emit SIGNAL_COMPLETE (or SIGNAL_ERROR).
When the last sub-task finishes, the callback is enqueued
(SIGNAL_ENQUEUED), then executed (SIGNAL_EXECUTING, etc.).
Registering Signal Handlers
To register a signal handler, use the Huey.signal() method:
@huey.signal()
def all_signal_handler(signal, task, exc=None):
# This handler will be called for every signal.
print('%s - %s' % (signal, task.id))
@huey.signal(SIGNAL_ERROR, SIGNAL_LOCKED, SIGNAL_CANCELED, SIGNAL_REVOKED)
def task_not_executed_handler(signal, task, exc=None):
# This handler will be called for the 4 signals listed, which
# correspond to error conditions.
print('[%s] %s - not executed' % (signal, task.id))
@huey.signal(SIGNAL_COMPLETE)
def task_success(signal, task):
# This handler will be called for each task that completes successfully.
pass
When no signals are specified (as in all_signal_handler), the handler is
registered for all signals via an internal "any" channel.
Signal handlers can be unregistered using Huey.disconnect_signal().
# Disconnect the "task_success" signal handler.
huey.disconnect_signal(task_success)
# Disconnect the "task_not_executed_handler", but just from
# handling SIGNAL_LOCKED.
huey.disconnect_signal(task_not_executed_handler, SIGNAL_LOCKED)
Examples
We’ll use the following tasks to illustrate how signals may be sent:
@huey.task()
def add(a, b):
return a + b
@huey.task(retries=2, retry_delay=10)
def flaky_task():
if random.randint(0, 1) == 0:
raise ValueError('uh-oh')
return 'OK'
Here is a simple example of a task execution we would expect to succeed:
>>> result = add(1, 2)
>>> result.get(blocking=True)
The following signals would be fired:
SIGNAL_ENQUEUED- the task has been enqueued (happens in the application process).SIGNAL_EXECUTING- the task has been dequeued and will be executed.SIGNAL_COMPLETE- the task has finished successfully.
Here is an example of scheduling a task for execution after a short delay:
>>> result = add.schedule((2, 3), delay=10)
>>> result(True) # same as result.get(blocking=True)
The following signals would be sent:
SIGNAL_ENQUEUED- the task has been enqueued (happens in the application process).SIGNAL_SCHEDULED- the task is not yet ready to run, so it has been added to the schedule.After 10 seconds, the consumer will re-enqueue the task as it is now ready to run, sending the
SIGNAL_ENQUEUED(in the consumer process!).Then the consumer will run the task and send the
SIGNAL_EXECUTINGsignal.SIGNAL_COMPLETE.
Here is an example that may fail, in which case it will be retried automatically with a delay of 10 seconds.
>>> result = flaky_task()
>>> try:
... result.get(blocking=True)
... except TaskException:
... result.reset()
... result.get(blocking=True) # Try again if first time fails.
...
Assuming the task failed the first time and succeeded the second time, we would see the following signals being sent:
SIGNAL_ENQUEUED- task has been enqueued.SIGNAL_EXECUTING- the task is being executed.SIGNAL_ERROR- the task raised an unhandled exception.SIGNAL_RETRYING- the task will be retried.SIGNAL_SCHEDULED- the task has been added to the schedule for execution in ~10 seconds.SIGNAL_ENQUEUED- 10s have elapsed and the task is ready to run and has been re-enqueued.SIGNAL_EXECUTING- second try running task.SIGNAL_COMPLETE- task succeeded.
What happens if we revoke the add() task and then attempt to execute it:
>>> add.revoke()
>>> res = add(1, 2)
The following signal will be sent:
SIGNAL_ENQUEUED- the task has been enqueued for execution.SIGNAL_REVOKED- this is sent before the task enters the “executing” state. When a task is revoked, no other signals will be sent.
Using SIGNAL_INTERRUPTED
The correct way to shut-down the Huey consumer is to send a SIGINT signal
to the worker process (e.g. Ctrl+C) - this initiates a graceful shutdown.
Sometimes, however, you may need to shutdown the consumer using SIGTERM -
this immediately stops the consumer. Any tasks that are currently being
executed are then “lost” and will not be retried by default (see also:
Consumer shutdown).
To avoid losing these tasks, you can use a SIGNAL_INTERRUPTED handler to
re-enqueue them:
@huey.signal(SIGNAL_INTERRUPTED)
def on_interrupted(signal, task, *args, **kwargs):
# The consumer was shutdown before `task` finished executing.
# Re-enqueue it.
huey.enqueue(task)
Signal Handler Error Resilience
If a signal handler raises an exception, Huey logs the exception but continues processing. A broken signal handler will not prevent other signal handlers from running, nor will it prevent the task from being executed or its result from being stored.
@huey.signal(SIGNAL_COMPLETE)
def broken_handler(signal, task):
raise ValueError('oops')
@huey.signal(SIGNAL_COMPLETE)
def working_handler(signal, task):
# This will still be called, even if broken_handler raised.
record_completion(task.id)
Signals and Immediate Mode
Signals fire in immediate mode as well as when running the consumer. This makes it easy to test signal handlers:
huey.immediate = True
state = []
@huey.signal(SIGNAL_COMPLETE)
def on_complete(signal, task):
state.append(task.id)
result = add(1, 2) # Executes immediately, fires signals.
assert len(state) == 1
assert state[0] == result.id
Performance considerations
Signal handlers are executed synchronously by the consumer as it processes
tasks (with the exception of SIGNAL_ENQUEUED, which also runs in your
application process). It is important to use care when implementing signal
handlers, as one slow signal handler can impact the overall responsiveness of
the consumer.
For example, if you implement a signal handler that posts some data to REST API, everything might work fine until the REST API goes down or stops being responsive – which will cause the signal handler to block, which then prevents the consumer from moving on to the next task.
Another consideration is the management of shared resources that may be used by signal handlers, such as database connections or open file handles. Signal handlers are called by the consumer workers, which (depending on how you are running the consumer) may be separate processes, threads or greenlets. As a result, care should be taken to ensure proper initialization and cleanup of any resources you plan to use in signal handlers.
Lastly, take care when implementing SIGNAL_ENQUEUED handlers, as these may
run in your application-code (e.g. whenever your application enqueues a task),
or by the consumer process (e.g. when re-enqueueing a task for retry, or
when enqueueing periodic tasks, when moving a task from the schedule to the
queue, etc).