0%

Django-Celery异步任务调度工具

Celery是一个异步任务的调度工具。

Django中使用Celery实现异步或定时任务

Celery 官网:http://www.celeryproject.org/

Celery 官方文档英文版http://docs.celeryproject.org/en/latest/index.html

Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/

Celery简介

Celery是一个异步任务的调度工具。

Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。

在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。

这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。

Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。

架构组成

celery00

Celery的架构由三部分组成,消息中间件(message broker),任务执行单元*(worker)和任务执行结果存储*(task result store)组成。

celery01

可以看到,Celery 主要包含以下几个模块:

  • 任务模块 Task

    包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列

  • 消息中间件 Broker

    Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。

  • 任务执行单元 Worker

    Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它

  • 任务结果存储 Backend

    Backend 用于存储任务的执行结果,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redisMongoDB 等。

安装

1
2
3
4
pip install celery
pip install django-celery-beta #任务发送
pip install django-celery-results #结果存储

配置

确保目录如下:

1
2
3
4
5
6
- proj/
- manage.py
- proj/
- __init__.py
- settings.py
- urls.py

新建proj/proj/celery.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from __future__ import absolute_import, unicode_literals
import os
from celery import Celery
from celery.schedules import crontab, timedelta

os.environ.setdefault('DJANGO_SETTINGS_MODULE', 'proj.settings')
app = Celery('proj')

app.config_from_object('django.conf:settings', namespace='CELERY')

app.autodiscover_tasks() # 自动发现任务文件每个app下的task.py

@app.task(bind=True)
def debug_task(self):
print('Request: {0!r}'.format(self.request))

编辑proj/proj/init.py文件

1
2
3
4
from __future__ import absolute_import, unicode_literals
from .celery import app as celery_app

__all__ = ('celery_app',)

编辑proj/proj/settings.py文件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
INSTALLED_APPS = (
...,
'django_celery_beat',
'django_celery_results',
)


# Celery
CELERY_URL = 'redis://:' + configs['redis']['RedisPassword'] + '@' + configs['redis']['RedisHost'] + ':' + str(
configs['redis']['RedisPort']) + '/' + str(configs['redis']['RedisDb'])
# CELERY_URL = "redis://:{ 密码 }@{ redis地址 }:6379/4"
BROKER_URL = CELERY_URL
CELERY_BROKER_URL = CELERY_URL
# CELERY_BEAT_SCHEDULER = 'django_celery_beat.schedulers:DatabaseScheduler'
# CELERY_RESULT_BACKEND = CELERY_URL
CELERY_RESULT_BACKEND = 'django-db' # 使用django orm 作为结果存储

CELERY_ACCEPT_CONTENT = ['application/json']
CELERY_TASK_SERIALIZER = 'json'
CELERY_RESULT_SERIALIZER = 'json'
CELERY_TIMEZONE = 'Asia/Shanghai'
CELERY_ENABLE_UTC = True
CELERYD_CONCURRENCY = 10

同步数据库

1
2
3
4
5
# 1. 创建更改的文件
python manage.py makemigrations

# 2. 将生成的py文件应用到数据库
python manage.py migrate

创建超级管理员

1
2
3
4
5
6
python manage.py createsuperuser

# 按照提示输入用户名和对应的密码就好了邮箱可以留空,用户名和密码必填

# 修改 用户密码可以用:
python manage.py changepassword username

增加定时任务

编辑proj/proj/celery.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
app.conf.update(
CELERYBEAT_SCHEDULE={
'task_update_ali_ecs': {
'task': 'assets.tasks.task_update_ali_ecs',
'schedule': timedelta(minutes=60),
},
'task_update_ali_rds': {
'task': 'assets.tasks.task_update_ali_rds',
'schedule': timedelta(minutes=60),
},
'task_update_ali_slb': {
'task': 'assets.tasks.task_update_ali_slb',
'schedule': timedelta(minutes=60),
},
'task_update_tx_cns': {
'task': 'assets.tasks.task_update_tx_cns',
'schedule': timedelta(minutes=60),
},
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
)

#https://docs.celeryproject.org/en/stable/userguide/periodic-tasks.html#crontab-schedules

# schedule 可以填写timedelta表示循环时间,隔多久执行一次
# 也可以填写 crontab 表示某个时间去执行,例如示例中,表示 Executes every Monday morning at 7:30 a.m.

新增任务的代码

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
from django_celery_beat.models import PeriodicTask, ClockedSchedule

#比如创建一个发版任务
# 1.新创建时间clockschedule
clock, create = ClockedSchedule.objects.get_or_create(clocked_time=db_deploy_time)

# 2.创建任务
PeriodicTask.objects.create(
clocked=clock,
name='vrigo-deploy_' + db_data['service_list'] + '-' + str(db_deploy_time),
task='services.tasks.task_build_job',
args=json.dumps([services, int(id)]),
start_time=datetime.datetime.now(),
one_off=True,
)

services/tasks.py

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from __future__ import absolute_import, unicode_literals
from celery import shared_task
from celery.schedules import crontab
from services.api.jenkins import JenkinsApi
from services.views import task_result_to_db
from users.api.mail import send_mail

@shared_task
def task_build_job(service,id):
obj = JenkinsApi()
data = obj.build_job(service) #触发jenkins job
# id = kwargs.get("id", 10)
to_db = task_result_to_db(id, data) #将结果存入db中
return True

启动服务

1
2
3
4
5
celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler

celery worker -A virgo -l info

python3 manage.py runserver 0.0.0.0:8000

flower

安装

1
pip install flower

启动

1
celery flower -A virgo -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler