翼度科技»论坛 编程开发 python 查看内容

Python并行编程2构建多线程程序(上):threading简介

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
2 构建多线程程序

在本章中,我们将介绍线程及其并发编程。在标准 Python 库中有一个threading 模块,它允许我们轻松地实现线程,并为线程的同步提供了一系列有用的工具。本章将专门介绍该模块,并逐一介绍所有提供的类及其功能。
主要内容

  • 线程
  • 锁和RLock
  • 信号
  • 条件
  • 事件
  • 线程池执行器
2.1 线程

并发编程的主角正是线程,为此,threading模块提供了Thread 类:
  1. class threading.Thread(group=None,
  2. target=None,
  3. name=None,
  4. args=(),
  5. kwargs={},
  6. *,
  7. daemon=None)
复制代码
Thread() 构造函数需要许多参数,其中最重要和最常用的是 target 和 args。要在线程中调用的函数传递给 target,而要传递给它的参数则传递给 args。我们可以通过一个实际例子立即了解这类对象的功能。在程序中,我们定义了五个线程,它们将相互竞争。所有线程都以同一个函数为目标,为方便起见,我们将其称为 function()。这个函数不会执行任务,只会占用很短的时间,只是为了模拟执行一组指令所花费的时间:
  1. import threading
  2. import time
  3. def function(i):
  4.   print ("start Thread %i\n" %i)
  5.   time.sleep(2)
  6.   print ("end Thread %i\n" %i)
  7.   return
  8. t1 = threading.Thread(target=function , args=(1,))
  9. t2 = threading.Thread(target=function , args=(2,))
  10. t3 = threading.Thread(target=function , args=(3,))
  11. t4 = threading.Thread(target=function , args=(4,))
  12. t5 = threading.Thread(target=function , args=(5,))
  13. t1.start()
  14. t2.start()
  15. t3.start()
  16. t4.start()
  17. t5.start()
  18. print("END Program")
复制代码
从代码中我们可以看到,首先定义了五个线程类实例,使用变量 t1、t2 等对应多个线程。随后,将通过调用 start() 方法启动线程的执行。运行程序后,我们将得到:
  1. start Thread 1
  2. start Thread 2
  3. start Thread 3
  4. start Thread 4
  5. start Thread 5
  6. END Program
  7. end Thread 2
  8. end Thread 1
  9. end Thread 5
  10. end Thread 3
  11. end Thread 4
复制代码
我们看到程序同时启动了所有五个线程,然后不等它们执行完毕就关闭了,导致输入新命令的提示出现。实际上,这五个线程在后台继续执行,继续在命令行上输出。
从输出中还可以看到另一个有趣的现象,那就是线程执行的关闭顺序与开始顺序不同,而且每次执行的顺序也会不同。这是线程的正常行为,因为它们是并发运行的。它们的执行持续时间和顺序很少可以预测。因此,使用本章后面将介绍的同步方法非常重要。

2.1.1 join() 方法

在前面的案例中,我们已经看到,通过在程序中启动线程,可以观察到程序在线程之前结束的行为。这个问题很容易解决,因为线程模块提供了 join() 方法。在线程上启动该方法后,程序会等待其执行结束后再关闭。因此,如果有多个线程,我们将在每个线程上调用 join() 方法。
  1. import threading
  2. import time
  3. def function(i):
  4.   print ("start Thread %i" %i)
  5.   time.sleep(2)
  6.   print ("end Thread %i" %i)
  7.   return
  8. t1 = threading.Thread(target=function , args=(1,))
  9. t2 = threading.Thread(target=function , args=(2,))
  10. t3 = threading.Thread(target=function , args=(3,))
  11. t4 = threading.Thread(target=function , args=(4,))
  12. t5 = threading.Thread(target=function , args=(5,))
  13. t1.start()
  14. t2.start()
  15. t3.start()
  16. t4.start()
  17. t5.start()
  18. t1.join()
  19. t2.join()
  20. t3.join()
  21. t4.join()
  22. t5.join()
  23. print("END Program")  
复制代码
在这种情况下,通过运行程序,我们将得到如下结果:
  1. start Thread 1
  2. start Thread 2
  3. start Thread 3
  4. start Thread 4
  5. start Thread 5
  6. end Thread 1
  7. end Thread 2
  8. end Thread 3
  9. end Thread 5
  10. end Thread 4
  11. END Program
