This document describes the current stable version of Celery (3.1). For development docs, go here.

celery — Distributed processing


This module is the main entry-point for the Celery API. It includes commonly needed things for calling tasks, and creating Celery applications.

Celery celery application instance
group group tasks together
chain chain tasks together
chord chords enable callbacks for groups
signature object describing a task invocation
current_app proxy to the current application instance
current_task proxy to the currently executing task

Celery application objects

バージョン 2.5 で追加.

class celery.Celery(main='__main__', broker='amqp://localhost//', )[ソース]
パラメタ:
  • main – Name of the main module if running as __main__. This is used as a prefix for task names.
  • broker – URL of the default broker used.
  • loader – The loader class, or the name of the loader class to use. Default is celery.loaders.app.AppLoader.
  • backend – The result store backend class, or the name of the backend class to use. Default is the value of the CELERY_RESULT_BACKEND setting.
  • amqp – AMQP object or class name.
  • events – Events object or class name.
  • log – Log object or class name.
  • control – Control object or class name.
  • set_as_current – Make this the global current app.
  • tasks – A task registry or the name of a registry class.
  • include – List of modules every worker should import.
  • fixups – List of fixup plug-ins (see e.g. celery.fixups.django).
  • autofinalize – If set to False a RuntimeError will be raised if the task registry or tasks are used before the app is finalized.
main

Name of the __main__ module. Required for standalone scripts.

If set this will be used instead of __main__ when automatically generating task names.

conf[ソース]

Current configuration.

user_options

Custom options for command-line programs. See Adding new command-line options

steps

Custom bootsteps to extend and modify the worker. See Installing Bootsteps.

current_task

The instance of the task that is being executed, or None.

amqp[ソース]

AMQP related functionality: amqp.

backend[ソース]

Current backend instance.

loader[ソース]

Current loader instance.

control[ソース]

Remote control: control.

events[ソース]

Consuming and sending events: events.

log[ソース]

Logging: log.

tasks[ソース]

Task registry.

Accessing this attribute will also finalize the app.

pool

Broker connection pool: pool. This attribute is not related to the workers concurrency pool.

Task[ソース]

Base task class for this app.

timezone[ソース]

Current timezone for this app. This is a cached property taking the time zone from the CELERY_TIMEZONE setting.

close()[ソース]

Close any open pool connections and do any other steps necessary to clean up after the application.

Only necessary for dynamically created apps for which you can use the with statement instead:

with Celery(set_as_current=False) as app:
    with app.connection() as conn:
        pass
signature()[ソース]

Return a new Signature bound to this app. See signature()

bugreport()[ソース]

Return a string with information useful for the Celery core developers when reporting a bug.

config_from_object(obj, silent=False, force=False)[ソース]

Reads configuration from object, where object is either an object or the name of a module to import.

パラメタ:
  • silent – If true then import errors will be ignored.
  • force – Force reading configuration immediately. By default the configuration will be read only when required.
>>> celery.config_from_object("myapp.celeryconfig")

>>> from myapp import celeryconfig
>>> celery.config_from_object(celeryconfig)
Celery.config_from_envvar(variable_name,
silent=False, force=False)

Read configuration from environment variable.

The value of the environment variable must be the name of a module to import.

>>> os.environ["CELERY_CONFIG_MODULE"] = "myapp.celeryconfig"
>>> celery.config_from_envvar("CELERY_CONFIG_MODULE")
autodiscover_tasks(packages, related_name="tasks")[ソース]

With a list of packages, try to import modules of a specific name (by default ‘tasks’).

For example if you have an (imagined) directory tree like this:

foo/__init__.py
   tasks.py
   models.py

bar/__init__.py
    tasks.py
    models.py

baz/__init__.py
    models.py

