Djangoでceleryをつかって定期実行。

GEMINIにCeleryについて聞いてみた。
 Celery (ソフトウェア)

  • 概要: Pythonで書かれた、分散メッセージパッシングをベースとしたオープンソースの非同期タスクキュー/ジョブキューです。
  • 特徴:
    • 非同期処理: 時間のかかる処理をバックグラウンドで実行し、アプリケーションの応答性を向上させます。
    • 分散処理: 複数のワーカー(処理を実行するプロセス)を起動し、タスクを分散して処理することで、高い並行性とスケーラビリティを実現します。
    • メッセージキュー: RabbitMQやRedisなどのメッセージブローカーを利用して、タスクの管理とワーカーへの配信を行います。
    • 信頼性: タスクの実行失敗時のリトライ機能や、結果の追跡機能などを備えています。
    • 柔軟性: 多くの設定オプションや拡張機能があり、様々な要件に対応できます。
  • 用途:
    • Webアプリケーションでのメール送信、画像処理、バッチ処理など、時間のかかる処理のオフロード。
    • 機械学習モデルのトレーニングや推論処理。
    • 定期的なタスクのスケジューリング。

便利そうなので、使ってみる。
①pip installする。(requirements.txtに記述)
        ・celery==5.5.1 #本体
        ・django-celery-beat==2.7.0 #celery定期実行機能
        ・django-celery-results==2.6.0 #celecy実行結果DB保存
  
  ※Redisじゃなくても良いらしいが、今回はRedis使用。
   Redis本体はDockerで構築。Redis用のライブラリをインストール。
  ・channels-redis==4.2.1

  
②settings.py編集する。
   以下を追加。
   
  ・INSTALLED_APPS += [   'django_celery_results',  'django_celery_beat', ] 
   ライブラリインストールしたら登録。いつもの作業。

  ・CELERY_BROKER_URL = 'redis://redis:6379/15'
   Celeryのタスクメッセージを一時的に保存する場所(キュー:FIFO)。
   このキューからタスクを取り出して処理します。
   Redisは0~15のデーターベースナンバーを指定可能。(当然他で使用していたら同じ番号はダメ)
   
  ・CELERY_RESULT_BACKEND = 'django-db'  
   django-celery-results使用。celeryの実行状況の保管先。 'django-db'とは、
   Djangoで使用しているデーターベースを指定している。
   

  ・CELERY_ACCEPT_CONTENT = ['json']
  ・CELERY_TASK_SERIALIZER = 'json'
   タスクメッセージやらをjson形式で行う設定。基本jsonにしとけば良いみたい。

  ・CELERY_RESULT_EXTENDED = True
   タスクのより詳細な動作状況を結果バックエンドに保存するかどうかを設定。
   Falseにしてもdjango-celery-resultsを設定していれば基本的な情報は保管されるとの
   こと。問題なければTrueにしておけば良さそう。

③プロジェクトフォルダ内celery.py作成。__init__.pyの修正。
   例としてのプロジェクトパターンが以下の通り。
   





####################################################

Celery.pyを以下のように作成。

from __future__ import absolute_import, unicode_literals
import os
from celery import Celery

# Django settingsモジュールの指定
os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

app = Celery('config')

# Django settingsからCelery設定を読み込む
app.config_from_object('django.conf:settings', namespace='CELERY')

# 自動でtasks.pyを探す
app.autodiscover_tasks()

@app.task(bind=True)
def debug_task(self):
print(f'Request: {self.request!r}')

####################################################

  ・from __future__ import absolute_import, unicode_literalsは、
   Python 2 系から Python 3 系への移行をスムーズにするための記述。

   おまもり。

  ・以下プログラムの'config'部分はプロジェクト名。
   プロジェクトによって変更の必要あり。

   os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'config.settings')

   app = Celery('config')

   app.config_from_object('django.conf:settings', namespace='CELERY')の

   

   

####################################################

・__init__.pyの変更。

from __future__ import absolute_import, unicode_literals

# celeryを起動する
from .celery import app as celery_app

__all__ = ('celery_app',)

####################################################


④tasks.py作成。

     ・アプリフォルダ内にtasks.pyを作成。以下は3つのタスクを作成した例。

            「@shared_task」デコレーターを付与した関数を管理実行する。

from celery import shared_task



@shared_task
def task1():
print("Task 1 is running!")
"""
タスクを記述

"""


@shared_task
def task2():
print("Task 2 is running!")
"""
タスクを記述
"""


@shared_task
def task3():
print("Task 3 is running!")
"""
タスクを記述
"""

####################################################


  これでCeleryをDjangoへの実装は終了。

⑤Celeryの実行。

  ・データーベースのマイグレーションする。

   Djangoのいつものマイグレーションコマンドを実行すると、

            Celery分(django-celery-results, django-celery-beat)も

   マイグレーションしてくれるとのこと。

            個別でも出来るが今回は、いつものコマンドで一括で行う。

   python manage.py migrate


  ・Celeryを実行。

            Celeryワーカー起動コマンド

   config部分がプロジェクト名。 --loglevelはinfoを指定。

   celery -A config worker --loglevel=info
            

   

            --scheduler:使用するスケジューラを指定

   (ここではDjango連携用のデータベーススケジューラ) 

   celery -A config beat --loglevel=info --scheduler    django_celery_beat.schedulers:DatabaseScheduler


   以上でCeleryの実装終了。



Celery動作設定
Celeryの実行タイミングの設定はDjangoの管理画面で行う。





・Clocked:タスク実行日付+時刻の設定。

・Crontab:タスク実行時刻+曜日の設定。

・Interval :タスク実行の間隔の設定。

・Periodic task:実行するタスクと、Clocked、Crontab、Intervalどのイベントにするかを設定。

・solar event :指定可能な天文イベント。


Celery便利そう。



コメント