翼度科技»论坛 编程开发 python 查看内容

[python]多线程快速入门

2

主题

2

帖子

6

积分

新手上路

Rank: 1

积分
6
前言

线程是操作系统能够进行运算调度的最小单位,它被包含在进程之中,是进程中的实际运作单位。由于CPython的GIL限制,多线程实际为单线程,大多只用来处理IO密集型任务。
Python一般用标准库threading来进行多线程编程。
基本使用


  • 方式1,创建threading.Thread类的示例
  1. import threading
  2. import time
  3. def task1(counter: int):
  4.     print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
  5.     num = counter
  6.     while num > 0:
  7.         time.sleep(3)
  8.         num -= 1
  9.     print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
  10. if __name__ == "__main__":
  11.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  12.     # 创建三个线程
  13.     t1 = threading.Thread(target=task1, args=(7,))
  14.     t2 = threading.Thread(target=task1, args=(5,))
  15.     t3 = threading.Thread(target=task1, args=(3,))
  16.     # 启动线程
  17.     t1.start()
  18.     t2.start()
  19.     t3.start()
  20.     # join() 用于阻塞主线程, 等待子线程执行完毕
  21.     t1.join()
  22.     t2.join()
  23.     t3.join()
  24.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
执行输出示例
  1. main thread: MainThread, start time: 2024-10-26 12:42:37
  2. thread: Thread-1 (task1), args: 7, start time: 2024-10-26 12:42:37
  3. thread: Thread-2 (task1), args: 5, start time: 2024-10-26 12:42:37
  4. thread: Thread-3 (task1), args: 3, start time: 2024-10-26 12:42:37
  5. thread: Thread-3 (task1), args: 3, end time: 2024-10-26 12:42:46
  6. thread: Thread-2 (task1), args: 5, end time: 2024-10-26 12:42:52
  7. thread: Thread-1 (task1), args: 7, end time: 2024-10-26 12:42:58
  8. main thread: MainThread, end time: 2024-10-26 12:42:58
复制代码

  • 方式2,继承threading.Thread类,重写run()和__init__()方法
  1. import threading
  2. import time
  3. class MyThread(threading.Thread):
  4.     def __init__(self, counter: int):
  5.         super().__init__()
  6.         self.counter = counter
  7.     def run(self):
  8.         print(f"thread: {threading.current_thread().name}, args: {self.counter}, start time: {time.strftime('%F %T')}")
  9.         num = self.counter
  10.         while num > 0:
  11.             time.sleep(3)
  12.             num -= 1
  13.         print(f"thread: {threading.current_thread().name}, args: {self.counter}, end time: {time.strftime('%F %T')}")
  14. if __name__ == "__main__":
  15.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  16.     # 创建三个线程
  17.     t1 = MyThread(7)
  18.     t2 = MyThread(5)
  19.     t3 = MyThread(3)
  20.     # 启动线程
  21.     t1.start()
  22.     t2.start()
  23.     t3.start()
  24.     # join() 用于阻塞主线程, 等待子线程执行完毕
  25.     t1.join()
  26.     t2.join()
  27.     t3.join()
  28.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
继承threading.Thread类也可以写成这样,调用外部函数。
  1. import threading
  2. import time
  3. def task1(counter: int):
  4.     print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
  5.     num = counter
  6.     while num > 0:
  7.         time.sleep(3)
  8.         num -= 1
  9.     print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
  10. class MyThread(threading.Thread):
  11.     def __init__(self, target, args: tuple):
  12.         super().__init__()
  13.         self.target = target
  14.         self.args = args
  15.    
  16.     def run(self):
  17.         self.target(*self.args)
  18. if __name__ == "__main__":
  19.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  20.     # 创建三个线程
  21.     t1 = MyThread(target=task1, args=(7,))
  22.     t2 = MyThread(target=task1, args=(5,))
  23.     t3 = MyThread(target=task1, args=(3,))
  24.     # 启动线程
  25.     t1.start()
  26.     t2.start()
  27.     t3.start()
  28.     # join() 用于阻塞主线程, 等待子线程执行完毕
  29.     t1.join()
  30.     t2.join()
  31.     t3.join()
  32.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
多线程同步

