翼度科技»论坛 编程开发 python 查看内容

Python并行编程2构建多线程程序(下):同步机制

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
2.2 同步机制

正如我们在上一节中所看到的,线程是并发执行的,因此也是同时运行的(但不是并行的)。这往往会产生不可预测的行为,如果不加以控制,可能会导致竞赛条件问题,尤其是在竞争访问共享资源时。为此,线程模块提供了一系列用于实现线程同步机制的类。这些类种类繁多,各有特点。在本章中,我们将看到所有这些类,每个类都将在一个简单的示例中实现,以便更好地理解它们的工作原理。
线程模块提供的同步对象有


  • RLock(可重入锁 Reentrant Lock)
  • 信号
  • 条件
  • 事件
2.2.1 锁

在线程模块提供的所有类中,Lock 是同步级别最低的一个。锁基本上有两种状态:

  • Locked
  • Unlocked
    并有两个方法:
  • acquire()
  • release()
这两个方法的功能是在锁定和解锁之间修改锁的状态。使用 Lock() 构造函数定义锁时,锁处于解锁状态。当一个线程调用 lock.acquire() 方法时,锁就会切换到锁定状态,并阻止该线程的执行,该线程将保持搁置状态。当另一个线程调用 lock.release() 方法时,锁将返回解锁状态,等待的线程将恢复执行。

如果没有很好地管理这种同步机制,它可能会导致比不使用同步机制更混乱的同步。事实上,可能有多个线程调用了 lock.acquire() 方法,它们都在等待至少另一个线程调用 lock.release(),将锁的状态从锁定变为解锁。在这种情况下,哪个等待线程将重新开始执行是不可预知的,在不同的实现中也会有所不同。
为了说明锁是如何工作的,让我们以两个线程分别执行不同的函数为例。我们将调用这两个函数 funcA() 和 funcB()。在第一章中,我们看到竞争线程共享进程内存。因此,在我们的示例中,我们只需使用一个共享变量,其中包含一个可供两个线程访问的整数。连接到 funcA() 的第一个线程会将该值增加 10,而相反,连接到 funcB() 的另一个线程会将该值减少 10。两个函数都将执行此操作 10 次。

因此,让我们编写下面的代码:
  1. import threading
  2. import time
  3. shared_data = 0
  4. def funcA():
  5. global shared_data
  6. for i in range(10):
  7.    local = shared_data
  8.    local += 10
  9.    time.sleep(1)
  10.    shared_data = local
  11.    print("Thread A wrote: %s" %shared_data)
  12. def funcB():
  13. global shared_data
  14. for i in range(10):
  15.    local = shared_data
  16.    local -= 10
  17.    time.sleep(1)
  18.    shared_data = local
  19.    print("Thread B wrote: %s" %shared_data)
  20. t1 = threading.Thread(target = funcA)
  21. t2 = threading.Thread(target = funcB)
  22. t1.start()
  23. t2.start()
  24. t1.join()
  25. t2.join()
复制代码
这应该是一个很好的线程间并发示例。我们运行代码,得到如下结果:
  1. Thread A wrote: 10
  2. Thread B wrote: -10
  3. Thread B wrote: -20
  4. Thread A wrote: 0
  5. Thread B wrote: -10
  6. Thread A wrote: 10
  7. Thread A wrote: 20
  8. Thread B wrote: 0
  9. Thread B wrote: -10
  10. Thread A wrote: 10
  11. Thread B wrote: 0
  12. Thread A wrote: 20
  13. Thread B wrote: 10
  14. Thread A wrote: 30
  15. Thread B wrote: 20
  16. Thread A wrote: 40
  17. Thread B wrote: 30
  18. Thread A wrote: 50
  19. Thread B wrote: 20
  20. Thread A wrote: 60
