logo

2023515

CeleryにおけるSQLAlchemyのセッション管理

前回の記事では SQLAlchemy の Session について解説しました。今回はその応用として、Celery においてどのように Session を管理するかを考えたいと思います。

関連記事:

結論

以下のように DB 操作を行うタスクのためのクラスを作ります。

from typing import Optional
from celery import Task
from sqlalchemy.orm import Session

engine = create_engine(...)
SessionLocal = sessionmaker(..., bind=engine)

class DatabaseTask(Task):
    _db: Optional[Session] = None

    def after_return(self, *args, **kwargs):
        if self._db is not None:
            self._db.close()
            self._db = None

    @property
    def db(self):
        if self._db is None:
            self._db = SessionLocal()
        return self._db

デコレータに base=DatabaseTask bind=True と指定します。

from celery import Celery

app = Celery()

@app.task(base=DatabaseTask, bind=True)
def create(self: DatabaseTask, name: str):
    new_user = User(name=name)
    self.db.add(new_user)
    self.db.commit()

少し解説

どのようなライフサイクルになるのか

Celery の Task クラスのインスタンスは、ワーカーの起動時に1度だけ作られ、リクエストごとにインスタンス化されるわけではありません。つまり、タスクの処理を記述した関数に Celery.task() デコレータをつけることで、起動時に Task クラスが作られ、インスタンス化され、各リクエスト(ジョブ)は同じインスタンスで実行されます。

状態を保持したいケースでは便利ですが、複数のユーザーのリクエストを同じ Session で扱うのも少し怖いといった都合上、リクエストごとに再作成したほうが好ましいです。after_return() メソッドを実装して、リクエストの終了後に(成功・失敗問わず常に)Session.close() と Session インスタンスの破棄を行います。

@property で db() メソッドを実装することで、継承先の Task クラスから Session を使えるようにします。bind=True はその際に self.db と書けるように self を束縛するための設定です。

scoped_session を使うべきか

scoped_session はスレッド・ローカルなスコープでセッションを管理するための Session のレジストリ機能を提供します。

Celery のデフォルト設定では予めフォークされた子プロセスの中でリクエストが同期的に処理されていくので、上のサンプルではローカル・スコープの Session を使っています。

参考文献