如果多个线程共同对某个数据修改,则可能出现不可预料的后果,这时候就需要某些同步机制。比如如下代码,结果是随机的(个人电脑用python3.13实测结果都是0,而低版本的python3.6运行结果的确是随机的)
  1. import threading
  2. import time
  3. num = 0
  4. def task1(counter: int):
  5.     print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
  6.     global num
  7.     for _ in range(100000000):
  8.         num = num + counter
  9.         num = num - counter
  10.     print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
  11. if __name__ == "__main__":
  12.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  13.     # 创建三个线程
  14.     t1 = threading.Thread(target=task1, args=(7,))
  15.     t2 = threading.Thread(target=task1, args=(5,))
  16.     t3 = threading.Thread(target=task1, args=(3,))
  17.     t4 = threading.Thread(target=task1, args=(6,))
  18.     t5 = threading.Thread(target=task1, args=(8,))
  19.     # 启动线程
  20.     t1.start()
  21.     t2.start()
  22.     t3.start()
  23.     t4.start()
  24.     t5.start()
  25.     # join() 用于阻塞主线程, 等待子线程执行完毕
  26.     t1.join()
  27.     t2.join()
  28.     t3.join()
  29.     t4.join()
  30.     t5.join()
  31.    
  32.     print(f"num: {num}")
  33.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
Lock-锁

使用互斥锁可以在一个线程访问数据时,拒绝其它线程访问,直到解锁。threading.Thread中的Lock()和Rlock()可以提供锁功能。
  1. import threading
  2. import time
  3. num = 0
  4. mutex = threading.Lock()
  5. def task1(counter: int):
  6.     print(f"thread: {threading.current_thread().name}, args: {counter}, start time: {time.strftime('%F %T')}")
  7.     global num
  8.     mutex.acquire()
  9.     for _ in range(100000):
  10.         num = num + counter
  11.         num = num - counter
  12.     mutex.release()
  13.     print(f"thread: {threading.current_thread().name}, args: {counter}, end time: {time.strftime('%F %T')}")
  14. if __name__ == "__main__":
  15.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  16.     # 创建三个线程
  17.     t1 = threading.Thread(target=task1, args=(7,))
  18.     t2 = threading.Thread(target=task1, args=(5,))
  19.     t3 = threading.Thread(target=task1, args=(3,))
  20.     # 启动线程
  21.     t1.start()
  22.     t2.start()
  23.     t3.start()
  24.     # join() 用于阻塞主线程, 等待子线程执行完毕
  25.     t1.join()
  26.     t2.join()
  27.     t3.join()
  28.    
  29.     print(f"num: {num}")
  30.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
Semaphore-信号量

互斥锁是只允许一个线程访问共享数据,而信号量是同时允许一定数量的线程访问共享数据。比如银行有5个窗口,允许同时有5个人办理业务,后面的人只能等待,待柜台有空闲才可以进入。
  1. import threading
  2. import time
  3. from random import randint
  4. semaphore = threading.BoundedSemaphore(5)
  5. def business(name: str):
  6.     semaphore.acquire()
  7.     print(f"{time.strftime('%F %T')} {name} is handling")
  8.     time.sleep(randint(3, 10))
  9.     print(f"{time.strftime('%F %T')} {name} is done")
  10.     semaphore.release()
  11. if __name__ == "__main__":
  12.     print(f"main thread: {threading.current_thread().name}, start time: {time.strftime('%F %T')}")
  13.     threads = []
  14.     for i in range(10):
  15.         t = threading.Thread(target=business, args=(f"thread-{i}",))
  16.         threads.append(t)
  17.     for t in threads:
  18.         t.start()
  19.     for t in threads:
  20.         t.join()
  21.    
  22.     print(f"main thread: {threading.current_thread().name}, end time: {time.strftime('%F %T')}")