复制代码
首先,两个函数中 for 循环的迭代被分开并分别执行。在这种情况下,线程以原子方式执行每个 for 循环,而每个循环都在两个函数之间竞争。因此,在每个执行步骤中,funcA() 或 funcB() 的 for 循环中的一个会优先于另一个被执行(在 Python 中线程不能并行执行)。然而,这正是我们所期望的多线程同时运行的行为。到目前为止,一切正常。
问题出在各步骤中共享变量的值上。执行结束时的值应该是 0,而在这次执行中,它是 100(但这是一个完全随机的值,在不同的执行中都不一样)。因此,我们遇到了竞赛情况。此外,从我们读取的数据中可以看出,这种情况并非只发生过一次,而是在短短 10 个周期内频繁发生,而且只有 2 个线程。
因此,很明显,如果我们想让这个程序正常运行,就必须使用同步机制来协调两个线程访问共享变量,以避免出现竞赛条件现象。线程模块提供的 Lock 类是最简单的例子。
然后,我们在程序开头定义一个 Lock 类实例,并在两个线程中插入对 acquire() 和 release() 方法的调用,如下代码所示:
  1. def funcA():
  2.   global shared
  3.   for i in range(10):
  4.     lock.acquire()
  5.     shared += 10
  6.     print("Thread A wrote: %s" %shared)
  7.     lock.release()
  8.     time.sleep(1)
  9. def funcB():
  10.   global shared
  11.   for i in range(10):
  12.     lock.acquire()
  13.     shared -= 10
  14.     print("Thread B wrote: %s" %shared)
  15.     lock.release()
  16.     time.sleep(1)
  17.    
  18. t1 = threading.Thread(target = funcA)
  19. t2 = threading.Thread(target = funcB)
  20. t1.start()
  21. t2.start()
  22. t1.join()
  23. t2.join()
复制代码
如果这次运行代码,我们会得到截然不同的结果:
  1. Thread A wrote: 10
  2. Thread B wrote: 0
  3. Thread A wrote: 10
  4. Thread B wrote: 0
  5. Thread A wrote: 10
  6. Thread B wrote: 0
  7. Thread A wrote: 10
  8. Thread B wrote: 0
  9. Thread A wrote: 10
  10. Thread B wrote: 0
  11. Thread A wrote: 10
  12. Thread B wrote: 0
  13. Thread A wrote: 10
  14. Thread B wrote: 0
  15. Thread A wrote: 10
  16. Thread B wrote: 0
  17. Thread A wrote: 10
  18. Thread B wrote: 0
  19. Thread B wrote: -10
  20. Thread A wrote: 0
复制代码
正如我们所看到的,不再存在竞赛条件问题。每次只有一个线程以同步方式读取和修改共享变量,从而避免了错误值的产生。
2.2.2 带锁的上下文管理协议

线程模块中所有使用 acquire() 和 release() 方法的对象,如 Lock 对象,都可以通过 with 语句(见下面的注释)用于上下文管理器。
注:在 Python 中,with 语句创建了一个运行时上下文,允许在上下文管理器的控制下执行语句块。
  1. with expression:
  2. #code
复制代码
上下文管理器负责评估与 with (context) 相关代码块的表达式。因此,表达式必须返回一个实现上下文管理协议的对象,该协议主要由两个方法组成:

  • enter() 在进入上下文时调用。
  • 退出上下文时调用 exit() 方法。
    除此以外,with 语句的另一个优点是包含了 try ... finally 结构的功能。
这样,你就能得到可读性更强、更易于重用的代码。正是由于这些优点,标准库中的许多类都支持使用 with 语句来替代传统的结构体。
在锁的情况下,进入代码块时将调用 acquire() 方法,退出时将调用 release() 方法。
因此就有了这种形式:
  1. import threading
  2. import time
  3. shared_data = 0
  4. lock = threading.Lock()
  5. def funcA():
  6.   global shared_data
  7.   for i in range(10):
  8.     with lock:
  9.       local = shared_data
  10.       local += 10
  11.       time.sleep(1)
  12.       shared_data = local
  13.       print("Thread A wrote: %s" %shared_data)
  14.    
  15. def funcB():
  16.   global shared_data
  17.   for i in range(10):
  18.     with lock:
  19.       local = shared_data
  20.       local -= 10
  21.       time.sleep(1)
  22.       shared_data = local
  23.       print("Thread B wrote: %s" %shared_data)
  24. t1 = threading.Thread(target = funcA)
  25. t2 = threading.Thread(target = funcB)
  26. t1.start()
  27. t2.start()
  28. t1.join()
  29. t2.join()
