Djangoで非同期処理 Celery×RabbitMQ×flower

概要

Webアプリで重い処理を実装したい場合、非同期処理を取り入れることになるかと思います。

1つ目の非同期処理の実装はフロントエンド部分が考えられます。例えばWebページ上の何らかの実行ボタンを押した際に、同期処理の場合はバックエンドでの実行が終わるまでページが読み込み中になりますが、非同期処理を取り入れることで、バックエンドの処理を待たずにページの更新等が行えます。これはJavascript等で実装されます。

2つ目の非同期処理はバックエンド部分が考えられます。フロントエンドの実行ボタンをトリガーにバックエンドで処理が開始されますが、バックエンド側が同期処理する実装になっている場合、いくらフロント側が非同期だとしても、処理が終わるまで別のタスクを実行できません。バックエンドでも非同期処理を実装することで、重い処理を待たずに別の処理を開始することができます。

この記事で紹介するのは上でいうと2つ目、バックエンド側の非同期処理をよしなにやってくれるツールのCeleryです。

Celeryとは?RabbitMQとflowerとの関係性

色々なツール名が登場していますが、主役はCeleryです。

CeleryはPythonで非同期処理を実装するためのツールです。タスク(重い処理など)をキューで管理してスケジューリングしてくれます。ただ、Celery単体では動作せず、キューとして管理するための「ブローカー」という存在が必要です。

そのブローカーがRabbitMQです。ブローカーは何しらのプログラム(クライアントという)から非同期処理をするジョブを受け取って一時的に保持しておきます。ブローカーにジョブが格納されると、Celeryはそれを検知して実行します。(Celery側をワーカーという)

Celeryで使用できるブローカーとして、RabbitMQ以外にもRedis、一部機能が制限されますがAmazonSQSなども使用できます。本当はRedisを使用したかったのですがWindowsに非対応だったため、今回はRabbitMQを選択しています。

このような構成にすることで、重い処理がたくさん発生した場合でもすぐには実行されず、一旦ブローカーに保持されてから順々に非同期で実行されていきます。

最後にflowerについて、これはCeleryのモニタリングツールのため導入は必須ではありませんが、導入することで各タスクのステータスを確認することができ、実行に成功/失敗しているかなどを確認することができます。

環境

Windows11・Celery・RabbitMQ・flower・Django

Dockerは使用しません。

各種ツールのインストール

Celeryとflowerはpipで導入できます。最後のeventletについて、実はCeleryのバージョン4から公式にはWindowsに非対応となったため、その回避策で導入します。linux環境では必要ありません。

pip install celery
pip install flower

pip install eventlet

ブローカーであるRabbitMQはchocolateyでインストールします。chocolateyが導入されていない場合はそちらを先に済ませてください。インストーラー版もありますが、Erlangという言語の実行環境を別でインストールする必要があるため、すべてを1コマンドで導入できるchocolateyの方がおすすめです。

重要なのは、「管理者権限で起動したPowershellを使って」以下のコマンドを実行することです。

choco install rabbitmq

非同期処理のデモアプリを作成

ページにアクセスするとボタンがあり、そのボタンを押すと10秒間待機するプログラムを非同期で実行する簡単なアプリです。

プロジェクトの立ち上げ、アプリの立ち上げをします。

python -m django startproject project

cd project

python manage.py startapp app

settings.pyを以下のように変更します。CELERY_BROKER_URLでブローカーとしてRabbitMQを使用するための設定をしています。

~略~
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'app', #アプリ名を追加
]

~略~

TEMPLATES = [
    {
        "BACKEND": "django.template.backends.django.DjangoTemplates",
        "DIRS": [BASE_DIR, "templates"], #変更
        "APP_DIRS": True,
        "OPTIONS": {
            "context_processors": [
                "django.template.context_processors.debug",
                "django.template.context_processors.request",
                "django.contrib.auth.context_processors.auth",
                "django.contrib.messages.context_processors.messages",
            ],
        },
    },
]

~略~

#以下を追加
CELERY_ACCEPT_CONTENT = ["json"]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZZER = "json"

CELERY_BROKER_URL = "amqp://localhost"
CELERY_CACHE_BACKEND = "django-cache"
CELERY_RESULT_EXTENDED = True

プロジェクト側(project>projectの中)のurls.pyに記述を追加します。

from django.contrib import admin
from django.urls import path, include

urlpatterns = [
    path("admin/", admin.site.urls),
    path("", include("app.urls")), #追加
]

続いてアプリ側(project>appの中)にurls.pyを新規作成し、以下のように実装します。

from django.urls import path
from . import views

urlpatterns = [
    path("", views.top_page, name=""),
    path("do-task", views.do_task, name="do-task"),
]

manage.pyと同じ階層に「templates」フォルダーを作成し、その中にindex.htmlを新規作成し、以下のように記述します。