复制代码
执行输出
  1. main thread: MainThread, start time: 2024-10-26 17:40:10
  2. 2024-10-26 17:40:10 thread-0 is handling
  3. 2024-10-26 17:40:10 thread-1 is handling
  4. 2024-10-26 17:40:10 thread-2 is handling
  5. 2024-10-26 17:40:10 thread-3 is handling
  6. 2024-10-26 17:40:10 thread-4 is handling
  7. 2024-10-26 17:40:15 thread-2 is done
  8. 2024-10-26 17:40:15 thread-5 is handling
  9. 2024-10-26 17:40:16 thread-0 is done
  10. 2024-10-26 17:40:16 thread-6 is handling
  11. 2024-10-26 17:40:19 thread-3 is done
  12. 2024-10-26 17:40:19 thread-4 is done
  13. 2024-10-26 17:40:19 thread-7 is handling
  14. 2024-10-26 17:40:19 thread-8 is handling
  15. 2024-10-26 17:40:20 thread-1 is done
  16. 2024-10-26 17:40:20 thread-9 is handling
  17. 2024-10-26 17:40:21 thread-6 is done
  18. 2024-10-26 17:40:23 thread-7 is done
  19. 2024-10-26 17:40:24 thread-5 is done
  20. 2024-10-26 17:40:24 thread-8 is done
  21. 2024-10-26 17:40:30 thread-9 is done
  22. main thread: MainThread, end time: 2024-10-26 17:40:30
复制代码
Condition-条件对象

Condition对象能让一个线程A停下来,等待其他线程,其他线程通知后线程A继续运行。
  1. import threading
  2. import time
  3. import random
  4. class Employee(threading.Thread):
  5.     def __init__(self, username: str, cond: threading.Condition):
  6.         self.username = username
  7.         self.cond = cond
  8.         super().__init__()
  9.     def run(self):
  10.         with self.cond:
  11.             print(f"{time.strftime('%F %T')} {self.username} 到达公司")
  12.             self.cond.wait()  # 等待通知
  13.             print(f"{time.strftime('%F %T')} {self.username} 开始工作")
  14.             time.sleep(random.randint(1, 5))
  15.             print(f"{time.strftime('%F %T')} {self.username} 工作完成")
  16. class Boss(threading.Thread):
  17.     def __init__(self, username: str, cond: threading.Condition):
  18.         self.username = username
  19.         self.cond = cond
  20.         super().__init__()
  21.     def run(self):
  22.         with self.cond:
  23.             print(f"{time.strftime('%F %T')} {self.username} 发出通知")
  24.             self.cond.notify_all()  # 通知所有线程
  25.         time.sleep(2)
  26. if __name__ == "__main__":
  27.     cond = threading.Condition()
  28.     boss = Boss("老王", cond)
  29.    
  30.     employees = []
  31.     for i in range(5):
  32.         employees.append(Employee(f"员工{i}", cond))
  33.     for employee in employees:
  34.         employee.start()
  35.     boss.start()
  36.     boss.join()
  37.     for employee in employees:
  38.         employee.join()
复制代码
执行输出
  1. 2024-10-26 21:16:20 员工0 到达公司
  2. 2024-10-26 21:16:20 员工1 到达公司
  3. 2024-10-26 21:16:20 员工2 到达公司
  4. 2024-10-26 21:16:20 员工3 到达公司
  5. 2024-10-26 21:16:20 员工4 到达公司
  6. 2024-10-26 21:16:20 老王 发出通知
  7. 2024-10-26 21:16:20 员工4 开始工作
  8. 2024-10-26 21:16:23 员工4 工作完成
  9. 2024-10-26 21:16:23 员工1 开始工作
  10. 2024-10-26 21:16:28 员工1 工作完成
  11. 2024-10-26 21:16:28 员工2 开始工作
  12. 2024-10-26 21:16:30 员工2 工作完成
  13. 2024-10-26 21:16:30 员工0 开始工作
  14. 2024-10-26 21:16:31 员工0 工作完成
  15. 2024-10-26 21:16:31 员工3 开始工作
  16. 2024-10-26 21:16:32 员工3 工作完成
复制代码
Event-事件

