Celery是一个异步任务的调度工具。
Django中使用Celery实现异步或定时任务
Celery 官网:http://www.celeryproject.org/
Celery 官方文档英文版 :http://docs.celeryproject.org/en/latest/index.html
Celery 官方文档中文版:http://docs.jinkan.org/docs/celery/
Celery简介 Celery是一个异步任务的调度工具。
Celery 是 Distributed Task Queue,分布式任务队列,分布式决定了可以有多个 worker 的存在,队列表示其是异步操作,即存在一个产生任务提出需求的工头,和一群等着被分配工作的码农。
在 Python 中定义 Celery 的时候,我们要引入 Broker,中文翻译过来就是“中间人”的意思,在这里 Broker 起到一个中间人的角色。在工头提出任务的时候,把所有的任务放到 Broker 里面,在 Broker 的另外一头,一群码农等着取出一个个任务准备着手做。
这种模式注定了整个系统会是个开环系统,工头对于码农们把任务做的怎样是不知情的。所以我们要引入 Backend 来保存每次任务的结果。这个 Backend 有点像我们的 Broker,也是存储任务的信息用的,只不过这里存的是那些任务的返回结果。我们可以选择只让错误执行的任务返回结果到 Backend,这样我们取回结果,便可以知道有多少任务执行失败了。
Celery(芹菜)是一个异步任务队列/基于分布式消息传递的作业队列。它侧重于实时操作,但对调度支持也很好。Celery用于生产系统每天处理数以百万计的任务。Celery是用Python编写的,但该协议可以在任何语言实现。它也可以与其他语言通过webhooks实现。Celery建议的消息队列是RabbitMQ,但提供有限支持Redis, Beanstalk, MongoDB, CouchDB, 和数据库(使用SQLAlchemy的或Django的 ORM) 。Celery是易于集成Django, Pylons and Flask,使用 django-celery, celery-pylons and Flask-Celery 附加包即可。
架构组成
Celery的架构由三部分组成,消息中间件 (message broker),任务执行单元*(worker)和 任务执行结果存储 *(task result store)组成。
可以看到,Celery 主要包含以下几个模块:
任务模块 Task
包含异步任务和定时任务。其中,异步任务通常在业务逻辑中被触发并发往任务队列,而定时任务由 Celery Beat 进程周期性地将任务发往任务队列 。
消息中间件 Broker
Broker,即为任务调度队列,接收任务生产者发来的消息(即任务),将任务存入队列 。Celery 本身不提供队列服务,官方推荐使用 RabbitMQ 和 Redis 等。
任务执行单元 Worker
Worker 是执行任务的处理单元,它实时监控消息队列,获取队列中调度的任务,并执行它 。
任务结果存储 Backend
Backend 用于存储任务的执行结果 ,以供查询。同消息中间件一样,存储也可使用 RabbitMQ, redis 和 MongoDB 等。
安装 1 2 3 4 pip install celery pip install django-celery-beta #任务发送 pip install django-celery-results #结果存储
配置 确保目录如下:
1 2 3 4 5 6 - proj/ - manage.py - proj/ - __init__.py - settings.py - urls.py
新建proj/proj/celery.py 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from __future__ import absolute_import, unicode_literalsimport osfrom celery import Celeryfrom celery.schedules import crontab, timedeltaos.environ.setdefault('DJANGO_SETTINGS_MODULE' , 'proj.settings' ) app = Celery('proj' ) app.config_from_object('django.conf:settings' , namespace='CELERY' ) app.autodiscover_tasks() @app.task(bind=True ) def debug_task (self ): print ('Request: {0!r}' .format (self.request))
编辑proj/proj/init .py 文件
1 2 3 4 from __future__ import absolute_import, unicode_literalsfrom .celery import app as celery_app__all__ = ('celery_app' ,)
编辑proj/proj/settings.py 文件
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 INSTALLED_APPS = ( ..., 'django_celery_beat' , 'django_celery_results' , ) CELERY_URL = 'redis://:' + configs['redis' ]['RedisPassword' ] + '@' + configs['redis' ]['RedisHost' ] + ':' + str ( configs['redis' ]['RedisPort' ]) + '/' + str (configs['redis' ]['RedisDb' ]) BROKER_URL = CELERY_URL CELERY_BROKER_URL = CELERY_URL CELERY_RESULT_BACKEND = 'django-db' CELERY_ACCEPT_CONTENT = ['application/json' ] CELERY_TASK_SERIALIZER = 'json' CELERY_RESULT_SERIALIZER = 'json' CELERY_TIMEZONE = 'Asia/Shanghai' CELERY_ENABLE_UTC = True CELERYD_CONCURRENCY = 10
同步数据库 1 2 3 4 5 # 1. 创建更改的文件 python manage.py makemigrations # 2. 将生成的py文件应用到数据库 python manage.py migrate
创建超级管理员 1 2 3 4 5 6 python manage.py createsuperuser # 按照提示输入用户名和对应的密码就好了邮箱可以留空,用户名和密码必填 # 修改 用户密码可以用: python manage.py changepassword username
增加定时任务 编辑proj/proj/celery.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 app.conf.update( CELERYBEAT_SCHEDULE={ 'task_update_ali_ecs' : { 'task' : 'assets.tasks.task_update_ali_ecs' , 'schedule' : timedelta(minutes=60 ), }, 'task_update_ali_rds' : { 'task' : 'assets.tasks.task_update_ali_rds' , 'schedule' : timedelta(minutes=60 ), }, 'task_update_ali_slb' : { 'task' : 'assets.tasks.task_update_ali_slb' , 'schedule' : timedelta(minutes=60 ), }, 'task_update_tx_cns' : { 'task' : 'assets.tasks.task_update_tx_cns' , 'schedule' : timedelta(minutes=60 ), }, 'add-every-monday-morning' : { 'task' : 'tasks.add' , 'schedule' : crontab(hour=7 , minute=30 , day_of_week=1 ), 'args' : (16 , 16 ), }, } )
新增任务的代码 1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 from django_celery_beat.models import PeriodicTask, ClockedSchedule #比如创建一个发版任务 # 1.新创建时间clockschedule clock, create = ClockedSchedule.objects.get_or_create(clocked_time=db_deploy_time) # 2.创建任务 PeriodicTask.objects.create( clocked=clock, name='vrigo-deploy_' + db_data['service_list'] + '-' + str(db_deploy_time), task='services.tasks.task_build_job', args=json.dumps([services, int(id)]), start_time=datetime.datetime.now(), one_off=True, )
services/tasks.py
1 2 3 4 5 6 7 8 9 10 11 12 13 14 from __future__ import absolute_import, unicode_literalsfrom celery import shared_taskfrom celery.schedules import crontabfrom services.api.jenkins import JenkinsApifrom services.views import task_result_to_dbfrom users.api.mail import send_mail@shared_task def task_build_job (service,id ): obj = JenkinsApi() data = obj.build_job(service) to_db = task_result_to_db(id , data) return True
启动服务 1 2 3 4 5 celery -A proj beat -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler celery worker -A virgo -l info python3 manage.py runserver 0.0.0.0:8000
flower 安装
启动 1 celery flower -A virgo -l info --scheduler django_celery_beat.schedulers:DatabaseScheduler