Then calling app.autodiscover_tasks(['foo', bar', 'baz']) will result in the modules foo.tasks and bar.tasks being imported.

パラメタ:
  • packages – List of packages to search. This argument may also be a callable, in which case the value returned is used (for lazy evaluation).
  • related_name – The name of the module to find. Defaults to “tasks”, which means it look for “module.tasks” for every module in packages.
  • force – By default this call is lazy so that the actual autodiscovery will not happen until an application imports the default modules. Forcing will cause the autodiscovery to happen immediately.
add_defaults(d)[ソース]

Add default configuration from dict d.

If the argument is a callable function then it will be regarded as a promise, and it won’t be loaded until the configuration is actually needed.

This method can be compared to:

>>> celery.conf.update(d)

with a difference that 1) no copy will be made and 2) the dict will not be transferred when the worker spawns child processes, so it’s important that the same configuration happens at import time when pickle restores the object on the other side.

setup_security()[ソース]

Setup the message-signing serializer. This will affect all application instances (a global operation).

Disables untrusted serializers and if configured to use the auth serializer will register the auth serializer with the provided settings into the Kombu serializer registry.

パラメタ:
  • allowed_serializers – List of serializer names, or content_types that should be exempt from being disabled.
  • key – Name of private key file to use. Defaults to the CELERY_SECURITY_KEY setting.
  • cert – Name of certificate file to use. Defaults to the CELERY_SECURITY_CERTIFICATE setting.
  • store – Directory containing certificates. Defaults to the CELERY_SECURITY_CERT_STORE setting.
  • digest – Digest algorithm used when signing messages. Default is sha1.
  • serializer – Serializer used to encode messages after they have been signed. See CELERY_TASK_SERIALIZER for the serializers supported. Default is json.
start(argv=None)[ソース]

Run celery using argv.

Uses sys.argv if argv is not specified.

task(fun, )[ソース]

Decorator to create a task class out of any callable.

Examples:

@app.task
def refresh_feed(url):
    return …

with setting extra options:

@app.task(exchange="feeds")
def refresh_feed(url):
    return …

App Binding

For custom apps the task decorator will return a proxy object, so that the act of creating the task is not performed until the task is used or the task registry is accessed.

If you are depending on binding to be deferred, then you must not access any attributes on the returned object until the application is fully set up (finalized).

send_task(name[, args[, kwargs[, ]]])[ソース]

Send task by name.

パラメタ:
  • name – Name of task to call (e.g. “tasks.add”).
  • result_cls – Specify custom result class. Default is using AsyncResult().

Otherwise supports the same arguments as Task.apply_async().

AsyncResult[ソース]

Create new result instance. See AsyncResult.

GroupResult[ソース]

Create new group result instance. See GroupResult.

worker_main(argv=None)[ソース]

Run celery worker using argv.

Uses sys.argv if argv is not specified.

Worker[ソース]

Worker application. See Worker.

WorkController[ソース]

Embeddable worker. See WorkController.

Beat[ソース]

Celerybeat scheduler application. See Beat.

connection(url=default[, ssl[, transport_options={}]])[ソース]

Establish a connection to the message broker.

パラメタ:
  • url – Either the URL or the hostname of the broker to use.
  • hostname – URL, Hostname/IP-address of the broker. If an URL is used, then the other argument below will be taken from the URL instead.
  • userid – Username to authenticate as.
  • password – Password to authenticate with
  • virtual_host – Virtual host to use (domain).
  • port – Port to connect to.
  • ssl – Defaults to the BROKER_USE_SSL setting.
  • transport – defaults to the BROKER_TRANSPORT setting.

:returns kombu.Connection:

connection_or_acquire(connection=None)[ソース]

For use within a with-statement to get a connection from the pool if one is not already provided.

パラメタ:connection – If not provided, then a connection will be acquired from the connection pool.
producer_or_acquire(producer=None)[ソース]

For use within a with-statement to get a producer from the pool if one is not already provided

パラメタ:producer – If not provided, then a producer will be acquired from the producer pool.
mail_admins(subject, body, fail_silently=False)[ソース]

Sends an email to the admins in the ADMINS setting.

select_queues(queues=[])[ソース]

Select a subset of queues, where queues must be a list of queue names to keep.

now()[ソース]

Return the current time and date as a datetime object.

set_current()[ソース]

Makes this the current app for this thread.

finalize()[ソース]