<!DOCTYPE html>
<html lang="ja">
<head>
    <meta charset="UTF-8">
    <title>テスト</title>
</head>
<body>
    <h1>テスト</h1>
    <p>非同期で10秒数えます。</p>
    <a href="do-task"><button>実行</button></a>
</body>
</html>

projectフォルダ内(project>projectの中)の__init__.pyに以下を実装します。

from .celery import app as celery_app

__all__ = ["celery_app"]

projectフォルダ内(project>projectの中)に「celery.py」を新規作成し、以下のように実装します。

import os
from celery import Celery

os.environ.setdefault("DJANGO_SETTINGS_MODULE", "project.settings")

app = Celery("project") #プロジェクト名を指定
app.config_from_object("django.conf:settings", namespace="CELERY")
app.autodiscover_tasks()

appフォルダ内(project>appの中)に「tasks.py」を新規作成し、以下のように実装します。

非同期処理で実行したい関数の前に@shared_task()を付与する点がポイントです。

from celery import shared_task
import time


@shared_task()
def time_count():
    time.sleep(10)
    print("Done!! By tasks.py")

views.pyを以下のように実装します。

呼び出し時に、.delay()をつけることで非同期で呼び出すことができます。引数がある場合はカッコ内に入れることで値を渡せます。

from django.shortcuts import render, redirect
import time
from app.tasks import *


def top_page(request):
    return render(request, "index.html")


def do_task(request):
    print("Before time_count function...")
    task = time_count.delay()
    print(task)
    print("After time_count function")
    return redirect("/")

実装は以上です。

アプリを実行

アプリの実行にあたって、manage.pyと同じ階層でコマンドを3つ開いてください。

1つ目はDjangoの実行用、2つ目はCeleryの実行用、3つ目はflowerの実行用です。(RabbitMQはWindowsにインストールした時点で自動的に起動しているので操作は不要です。PCを再起動しても自動的に起動します。)

Djangoの実行

python manage.py runserver

Celeryの実行(「-P eventlet」はWindowsでCeleryを動かすために付与しています。)

celery -A project worker -l info -P eventlet

flowerの実行

celery -A project flower -l info --port=5555

非同期処理ができているか確認

まずは用意したページ「http://localhost:8000」にアクセスします。

実行ボタンを押すと、Djangoを実行しているコマンド上で以下のようなログが一気に表示されます。

Before time_count function...
56de28ed-490e-4570-954b-3c4cceba3d25
After time_count function

views.pyから重い処理「time_count」を呼び出しているはずですが、処理の完了を待っていないことが確認できます。

これと同時に、Celeryを実行しているコマンド上では以下のログが確認できます。

Task app.tasks.time_count[56de28ed-490e-4570-954b-3c4cceba3d25] received

Celery側でタスクを受け取っていることが確認できます。受け取れているということはジョブの受け渡しを仲介しているRabbitMQも問題なく動作していることが分かります。

そして実行から10秒後、Celeryを実行しているコマンド上で

Done!! By tasks.py

succeeded in 10.046999999991385s: None

のように表示されます。

各ツールの管理ページ

RabbitMQとflowerには管理ページが用意されています。

RabbitMQの管理ページは「http://localhost:15672」です。デフォルトユーザーは「guest」パスワードも「guest」です。

以下のようにキューの状態などを確認できます。

flowerの管理ページは「http://localhost:5555」です。以下のようにタスクのステータスを確認することができます。

タスクのステータスを取得

実際にWebアプリで非同期処理を実装する場合、タスクをCeleryに投げっぱなしで終わらず、タスクの実行状態を逐次確認したり、タスクが終了した際にコールバックしてほしい場合もあります。

こういった場合は「django_celery_results」を使います。

実装その1

タスクIDを用いてステータスを確認します。

django_celery_resultsをインストール

pip install django_celery_results

settings.pyのINSTALLED_APPSに追記、CELERY_RESULT_BACKENDとCELERY_TASK_TRACK_STARTEDの設定を追加します。

CELERY_RESULT_BACKENDを「django-db」にすることで、そのプロジェクトで使用するDBにタスクの結果が保存されます。デフォルトの状態であればSqliteということになります。

もう一つ、CELERY_TASK_TRACK_STARTEDをTrueにすることで、タスクの実行中の状態もDBに保存されます。逆にこれを設定しないと、タスクが完了するまではタスクの状態はDBに登録されません。

INSTALLED_APPS = [
    "django.contrib.admin",
    "django.contrib.auth",
    "django.contrib.contenttypes",
    "django.contrib.sessions",
    "django.contrib.messages",
    "django.contrib.staticfiles",
    "app",
    "django_celery_results", #追記
]

~略~
#追記
CELERY_RESULT_BACKEND = "django-db"
CELERY_TASK_TRACK_STARTED = True

views.pyのdo_task関数に処理を追加します。