复制代码
我们可以看到,现在程序会等待所有线程执行完毕后才关闭。
但让我们进一步思考一下。作为一个简单的同步示例,我们可以在程序的某一时刻调用 join() 方法。例如,我们只想执行主程序的部分代码,然后等待线程开始执行。然后重新启动、执行其他操作或启动其他线程。
让我们通过修改前面的示例代码来看一个简单的例子:
  1. import threading
  2. import time
  3. def function(i):
  4.   print ("start Thread %i" %i)
  5.   time.sleep(2)
  6.   print ("end Thread %i" %i)
  7.   return
  8. t1 = threading.Thread(target=function , args=(1,))
  9. t2 = threading.Thread(target=function , args=(2,))
  10. t3 = threading.Thread(target=function , args=(3,))
  11. t4 = threading.Thread(target=function , args=(4,))
  12. t5 = threading.Thread(target=function , args=(5,))
  13. t1.start()
  14. t2.start()
  15. t1.join()
  16. t2.join()
  17. print("First set of threads done")
  18. print("The program can execute other code here")
  19. t3.start()
  20. t4.start()
  21. t5.start()
  22. t3.join()
  23. t4.join()
  24. t5.join()
  25. print("Second set of threads done")
  26. print("END Program")
复制代码
运行新修改的代码,我们将得到如下结果:
  1. start Thread 1
  2. start Thread 2
  3. end Thread 1
  4. end Thread 2
  5. First set of threads done
  6. The program can execute other code here
  7. start Thread 3
  8. start Thread 4
  9. start Thread 5
  10. end Thread 4
  11. end Thread 3
  12. end Thread 5
  13. Second set of threads done
  14. END Program
复制代码
我们可以看到,可以使用 join() 方法在程序中添加一个等待线程开始执行的点,然后用下面几行代码重新开始。

2.1.2 常见的线程同步模式

在前面使用多个线程的示例中,你可以看到相同的代码行被多次使用。每次我们都为每个线程定义了 start() 和 join() 方法的调用,写下了无数行类似的代码。随着线程数量的增加,情况会越来越糟。不过,还有其他更方便的代码编写方法。
  1. import threading
  2. import time
  3. def function(i):
  4.   print ("start Thread %i" %i)
  5.   time.sleep(2)
  6.   print ("end Thread %i" %i)
  7.   return
  8. n_threads = 5
  9. threads = [ ]
  10. for i in range(n_threads):
  11.   t = threading.Thread(target=function , args=(i,))
  12.   threads.append(t)
  13.   t.start()
  14. for i in range(n_threads):
  15.   threads[i].join()
复制代码
这种形式的代码更易读,也更简单。通过使用一个 for 循环来遍历所需的线程数量,我们避免了单独定义每个线程(t1、t2、t3......),以及为每个线程调用 start() 和 join() 方法。
我们将得到与前面示例相同的结果:
  1. start Thread 0
  2. start Thread 1
  3. start Thread 2
  4. start Thread 3
  5. start Thread 4
  6. end Thread 0
  7. end Thread 1
  8. end Thread 4
  9. end Thread 3
  10. end Thread 2
复制代码
参考资料

2.1.2 concurrent.futures模块和 ThreadPoolExecutor

除了线程模块外,标准库中还有另一个模块可以为我们提供线程方面的有用工具,我们将在下一章看到它,同样适用于进程。该模块名为 concurrent.futures,是异步执行可调用程序的高级接口。
其中的 ThreadPoolExecutor 类对于同时管理多个线程非常有用。事实上,当我们的程序中有许多线程需要管理时,最有效的方法就是创建一个 ThreadPoolExecutor。
举个例子,我们想同时启动四个线程,它们都与一个或多个函数相关联。与其编写四个 Thread 类实例的定义,然后四次调用 start() 方法和四次调用 join() 方法,不如使用 ThreadPoolExecutor 来得简单。
下面的代码就是一个例子:
  1. import concurrent.futures
  2. import time
  3.   
  4. def thread(num,t):
  5.   print("Thread %s started" %num)
  6.   time.sleep(t)
  7.   print("Thread %s ended" %num)
  8.   
  9. with concurrent.futures.ThreadPoolExecutor(max_workers=4) as t:
  10.    t.submit(thread(1,10))
  11.    t.submit(thread(2,1))
  12.    t.submit(thread(3,10))
  13.    t.submit(thread(4,4))
  14. print("Program ended")
复制代码
运行这段代码,我们会得到类似下面的结果。
  1. Thread 1 started
  2. Thread 1 ended
  3. Thread 2 started
  4. Thread 2 ended
  5. Thread 3 started
  6. Thread 3 ended
  7. Thread 4 started
  8. Thread 4 ended
  9. Program ended