在 Python 的 threading 模块中,Event 是一个线程同步原语,用于在多个线程之间进行简单的通信。Event 对象维护一个内部标志,线程可以使用 wait() 方法阻塞,直到另一个线程调用 set() 方法将标志设置为 True。一旦标志被设置为 True,所有等待的线程将被唤醒并继续执行。
Event 的主要方法

  • set():将事件的内部标志设置为 True,并唤醒所有等待的线程。
  • clear():将事件的内部标志设置为 False。
  • is_set():返回事件的内部标志是否为 True。
  • wait(timeout=None):如果事件的内部标志为 False,则阻塞当前线程,直到标志被设置为 True 或超时(如果指定了 timeout)。
  1. import threading
  2. import time
  3. import random
  4. class Employee(threading.Thread):
  5.     def __init__(self, username: str, cond: threading.Event):
  6.         self.username = username
  7.         self.cond = cond
  8.         super().__init__()
  9.     def run(self):
  10.         print(f"{time.strftime('%F %T')} {self.username} 到达公司")
  11.         self.cond.wait()  # 等待事件标志为True
  12.         print(f"{time.strftime('%F %T')} {self.username} 开始工作")
  13.         time.sleep(random.randint(1, 5))
  14.         print(f"{time.strftime('%F %T')} {self.username} 工作完成")
  15. class Boss(threading.Thread):
  16.     def __init__(self, username: str, cond: threading.Event):
  17.         self.username = username
  18.         self.cond = cond
  19.         super().__init__()
  20.     def run(self):
  21.         print(f"{time.strftime('%F %T')} {self.username} 发出通知")
  22.         self.cond.set()
  23. if __name__ == "__main__":
  24.     cond = threading.Event()
  25.     boss = Boss("老王", cond)
  26.    
  27.     employees = []
  28.     for i in range(5):
  29.         employees.append(Employee(f"员工{i}", cond))
  30.     for employee in employees:
  31.         employee.start()
  32.     boss.start()
  33.     boss.join()
  34.     for employee in employees:
  35.         employee.join()
复制代码
执行输出
  1. 2024-10-26 21:22:28 员工0 到达公司
  2. 2024-10-26 21:22:28 员工1 到达公司
  3. 2024-10-26 21:22:28 员工2 到达公司
  4. 2024-10-26 21:22:28 员工3 到达公司
  5. 2024-10-26 21:22:28 员工4 到达公司
  6. 2024-10-26 21:22:28 老王 发出通知
  7. 2024-10-26 21:22:28 员工0 开始工作
  8. 2024-10-26 21:22:28 员工1 开始工作
  9. 2024-10-26 21:22:28 员工3 开始工作
  10. 2024-10-26 21:22:28 员工4 开始工作
  11. 2024-10-26 21:22:28 员工2 开始工作
  12. 2024-10-26 21:22:30 员工3 工作完成
  13. 2024-10-26 21:22:31 员工4 工作完成
  14. 2024-10-26 21:22:31 员工2 工作完成
  15. 2024-10-26 21:22:32 员工0 工作完成
  16. 2024-10-26 21:22:32 员工1 工作完成
复制代码
使用队列

Python的queue模块提供同步、线程安全的队列类。以下示例为使用queue实现的生产消费者模型
  1. import threading
  2. import time
  3. import random
  4. import queue
  5. class Producer(threading.Thread):
  6.     """多线程生产者类."""
  7.     def __init__(
  8.         self, tname: str, channel: queue.Queue, done: threading.Event
  9.     ):
  10.         self.tname = tname
  11.         self.channel = channel
  12.         self.done = done
  13.         super().__init__()
  14.     def run(self) -> None:
  15.         """Method representing the thread's activity."""
  16.         while True:
  17.             if self.done.is_set():
  18.                 print(
  19.                     f"{time.strftime('%F %T')} {self.tname} 收到停止信号事件"
  20.                 )
  21.                 break
  22.             if self.channel.full():
  23.                 print(
  24.                     f"{time.strftime('%F %T')} {self.tname} report: 队列已满, 全部停止生产"
  25.                 )
  26.                 self.done.set()
  27.             else:
  28.                 num = random.randint(100, 1000)
  29.                 self.channel.put(f"{self.tname}-{num}")
  30.                 print(
  31.                     f"{time.strftime('%F %T')} {self.tname} 生成数据 {num}, queue size: {self.channel.qsize()}"
  32.                 )
  33.                 time.sleep(random.randint(1, 5))
  34. class Consumer(threading.Thread):
  35.     """多线程消费者类."""
  36.     def __init__(
  37.         self, tname: str, channel: queue.Queue, done: threading.Event
  38.     ):
  39.         self.tname = tname
  40.         self.channel = channel
  41.         self.done = done
  42.         self.counter = 0
  43.         super().__init__()
  44.     def run(self) -> None:
  45.         """Method representing the thread's activity."""
  46.         while True:
  47.             if self.done.is_set():
  48.                 print(
  49.                     f"{time.strftime('%F %T')} {self.tname} 收到停止信号事件"
  50.                 )
  51.                 break
  52.             if self.counter >= 3:
  53.                 print(
  54.                     f"{time.strftime('%F %T')} {self.tname} report: 全部停止消费"
  55.                 )
  56.                 self.done.set()
  57.                 continue
  58.             if self.channel.empty():
  59.                 print(
  60.                     f"{time.strftime('%F %T')} {self.tname} report: 队列为空, counter: {self.counter}"
  61.                 )
  62.                 self.counter += 1
  63.                 time.sleep(1)
  64.                 continue
  65.             else:
  66.                 data = self.channel.get()
  67.                 print(
  68.                     f"{time.strftime('%F %T')} {self.tname} 消费数据 {data}, queue size: {self.channel.qsize()}"
  69.                 )
  70.                 time.sleep(random.randint(1, 5))
  71.                 self.counter = 0
  72. if __name__ == "__main__":
  73.     done_p = threading.Event()
  74.     done_c = threading.Event()
  75.     channel = queue.Queue(30)
  76.     threads_producer = []
  77.     threads_consumer = []
  78.     for i in range(8):
  79.         threads_producer.append(Producer(f"producer-{i}", channel, done_p))
  80.     for i in range(6):
  81.         threads_consumer.append(Consumer(f"consumer-{i}", channel, done_c))
  82.     for t in threads_producer:
  83.         t.start()
  84.     for t in threads_consumer:
  85.         t.start()
  86.     for t in threads_producer:
  87.         t.join()
  88.     for t in threads_consumer:
  89.         t.join()