AsyncResult(<タスクid>).statusのように記述することでタスクのステータスを取得することができきます。ここでは1秒おきにステータスを取得、それを12秒間行っています。

views.pyにsleep関数を記述してしまっているので、実行ボタンを押した後12秒間はページが読み込み中になり、こうなると非同期処理でも何でもないですが、今回はステータスが取得できることを確認したいだけなので良しとします。

from celery.result import AsyncResult #追加

~略~

def do_task(request):
    print("Before time_count function...")
    task = time_count.delay()
    print("After time_count function")
    #追加
    for _ in range(12):
        print(AsyncResult(task.id).status)
        time.sleep(1)
    return redirect("/")

ここまで実装出来たら、マイグレーションとマイグレートをします。

python manage.py makemigrations

python manage.py migrate

ではDjangoとCelery(とflower)を起動し、ページの実行ボタンを押します。すると、Djangoを起動しているコマンド上で以下のように表示されていくのが確認できます。

タスクが始まる前はPENDING、タスクが始まって10秒間(すなわち処理中)はSTARTED、タスクが終わったらSUCCESSと表示されます。

PENDING
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
STARTED
SUCCESS
SUCCESS

実装その2

TaskResultテーブルからステータスを確認します。フロントエンドからステータスチェックをしたい場合、タスクIDを使う方法よりもこちらの方が実装しやすいのではないかと思います。

まず、views.pyを変更します。(全文)

from django.shortcuts import render, redirect
from app.tasks import *
#追加
from django_celery_results.models import TaskResult
from django.http import JsonResponse


def top_page(request):
    return render(request, "index.html")

#変更
def do_task(request):
    print("Before time_count function...")
    time_count.delay()
    print("After time_count function")
    return redirect("/")

#追加
def check_task(request):
    status = TaskResult.objects.filter(task_name="app.tasks.time_count").first().status
    print(status)
    return JsonResponse({"status": status})

urls.pyに記述を追加します。

path("check-task", views.check_task, name="check-task"),

indes.htmlに記述を追加します。

<a href="check-task"><button>ステータス確認</button></a>

以上を実装できたら、Celeryを再起動します。Celeryを実行しているコマンド上で「Ctrl」+「C」で停止し、再度以下のコマンドで実行します。Celeryを再起動しないとコードの変更が反映されません。

celery -A project worker -l info -P eventlet

では、再度「http://127.0.0.1:8000/」へアクセスします。

「実行」ボタンを押してからすぐに「ステータス確認」ボタンを押すと以下のように「STARTED」ステータスを確認できます。

10秒経過し、タスクが完了すると「SUCCESS」が帰ってきました。

今回の実装ではレスポンスはJsonなので、jQuery等で非同期でステータスチェックする場合も使えます。

管理サイトからの確認

また、django_celery_resultsを使用していることで、タスクのステータスがDjangoの管理サイトからも確認できるようになりました。

その前に、Djangoのスーパーユーザーを未作成の場合は作成しておきます。

python manage.py createsuperuser

以下のように実行したタスクが確認できます。

ここまでくれば、フロント側からステータス状態をチェックしてSTARTEDだったら「実行中・・・」と表示したり、SUCCESSになったらjsonレスポンスを飛ばして「完了しました!」と表示したりすることができます。

デプロイにあたって

DjangoはuWSGIやGunicornで、RabbitMQやflowerはsystemdでデーモン化すれば良いでしょう。

なお、UbuntuでのRabbitMQのインストールは以下のように行いました。

sudo apt update -y
sudo apt install curl gnupg apt-transport-https -y

curl -1sLf "https://keys.openpgp.org/vks/v1/by-fingerprint/0A9AF2115F4687BD29803A206B73A36E6026DFCA" | sudo gpg --dearmor | sudo tee /usr/share/keyrings/com.rabbitmq.team.gpg > /dev/null

sudo tee /etc/apt/sources.list.d/rabbitmq.list <<EOF
deb [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
deb-src [signed-by=/usr/share/keyrings/com.rabbitmq.team.gpg] https://dl.cloudsmith.io/public/rabbitmq/rabbitmq-erlang/deb/ubuntu jammy main
EOF

sudo apt update -y

ErlangとRabbitMQのインストール
sudo apt install -y erlang-base erlang-asn1 erlang-crypto erlang-eldap erlang-ftp erlang-inets erlang-mnesia erlang-os-mon erlang-parsetools erlang-public-key erlang-runtime-tools erlang-snmp erlang-ssl erlang-syntax-tools erlang-tftp erlang-tools erlang-xmerl
sudo apt install -y rabbitmq-server --fix-missing

#RabbitMQのサービスの起動と自動起動設定
sudo systemctl start rabbitmq-server
sudo systemctl enable rabbitmq-server

#ステータス確認
sudo systemctl status rabbitmq-server