复制代码
然后,我们使用支持的上下文管理器协议重写之前的代码:
  1. Thread A wrote: 10
  2. Thread B wrote: 0
  3. Thread A wrote: 10
  4. Thread B wrote: 0
  5. Thread B wrote: -10
  6. Thread B wrote: -20
  7. Thread B wrote: -30
  8. Thread B wrote: -40
  9. Thread B wrote: -50
  10. Thread B wrote: -60
  11. Thread B wrote: -70
  12. Thread B wrote: -80
  13. Thread A wrote: -70
  14. Thread A wrote: -60
  15. Thread A wrote: -50
  16. Thread A wrote: -40
  17. Thread A wrote: -30
  18. Thread A wrote: -20
  19. Thread A wrote: -10
  20. Thread A wrote: 0
复制代码
我们可以看到,代码的可读性大大提高。如果运行这段代码,我们不会发现其行为与之前的代码有任何不同。
本章稍后将介绍的线程模块中的其他对象也支持使用 with 语句的上下文管理器协议:

  • RLock
  • 条件
  • 信号
    所有对象都与 Lock 一样,在同步机制中使用 acquire() 和 release() 方法。
2.2.3 另一种可能的锁同步解决方案

让我们继续分析前面的代码。正如我们所看到的,我们添加了一种同步机制,它似乎可以完全(或至少几乎)取消两个线程的并发行为。
我们创建的机制是最直观的,即在每个线程中成对地添加对 acquire() 和 release() 方法的调用,以便划分部分块。使用带语句的上下文管理器,一切都很清楚:我们在两个线程中都有两个对称的代码部分。
但我们并不必须以这种方式运行。你可以尝试找到更复杂的同步条件。风险自担。事实上,可以在代码中彼此不对称的位置,不对称地插入对 acquire() 和 release() 方法的调用,有时在一个线程中使用 acquire(),而在另一个线程中使用 release()。这样一来,上下文管理器可识别的代码块就丢失了,而传递的同步控制却复杂得多。在这种情况下,不仅会出现竞赛条件问题,甚至还会出现死锁。此外,如果在锁状态解锁时调用 release() 方法,会导致执行错误。不过,不要灰心,做一些测试,也许同步并发解决方案是可行的。
例如,如果我们以下面的方式修改代码:
  1. import threading
  2. import time
  3. shared = 0
  4. lock = threading.Lock()
  5. def funcA():
  6.   global shared
  7.   for i in range(10):
  8.     time.sleep(1)
  9.     shared += 10
  10.     print("Thread A wrote: %s" %shared)
  11.     lock.acquire()
  12. def funcB():
  13.   global shared
  14.   lock.acquire()
  15.   for i in range(10):
  16.     time.sleep(1)
  17.     shared -= 10
  18.     print("Thread B wrote: %s" %shared)
  19.     lock.release()
  20.    
  21. t1 = threading.Thread(target = funcA)
  22. t2 = threading.Thread(target = funcB)
  23. t1.start()
  24. t2.start()
  25. t1.join()
  26. t2.join()
复制代码
有很多改动。例如,funcA() 函数不再调用 release() 方法,而在每次迭代结束时调用 acquire() 方法。而在 funcB() 函数中,acquire() 方法则在执行开始时,即 for 循环之外调用。
运行修改后的代码,我们将得到如下结果:
  1. Thread B wrote: -10
  2. Thread A wrote: 0
  3. Thread B wrote: -10
  4. Thread A wrote: 0
  5. Thread A wrote: 10
  6. Thread B wrote: 0
  7. Thread A wrote: 10
  8. Thread B wrote: 0
  9. Thread A wrote: 10
  10. Thread B wrote: 0
  11. Thread A wrote: 10
  12. Thread B wrote: 0
  13. Thread B wrote: -10
  14. Thread A wrote: 0
  15. Thread B wrote: -10
  16. Thread A wrote: 0
  17. Thread A wrote: 10
  18. Thread B wrote: 0
  19. Thread B wrote: -10
  20. Thread A wrote: 0
