Celery ワークフロー Chord の例外処理

Chords - Error handling

Celery ワークフローの Chord のタスク関数内で例外処理についての話。

c = chord([add.s(4, 4), raising_task.s(), add.s(8, 8)])
result = c()
result.get()

上記のコードでは AsyncResult.get() を呼べば、そこで ChordError が送出される。なので result.get() を try 〜 except で囲めば例外処理はできる。
しかし、ある Celery タスクから Chord を実行する場合、AsyncResult.get()Avoid launching synchronous subtasks で言うところの同期処理に該当してしまうので避けなければいけない。

このような場合は CELERY_CHORD_PROPAGATES 設定を False にする。そうすると、以下のコードにおいて fire タスクを実行しても同期処理は発生せず、callback に渡される引数に add() から送出された例外オブジェクトがセットされるので、この関数内で引数が例外かどうかを判定して処理ができる。

@app.task()
def add(a, b):
    if a < 0 or b < 0:
        raise ValueError()

    return a + b

@app.task()
def callback(results):
    print(results)  # [3, ValueError()]

@app.task()
def fire():
    header = [add.s(1, 2), add.s(-1, -1)]
    chord(header)(callback)
Last updated on July 8, 2016