我在实际工程中的基于python协程consumer考虑到的优化点
核心要点
相信我们有很多的consumer都涉及到的是从一个消息队列消费,处理后可能还涉及到往多个IO目标(如回调一个http地址,上传到elasticsearch…)进行写的情况。对这种I/O密集的情况,下面有一些我认为的需要注意点:
- 所有IO操作要能够批量化
- 需要能处理异常信号,处理完当前已取出的元素后才推出
- 考虑到上下游挤压/性能问题,要能够自动/灵活的调整获取元素/调用下游时的并发量
- 确保coroutine的划分能合理有效的在不同IO操作时能够切换出去
详细说明
以我实际遇到的为例:consumer从放置短信状态报告的redis队列中取出状态报告(报告发出的每条短信的接收状态),处理后全部要上传到elasticsearch,同时若其中有上行短信(用户回复的短信)的记录要以http调用的方式另外发布给一个平台进行下一步的处理
全部使用I/O操作批量
redis获取的批量操作:这个可以通过下面的lua脚本做到,从而将每次获取的数量作为参数传入即可
1
2
3
4
5
6
7
8
9
10
11
12
13
14local key = KEYS[1]
local n = ARGV[1]
local rep = {}
local ele = 1
while tonumber(n) > 0 and ele do
ele = redis.call('RPOP', key)
n = n - 1
if ele then
rep[#rep+1] = ele
end
end
return rep批量上传到es:这个es是支持bulk操作的
批量发布到另一个平台:请在接入时和对方提要求必须要能够批量传入
需要能够处理异常信息
说明
对于consumer而言,相比真正的一些不可预料的异常(反正SIGKILL - 9和SIGSTOP - 19也扑捉不到)。我们实际更常遇到的是重启consumer(比如更新了代码),故我们需要能够正确的处理此类信号(如最常见的SIGTERM - 15),做到完全部已取出元素后再重启。
解决
以一个全局变量为标示,再main函数中每次要进行下一轮获取前检测全局变量,如果符合退出要求就关闭需要关闭的后退出。而设置此全局变量的方式就是通过python的signal包在初始化时注册信号处理函数,如此后若发现有等待处理信号便会进入注册的信号处理函数
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22import signal
SIGNAL_TERMINATE = 0 # 若被设置为1表示收到了系统关闭的信号量
def exit_signal_controller(signum, stack):
global SIGNAL_TERMINATE
if signum == 15: # supervisor default stop/restart command signum
SIGNAL_TERMINATE = 1
def main():
# 注册信号处理函数
signal.signal(signal.SIGTERM, exit_signal_controller)
while True:
# 获取元素
# 处理
if SIGNAL_TERMINATE:
break
# 关闭需要关闭的资源
自动/灵活的调整获取元素并发量
- 说明
- 若上游出现大批量挤压,要能够自动的加大并发量进行消费
- 而若假设这边的处理是要调用一个提供服务的下游,而下游不可避免的在某些时候会出现状况或者是调大并发量后发现扛不住,此时要能自动进行降低并发量
- 解决
- 在第1.1.1里面看到了,每次从redis获取元素的数量是个变量(假设为EACH_FETCH_AMOUNT)。但是我这里还要引入另一个变量,用于控制并发量(假设为HANDLE_AMOUNT_THRESOLD)。核心思想是获取到的总数量>=HANDLE_AMOUNT_THRESOLD后才进行处理,故实际HANDLE_AMOUNT_THRESOLD就是并发量最大值。(补充:此处实际还需要做在数量不够HANDLE_AMOUNT_THRESOLD单超过了一定时间后也进行处理的逻辑,避免延迟不可接受。)
- 这里我的解决办法其实和上面处理异常信号的思想很像,注册信号处理函数变成了subscribe一个redis channel,这样自动检测程序(检测上游是否有积压以及下游是否扛不住)在检测到问题后就publish具体希望调节到的并发量数值到程序完成自动调节。自由度很大
- 说明
确保coroutine的划分能合理有效的在不同IO操作时能够切换出去
说明:这只是一个提醒,划分coroutine注意要按是否有I/O操作来划分,以及注意可以使用一些技巧更好的编排coroutine的切换(如这里使用的event),避免空切太多
示例:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68async def push_momsg_to_xxx(session, event, mos_list):
while True:
await event.wait()
items = mos_list[:]
mos_list[:] = []
# 使用session将items通过http调用报告过去
event.clear()
async def push_to_es(session, event, redis, items_list):
while True:
await event.wait()
items = items_list[:]
items_list[:] = []
# 使用session将items上传到elasticsearch
event.clear()
async def consumer(redis, report_event, items_list, mo_event, mo_list):
last_up_time = time.time()
amount = 0
resp = await redis.client.brpop(REDIS_LIST_NAME)
script_result = await redis.client.evalsha(redis.sha_multipop, keys=[REDIS_LIST_NAME], args=[EACH_FETCH_AMOUNT])
script_result.append(resp[1])
for index, body in enumerate(script_result):
ess, mos, error = handle_item(body)
amount += 1
items_list.append(ess)
if mos: mo_list.extend(mos)
if len(items_list) >= UPLOAD_AMOUNT_THRESOLD or (time.time() - last_up_time) > UPLOAD_TIME_THRESOLD:
if len(items_list) > 0:
report_event.set()
if mo_list:
mo_event.set()
async def main():
report_event = asyncio.Event()
items_list = []
mo_event = asyncio.Event()
mo_list = []
redis = await AsyncRedis().get_ins()
async with aiohttp.ClientSession() as session:
consumer_task = asyncio.create_task(consumer(redis, report_event, items_list, mo_event, mo_list))
pusher_task = asyncio.create_task(push_to_es(session, report_event, redis, items_list))
push_to_mo_task = asyncio.create_task(push_momsg_to_xxx(session, mo_event, mo_list))
await asyncio.gather(*[consumer_task, pusher_task, push_to_mo_task])
if __name__ == '__main__':
REDIS_LIST_NAME = 'xxx'
EACH_FETCH_AMOUNT = 30
UPLOAD_AMOUNT_THRESOLD = 60
UPLOAD_TIME_THRESOLD = 5
asyncio.run(main())
具体代码
TODO