复制代码
我们可以看到,这次一切似乎都很顺利。我们最终得到了一个 0 的最终共享值,并重新获得了两个线程之间的并发行为。事实上,两个线程之间 for 循环的迭代顺序恢复了随机和并发。通过多次运行程序,我们注意到行为依然正确,即使在这些情况下我们永远无法获得绝对的确定性。
如果我们不能确定两个线程是否交替进行,而是其中一个线程执行受阻,而另一个线程继续执行,那么我们可以(在调试阶段)在之前的代码中添加一些打印值,以确认两个线程的执行进度。在本例中,我们可以在报告各线程计数结果的字符串中添加迭代次数。这样,我们就能同时确定两个线程的执行进度:
  1. import threading
  2. import time
  3. shared = 0
  4. lock = threading.Lock()
  5. def funcA():
  6.   global shared
  7.   for i in range(10):
  8.     time.sleep(1)
  9.     shared += 10
  10.     print("Thread A wrote: %s, %i" %(shared,i))
  11.     lock.acquire()
  12. def funcB():
  13.   global shared
  14.   lock.acquire()
  15.   for i in range(10):
  16.     time.sleep(1)
  17.     shared -= 10
  18.     print("Thread B wrote: %s, %i" %(shared,i))
  19.     lock.release()
  20.    
  21. t1 = threading.Thread(target = funcA)
  22. t2 = threading.Thread(target = funcB)
  23. t1.start()
  24. t2.start()
  25. t1.join()
  26. t2.join()
复制代码
在修改代码后立即运行代码,我们会发现两个线程都在交替执行:
  1. Thread A wrote: 10, 0
  2. Thread B wrote: 0, 0
  3. Thread B wrote: -10, 1
  4. Thread A wrote: 0, 1
  5. Thread B wrote: -10, 2
  6. Thread A wrote: 0, 2
  7. Thread B wrote: -10, 3
  8. Thread A wrote: 0, 3
  9. Thread B wrote: -10, 4
  10. Thread A wrote: 0, 4
  11. Thread B wrote: -10, 5
  12. Thread A wrote: 0, 5
  13. Thread A wrote: 10, 6
  14. Thread B wrote: 0, 6
  15. Thread A wrote: 10, 7
  16. Thread B wrote: 0, 7
  17. Thread A wrote: 10, 8
  18. Thread B wrote: 0, 8
  19. Thread B wrote: -10, 9
  20. Thread A wrote: 0, 9
复制代码
2.2.4 RLock

另一个用于线程同步的类是 RLock,它是一种重入锁。该类与 Lock 类非常相似,但不同的是,它可以被同一线程多次获取。在它的内部,除了锁定-解锁状态外,还有关于所有者线程和递归级别的信息。
与锁一样,RLock 也可以通过 acquire() 方法从线程中获取。此时,RLock 变成锁定状态,调用线程成为所有者之一。同样,RLock 也可以通过调用 release() 方法来解锁。但这次与 Lock 同步机制不同的是,调用 acquire() 和 release() 方法的线程对是多个,可以嵌套在一起。调用 acquire() 方法的其他线程将被添加到所有者列表中。只有最后的 release() 方法才能解锁 RLock,并确保另一个线程可以重新启动。
举个例子,我们使用一个函数,其中有两个嵌套的 for 循环,在这两个层级中都可以访问共享变量。此外,我们还可以通过改变每个线程的执行时间来区分这三个线程,这样可以更加突出并发行为:
  1. import threading
  2. import time
  3. shared = 0
  4. rlock = threading.RLock()
  5. def func(name, t):
  6.   global shared
  7.   for i in range(3):
  8.     rlock.acquire()
  9.     local = shared
  10.     time.sleep(t)
  11.     for j in range(2):
  12.       rlock.acquire()
  13.       local += 1
  14.       time.sleep(2)
  15.       shared = local
  16.       print("Thread %s-%s wrote: %s" %(name, j, shared))
  17.       rlock.release()  
  18.     shared = local + 1
  19.     print("Thread %s wrote: %s" %(name, shared))
  20.     rlock.release()
  21. t1 = threading.Thread(target = func,args=('A',2,))
  22. t2 = threading.Thread(target = func,args=('B',10,))
  23. t3 = threading.Thread(target = func,args=('C',1,))
  24. t1.start()
  25. t2.start()
  26. t3.start()
  27. t1.join()
  28. t2.join()
  29. t3.join()
复制代码
运行前面的代码,我们将得到如下结果:
  1. Thread A-0 wrote: 1
  2. Thread A-1 wrote: 2
  3. Thread A wrote: 3
  4. Thread A-0 wrote: 4
  5. Thread A-1 wrote: 5
  6. Thread A wrote: 6
  7. Thread A-0 wrote: 7
  8. Thread A-1 wrote: 8
  9. Thread A wrote: 9
  10. Thread B-0 wrote: 10
  11. Thread B-1 wrote: 11
  12. Thread B wrote: 12
  13. Thread B-0 wrote: 13
  14. Thread B-1 wrote: 14
  15. Thread B wrote: 15
  16. Thread B-0 wrote: 16
  17. Thread B-1 wrote: 17
  18. Thread B wrote: 18
  19. Thread C-0 wrote: 19
  20. Thread C-1 wrote: 20
  21. Thread C wrote: 21
  22. Thread C-0 wrote: 22
  23. Thread C-1 wrote: 23
  24. Thread C wrote: 24
  25. Thread C-0 wrote: 25
  26. Thread C-1 wrote: 26
  27. Thread C wrote: 27
