翼度科技»论坛 编程开发 python 查看内容

ThreadPoolExecutor使用浅谈

3

主题

3

帖子

9

积分

新手上路

Rank: 1

积分
9
1. 基础介绍

ThreadPoolExecutor是Python标准库concurrent.futures模块中的一个类,用于实现线程池的功能。
ThreadPoolExecutor模块相比于threading等模块,通过submit方法返回的是一个Future对象,它代表了一个未来可期的结果。通过Future对象,我们可以在主线程(或主进程)中获取某个线程(或任务)的状态以及返回值,实现了多线程和多进程编码接口的一致性。
具体来说,Future对象具有以下特点:

  • 获取状态和返回值:通过result()方法可以获取一个任务的执行结果。如果任务尚未完成,调用result()方法会阻塞主线程,直到任务完成并返回结果。
  • 异步通知:当一个线程完成时,主线程可以立即得到通知。可以通过done()方法判断任务是否已完成,或使用add_done_callback()方法注册一个回调函数,在任务完成时自动调用该函数。
  • 异常处理:如果任务抛出异常,Future对象会将异常抛出到主线程。可以使用exception()方法获取异常对象。
通过返回Future对象,我们可以更方便地管理和控制线程池中的任务。可以在主线程中获取任务的状态、返回值和异常信息,避免了线程之间的显式同步和等待。
总的来说,ThreadPoolExecutor模块提供了一种高级的多线程编程接口,使得多线程编程更加简洁和易用。它实现了多线程和多进程的编码接口一致性,使得我们可以使用类似的方式处理多线程和多进程编程任务。
2. 基础使用

创建线程池对象
可以使用ThreadPoolExecutor类创建一个线程池对象。可以指定线程池的大小(即可同时运行的线程数量),也可以使用默认值(大小为系统默认的处理器数量)。
  1. from concurrent.futures import ThreadPoolExecutor
  2. import time
  3. def get_html(times):
  4.     time.sleep(times)
  5.     print("get page {} success".format(times))
  6.     return times
  7. executor = ThreadPoolExecutor(max_workers=2)    # 表示在这个线程池中同时运行的线程有3个线程
复制代码
提交任务
使用submit()方法向线程池提交任务,该方法接受一个可调用对象(函数、方法等)作为参数,并返回一个Future对象,表示异步执行的结果。
  1. def my_task(arg):
  2.     # 执行任务的代码
  3.     return result
  4. # 提交任务到线程池
  5. future = executor.submit(my_task, arg)
复制代码
获取任务结果
可以使用Future对象的result()方法来获取任务的结果。如果任务尚未完成,result()方法会阻塞当前线程,直到任务完成并返回结果。
  1. # 获取任务的结果
  2. result = future.result()
复制代码
获取一组任务结果
使用submit()方法向线程池提交一组任务,并获取返回的Future对象列表,使用as_completed()函数迭代处理Future对象列表,它会在任务完成时产生结果。可以使用next()函数或直接使用for循环来获取结果
  1. def my_task(arg):
  2.     # 执行任务的代码
  3.     return result
  4. args = [arg1, arg2, arg3, ...]
  5. futures = [executor.submit(my_task, arg) for arg in args]
  6. # 使用next()函数获取每个任务的结果
  7. for future in as_completed(futures):
  8.     result = future.result()
  9.     # 处理任务结果
  10. # 或者使用for循环获取每个任务的结果
  11. for future in as_completed(futures):
  12.     result = future.result()
  13.     # 处理任务结果
复制代码
批量提交任务
除了逐个提交任务,还可以使用map()方法批量提交任务。map()方法接受一个可调用对象和一个可迭代的参数列表,然后并行地对参数列表中的每个参数调用可调用对象,并返回一个迭代器,用于获取每个任务的结果。
  1. def my_task(arg):
  2.     # 执行任务的代码
  3.     return result
  4. args = [arg1, arg2, arg3, ...]
  5. # 批量提交任务并获取结果
  6. results = executor.map(my_task, args)
复制代码
等待任务完成
使用wait()方法等待所有已提交的任务完成。可以指定超时时间,如果超时时间到达而还有任务未完成,则不再等待并返回结果。
  1. from concurrent.futures import ALL_COMPLETED, FIRST_COMPLETED
  2. # 等待所有任务完成
  3. executor.wait(futures)
  4. # 等待任意任务完成
  5. executor.wait(futures, return_when=FIRST_COMPLETED)
  6. # 等待所有任务完成或达到超时时间(单位为秒)
  7. executor.wait(futures, timeout=10)
复制代码
wait()方法接受三个参数:

  • fs:要等待的Future对象列表。
  • timeout:可选参数,指定等待的超时时间(单位为秒)。如果超时时间到达而还有任务未完成,则不再等待并返回结果。
  • return_when:可选参数,指定返回结果的条件。默认为ALL_COMPLETED,表示等待所有任务完成;也可以指定为FIRST_COMPLETED,表示等待任意一个任务完成。
 
关闭线程池
在不再需要线程池时,应该调用shutdown()方法关闭线程池。关闭线程池后,将不再接受新的任务提交,但会等待已提交的任务完成。
  1. # 关闭线程池
  2. executor.shutdown()
复制代码
  

来源:https://www.cnblogs.com/beyond-tester/p/17804328.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具