Huey’s API¶
Most end-users will interact with the API using the two decorators:
The API documentation will follow the structure of the huey API, starting with
the highest-level interfaces (the decorators) and eventually discussing the
lowest-level interfaces, the BaseQueue
and BaseDataStore
objects.
Function decorators and helpers¶
-
class
Huey
(name[, result_store=True[, events=True[, store_none=False[, always_eager=False[, store_errors=True[, blocking=False[, **storage_kwargs]]]]]]])¶ Huey executes tasks by exposing function decorators that cause the function call to be enqueued for execution by the consumer.
Typically your application will only need one Huey instance, but you can have as many as you like – the only caveat is that one consumer process must be executed for each Huey instance.
Parameters: - name – the name of the huey instance or application.
- result_store (bool) – whether the results of tasks should be stored.
- events (bool) – whether events should be emitted by the consumer.
- store_none (bool) – Flag to indicate whether tasks that return
None
should store their results in the result store. - always_eager (bool) – Useful for testing, this will execute all tasks immediately, without enqueueing them.
- store_errors (bool) – whether task errors should be stored.
- blocking (bool) – whether the queue will block (if False, then the queue will poll).
- storage_kwargs – arbitrary kwargs to pass to the storage implementation.
Example usage:
from huey import RedisHuey, crontab huey = RedisHuey('my-app') @huey.task() def slow_function(some_arg): # ... do something ... return some_arg @huey.periodic_task(crontab(minute='0', hour='3')) def backup(): # do a backup every day at 3am return
-
task
([retries=0[, retry_delay=0[, retries_as_argument=False[, include_task=False]]]])¶ Function decorator that marks the decorated function for processing by the consumer. Calls to the decorated function will do the following:
- Serialize the function call into a message suitable for storing in the queue.
- Enqueue the message for execution by the consumer.
- If a
result_store
has been configured, return aTaskResultWrapper
instance which can retrieve the result of the function, orNone
if not using a result store.
Note
Huey can be configured to execute the function immediately by instantiating it with
always_eager = True
– this is useful for running in debug mode or when you do not wish to run the consumer.Here is how you might use the
task
decorator:# assume that we've created a huey object from huey import RedisHuey huey = RedisHuey() @huey.task() def count_some_beans(num): # do some counting! return 'Counted %s beans' % num
Now, whenever you call this function in your application, the actual processing will occur when the consumer dequeues the message and your application will continue along on its way.
With a result store:
>>> res = count_some_beans(1000000) >>> res <huey.api.TaskResultWrapper object at 0xb7471a4c> >>> res() 'Counted 1000000 beans'
Without a result store:
>>> res = count_some_beans(1000000) >>> res is None True
Parameters: - retries (int) – number of times to retry the task if an exception occurs
- retry_delay (int) – number of seconds to wait between retries
- retries_as_argument (boolean) – whether the number of retries should be passed in to the decorated function as an argument.
- include_task (boolean) – whether the task instance itself should be
passed in to the decorated function as the
task
argument.
Returns: A callable
TaskWrapper
instance.Return type: The return value of any calls to the decorated function depends on whether the
Huey
instance is configured with aresult_store
. If a result store is configured, the decorated function will return anTaskResultWrapper
object which can fetch the result of the call from the result store – otherwise it will simply returnNone
.The
task
decorator also does one other important thing – it adds a special methods onto the decorated function, which makes it possible to schedule the execution for a certain time in the future, create task pipelines, etc. For more information, see:
-
periodic_task
(validate_datetime)¶ Function decorator that marks the decorated function for processing by the consumer at a specific interval. The
periodic_task
decorator serves to mark a function as needing to be executed periodically by the consumer.Note
By default, the consumer will schedule and enqueue periodic task functions. To disable the enqueueing of periodic tasks, run the consumer with
-n
or--no-periodic
.The
validate_datetime
parameter is a function which accepts a datetime object and returns a boolean value whether or not the decorated function should execute at that time or not. The consumer will send a datetime to the function every minute, giving it the same granularity as the linux crontab, which it was designed to mimic.For simplicity, there is a special function
crontab()
, which can be used to quickly specify intervals at which a function should execute. It is described below.Here is an example of how you might use the
periodic_task
decorator and thecrontab
helper:from huey import crontab from huey import RedisHuey huey = RedisHuey() @huey.periodic_task(crontab(minute='*/5')) def every_five_minutes(): # this function gets executed every 5 minutes by the consumer print("It's been five minutes")
Note
Because functions decorated with
periodic_task
are meant to be executed at intervals in isolation, they should not take any required parameters nor should they be expected to return a meaningful value. This is the same regardless of whether or not you are using a result store.Parameters: validate_datetime – a callable which takes a datetime
and returns a boolean whether the decorated function should execute at that time or notReturns: A callable TaskWrapper
instance.Return type: PeriodicQueueTask
Like
task()
, the periodic task decorator adds helpers to the decorated function. These helpers allow you to “revoke” and “restore” the periodic task, effectively enabling you to pause it or prevent its execution. For more information, seeTaskWrapper
.Note
The result (return value) of a periodic task is not stored in the result store. This is primarily due to the fact that there is not an obvious way one would read such results, since the invocation of the periodic task happens inside the consumer scheduler. As such, there is no task result handle which the user could use to read the result. To store the results of periodic tasks, you will need to use your own storage or use the storage APIs directly:
More info:
-
enqueue
(task)¶ Enqueue the given task. When the result store is enabled (on by default), the return value will be a
TaskResultWrapper
which provides access to the result (among other things).If the task specifies another task to run on completion (see
QueueTask.then()
), then the return value will be alist
ofTaskResultWrapper
objects, one for each task in the pipeline.Note
Unless you are executing a pipeline of tasks, it should not typically be necessary to use the
Huey.enqueue()
method. Calling (or scheduling) atask
-decorated function will automatically enqueue a task for execution.When you create a task pipeline, however, it is necessary to enqueue the pipeline once it has been set up.
Parameters: task (QueueTask) – a QueueTask
instance.Returns: A TaskResultWrapper
object (if result store enabled).
-
register_pre_execute
(name, fn)¶ Register a pre-execute hook. The callback will be executed before the execution of all tasks. Execution of the task can be cancelled by raising a
CancelExecution
exception. Uncaught exceptions will be logged but will not cause the task itself to be cancelled.The callback function should accept a single task instance, the return value is ignored.
Hooks are executed in the order in which they are registered (which may be implicit if registered using the decorator).
Parameters: - name – Name for the hook.
- fn – Callback function that accepts task to be executed.
-
unregister_pre_execute
(name)¶ Unregister the specified pre-execute hook.
-
pre_execute
([name=None])¶ Decorator for registering a pre-execute hook.
Usage:
@huey.pre_execute() def my_pre_execute_hook(task): do_something()
-
register_post_execute
(name, fn)¶ Register a post-execute hook. The callback will be executed after the execution of all tasks. Uncaught exceptions will be logged but will have no other effect on the overall operation of the consumer.
The callback function should accept:
- a task instance
- the return value from the execution of the task (which may be None)
- any exception that was raised during the execution of the task (which will be None for tasks that executed normally).
The return value of the callback itself is ignored.
Hooks are executed in the order in which they are registered (which may be implicit if registered using the decorator).
Parameters: - name – Name for the hook.
- fn – Callback function that accepts task that was executed and the tasks return value (or None).
-
unregister_post_execute
(name)¶ Unregister the specified post-execute hook.
-
post_execute
([name=None])¶ Decorator for registering a post-execute hook.
Usage:
@huey.post_execute() def my_post_execute_hook(task, task_value, exc): do_something()
-
register_startup
(name, fn)¶ Register a startup hook. The callback will be executed whenever a worker comes online. Uncaught exceptions will be logged but will have no other effect on the overall operation of the worker.
The callback function must not accept any parameters.
This API is provided to simplify setting up global resources that, for whatever reason, should not be created as import-time side-effects. For example, your tasks need to write data into a Postgres database. If you create the connection at import-time, before the worker processes are spawned, you’ll likely run into errors when attempting to use the connection from the child (worker) processes. To avoid this problem, you can register a startup hook which executes once when the worker starts up.
Parameters: - name – Name for the hook.
- fn – Callback function.
-
unregister_startup
(name)¶ Unregister the specified startup hook.
-
on_startup
([name=None])¶ Decorator for registering a startup hook. See
register_startup()
for information about start hooks.Usage:
db_connection = None @huey.on_startup() def setup_db_connection(): global db_connection db_connection = psycopg2.connect(database='my_db') @huey.task() def write_data(rows): cursor = db_connection.cursor() # ...
-
revoke
(task[, revoke_until=None[, revoke_once=False]])¶ Prevent the given task instance from being executed by the consumer after it has been enqueued. To understand this method, you need to know a bit about how the consumer works. When you call a function decorated by the
Huey.task()
method, calls to that function will enqueue a message to the consumer indicating which task to execute, what the parameters are, etc. If the task is not scheduled to execute in the future, and there is a free worker available, the task starts executing immediately. Otherwise if workers are busy, it will wait in line for the next free worker.When you revoke a task, when the worker picks up the revoked task to start executing it, it will instead just throw it away and get the next available task. So, revoking a task only has affect between the time you call the task and the time the worker actually starts executing the task.
Warning
This method only revokes a given instance of a task. Therefore, this method cannot be used with periodic tasks. To revoke all instances of a given task (including periodic tasks), see the
revoke_all()
method.This function can be called multiple times, but each call will supercede any previous revoke settings.
Parameters: - revoke_until (datetime) – Prevent the execution of the task until the
given datetime. If
None
it will prevent execution indefinitely. - revoke_once (bool) – If
True
will only prevent execution the next time it would normally execute.
- revoke_until (datetime) – Prevent the execution of the task until the
given datetime. If
-
restore
(task)¶ Takes a previously revoked task instance and restores it, allowing normal execution. If the revoked task was already consumed and discarded by a worker, then restoring will have no effect.
Note
If the task class itself has been revoked, restoring a given instance will not have any effect.
-
revoke_by_id
(task_id[, revoke_until=None[, revoke_once=False]])¶ Exactly the same as
revoke()
, except it accepts a task instance ID instead of the task instance itself.
-
restore_by_id
(task_id)¶ Exactly the same as
restore()
, except it accepts a task instance ID instead of the task instance itself.
-
revoke_all
(task_class[, revoke_until=None[, revoke_once=False]])¶ Prevent any instance of the given task from being executed by the consumer.
Warning
This method affects all instances of a given task.
This function can be called multiple times, but each call will supercede any previous revoke settings.
Parameters: - revoke_until (datetime) – Prevent execution of the task until the
given datetime. If
None
it will prevent execution indefinitely. - revoke_once (bool) – If
True
will only prevent execution the next time it would normally execute.
- revoke_until (datetime) – Prevent execution of the task until the
given datetime. If
-
restore_all
(task_class)¶ Takes a previously revoked task class and restores it, allowing normal execution. Restoring a revoked task class does not have any effect on individually revoked instances of the given task.
Note
Restoring a revoked task class does not have any effect on individually revoked instances of the given task.
-
is_revoked
(task[, dt=None])¶ Returns a boolean indicating whether the given task instance/class is revoked. If the
dt
parameter is specified, then the result will indicate whether the task is revoked at that particular datetime.Note
If a task class is specified, the return value will indicate only whether all instances of that task are revoked.
If a task instance/ID is specified, the return value will indicate whether the given instance or the task class itself has been revoked.
Parameters: task – Either a task class, task instance or task ID. Returns: Boolean indicating whether the aforementioned task is revoked.
-
result
(task_id[, blocking=False[, timeout=None[, backoff=1.15[, max_delay=1.0[, revoke_on_timeout=False[, preserve=False]]]]]])¶ Attempt to retrieve the return value of a task. By default,
result()
will simply check for the value, returningNone
if it is not ready yet. If you want to wait for a value, you can specifyblocking=True
. This will loop, backing off up to the providedmax_delay
, until the value is ready or thetimeout
is reached. If thetimeout
is reached before the result is ready, aDataStoreTimeout
exception will be raised.Note
If the task failed with an exception, then a
TaskException
that wraps the original exception will be raised.Warning
By default the result store will delete a task’s return value after the value has been successfully read (by a successful call to the
result()
orTaskResultWrapper.get()
methods). If you need to use the task result multiple times, you must specifypreserve=True
when calling these methods.Parameters: - task_id – the task’s unique identifier.
- blocking (bool) – whether to block while waiting for task result
- timeout – number of seconds to block (if
blocking=True
) - backoff – amount to backoff delay each iteration of loop
- max_delay – maximum amount of time to wait between iterations when attempting to fetch result.
- revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.
- preserve (bool) – see the above warning. When set to
True
, this parameter ensures that the task result should be preserved after having been successfully retrieved.
-
lock_task
(lock_name)¶ Utilize the Storage key/value APIs to implement simple locking.
This lock is designed to be used to prevent multiple invocations of a task from running concurrently. Can be used as either a context-manager or as a task decorator. If using as a decorator, place it directly above the function declaration.
If a second invocation occurs and the lock cannot be acquired, then a special exception is raised, which is handled by the consumer. The task will not be executed and an
EVENT_LOCKED
will be emitted. If the task is configured to be retried, then it will be retried normally, but the failure to acquire the lock is not considered an error.Examples:
@huey.periodic_task(crontab(minute='*/5')) @huey.lock_task('reports-lock') def generate_report(): # If a report takes longer than 5 minutes to generate, we do # not want to kick off another until the previous invocation # has finished. run_report() @huey.periodic_task(crontab(minute='0')) def backup(): # Generate backup of code do_code_backup() # Generate database backup. Since this may take longer than an # hour, we want to ensure that it is not run concurrently. with huey.lock_task('db-backup'): do_db_backup()
Parameters: lock_name (str) – Name to use for the lock. Returns: Decorator or context-manager.
-
put
(key, value)¶ Parameters: - key – key for data
- value – arbitrary data to store in result store.
Store a value in the result-store under the given key.
-
get
(key[, peek=False])¶ Parameters: - key – key to read
- peek (bool) – non-destructive read
Read a value from the result-store at the given key. By default reads are destructive, but to preserve the value you can specify
peek=True
.
-
pending
([limit=None])¶ Return all unexecuted tasks currently in the queue.
-
scheduled
([limit=None])¶ Return all unexecuted tasks currently in the schedule.
-
all_results
()¶ Return a mapping of task-id to pickled result data for all executed tasks whose return values have not been automatically removed.
-
class
TaskWrapper
(huey, func[, retries=0[, retry_delay=0[, retries_as_argument=False[, include_task=False[, name=None[, task_base=None[, **task_settings]]]]]]])¶ Parameters: - huey (Huey) – A huey instance.
- func – User function.
- retries (int) – Upon failure, number of times to retry the task.
- retry_delay (int) – Number of seconds to wait before retrying after a failure/exception.
- retries_as_argument (bool) – Pass the number of remaining retries as an argument to the user function.
- include_task (bool) – Pass the task object itself as an argument to the user function.
- name (str) – Name for task (will be determined based on task module and function name if not provided).
- task_base – Base-class for task, defaults to
QueueTask
. - task_settings – Arbitrary settings to pass to the task class constructor.
Wrapper around a user-defined function that converts function calls into tasks executed by the consumer. The wrapper, which decorates the function, replaces the function in the scope with a
TaskWrapper
instance.The wrapper class, when called, will enqueue the requested function call for execution by the consumer.
Note
You should not need to create
TaskWrapper
instances directly. Instead, use theHuey.task()
andHuey.periodic_task()
decorators.The wrapper class also has several helper methods for managing and enqueueing tasks, which are described below.
-
schedule
([args=None[, kwargs=None[, eta=None[, delay=None[, convert_utc=True]]]]])¶ Use the
schedule
method to schedule the execution of the queue task for a given time in the future:import datetime # get a datetime object representing one hour in the future in_an_hour = datetime.datetime.now() + datetime.timedelta(seconds=3600) # schedule "count_some_beans" to run in an hour count_some_beans.schedule(args=(100000,), eta=in_an_hour) # another way of doing the same thing... count_some_beans.schedule(args=(100000,), delay=(60 * 60))
Parameters: - args – arguments to call the decorated function with
- kwargs – keyword arguments to call the decorated function with
- eta (datetime) – the time at which the function should be
executed. See note below on how to correctly specify the
eta
whether the consumer is running in UTC- or localtime-mode. - delay (int) – number of seconds to wait before executing function
- convert_utc – whether the
eta
ordelay
should be converted from local time to UTC. Defaults toTrue
. See note below.
Return type: like calls to the decorated function, will return an
TaskResultWrapper
object if a result store is configured, otherwise returnsNone
Note
It can easily become confusing when/how to use the
convert_utc
parameter when scheduling tasks. Similarly, if you are using naive datetimes, whether the ETA should be based arounddatetime.utcnow()
ordatetime.now()
.If you are running the consumer in UTC-mode (the default):
- When specifying a
delay
,convert_utc=True
. - When specifying an
eta
with respect todatetime.now()
,convert_utc=True
. - When specifying an
eta
with respect todatetime.utcnow()
,convert_utc=False
.
If you are running the consumer in localtime-mode (
-o
):- When specifying a
delay
,convert_utc=False
. - When specifying an
eta
, it should always be with respect todatetime.now()
withconvert_utc=False
.
In other words, for consumers running in UTC-mode, the only time
convert_utc=False
is when you are passing aneta
that is already a naive datetime with respect toutcnow()
.Similarly for localtime-mode consumers,
convert_utc
should always beFalse
and when specifying aneta
it should be with respect todatetime.now()
.
-
call_local
()¶ Call the
@task
-decorated function without enqueueing the call. Or, in other words,call_local()
provides access to the underlying user function.>>> count_some_beans.call_local(1337) 'Counted 1337 beans'
-
revoke
([revoke_until=None[, revoke_once=False]])¶ Prevent any instance of the given task from executing. When no parameters are provided the function will not execute again until explicitly restored.
This function can be called multiple times, but each call will supercede any limitations placed on the previous revocation.
Parameters: - revoke_until (datetime) – Prevent the execution of the task until the
given datetime. If
None
it will prevent execution indefinitely. - revoke_once (bool) – If
True
will only prevent execution of the next invocation of the task.
# skip the next execution count_some_beans.revoke(revoke_once=True) # prevent any invocation from executing. count_some_beans.revoke() # prevent any invocation for 24 hours. count_some_beans.revoke(datetime.datetime.now() + datetime.timedelta(days=1))
- revoke_until (datetime) – Prevent the execution of the task until the
given datetime. If
-
is_revoked
([dt=None])¶ Check whether the given task is revoked. If
dt
is specified, it will check if the task is revoked with respect to the given datetime.Parameters: dt (datetime) – If provided, checks whether task is revoked at the given datetime
-
restore
()¶ Clears any revoked status and allows the task to run normally.
-
s
([*args[, **kwargs]])¶ Create a task instance representing the invocation of the user function with the given arguments and keyword-arguments. The resulting task instance is not enqueued automatically.
To illustrate the distinction, when you call a
task()
-decorated function, behind-the-scenes, Huey is doing something like this:@huey.task() def add(a, b): return a + b result = add(1, 2) # Is equivalent to: task = add.s(1, 2) result = huey.enqueue(task)
Parameters: - args – Arguments for user-defined function.
- kwargs – Keyword arguments for user-defined function.
Returns: a
QueueTask
instance representing the execution of the user-defined function with the given arguments.Typically, one will use the
TaskWrapper.s()
helper when creating task execution pipelines.For example:
add_task = add.s(1, 2) # Represent task invocation. pipeline = (add_task .then(add, 3) # Call add() with previous result and 3. .then(add, 4) # etc... .then(add, 5)) results = huey.enqueue(pipeline) # Print results of above pipeline. print([result.get(blocking=True) for result in results]) # [3, 6, 10, 15]
-
task_class
¶ Store a reference to the task class for the decorated function.
>>> count_some_beans.task_class tasks.queuecmd_count_beans
-
class
QueueTask
([data=None[, task_id=None[, execute_time=None[, retries=None[, retry_delay=None[, on_complete=None]]]]]])¶ The
QueueTask
class represents the execution of a function. Instances of the class are serialized and enqueued for execution by the consumer, which deserializes them and executes the function.Note
You should not need to create instances of
QueueTask
directly, but instead use either theHuey.task()
decorator or theTaskWrapper.s()
method.Parameters: - data – Data specific to this execution of the task. For
task()
-decorated functions, this will be a tuple of the(args, kwargs)
the function was invoked with. - task_id (str) – The task’s ID, defaults to a UUID if not provided.
- execute_time (datetime) – Time at which task should be executed.
- retries (int) – Number of times to retry task upon failure/exception.
- retry_delay (int) – Number of seconds to wait before retrying a failed task.
- on_complete (QueueTask) – Task to execute upon completion of this task.
Here’s a refresher on how tasks work:
@huey.task() def add(a, b): return a + b ret = add(1, 2) print(ret.get(blocking=True)) # "3". # The above two lines are equivalent to: task_instance = add.s(1, 2) # Create a QueueTask instance. ret = huey.enqueue(task_instance) # Enqueue the queue task. print(ret.get(blocking=True)) # "3".
-
then
(task[, *args[, **kwargs]])¶ Parameters: - task (TaskWrapper) – A
task()
-decorated function. - args – Arguments to pass to the task.
- kwargs – Keyword arguments to pass to the task.
Returns: The parent task.
The
then()
method is used to create task pipelines. A pipeline is a lot like a unix pipe, such that the return value from the parent task is then passed (along with any parameters specified byargs
andkwargs
) to the child task.Here’s an example of chaining some addition operations:
add_task = add.s(1, 2) # Represent task invocation. pipeline = (add_task .then(add, 3) # Call add() with previous result and 3. .then(add, 4) # etc... .then(add, 5)) results = huey.enqueue(pipeline) # Print results of above pipeline. print([result.get(blocking=True) for result in results]) # [3, 6, 10, 15]
If the value returned by the parent function is a
tuple
, then the tuple will be used to update the*args
for the child function. Likewise, if the parent function returns adict
, then the dict will be used to update the**kwargs
for the child function.Example of chaining fibonacci calculations:
@huey.task() def fib(a, b=1): a, b = a + b, a return (a, b) # returns tuple, which is passed as *args pipe = (fib.s(1) .then(fib) .then(fib)) results = huey.enqueue(pipe) print([result.get(blocking=True) for result in results]) # [(2, 1), (3, 2), (5, 3)]
- task (TaskWrapper) – A
- data – Data specific to this execution of the task. For
-
crontab
(month='*', day='*', day_of_week='*', hour='*', minute='*')¶ Convert a “crontab”-style set of parameters into a test function that will return
True
when a givendatetime
matches the parameters set forth in the crontab.Day-of-week uses 0=Sunday and 6=Saturday.
Acceptable inputs:
- “*” = every distinct value
- “*/n” = run every “n” times, i.e. hours=’*/4’ == 0, 4, 8, 12, 16, 20
- “m-n” = run every time m..n
- “m,n” = run on m and n
Return type: a test function that takes a datetime
and returns a booleanNote
It is currently not possible to run periodic tasks with an interval less than once per minute. If you need to run tasks more frequently, you can create a periodic task that runs once per minute, and from that task, schedule any number of sub-tasks to run after the desired delays.
TaskResultWrapper¶
-
class
TaskResultWrapper
(huey, task)¶ Although you will probably never instantiate an
TaskResultWrapper
object yourself, they are returned by any calls totask()
decorated functions (provided that huey is configured with a result store). TheTaskResultWrapper
talks to the result store and is responsible for fetching results from tasks.Once the consumer finishes executing a task, the return value is placed in the result store, allowing the producer to retrieve it.
Note
By default, the data is removed from the result store after being read, but this behavior can be disabled.
Getting results from tasks is very simple:
>>> from main import count_some_beans >>> res = count_some_beans(100) >>> res # what is "res" ? <huey.queue.TaskResultWrapper object at 0xb7471a4c> >>> res() # Fetch the result of this task. 'Counted 100 beans'
What happens when data isn’t available yet? Let’s assume the next call takes about a minute to calculate:
>>> res = count_some_beans(10000000) # let's pretend this is slow >>> res.get() # Data is not ready, so None is returned. >>> res() is None # We can omit ".get", it works the same way. True >>> res(blocking=True, timeout=5) # Block for up to 5 seconds Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/charles/tmp/huey/src/huey/huey/queue.py", line 46, in get raise DataStoreTimeout huey.exceptions.DataStoreTimeout >>> res(blocking=True) # No timeout, will block until it gets data. 'Counted 10000000 beans'
If the task failed with an exception, then a
TaskException
will be raised when reading the result value:>>> @huey.task() ... def fails(): ... raise Exception('I failed') >>> res = fails() >>> res() # raises a TaskException! Traceback (most recent call last): File "<stdin>", line 1, in <module> File "/home/charles/tmp/huey/src/huey/huey/api.py", line 684, in get raise TaskException(result.metadata) huey.exceptions.TaskException: Exception('I failed',)
-
get
([blocking=False[, timeout=None[, backoff=1.15[, max_delay=1.0[, revoke_on_timeout=False[, preserve=False]]]]]])¶ Attempt to retrieve the return value of a task. By default,
get()
will simply check for the value, returningNone
if it is not ready yet. If you want to wait for a value, you can specifyblocking=True
. This will loop, backing off up to the providedmax_delay
, until the value is ready or thetimeout
is reached. If thetimeout
is reached before the result is ready, aDataStoreTimeout
exception will be raised.Warning
By default the result store will delete a task’s return value after the value has been successfully read (by a successful call to the
result()
orTaskResultWrapper.get()
methods). If you need to use the task result multiple times, you must specifypreserve=True
when calling these methods.Note
Instead of calling
.get()
, you can simply call theTaskResultWrapper
object directly. Both methods accept the same parameters.Parameters: - blocking (bool) – whether to block while waiting for task result
- timeout – number of seconds to block (if
blocking=True
) - backoff – amount to backoff delay each iteration of loop
- max_delay – maximum amount of time to wait between iterations when attempting to fetch result.
- revoke_on_timeout (bool) – if a timeout occurs, revoke the task, thereby preventing it from running if it is has not started yet.
- preserve (bool) – see the above warning. When set to
True
, this parameter ensures that the task result should be preserved after having been successfully retrieved.
-
revoke
()¶ Revoke the given task. Unless it is in the process of executing, it will be revoked and the task will not run.
in_an_hour = datetime.datetime.now() + datetime.timedelta(seconds=3600) # run this command in an hour res = count_some_beans.schedule(args=(100000,), eta=in_an_hour) # oh shoot, I changed my mind, do not run it after all res.revoke()
-
restore
()¶ Restore the given task instance. Unless the task instance has already been dequeued and discarded, it will be restored and run as scheduled.
Warning
If the task class itself has been revoked, then this method has no effect.
-
is_revoked
()¶ Return a boolean value indicating whether this particular task instance or the task class itself has been revoked.
See also:
Huey.is_revoked()
.
-
reschedule
([eta=None[, delay=None[, convert+utc=True]]])¶ Reschedule the given task. The original task instance will be revoked, but no checks are made to verify that it hasn’t already been executed.
If neither an
eta
nor adelay
is specified, the task will be run as soon as it is received by a worker.Parameters: - eta (datetime) – the time at which the function should be
executed. See note below on how to correctly specify the
eta
whether the consumer is running in UTC- or - delay (int) – number of seconds to wait before executing function
- convert_utc – whether the
eta
ordelay
should be converted from local time to UTC. Defaults toTrue
. See the note in theschedule()
method ofHuey.task()
for more information.
Return type: TaskResultWrapper
object for the new task.- eta (datetime) – the time at which the function should be
executed. See note below on how to correctly specify the
-
reset
()¶ Reset the cached result and allow re-fetching a new result for the given task (i.e. after a task error and subsequent retry).
-