复制代码
另外,在这种情况下,与锁的情况一样,同步导致了共享变量的完美管理,但却失去了线程的并发行为。
2.2.5 信号(Semaphore)

线程模块中的另一种同步机制是基于信号的同步机制。这种原始机制是计算机科学历史上最古老的同步形式,由 Edsger W. Dijkstra 于 1962 年发明。
其目的是同步管理同一进程中多个线程对共享资源的使用。为此,每个信号都与共享资源相关联,允许所有线程访问,直到其内部计数器的值为负为止。
信号是一个对象,它与锁类似,通过调用 acquire() 和 release() 方法来工作。其内部有一个计数器,每次调用 acquire() 方法都会递减一个单位,而每次调用 release() 方法都会递增一个单位。
因此,如果一个线程需要访问受 Semaphore 保护的共享资源,它首先会调用 acquire() 方法。交通灯的内部计数器会减少一个单位。如果该值等于或大于零,则线程将访问该资源,否则将被阻塞,并等待其他线程对同一资源调用 release()。只有到那时,线程才能继续执行,访问必要的资源。
因此,非常重要的一点是,每个调用 acquire() 的线程在结束对共享资源的操作后,都要调用 release() 方法,这样其他线程也能访问资源,避免出现死锁:

这种编程模型基于两类在数据流中起作用的对象。生产者通常通过从外部资源获取数据来生成数据,而消费者则使用生产者生成的数据。问题是,这两个对象各自独立工作,速度不同且可变。它们的数量也可能不同。例如,可以只有一个生产者和多个消费者,反之亦然。这种模型非常适合线程(也适合进程),因此在这些示例中引入这种模型是个好主意。
作为使用 Semaphore 同步的示例,我们将定义两个线程子类: 消费者(Consumer)和生产者(Producer)。在它们的 run() 方法中,我们将执行它们的代码。在生产者中,我们将实现一个 request() 函数,模拟从外部来源请求数据,并通过 time.sleep() 抽出一定的时间。
因此,让我们编写以下代码:
  1. from threading import Thread, Semaphore
  2. import time
  3. import random
  4. semaphore = Semaphore(1)
  5. shared = 1
  6. class Consumer(Thread):
  7.   def __init__(self):
  8.     Thread.__init__(self)
  9.     global semaphore
  10.   def run(self):
  11.     global shared
  12.     semaphore.acquire()
  13.     print("consumer has used this: %s" %shared)
  14.     shared = 0
  15.     semaphore.release()
  16. class Producer(Thread):
  17.   def __init__(self):
  18.     Thread.__init__(self)
  19.     global semaphore
  20.   def request(self):
  21.     time.sleep(1)
  22.     return random.randint(0,100)
  23.   def run(self):
  24.     global shared
  25.     semaphore.acquire()
  26.     shared = self.request()
  27.     print("producer has loaded this: %s" %shared)
  28.     semaphore.release()
  29. t1 = Producer()
  30. t2 = Consumer()
  31. t1.start()
  32. t2.start()
  33. t1.join()
  34. t2.join()
复制代码
执行刚才编写的代码,我们将得到类似下面的结果:
  1. producer has loaded this: 39
  2. consumer has used this: 39
复制代码
由于 semaphores 的同步机制也是通过 acquire() 和 release() 方法实现的,因此 semaphores 也支持上下文管理协议。因此,我们可以将前面的代码编写如下:
  1. from threading import Thread, Semaphore
  2. import time
  3. import random
  4. semaphore = Semaphore(1)
  5. shared = 1
  6. class Consumer(Thread):
  7.   def __init__(self):
  8.     Thread.__init__(self)
  9.     global semaphore
  10.   def run(self):
  11.     global shared
  12.     with semaphore:
  13.       print("consumer has used this: %s" %shared)
  14.       shared = 0
  15. class Producer(Thread):
  16.   def __init__(self):
  17.     Thread.__init__(self)
  18.     global semaphore
  19.   def request(self):
  20.     time.sleep(1)
  21.     return random.randint(0,100)
  22.   def run(self):
  23.     global shared
  24.     with semaphore:
  25.       shared = self.request()
  26.       print("producer has loaded this: %s" %shared)
  27.       
  28. t1 = Producer()
  29. t2 = Consumer()
  30. t1.start()
  31. t2.start()
  32. t1.join()
  33. t2.join()
