2023年5月15日
CeleryにおけるSQLAlchemyのセッション管理
前回の記事では SQLAlchemy の Session について解説しました。今回はその応用として、Celery においてどのように Session を管理するかを考えたいと思います。
関連記事:
結論
以下のように DB 操作を行うタスクのためのクラスを作ります。
from typing import Optional from celery import Task from sqlalchemy.orm import Session engine = create_engine(...) SessionLocal = sessionmaker(..., bind=engine) class DatabaseTask(Task): _db: Optional[Session] = None def after_return(self, *args, **kwargs): if self._db is not None: self._db.close() self._db = None @property def db(self): if self._db is None: self._db = SessionLocal() return self._db
デコレータに base=DatabaseTask
bind=True
と指定します。
from celery import Celery app = Celery() @app.task(base=DatabaseTask, bind=True) def create(self: DatabaseTask, name: str): new_user = User(name=name) self.db.add(new_user) self.db.commit()
少し解説
どのようなライフサイクルになるのか
Celery の Task クラスのインスタンスは、ワーカーの起動時に1度だけ作られ、リクエストごとにインスタンス化されるわけではありません。つまり、タスクの処理を記述した関数に Celery.task() デコレータをつけることで、起動時に Task クラスが作られ、インスタンス化され、各リクエスト(ジョブ)は同じインスタンスで実行されます。
状態を保持したいケースでは便利ですが、複数のユーザーのリクエストを同じ Session で扱うのも少し怖いといった都合上、リクエストごとに再作成したほうが好ましいです。after_return() メソッドを実装して、リクエストの終了後に(成功・失敗問わず常に)Session.close() と Session インスタンスの破棄を行います。
@property で db() メソッドを実装することで、継承先の Task クラスから Session を使えるようにします。bind=True
はその際に self.db
と書けるように self を束縛するための設定です。
scoped_session を使うべきか
scoped_session はスレッド・ローカルなスコープでセッションを管理するための Session のレジストリ機能を提供します。
Celery のデフォルト設定では予めフォークされた子プロセスの中でリクエストが同期的に処理されていくので、上のサンプルではローカル・スコープの Session を使っています。