Celery ワークフロー Chain の例外処理

先日の Chord での例外処理に続き、今度は Chain の例外処理について。

@app.task()
def add(a, b):
    if a == b:
        raise RuntimeError("a == b")
    return a + b


@app.task()
def log_error(task_id):
    logger.info("log_error(%s)", task_id)  # [INFO] log_error(b3df340c-e683-4269-8e41-9ca2440342e3)
    result = app.AsyncResult(task_id)
    e = result.result
    logger.info(type(e))  # [INFO] <class 'RuntimeError'>
    logger.info(str(e))  # [INFO] a == b


@app.task()
def fire():
    c = add.s(1, 2) | add.s(3) | add.s(4)
    result = c.apply_async(link_error=log_error.s())

上のように Chain 実行時に link_error 引数でエラー処理関数のサブタスクを指定すると、例外発生時にその関数がタスク ID と共に呼ばれる。ここでは二番目の add.s(3) で発生するが、そうすると当然ながら三番目の add.s(4) が実行されることはない。

(このコードでは Chain を実行する fire() も Celery タスクなので足し算の結果を受け取るには同期処理 result.get() が必要になってしまう良くないものだけど、非同期に例外を処理するサンプルなので無視)

Last updated on July 11, 2016