复制代码
执行后会得到相同的结果。
  1. producer has loaded this: 4
  2. consumer has used this: 4
复制代码
在这种情况下,我们有两个线程,即生产者和消费者,它们只运行一次;也就是说,它们产生一个值,然后被消耗掉。但是,如果我们让生产者线程产生更多的数值,让消费者线程消耗同样多的数值,会发生什么情况呢?
我们重写代码,让每个线程执行上述操作五次:
  1. from threading import Thread, Semaphore
  2. import time
  3. import random
  4. semaphore = Semaphore(1)
  5. shared = 1
  6. count = 5
  7. class consumer(Thread):
  8.   def __init__(self, count):
  9.     Thread.__init__(self)
  10.     global semaphore
  11.     self.count = count
  12.   def run(self):
  13.     global shared
  14.     for i in range(self.count):
  15.       semaphore.acquire()
  16.       print("consumer has used this: %s" %shared)
  17.       shared = 0
  18.       semaphore.release()
  19. class producer(Thread):
  20.   def __init__(self, count):
  21.     Thread.__init__(self)
  22.     self.count = count
  23.     global semaphore
  24.   def request(self):
  25.     time.sleep(1)
  26.     return random.randint(0,100)
  27.   def run(self):
  28.     global shared
  29.     for i in range(self.count):
  30.       semaphore.acquire()
  31.       shared = self.request()
  32.       print("producer has loaded this: %s" %shared)
  33.       semaphore.release()
  34. t1 = producer(count)
  35. t2 = consumer(count)
  36. t1.start()
  37. t2.start()
  38. t1.join()
  39. t2.join()
复制代码
运行代码后,我们得到如下结果:
  1. producer has loaded this: 29
  2. producer has loaded this: 70
  3. producer has loaded this: 10
  4. producer has loaded this: 1
  5. producer has loaded this: 32
  6. consumer has used this: 32
  7. consumer has used this: 0
  8. consumer has used this: 0
  9. consumer has used this: 0
  10. consumer has used this: 0
复制代码
这肯定不是我们想要的结果。事实上,生产者线程继续产生覆盖共享资源的值,但消费者线程被阻塞,直到生产者完成工作(五个周期)后才开始。消费者线程在生产者的五个周期结束后开始工作,只消耗最后产生的值,而丢失前面的四个值。
此外,在这种情况下,就像锁的例子一样,用信号传递器对线程代码块进行原子管理不再适合我们的需要。我们所说的原子性是指精确性:
因此,让我们分解一下两个线程之间的 acquire() 和 release() 调用机制。首先必须访问共享资源的生产者线程会在覆盖数据前调用 acquire() 方法。而消费者线程在消耗完共享资源后,将调用 release() 方法,释放资源:
  1. from threading import Thread, Semaphore
  2. import time
  3. import random
  4. semaphore = Semaphore(1)
  5. shared = 1
  6. count = 5
  7. def request():
  8.   time.sleep(1)
  9.   return random.randint(0,100)
  10. class consumer(Thread):
  11.   def __init__(self, count):
  12.     Thread.__init__(self)
  13.     global semaphore
  14.     self.count = count
  15.   def run(self):
  16.     global shared
  17.     for i in range(self.count):
  18.       semaphore.acquire()
  19.       print("consumer has used this: %s" %shared)
  20.       shared = 0
  21. class producer(Thread):
  22.   def __init__(self, count):
  23.     Thread.__init__(self)
  24.     self.count = count
  25.     global semaphore
  26.   def run(self):
  27.     global shared
  28.     for i in range(self.count):
  29.       shared = request()
  30.       print("producer has loaded this: %s" %shared)
  31.       semaphore.release()
  32. t1 = producer(count)
  33. t2 = consumer(count)
  34. t1.start()
  35. t2.start()
  36. t1.join()
  37. t2.join()
