python线程池 ThreadPoolExecutor 使用详解

您所在的位置:网站首页 线程池的线程如何存活 python线程池 ThreadPoolExecutor 使用详解

python线程池 ThreadPoolExecutor 使用详解

2023-12-17 02:38| 来源: 网络整理| 查看: 265

从 Python3.2 开始,标准库为我们提供了 concurrent.futures 模块,它提供了 ThreadPoolExecutor 和 ProcessPoolExecutor两个类,实现了对 threading 和 multiprocessing 的进一步抽象(这里主要关注线程池),不仅可以自动调度线程,还可以做到:

主线程可以获取某一个线程(或者任务的)的状态,以及返回值。当一个线程完成的时候,主线程能够立即知道。让多线程和多进程的编码接口一致。

下面介绍线程池的使用方法:

1. 使用 ThreadPoolExecutor 来实例化线程池对象。传入max_workers参数来设置线程池中最多能同时运行的线程数目。

from concurrent.futures import ThreadPoolExecutor import time def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 表示在这个线程池中同时运行的线程有3个线程

2. 使用 submit 函数来提交线程需要执行的任务(函数名和参数)到线程池中,并返回该任务的句柄(类似于文件、画图),注意 submit() 不是阻塞的,而是立即返回。通过 submit 函数返回的任务句柄,能够使用 done() 方法判断任务是否结束。

3. 使用 cancel() 方法可以取消提交的任务,如果任务已经在线程池中运行了,就取消不了。这个例子中,线程池的大小设置为2,任务已经在运行了,会取消失败。如果改变线程池的大小为1,那么先提交的是task1,task2还在排队等候,这是时候就可以成功取消。

4. 使用 result() 方法可以获取任务的返回值,这个方法是阻塞的。

#通过submit函数提交执行的函数到线程池中, submit 是立即返回 task1 = executor.submit(get_html, (3)) # 第一个是回调函数,第二个是传给函数的参数 task2 = executor.submit(get_html, (2)) #done方法用于判定某个任务是否完成 print(task1.done()) # cancel方法用于取消某个任务 print(task2.cancel()) # result方法可以获取task的执行结果, 这个方法是阻塞的 print(task1.result())

5. as_completed() 方法,上面虽然提供了判断任务是否结束的方法,但是不能在主线程中一直判断,有时候我们是得知某个任务结束了,就去获取结果,而不是一直判断每个任务有没有结束。这是就可以使用 as_completed() 方法一次取出所有任务的结果。as_completed() 方法是一个生成器,在没有任务完成的时候,会阻塞,在有某个任务完成的时候,会 yield这个任务,就能执行for循环下面的语句,然后继续阻塞住,循环到所有的任务结束。从结果也可以看出,先完成的任务会先通知主线程。

from concurrent.futures import ThreadPoolExecutor, as_completed import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url all_task = [executor.submit(get_html, (url)) for url in urls] for future in as_completed(all_task): data = future.result() print("in main: get page {}s success".format(data)) """ 执行结果 get page 2s finished in main: get page 2s success get page 3s finished in main: get page 3s success get page 4s finished in main: get page 4s success """

6. 使用 map 方法,无需提前使用 submit 方法,map 方法与 python 标准库中的 map 含义相同,都是将序列中的每个元素都执行同一个函数,下面的代码就是对 urls 的每个元素都执行 get_html 函数,并分配到线程池里。可以看到执行结果与上面的 as_completed 方法的结果不同,输出顺序和 urls 列表的顺序相同,就算 2s 的任务先执行完成,也会先打印出3s的任务先完成,再打印2s的任务完成。

import time from concurrent.futures import ThreadPoolExecutor def get_html(times): time.sleep(times) print("get page {} success".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) # 通过executor的 map 获取已经完成的task的值 for data in executor.map(get_html, urls): print("get {} page".format(data))

7. wait 方法可以让主线程阻塞,直到满足设定的要求。wait 方法接收3个参数,等待的任务序列、超时时间以及等待条件。等待条件 reture_when 默认为 ALL_COMPLETED,表明要等待所有的任务都结束。可以看到运行结果中,确实是所有任务都完成了,主线程才打印出 main。等待条件还可以设置为 FIRST_COMPLETED,表示第一个任务完成就停止等待。

from concurrent.futures import ThreadPoolExecutor, wait, ALL_COMPLETED, FIRST_COMPLETED import time # 参数times用来模拟网络请求的时间 def get_html(times): time.sleep(times) print("get page {}s finished".format(times)) return times executor = ThreadPoolExecutor(max_workers=2) urls = [3, 2, 4] # 并不是真的url all_task = [executor.submit(get_html, (url)) for url in urls] wait(all_task, return_when=ALL_COMPLETED) print("main") # 执行结果 # get page 2s finished # get page 3s finished # get page 4s finished # main  

8.  ThreadPoolExecutor无法调用类中的方法

原因是实例方法不能被pickle。只能调用非类中的方法,或者类中的@staticmethod方法。

https://www.zhihu.com/question/35637215?sort=created

 

9. ThreadPoolExecutor.submit()传递多个参数的方法

from concurrent.futures import ThreadPoolExecutor,as_completed def doFileParse(filepath,segment,wordslist): print(filepath) print(segment) #调用方法 args =[filepath,thu1,Words] newTask=executor.submit(lambda p: doFileParse(*p),args)

 

 

总结: future的设计理念很棒,在线程池/进程池和携程中都存在future对象,是异步编程的核心。ThreadPoolExecutor 让线程的使用更加方便,减小了线程创建/销毁的资源损耗,无需考虑线程间的复杂同步,方便主线程与子线程的交互。线程池的抽象程度很高,多线程和多进程的编码接口一致。

 

本文参考文章:

[python] ThreadPoolExecutor线程池



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3