博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
python celery 多work多队列
阅读量:6979 次
发布时间:2019-06-27

本文共 2568 字,大约阅读时间需要 8 分钟。

1.Celery模块调用

既然celery是一个分布式的任务调度模块,那么celery是如何和分布式挂钩呢,celery可以支持多台不通的计算机执行不同的任务或者相同的任务。

如果要说celery的分布式应用的话,就要提到celery的消息路由机制,AMQP协议。具体的可以查看AMQP的文档。简单地说就是可以有多个消息队列(Message Queue),不同的消息可以指定发送给不同的Message Queue,而这是通过Exchange来实现的。发送消息到Message Queue中时,可以指定routiing_key,Exchange通过routing_key来把消息路由(routes)到不同的Message Queue中去。

多worker,多队列,实例:

1.在服务器上编写文件tasks.py。首先定义一个Celery的对象,然后通过celeryconfig.py对celery对象进行设置。之后又分别定义了三个task,分别是taskA, taskB和add。

 

#!/usr/bin/env#-*-conding:utf-8-*-from celery import Celery,platformsplatforms.C_FORCE_ROOT = Trueapp = Celery()app.config_from_object("celeryconfig")@app.taskdef tashA(x,y):	return x*y@app.taskdef taskB(x,y,z):	return x+y+z@app.taskdef add(x,y):	return x+y

2.编写celeryconfig.py文件。

 

#!/usr/bin/env python#-*- coding:utf-8 -*-from kombu import Exchange,Queuefrom celery import platformsplatforms.C_FORCE_ROOT = TrueBROKER_URL = "redis://localhost:6379/7" CELERY_RESULT_BACKEND = "redis://localhost:6379/8"CELERY_QUEUES = (Queue("default",Exchange("default"),routing_key="default"),Queue("for_task_A",Exchange("for_task_A"),routing_key="for_task_A"),Queue("for_task_B",Exchange("for_task_B"),routing_key="for_task_B") )CELERY_ROUTES = {'tasks.taskA':{"queue":"for_task_A","routing_key":"for_task_A"},'tasks.taskB':{"queue":"for_task_B","routing_key":"for_task_B"}}

3.启动worker来指定task

celery -A tasks worker -l info -n workerA.%h -Q for_task_A

celery -A tasks worker -l info -n workerB.%h -Q for_task_B

4.传入参数

将上面两个文件导出到pycharm中:

 

编写文件传参:

 

from tasks import *re1 = taskA.delay(100, 200)re2 = taskB.delay(1,2, 3)print(re3.status)          #查看re3的状态print(re3.id)               #查看re3的id

运行之后可见:taskA,taskB都已正常执行。

5.我们可以看到add(re3)的状态是PENDING,表示没有执行,这个是因为没有celeryconfig.py文件中指定改route到哪一个Queue中,所以会被发动到默认的名字celery的Queue中,但是我们还没有启动worker执行celery中的任务。下面,我们来启动一个worker来执行celery队列中的任务。

celery -A tasks worker -l info -n worker.%h -Q celery 

这样我们再次运行pycharm就可以看见add也被运行了,并且redis数据库中也有该id了。

2.Celery与定时任务

1.在celery中执行定时任务非常简单,只需要设置celery对象中的CELERYBEAT_SCHEDULE属性即可。

下面我们接着在celeryconfig.py中添加CELERYBEAT_SCHEDULE变量:

 

CELERY_TIMEZONE = 'UTC'CELERYBEAT_SCHEDULE = {    'taskA_schedule' : {        'task':'tasks.taskA',        'schedule':20,        'args':(5,6)    },    'taskB_scheduler' : {        'task':"tasks.taskB",        "schedule":200,        "args":(10,20,30)    },    'add_schedule': {        "task":"tasks.add",        "schedule":10,        "args":(1,2)    }}

2.Celery启动定时任务

celery -A tasks worker -l info -n workerA.%h -Q for_task_A -B

 

启动完成后:

taskA每20秒执行一次taskA.delay(5, 6)

taskB每200秒执行一次taskB.delay(10, 20, 30)

Celery每10秒执行一次add.delay(1, 2)

你可能感兴趣的文章
java中的char类型
查看>>
Windows XP下,JDK环境变量配置
查看>>
RabbitMQ (四) 路由选择 (Routing)
查看>>
关于ExtJS在使用下拉列表框的二级联动获取数据
查看>>
SPRING3.X JSON 406 和 中文乱码问题
查看>>
多个class相同的input标签 获取当前值!方法!
查看>>
模板方法模式与策略模式的区别
查看>>
html5地理定位数据
查看>>
《JAVA-枚举》
查看>>
使用photoshop 10.0制作符合社保要求的照片
查看>>
Python下使用tarfile模块来实现文件归档压缩与解压
查看>>
思科交换机各类型中字母的意思?
查看>>
linux基础命令
查看>>
我的友情链接
查看>>
Nutanix CE on Lenovo W520 初探
查看>>
make执行过程
查看>>
Ansible源码解析 Inventory组概念
查看>>
数据备份学习
查看>>
替换空格
查看>>
Linux中源码包的管理
查看>>