复制代码
执行结果如下:
  1. consumer has used this: 1
  2. producer has loaded this: 33
  3. consumer has used this: 33
  4. producer has loaded this: 40
  5. consumer has used this: 40
  6. producer has loaded this: 99
  7. consumer has used this: 99
  8. producer has loaded this: 25
  9. consumer has used this: 25
  10. producer has loaded this: 20
复制代码
从结果可以看出,同步还不够完美。我们看到消费者线程和生产者线程之间的活动交替进行。但第一个访问共享资源的是消费者,因此第一个消耗的值是起始默认值 1。而生产者在程序结束时产生的值不会被消耗。要解决这个问题并正确调整执行顺序,只需将 Semaphore 的初始内部值设置为 0,而不是 1。

参考资料

2.2.6 条件(Condition)

除了 Semaphores 之外,Condition 类也可用于线程同步。Condition 类有一个内部锁,通过 acquire() 和 release() 可以实现锁定和解锁状态。除此之外,它还有其他相关方法。wait() 方法会释放锁,但会阻塞线程,直到另一个线程调用 notify() 和 notify_all() 方法。
如果有条件变量,notify()方法只会唤醒其中一个等待条件变量的线程。而 notify_all() 方法会唤醒所有等待的线程。
让我们回到之前使用 Semaphore 的代码,这次使用 Condition 作为线程同步系统。然后,我们修改之前编写的代码,最后得到如下代码:
  1. from threading import Thread, Condition
  2. import time
  3. import random
  4. condition = Condition()
  5. shared = 1
  6. count = 5
  7. class Consumer(Thread):
  8.   def __init__(self, count):
  9.     Thread.__init__(self)
  10.     global condition
  11.     self.count = count
  12.   
  13.   def run(self):
  14.     global shared
  15.     for i in range(self.count):
  16.       condition.acquire()
  17.       if shared == 0:
  18.         condition.wait()
  19.       print("consumer has used this: %s" %shared)
  20.       shared = 0
  21.       condition.notify()
  22.       condition.release()
  23. class Producer(Thread):
  24.   def __init__(self, count):
  25.     Thread.__init__(self)
  26.     self.count = count
  27.     global condition
  28.   def request(self):
  29.     time.sleep(1)
  30.     return random.randint(0,100)
  31.   def run(self):
  32.     global shared
  33.     for i in range(self.count):
  34.       condition.acquire()
  35.       shared = self.request()
  36.       print("producer has loaded this: %s" %shared)
  37.       condition.wait()
  38.       if shared == 0:
  39.         condition.notify()
  40.       condition.release()
  41. t1 = Producer(count)
  42. t2 = Consumer(count)
  43. t1.start()
  44. t2.start()
  45. t1.join()
  46. t2.join()
复制代码
如果我们运行这段代码,就会得到类似下面的结果:
  1. producer has loaded this: 59
  2. consumer has used this: 59
  3. producer has loaded this: 76
  4. consumer has used this: 76
  5. producer has loaded this: 29
  6. consumer has used this: 29
  7. producer has loaded this: 15
  8. consumer has used this: 15
  9. producer has loaded this: 76
  10. consumer has used this: 76
复制代码

2.2.7 事件(Event)

除了 Semaphore 和 Condition 之外,还有另一种同步机制。事件(Event)的使用在概念上是最简单的。所有这些都可以理解为线程之间的基本通信机制,其中一个线程向另一个等待事件发生的线程发出信号。
事件对象管理一个内部布尔标志。还有两个方法可以确定其值。set() 方法将标志值设置为 True,而 clear() 方法则将标志值设置为 False。False 是事件对象创建时的默认值。还有第三个 wait() 方法会阻塞线程,直到标志变为 True。
换句话说,一个线程在执行过程中,通过调用 wait() 方法冻结,等待事件发生后才能继续执行。在另一个线程中,当该事件发生时,会调用 set() 方法解锁前一个线程,该线程执行其操作,然后调用 clear() 重置一切。

