Celery+Rabbitmq实现异步执行任务

celery是python的一个第三方库,中文为"芹菜",是一个基于生产者消费者模式的框架,主要用于异步执行任务或定时任务。本文将介绍如何使用celery实现异步执行任务的方法。

一. 安装Celery和后端代理

首先,需要安装Celery,并安装RabbitMQ或Redis作为后端代理。

# 安装Celery
pip install celery

RabbitMQ和Redis都可以作为Celery的后端代理,用于任务队列。Celery官方文档建议优先使用RabbitMQ,但也可以选择Redis。安装RabbitMQ需要依赖Erlang,具体的安装教程可以自行查找。

二. 搭建Celery任务架构

在项目中合适的位置创建一个名为celery_tasks的目录,在这个目录下编写Celery的相关代码,将Celery代码与项目业务逻辑代码分离。注意:目录名不要直接使用celery,以免与Python关键字或第三方模块冲突,导致导包错误。

celery_tasks目录下创建config.py, tasks.py, main.py三个Python文件,分别用于编写Celery的配置代码、任务函数代码和任务启动代码。

# 目录结构
- celery_tasks
    - config.py
    - main.py
    - tasks.py

三. 编写代码实现异步调用任务

config.py

from celery import Celery

创建Celery对象, 'demo'是自定义的命名

broker指定后端代理,可以使用RabbitMQ或Redis

app = Celery('demo', broker='amqp://guest@localhost:5672//')

app = Celery('demo', broker='redis://127.0.0.1:6379/15')

tasks.py

from config import app

定义任务,使用@app.task装饰器,Celery会自动识别任务

@app.task(name='celery_task1_name') def celery_task1_name(arg): print('执行任务代码', arg)

@app.task(name='celery_task2_name') def celery_task2_name(): print('将需要执行的代码导入tasks.py文件,然后在这里调用即可')

main.py

from tasks import *

设置Celery对象自动识别任务

'celery_tasks'指定tasks.py的目录,保证程序能找到tasks.py

app.autodiscover_tasks(['celery_tasks'])

四. 启动Celery任务

main.py所在目录下执行以下命令启动Celery任务。如果不在此目录,则需要在main前加上相对路径,例如:celery_tasks.main

celery -A main worker -l info

参数说明:

  • -A 指定Celery的启动入口main
  • worker 为Celery执行任务的后端工人
  • -l 指定日志级别为info

执行成功后,Celery会启动worker,从代理队列中获取任务并执行。如果任务队列为空,则一直等待直到有任务。

Windows Bug: 如果使用Celery4.0以上版本在Windows上,通过上述命令启动,在执行task.delay()时会报错:ValueError: not enough values to unpack (expected 3, got 0)。此问题在Linux上不会出现,只在Windows上存在,与“绿色线程”有关,具体可参考eventlet相关资料。

解决办法:

  1. 安装eventlet
pip install eventlet
  1. 启动worker时增加-P eventlet参数
celery -A main worker -l info -P eventlet

五. 调用Celery异步执行任务

在需要执行异步任务的地方导入任务,使用task.delay(参数)调用任务。例如,在与celery_tasks目录同级的demo目录下的demo.py文件中异步执行任务。

from celery_tasks.tasks import celery_task1_name, celery_task2_name

def demo_func(a):

调用格式:任务名.delay(参数)

celery_task1_name.delay(a)
print('celery_task1_name执行完成:{}!'.format(a))
celery_task2_name.delay()
print('celery_task2_name执行完成!')

demo_func('hello celery!')

至此,我们已经实现了Celery异步调用任务的功能,通过复制以上步骤中的代码,可以实现异步任务的demo。