首页 技术 正文
技术 2022年11月14日
0 收藏 988 点赞 3,715 浏览 8282 个字

认识

这里有几个概念,task、worker、broker。
顾名思义,task 就是老板交给你的各种任务,worker 就是你手下干活的人员。

那什么是 Broker 呢?

老板给你下发任务时,你需要 把它记下来, 这个它 可以是你随身携带的本子,也可以是 电脑里地记事本或者excel,或者是你的 任何时间管理工具。

Broker  则是 Celery 记录task的地方。
作为一个任务管理者的你,将老板(前端程序)发给你的 安排的工作(Task) 记录到你的本子(Broker)里。接下来,你就安排你手下的IT程序猿们(Worker),都到你的本子(Broker)里来取走工作(Task)

celery 实例进阶

回到顶部

1. broker为rabbitmq

#tasks.py

celery 实例进阶

from celery import Celeryapp = Celery('tasks', broker='amqp://admin:admin@localhost:5672')@app.task
def add(x, y):
return x + y

celery 实例进阶

启动

celery -A tasks worker --loglevel=info

运行

celery 实例进阶

>>> from tasks import add
>>> add(1, 3)
4
>>> add.delay(1,3)
<AsyncResult: 07614cef-f314-4c7b-a33f-92c080cadb83>
>>>

celery 实例进阶

:delay是使用异步的方式,会压入到消息队列。否则,不会使用消息队列。

文件名为tasks.py,则其中代码app = Celery(‘tasks‘, broker=),Celery第一个参数为工程名,启动时也是celery -A tasks worker –loglevel=info

对比

:投入到指定的队列用:add.delay(1, 3, queue=’queue_add1′)

test_2.py

celery 实例进阶

from celery import Celeryapp = Celery('proj', broker='amqp://admin:admin@localhost:5672', include='test_2')@app.task
def add(x, y):
return x + y

celery 实例进阶回到顶部

2. 以python+文件名的方式启动

例1:

#test.py

celery 实例进阶

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')@app.task
def add(x, y):
print "------>"
time.sleep(5)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

启动

python test.py worker 

celery默认启动的worker数为内核个数,如果指定启动个数,用参数-c,例

python test.py worker -c 2

例2:

#test.py

celery 实例进阶

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672')@app.task
def add(x, y):
print "------>"
time.sleep(2)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

#eg.py

celery 实例进阶

from test import *
import timerev = []
for i in range(3):
rev.append(add.delay(1,3))print "len rev:", len(rev)
while 1:
tag = 1
for key in rev:
if not key.ready():
tag = 0
time.sleep(1)
print "sleep 1"
if tag:
break
print "_____________________>"

celery 实例进阶回到顶部

3. broker为redis

#test_redis.py

celery 实例进阶

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')@app.task
def add(x, y):
print "------>"
time.sleep(5)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

启动

python test_redis.py worker -c 2

测试

celery 实例进阶

from celery import group
from test_redis import *
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
g = group(add.s(2, 3)).apply_async()
for ret in g.get():
print ret
print "end-----------------------------------"

celery 实例进阶

结果

5
end-----------------------------------

回到顶部

4. 两个队列(redis)

#test_redis.py

celery 实例进阶

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis', backend='redis', broker='redis://100.69.201.116:7000')@app.task
def add(x, y):
print "------>"
time.sleep(5)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

#test_redis_2.py

celery 实例进阶

from celery import Celery
import time
#app = Celery('test_redis', backend='amqp', broker='redis://100.69.201.116:7000')
app = Celery('test_redis_2', backend='redis', broker='redis://100.69.201.116:7001')@app.task
def add_2(x, y):
print "=======>"
time.sleep(5)
print "<================="
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

测试

celery 实例进阶