在生产者(Producer)和消费者(Consumer)两个线程的示例中,我们可以用基于事件的同步机制取代之前的机制。
让我们对前面示例的代码作如下修改:
  1. from threading import Thread, Event
  2. import time
  3. import random
  4. event = Event()
  5. shared = 1
  6. count = 5
  7. class Consumer(Thread):
  8.   def __init__(self, count):
  9.     Thread.__init__(self)
  10.     global event
  11.     self.count = count
  12.   
  13.   def run(self):
  14.     global shared
  15.     for i in range(self.count):
  16.       event.wait()
  17.       print("consumer has used this: %s" %shared)
  18.       shared = 0
  19.       event.clear()
  20. class Producer(Thread):
  21.   def __init__(self, count):
  22.     Thread.__init__(self)
  23.     self.count = count
  24.     global event
  25.   def request(self):
  26.     time.sleep(1)
  27.     return random.randint(0,100)
  28.   def run(self):
  29.     global shared
  30.     for i in range(self.count):
  31.       shared = self.request()
  32.       print("producer has loaded this: %s" %shared)
  33.       event.set()
  34. t1 = Producer(count)
  35. t2 = Consumer(count)
  36. t1.start()
  37. t2.start()
  38. t1.join()
  39. t2.join()
复制代码
运行代码后,我们会得到类似下面的结果:
  1. producer has loaded this: 11
  2. consumer has used this: 11
  3. producer has loaded this: 84
  4. consumer has used this: 84
  5. producer has loaded this: 78
  6. consumer has used this: 78
  7. producer has loaded this: 27
  8. consumer has used this: 27
  9. producer has loaded this: 16
  10. consumer has used this: 16
复制代码
我们还可以看到,在这种情况下,两个线程之间的同步是完美的。

2.2.8 队列(Queue)

我们继续开发前面的示例。到目前为止,我们只使用了一个生产者线程和一个消费者线程。但如果增加数量会怎样呢?

在这种情况下,队列(Queue)可以帮助我们。
  1. from threading import Thread
  2. from queue import Queue
  3. import time
  4. import random
  5. queue = Queue()
  6. shared = 1
  7. count = 5
  8. class Consumer(Thread):
  9.   def __init__(self, count):
  10.     Thread.__init__(self)
  11.     self.count = count
  12.   
  13.   def run(self):
  14.     global queue
  15.     for i in range(self.count):
  16.       local = queue.get()
  17.       print("consumer has used this: %s" %local)
  18.       queue.task_done()
  19. class Producer(Thread):
  20.   def __init__(self, count):
  21.     Thread.__init__(self)
  22.     self.count = count
  23.   def request(self):
  24.     time.sleep(1)
  25.     return random.randint(0,100)
  26.   def run(self):
  27.     global queue
  28.     for i in range(self.count):
  29.       local = self.request()
  30.       queue.put(local)
  31.       print("producer has loaded this: %s" %local)
  32. t1 = Producer(count)
  33. t2 = Producer(count)
  34. t3 = Consumer(count)
  35. t4 = Consumer(count)
  36. t1.start()
  37. t2.start()
  38. t3.start()
  39. t4.start()
  40. t1.join()
  41. t2.join()
  42. t3.join()
  43. t4.join()
复制代码
运行代码将得到以下结果:
  1. producer has loaded this: 45
  2. consumer has used this: 45
  3. producer has loaded this: 6
  4. consumer has used this: 6
  5. producer has loaded this: 90
  6. consumer has used this: 90
  7. producer has loaded this: 49
  8. consumer has used this: 49
  9. producer has loaded this: 38
  10. consumer has used this: 38
  11. producer has loaded this: 40
  12. consumer has used this: 40
  13. producer has loaded this: 60
  14. consumer has used this: 60
  15. producer has loaded this: 75
  16. consumer has used this: 75
  17. producer has loaded this: 72
  18. consumer has used this: 72
  19. consumer has used this: 18
  20. producer has loaded this: 18
复制代码

2.3 结论

在本章中,我们全面介绍了线程模块提供的所有工具。我们看到了如何通过调用函数、使用子类或将线程插入 ThreadPoolExecutor 等不同方式在程序中定义一组线程。我们还研究了各种可能的线程同步机制,以及它们之间的区别。无论如何,我们已经看到了线程行为的不可预测性,以及遇到竞赛条件问题有多么容易。在下一章中,我们将转向真正的并行编程,在 Python 中,并行编程完全用进程来表达。我们将讨论标准 Python 库提供的多进程模块。
需要记住的要点

  • 同步方法: 当您想使用多个线程共享的内存时,这些方法是必不可少的。
  • 过度同步: 这会抑制线程的并发行为,使它们串行执行。

来源:https://www.cnblogs.com/testing-/p/18549243
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具