复制代码
线程是连续执行的,而执行显然是分开的。从这个极其简洁的结果来看,ThreadPoolExecutor 内部提供了某种同步系统。即使它内部不应该有任何同步系统。如果我们不使用 ThreadPoolExecutor,而是像下面这样写之前的代码:
  1. import threading
  2. import time
  3.   
  4. def thread(num,t):
  5.   print("Thread %s started" %num)
  6.   time.sleep(t)
  7.   print("Thread %s ended" %num)
  8.   
  9. t1 = threading.Thread(target=thread, args=(1,10,))
  10. t2 = threading.Thread(target=thread, args=(2,1,))
  11. t3 = threading.Thread(target=thread, args=(3,10,))
  12. t4 = threading.Thread(target=thread, args=(4,4,))
  13. t1.start()
  14. t2.start()
  15. t3.start()
  16. t4.start()
  17. t1.join()
  18. t2.join()
  19. t3.join()
  20. t4.join()
  21. print("Program ended")
复制代码
运行它,我们会发现完全不同的行为:
  1. Thread 1 started
  2. Thread 2 started
  3. Thread 3 started
  4. Thread 4 started
  5. Thread 2 ended
  6. Thread 4 ended
  7. Thread 1 ended
  8. Thread 3 ended
  9. Program ended
复制代码
在这里,线程是同时并发启动的。因此,每个运行线程的执行时间取决于其持续时间。
2.1.3 线程竞争

下面的代码是一个简单明了的示例,可以说明两个相互竞争的线程是如何工作的(并发编程)。每个线程都被分配了一个不同的函数 addA() 和 addB(),它们模拟循环执行的某种操作,每次迭代分别需要时间 A 和时间 B。这两个线程将一起启动,由于在 Python 中它们不能并行执行,只能一次执行一个,因此在程序执行过程中,它们将竞争以尽快结束它们的循环(COUNT 等于 5 次迭代)。为了监控两个线程的执行顺序,每个周期的两个函数都会将对应于两个线程的字母 A 和 B 添加到字符串中:
  1. import threading
  2. import time
  3. sequence = ""
  4. COUNT = 5
  5. timeA = 5
  6. timeB = 10
  7. def addA():
  8.   global sequence
  9.   for i in range(COUNT):
  10.     time.sleep(timeA)
  11.     sequence = "%sA" %sequence
  12.     print("Sequence: %s" %sequence)
  13. def addB():
  14.   global sequence
  15.   for i in range(COUNT):
  16.     time.sleep(timeB)
  17.     sequence = "%sB" %sequence
  18.     print("Sequence: %s" %sequence)
  19. # the Main program
  20. t1 = threading.Thread(target = addA)
  21. t2 = threading.Thread(target = addB)
  22. t1.start()
  23. t2.start()
  24. t1.join()
  25. t2.join()
复制代码
在程序运行过程中,您可以看到执行顺序。因此,如果运行你刚才编写的代码,就会得到类似下面的序列:
  1. Sequence: A
  2. Sequence: AB
  3. Sequence: ABA
  4. Sequence: ABAA
  5. Sequence: ABAAB
  6. Sequence: ABAABA
  7. Sequence: ABAABAA
  8. Sequence: ABAABAAB
  9. Sequence: ABAABAABB
  10. Sequence: ABAABAABBB
复制代码
从结果中我们可以看到,两个线程以任意方式交替执行。你会发现,序列会随着执行的不同而变化。你还可以通过改变 timeA 和 timeB 变量的值来改变每个线程的执行时间。这将反过来影响相互竞争的两个线程的执行顺序。
2.1.4 使用Thread 子类

在前面的示例中,我们通过 Thread() 构造函数定义了一个线程,函数名称作为参数通过 target 参数传递给该构造函数:
  1. t = threading.Thread(target = function_name)
复制代码
在这种情况下,我们将函数中定义的代码所定义的线程分成了两个不同的实体。
另一种构思代码的方式是将新线程定义为线程子类,它有自己的方法,因此可以在其内部执行一些代码,而无需调用更多外部函数。这样,线程就是真正的对象,与面向对象编程一致。
  1. from threading import Thread
  2. import time
  3. sequence = ""
  4. COUNT = 5
  5. timeA = 1
  6. timeB = 2
  7. class ThreadA(Thread):
  8.   def __init__(self):
  9.     Thread.__init__(self)
  10.    
  11.   def run(self):
  12.     global sequence
  13.     for i in range(COUNT):
  14.       time.sleep(timeA)
  15.       sequence = "%sA" %sequence
  16.       print("Sequence: %s" %sequence)
  17. class ThreadB(Thread):
  18.   def __init__(self):
  19.     Thread.__init__(self)
  20.    
  21.   def run(self):
  22.     global sequence
  23.     for i in range(COUNT):
  24.       time.sleep(timeB)
  25.       sequence = "%sB" %sequence
  26.       print("Sequence: %s" %sequence)
  27. # the Main program
  28. t1 = ThreadA()
  29. t2 = ThreadB()
  30. t1.start()
  31. t2.start()
  32. t1.join()
  33. t2.join()
复制代码
来源:https://www.cnblogs.com/testing-/p/18548555
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具