基于python异步模块asyncio的生产者

您所在的位置:网站首页 如何实现生产者消费者模型管理 基于python异步模块asyncio的生产者

基于python异步模块asyncio的生产者

2024-07-17 10:29| 来源: 网络整理| 查看: 265

 协程在python中是一个很强大的概念,被誉为轻量级线程,那么怎样用协程来实现一个生产-消费者模式呢?

这里需要用到asyncio模块自带的队列asyncio.Queue,先进先出队列(FIFO),可以设置队列最大长度来控制并发数量。

1.首先定义一个消费者协程函数(coroutine)

async def consumer(name, queue): while True: #在队列中取出任务 task = await queue.get() #模拟消费者的IO操作 await asyncio.sleep(1) queue.task_done() print(f'{name} has deal task {task}')

这里消费者代码很简单,首先在队列里取出任务,这里注意如果队列里没有任务,consumer协程就会在此阻塞,让出控制权,交由事件循环调度其他协程(等待生产者往队列里放置任务),然后是模拟进行IO消费,最后通知队列任务完成。

2.定义生产者协程函数

async def producer(name, queue): i = 0 while True: await queue.put(i) await asyncio.sleep(0.3) print(f'{name} has put task {i} into the queue') i += 1

这里的queue.put操作也是可以阻塞的,当队列里的任务达到队列最大长度时,生产者协程会在此阻塞,等待消费者协程进行消费。

3.定义协程入口函数

async def main(): queue = asyncio.Queue(10) workers = [] for i in range(3): consumer_worker = asyncio.create_task(consumer(f'consumer-{i}', queue)) workers.append(consumer_worker) producer_worker = asyncio.create_task(producer('producer', queue)) workers.append(producer_worker) await asyncio.gather(*workers)

这里首先定义一个长度为10的队列,然后创建三个消费者协程和一个生产者协程,交由事件循环对象去调度,最后等待这四个worker结束,事实上由于我们生产者和消费者里都是写的while True循环,因此这四个worker永远不会结束。

4.运行程序

if __name__ == '__main__': loop = asyncio.get_event_loop() loop.run_until_complete(main()) loop.close()

5.观察运行结果

可以看到,事件循环一旦启动,生产者不停的将任务放入队列,同时三个消费者不停的消费这些任务,同时他们不会出现重复消费等现象,因此它是“协程安全的”。

6.总结

这是一个小domo,但是在这个例子中我们可以发现,对于IO密集型任务,协程可以基本上代替线程的作用,以资源的低开销来实现生产-消费者模式。这点对于爬虫、网络通讯、数据库查询等任务有较好的适应性。

文章写的比较简单,欢迎大家和我互相交流



【本文地址】


今日新闻


推荐新闻


    CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3