from celery import group
from test_redis import *
from test_redis_2 import *
ll = [(1,2), (3,4), (5,6)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
print ret
print "end redis_1 -----------------------------------"ll = [(1,2), (3,4), (5,6)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
print ":", ret
print "end redis_2 -----------------------------------"

celery 实例进阶

结果

celery 实例进阶

3
7
11
end redis_1 -----------------------------------
: 3
: 7
: 11
end redis_2 -----------------------------------

celery 实例进阶回到顶部

5. 两个队列(同一个rabbitmq)

注释:需要提前设置下队列

celery 实例进阶

##例1

#test.py

celery 实例进阶

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//')@app.task
def add(x, y):
print "------>"
time.sleep(5)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

#test_2.py

celery 实例进阶

from celery import Celery
import time
app = Celery('test_2', backend='amqp', broker='amqp://admin:admin@localhost:5672//hwzh')@app.task
def add_2(x, y):
print "=====>"
time.sleep(5)
print "<=========="
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

测试

celery 实例进阶

from celery import group
from test import *
from test_2 import *ll = [(1,2), (3,4), (7,8)]
g = group(add.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
print retll = [(1,2), (3,4), (7,8)]
g = group(add_2.s(key[0], key[1]) for key in ll).apply_async()
for ret in g.get():
print ret

celery 实例进阶

结果

celery 实例进阶

3
7
15
3
7
15

celery 实例进阶

##例2

#test.py

celery 实例进阶

from celery import Celery
import time
app = Celery('test', backend='amqp', broker='amqp://admin:admin@localhost:5672//mq4')@app.task
def add(x, y):
print "------>"
time.sleep(2)
print "<--------------"
return x + y@app.task
def sum(x, y):
print "------>"
time.sleep(2)
print "<--------------"
return x + yif __name__ == "__main__":
app.start()

celery 实例进阶

#eg2.py

celery 实例进阶

from test import *
import timerev = []
for i in range(3):
rev.append(add.delay(1,3))for i in range(3):
rev.append(sum.delay(1,3))print "len rev:", len(rev)
while 1:
tag = 1
for key in rev:
if not key.ready():
tag = 0
time.sleep(1)
print "sleep 1"
if tag:
break
print "_____________________>"

celery 实例进阶回到顶部

6. 保存结果

celery 实例进阶

from celery import Celeryapp = Celery('tasks', backend='amqp', broker='amqp://admin:admin@localhost')@app.task
def add(x, y):
return x + y

celery 实例进阶

启动

celery -A tasks_1 worker --loglevel=info

与前例不同:

– ** ———- [config]
– ** ———- .> app: tasks:0x7f8057931810
– ** ———- .> transport: amqp://admin:**@localhost:5672//
– ** ———- .> results: amqp

运行

celery 实例进阶

>>> from tasks_1 import add
>>> result = add.delay(1, 3)
>>> result.ready()
True
>>> result.get()
4

celery 实例进阶回到顶部

7. 多个队列

celery 实例进阶

from celery import Celery
from kombu import Exchange, Queue
BROKER_URL = 'amqp://admin:admin@localhost//'
app = Celery('tasks', backend='amqp',broker=BROKER_URL)
app.conf.update(
CELERY_ROUTES={
"add1":{"queue":"queue_add1"},
"add2":{"queue":"queue_add2"},
"add3":{"queue":"queue_add3"},
"add4":{"queue":"queue_add4"},
},
)
@app.task
def add1(x, y):
return x + y@app.task
def add2(x, y):
return x + y@app.task
def add3(x, y):
return x + y@app.task
def add4(x, y):
return x + y

celery 实例进阶回到顶部

8. 消息路由

文件:tasks.py

celery 实例进阶

from celery import Celery, platforms
import time
import osapp = Celery('proj', broker='amqp://admin:admin@ip:5672',
include=['tasks']
)
app.conf.update(
CELERY_ROUTES={
'tasks.fun_1': {
'queue': "q_1"
},
'tasks.fun_2': {
'queue': "q_2"
}
}
)
platforms.C_FORCE_ROOT = True @app.task
def fun_1(n):
print "(((((((((((((((func_1", n
return 1@app.task
def fun_2(n):
print n, ")))))))))))))))"
return 2if __name__ == "__main__":
app.start()

celery 实例进阶

启动

python tasks.py worker -c 2 -Q q_1
python tasks.py worker -c 2 -Q q_2

两个消息队列:q_1, q_2,调用示例

celery 实例进阶

>>> from tasks import *
>>> fun_1(1)
(((((((((((((((func_1 1
1
>>> fun_1.delay(1)
<AsyncResult: 528a2ad1-bc16-4bdc-beff-cd166fe3e885>
>>> fun_2.delay(2)
<AsyncResult: ee5881eb-b384-4a39-ba00-08aa8ee53504>

celery 实例进阶回到顶部

9. woker内启多进程

#tasks.py

celery 实例进阶

from celery import Celery
import time
import multiprocessing as mpapp = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks")def test_func(i):
print "beg...:", i
time.sleep(5)
print "....end:", i
return i * 5@app.task
def fun_1(n):
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,))
p.close()
p.join()
return 1if __name__ == "__main__":
app.start()

celery 实例进阶

说明

直接启动多进程是肯定不可以的,因为是守候进程(curr_proc.daemon=True),所以启多进程之前主动设置为非守候进程:curr_proc.daemon=False,启动了以后再设为守候进程

#tasks_callback.py

celery 实例进阶

from celery import Celery
import time
import multiprocessing as mpapp = Celery('proj', broker='amqp://admin:admin@ip:5672', include="tasks_callback")
rev = []
def test_func(i):
print "beg...:", i
time.sleep(5)
print "....end:", i
return i * 5def callback_log(rev_val):
rev.append(rev_val)@app.task
def fun_1(n):
print "before rev:", rev
curr_proc = mp.current_process()
curr_proc.daemon = False
p = mp.Pool(mp.cpu_count())
curr_proc.daemon = True
for i in range(n):
p.apply_async(test_func, args=(i,), callback=callback_log)
p.close()
p.join()
print "after rev:", rev
return 1if __name__ == "__main__":
app.start()

celery 实例进阶回到顶部

10. 常用参数配置

1. CELERYD_PREFETCH_MULTIPLIER

同时预取得消息个数,比如如果CELERYD_PREFETCH_MULTIPLIER=2,那么如果现在对于1个worker,有一个状态是STARTED, 那么可以有2个处于RECEVED状态(如果有的话),这样就避免了如果消息很多全部分下取,后起来的worker领不到消息的尴尬。

参考代码

celery 实例进阶

from celery import Celery, platforms
import time
import osapp = Celery('proj', broker='amqp://admin:admin@localhost:5672',
include=['tasks']
)
app.conf.update(
CELERYD_PREFETCH_MULTIPLIER=2,
CELERY_ROUTES={
'tasks.fun_1': {
'queue': "q_1"
},
'tasks.fun_2': {
'queue': "q_2"
}
}
)
platforms.C_FORCE_ROOT = True@app.task
def fun_1(n):
print "(((((((((((((((func_1", n
time.sleep(20)
return 1@app.task
def fun_2(n):
print n, ")))))))))))))))"
return 2

celery 实例进阶

调用

celery 实例进阶

>>> from tasks import *
>>> fun_1.delay(3)
<AsyncResult: 609f2216-6785-409e-9f6f-85ae3fcce084>
>>> fun_1.delay(3)
<AsyncResult: 0230b8bd-b237-40ef-bc73-88929f8f8290>
>>> fun_1.delay(3)
<AsyncResult: 8fce172a-93c9-41f8-8c08-377a4363389c>
>>> fun_1.delay(3)

celery 实例进阶

相关推荐
python开发_常用的python模块及安装方法
adodb:我们领导推荐的数据库连接组件bsddb3:BerkeleyDB的连接组件Cheetah-1.0:我比较喜欢这个版本的cheeta…
日期:2022-11-24 点赞:878 阅读:9,492
Educational Codeforces Round 11 C. Hard Process 二分
C. Hard Process题目连接:http://www.codeforces.com/contest/660/problem/CDes…
日期:2022-11-24 点赞:807 阅读:5,907
下载Ubuntn 17.04 内核源代码
zengkefu@server1:/usr/src$ uname -aLinux server1 4.10.0-19-generic #21…
日期:2022-11-24 点赞:569 阅读:6,740
可用Active Desktop Calendar V7.86 注册码序列号
可用Active Desktop Calendar V7.86 注册码序列号Name: www.greendown.cn Code: &nb…
日期:2022-11-24 点赞:733 阅读:6,495
Android调用系统相机、自定义相机、处理大图片
Android调用系统相机和自定义相机实例本博文主要是介绍了android上使用相机进行拍照并显示的两种方式,并且由于涉及到要把拍到的照片显…
日期:2022-11-24 点赞:512 阅读:8,132
Struts的使用
一、Struts2的获取  Struts的官方网站为:http://struts.apache.org/  下载完Struts2的jar包,…
日期:2022-11-24 点赞:671 阅读:5,295