复制代码
线程池

在面向对象编程中,创建和销毁对象是很费时间的,因为创建一个对象要获取内存资源或其他更多资源。在多线程程序中,生成一个新线程之后销毁,然后再创建一个,这种方式就很低效。池化多线程,也就是线程池就为此而生。
将任务添加到线程池中,线程池会自动指定一个空闲的线程去执行任务,当超过最大线程数时,任务需要等待有新的空闲线程才会被执行。Python一般可以使用multiprocessing模块中的Pool来创建线程池。
  1. import time
  2. from multiprocessing.dummy import Pool as ThreadPool
  3. def foo(n):
  4.     time.sleep(2)
  5. if __name__ == "__main__":
  6.     start = time.time()
  7.     for n in range(5):
  8.         foo(n)
  9.     print("single thread time: ", time.time() - start)
  10.     start = time.time()
  11.     t_pool = ThreadPool(processes=5)  # 创建线程池, 指定池中的线程数为5(默认为CPU数)
  12.     rst = t_pool.map(foo, range(5))  # 使用map为每个元素应用到foo函数
  13.     t_pool.close()  # 阻止任何新的任务提交到线程池
  14.     t_pool.join()  # 等待所有已提交的任务完成
  15.     print("thread pool time: ", time.time() - start)
复制代码
线程池执行器

python的内置模块concurrent.futures提供了ThreadPoolExecutor类。这个类结合了线程和队列的优势,可以用来平行执行任务。
  1. import time
  2. from random import randint
  3. from concurrent.futures import ThreadPoolExecutor
  4. def foo() -> None:
  5.     time.sleep(2)
  6.     return randint(1,100)
  7. if __name__ == "__main__":
  8.     start = time.time()
  9.     futures = []
  10.     with ThreadPoolExecutor(max_workers=5) as executor:
  11.         for n in range(10):
  12.             futures.append(executor.submit(foo))  # Fan out
  13.             
  14.     for future in futures:  # Fan in
  15.         print(future.result())
  16.     print("thread pool executor time: ", time.time() - start)
复制代码
执行输出
  1. 44
  2. 19
  3. 86
  4. 48
  5. 35
  6. 74
  7. 59
  8. 99
  9. 58
  10. 53
  11. thread pool executor time:  4.001955032348633
复制代码
ThreadPoolExecutor类的最大优点在于:如果调用者通过submit方法把某项任务交给它执行,那么会获得一个与该任务相对应的Future实例,当调用者在这个实例上通过result方法获取执行结果时,ThreadPoolExecutor会把它在执行任务的过程中所遇到的异常自动抛给调用者。而ThreadPoolExecutor类的缺点是IO并行能力不高,即便把max_worker设为100,也无法高效处理任务。更高需求的IO任务可以考虑换异步协程方案。
参考


  • 郑征《Python自动化运维快速入门》清华大学出版社
  • Brett Slatkin《Effective Python》(2nd) 机械工业出版社

来源:https://www.cnblogs.com/XY-Heruo/p/18514316
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具