Celery のワークフロー

Canvas: Designing Workflows

Celery のワークフローは強力な機能だけど、最初は取っ掛かりにくいのが Primitives 周り。
Primitives と聞くと訳し難くくてよく分からないけど、プリミティブ型(組み込み型、基本型)と同じような Celery タスク型の種類だと自分では理解している。

タスクをワークフローのコンポーネントとして扱う時に、タスクを Signiture 型オブジェクトにする必要がある。それがドキュメントにある

@app.task()
def add(n1, n2):
    return n1 + n2


from celery import signature
s = signature('tasks.add', args=(2, 2), countdown=10)
type(s)  # celery.canvas.Signature

こういうことで、短縮すると

s = add.s(2, 2)
type(s)  # celery.canvas.Signature

こうなる。(この短縮形の場合は countdown=10 は含まれない)
また、以下の各行は同じことをしている。但し上の 3 つは Signature オブジェクトを生成してから実行している。

signature('tasks.add', args=(2, 2)).delay()
add.subtask((2, 2)).delay()
add.s(2, 2).delay()

add.deplay(2, 2)

上記例では Signature を生成する際に足し算の引数を指定しているが、引数は空でもよい。

s = add.s()
res = s.delay(2, 2)
res.get()  # 4

引数を部分的に指定した Signature を生成することもでき、これは Partial と呼ばれる。Scala 言語とかでもあるやつ。

plus1 = add.s(1)
type(plus1)  # celery.canvas.Signature
res = plus1.delay(2)
res.get()  # 3

このシグニチャを組み合わせて Primitives のワークフローを実現できる。

あるタスクの結果を、次のタスクに渡す Primitives の「Chain」は、短縮形で書く場合はパイプ(|)でシグニチャを連結する。
1 + 2 の結果の 3 を上記の Parital である plus1 の引数として渡して、結果を求める場合は次のように書く。

c = add.s(1, 2) | plus1
type(c)  # celery.canvas.chain
res = c.delay()  # <AsyncResult: e150ee02-420b-4400-8779-30d007d54a99>
res.get()  # 4

「Group」は指定された複数のタスクを並列に実行するもので、Group の結果をチェーンするのが「Chord」。ちなみに "Chord" はギターのコードなどの "コード"。

@app.task()
def xsum(*numbers):
    return sum(*numbers)


c = chord([add.s(1, 1), add.s(2, 2), add.s(3, 3)], xsum)
type(c)  # celery.canvas.chord
res = c.delay()
res.get()  # 12

この Chord の場合、最初の引数で指定されたリストにある 3 つのタスクを Group として並列に実行し、その結果の [2, 4, 6]xsum 関数のタスクの引数として渡している。

この例のタスクは簡単すぎるため、ワーカーを複数回実行するコストの方が高くつくが、それぞれのタスクが重たい処理を行う場合は結果を待ってブロックするより Chain や Chord を使った方がよい。その辺についてはタスクの Performance and Strategies で述べられている。

あと、link 引数を使ったコールバックは、前述の Chain とはちょっと異なるので注意。

res = add.apply_async((1, 2), link=plus1)
res.get()  # 3

ここでは 1 + 2 の結果の 3 が plus1 に渡って 4 の結果を返して欲しいが、link 先まで含んだ結果は返ってこない。

Last updated on May 13, 2016