Finalizes the app by loading built-in tasks, and evaluating pending task decorators

on_configure()[ソース]

Optional callback for when the first time the configured is required.

Pickler

Helper class used to pickle this application.

Canvas primitives

See Canvas: ワークフローの設計 for more about creating task workflows.

class celery.group(task1[, task2[, task3[, … taskN]]])[ソース]

Creates a group of tasks to be executed in parallel.

Example:

>>> res = group([add.s(2, 2), add.s(4, 4)])()
>>> res.get()
[4, 8]

A group is lazy so you must call it to take action and evaluate the group.

Will return a group task that when called will then call all of the tasks in the group (and return a GroupResult instance that can be used to inspect the state of the group).

class celery.chain(task1[, task2[, task3[, … taskN]]])[ソース]

Chains tasks together, so that each tasks follows each other by being applied as a callback of the previous task.

If called with only one argument, then that argument must be an iterable of tasks to chain.

Example:

>>> res = chain(add.s(2, 2), add.s(4))()

is effectively (2 + 2) + 4):

>>> res.get()
8

Calling a chain will return the result of the last task in the chain. You can get to the other tasks by following the result.parent‘s:

>>> res.parent.get()
4
class celery.chord(header[, body])[ソース]

A chord consists of a header and a body. The header is a group of tasks that must complete before the callback is called. A chord is essentially a callback for a group of tasks.

Example:

>>> res = chord([add.s(2, 2), add.s(4, 4)])(sum_task.s())

is effectively \Sigma ((2 + 2) + (4 + 4)):

>>> res.get()
12

The body is applied with the return values of all the header tasks as a list.

class celery.signature(task=None, args=(), kwargs={}, options={})[ソース]

Describes the arguments and execution options for a single task invocation.

Used as the parts in a group or to safely pass tasks around as callbacks.

Signatures can also be created from tasks:

>>> add.subtask(args=(), kwargs={}, options={})

or the .s() shortcut:

>>> add.s(*args, **kwargs)
パラメタ:
  • task – Either a task class/instance, or the name of a task.
  • args – Positional arguments to apply.
  • kwargs – Keyword arguments to apply.
  • options – Additional options to Task.apply_async().

Note that if the first argument is a dict, the other arguments will be ignored and the values in the dict will be used instead.

>>> s = signature("tasks.add", args=(2, 2))
>>> signature(s)
{"task": "tasks.add", args=(2, 2), kwargs={}, options={}}
__call__(*args **kwargs)

Call the task directly (in the current process).

delay(*args, **kwargs)

Shortcut to apply_async().

apply_async(args=(), kwargs={}, )

Apply this task asynchronously.

パラメタ:
  • args – Partial args to be prepended to the existing args.
  • kwargs – Partial kwargs to be merged with the existing kwargs.
  • options – Partial options to be merged with the existing options.

See apply_async().

apply(args=(), kwargs={}, )

Same as apply_async() but executed the task inline instead of sending a task message.

freeze(_id=None)

Finalize the signature by adding a concrete task id. The task will not be called and you should not call the signature twice after freezing it as that will result in two task messages using the same task id.

戻り値:app.AsyncResult instance.
clone(args=(), kwargs={}, )

Return a copy of this signature.

パラメタ:
  • args – Partial args to be prepended to the existing args.
  • kwargs – Partial kwargs to be merged with the existing kwargs.
  • options – Partial options to be merged with the existing options.
replace(args=None, kwargs=None, options=None)

Replace the args, kwargs or options set for this signature. These are only replaced if the selected is not None.

Add a callback task to be applied if this task executes successfully.

戻り値:other_signature (to work with reduce()).

Add a callback task to be applied if an error occurs while executing this task.

戻り値:other_signature (to work with reduce())
set()

Set arbitrary options (same as .options.update(…)).

This is a chaining method call (i.e. it will return self).

Gives a recursive list of dependencies (unchain if you will, but with links intact).

Proxies

celery.current_app

The currently set app for this thread.

celery.current_task

The task currently being executed (only set in the worker, or when eager/apply is used).