翼度科技»论坛 编程开发 python 查看内容

多任务之协程

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
协程
  1. 协程我们是使用gevent模块实现的,而gevent 是对greenlet进行的封装,而greenlet 又是对yield进行封装。要理解gevent就要从yield开始。 要理解yield的作用我们就要先理解可迭代对象与迭代器
复制代码
一、可迭代对象与迭代器

1> 可迭代对象

迭代是访问集合元素的一种方式。迭代器是一个可以记住遍历的位置的对象。迭代器对象从集合的第一个元素开始访问,直到所有的元素被访问完结束。迭代器只能往前不会后退。


  • 可迭代对象定义:
    1. 我们把可以通过for...in...这类语句迭代读取一条数据供我们使用的对象称之为可迭代对象(Iterable)
    复制代码
  • 可迭代对象本质
    1. 向我们提供一个这样的中间“人”即迭代器帮助我们对其进行迭代遍历使用。
    2. 可迭代对象通过 __iter__ 方法向我们提供一个迭代器,我们在迭代一个可迭代对象的时候,实际上就是先获取该对象提供的一个迭代器,然后通过这个迭代器来依次获取对象中的每一个数据, 也就是说,一个具备了 __iter__ 方法的对象,就是一个可迭代对象。
    复制代码
  • 创建一个可迭代对象
    1. from collections.abc import Iterable
    2. class Mylist():
    3.         def __iter__(self):
    4.         # 暂时忽略如何构造一个迭代器对象
    5.                 pass
    6.    
    7. print(isinstance(Mylist(), Iterable))
    复制代码
2> 迭代器

迭代器是用来帮助我们记录每次迭代访问到的位置,当我们对迭代器使用next()函数的时候,迭代器会向我们返回它所记录位置的下一个位置的数据。实际上,在使用next()函数的时候,调用的就是迭代器对象的__next__方法


  • 迭代器定义:
    1. 一个实现了__iter__方法和__next__方法的对象,就是迭代器。
    复制代码
  • 迭代器本质
    1. for item in Iterable 循环的本质就是先通过iter()函数获取可迭代对象Iterable的迭代器,然后对获取到的迭代器不断调用next()方法来获取下一个值并将其赋值给item,当遇到StopIteration的异常后循环结束。
    复制代码
  • 迭代器实现
    1. from collections.abc import Iterable, Iterator
    2. # 自定义一个可迭代对象
    3. class MyList(object):
    4.     def __init__(self):
    5.         self.items = list()
    6.     def add(self, value):
    7.         self.items.append(value)
    8.     def __iter__(self):
    9.         return MyIterator(self)
    10. # 自定义一个迭代器
    11. class MyIterator(object):
    12.     def __init__(self, obj):
    13.         self.obj = obj
    14.         self.current_index = 0
    15.     # python要求迭代器本身也是可迭代的
    16.     def __iter__(self):
    17.         return self
    18.     def __next__(self):
    19.         if self.current_index < len(self.obj.items):
    20.             item = self.obj.items[self.current_index]
    21.             self.current_index += 1
    22.             return item
    23.         else:
    24.             raise StopIteration
    25. m = MyList()
    26. print("MyList实例对象是否为可迭代对象: ", isinstance(m, Iterable))
    27. print("MyList实例对象是否为迭代器: ", isinstance(m, Iterator))
    28. print("MyIterator实例对象是否为可迭代对象: ", isinstance(MyIterator(m), Iterable))
    29. print("MyIterator实例对象是否为迭代器: ", isinstance(MyIterator(m), Iterator))
    30. m.add(1)
    31. m.add(22)
    32. m.add(333)
    33. m.add(444)
    34. for i in m:
    35.     print(i)
    复制代码
  • 说明:并不是只有for循环能接收可迭代对象
    1. li = list(m)
    2. print(li)
    3. tp = tuple(m)
    4. print(tp)
    复制代码
二、生成器


  • 生成器的定义
    1. 只要在def中有yield关键字的 就称为 生成器, 生成器是一类特殊的迭代器
    复制代码
  • 生成器定义方式
    1. # 方式一
    2. G = ( x*2 for x in range(5))
    3. >>>  <generator object <genexpr> at 0x7f626c132db0>
    4. # 方式二
    5. def fib(n):
    6.     current = 0
    7.     num1, num2 = 0, 1
    8.     while current < n:
    9.         num1, num2 = num2, num1+num2
    10.         current += 1
    11.         yield num
    12.     return 'done'
    复制代码
  • 生成器总结

    • 使用了yield关键字的函数不再是函数,而是生成器。(使用了yield的函数就是生成器)
    • yield关键字有两点作用:

      • 保存当前运行状态(断点),然后暂停执行,即将生成器(函数)挂起
      • 将yield关键字后面表达式的值作为返回值返回,此时可以理解为起到了return的作用

    • 可以使用next()函数让生成器从断点处继续执行,即唤醒生成器(函数)
    • Python3中的生成器可以使用return返回最终运行的返回值(通过捕获异常对象的value属性值获取),而Python2中的生成器不允许使用return返回一个返回值(即可以使用return从生成器中退出,但return后不能有任何表达式)。

三、协程


  • 协程的定义
    1. 协程(微线程、纤程)是python个中另外一种实现多任务的方式,只不过比线程更小占用更小执行单元(理解为需要的资源);协程是运行在某个单一线程下的,即先有线程才有协程。
    复制代码
  • 协程与线程区别
    1. 在实现多任务时, 线程切换从系统层面远不止保存和恢复 CPU上下文这么简单。 操作系统为了程序运行的高效性每个线程都有自己缓存Cache等等数据,操作系统还会帮你做这些数据的恢复操作。 所以线程的切换非常耗性能。但是协程的切换只是单纯的操作CPU的上下文,所以一秒钟切换个上百万次系统都抗的住。
    复制代码
  • 协程模块gevent

    • 通过yield实现协程的工作过程
      1. import time
      2. def work1():
      3.     while True:
      4.         print("----work1---")
      5.         yield
      6.         time.sleep(0.5)
      7. def work2():
      8.     while True:
      9.         print("----work2---")
      10.         yield
      11.         time.sleep(0.5)
      12. def main():
      13.     w1 = work1()
      14.     w2 = work2()
      15.     while True:
      16.         next(w1)
      17.         next(w2)
      18. if __name__ == "__main__":
      19.     main()
      复制代码
    • 通过greenlet实现协程
      1. 为了更好使用协程来完成多任务,python中的greenlet模块对其封装,从而使得切换任务变的更加简单
      复制代码
      1. # pip install greenlet
      2. from greenlet import greenlet
      3. import time
      4. def test1():
      5.     while True:
      6.         print("---A--")
      7.         gr2.switch()
      8.         time.sleep(0.5)
      9. def test2():
      10.     while True:
      11.         print("---B--")
      12.         gr1.switch()
      13.         time.sleep(0.5)
      14. gr1 = greenlet(test1)
      15. gr2 = greenlet(test2)
      16. # 切换到gr1中运行
      17. gr1.switch()
      复制代码
    • 通过gevent实现协程
      1.   greenlet已经实现了协程,但是这个还的人工切换,那么python还有一个比greenlet更强大的并且能够自动切换任务的模块gevent.
      2.   其原理是当一个greenlet遇到IO(指的是input output 输入输出,比如网络、文件操作等)操作时,比如访问网络,就自动切换到其他的greenlet,等到IO操作完成,再在适当的时候切换回来继续执行。
      3.   由于IO操作非常耗时,经常使程序处于等待状态,有了gevent为我们自动切换协程,就
      4. 保证总有greenlet在运行,而不是等待IO
      复制代码

      • gevent的使用
        1. # pip3 install gevent
        2. import gevent
        3. def f(n):
        4.     for i in range(n):
        5.         print(gevent.getcurrent(), i)
        6. g1 = gevent.spawn(f, 5)
        7. g2 = gevent.spawn(f, 5)
        8. g3 = gevent.spawn(f, 5)
        9. g1.join()
        10. g2.join()
        11. g3.join()
        12. # 运行结果
        13. # 可以看到,3个greenlet是依次运行而不是交替运行
        14. <Greenlet at 0x10e49f550: f(5)> 0
        15. <Greenlet at 0x10e49f550: f(5)> 1
        16. <Greenlet at 0x10e49f550: f(5)> 2
        17. <Greenlet at 0x10e49f550: f(5)> 3
        18. <Greenlet at 0x10e49f550: f(5)> 4
        19. <Greenlet at 0x10e49f910: f(5)> 0
        20. <Greenlet at 0x10e49f910: f(5)> 1
        21. <Greenlet at 0x10e49f910: f(5)> 2
        22. <Greenlet at 0x10e49f910: f(5)> 3
        23. <Greenlet at 0x10e49f910: f(5)> 4
        24. <Greenlet at 0x10e49f4b0: f(5)> 0
        25. <Greenlet at 0x10e49f4b0: f(5)> 1
        26. <Greenlet at 0x10e49f4b0: f(5)> 2
        27. <Greenlet at 0x10e49f4b0: f(5)> 3
        28. <Greenlet at 0x10e49f4b0: f(5)> 4
        29.   
        复制代码
      • gevent切换执行
        1. import gevent
        2. # 将程序中用到的耗时操作的代码,换为gevent中自己实现的模块
        3. monkey.patch_all()
        4. def f(n):
        5.     for i in range(n):
        6.         print(gevent.getcurrent(), i)
        7.         #用来模拟一个耗时操作,注意不是time模块中的sleep
        8.         # gevent.sleep(1)
        9.         time.sleep(1)
        10. g1 = gevent.spawn(f, 5)
        11. g2 = gevent.spawn(f, 5)
        12. g3 = gevent.spawn(f, 5)
        13. g1.join()
        14. g2.join()
        15. g3.join()
        16. # 运行结果
        17. <Greenlet at 0x7fa70ffa1c30: f(5)> 0
        18. <Greenlet at 0x7fa70ffa1870: f(5)> 0
        19. <Greenlet at 0x7fa70ffa1eb0: f(5)> 0
        20. <Greenlet at 0x7fa70ffa1c30: f(5)> 1
        21. <Greenlet at 0x7fa70ffa1870: f(5)> 1
        22. <Greenlet at 0x7fa70ffa1eb0: f(5)> 1
        23. <Greenlet at 0x7fa70ffa1c30: f(5)> 2
        24. <Greenlet at 0x7fa70ffa1870: f(5)> 2
        25. <Greenlet at 0x7fa70ffa1eb0: f(5)> 2
        26. <Greenlet at 0x7fa70ffa1c30: f(5)> 3
        27. <Greenlet at 0x7fa70ffa1870: f(5)> 3
        28. <Greenlet at 0x7fa70ffa1eb0: f(5)> 3
        29. <Greenlet at 0x7fa70ffa1c30: f(5)> 4
        30. <Greenlet at 0x7fa70ffa1870: f(5)> 4
        31. <Greenlet at 0x7fa70ffa1eb0: f(5)> 4
        复制代码
      • 使用协程实现TCP通信 为多个用户服务
        1. # 【本机环境运行】
        2. import socket
        3. import gevent
        4. from gevent import monkey
        5. monkey.patch_all()
        6. def handle_request(client_socket):
        7.     while True:
        8.         data = client_socket.recv(1024).decode("gbk")
        9.         print(data)
        10. def main():
        11.     tcp_socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
        12.     tcp_socket.bind(("127.0.0.1", 9000))
        13.     tcp_socket.listen(128)
        14.     while True:
        15.         # 接收新的请求,未收到连接请求会导致程序堵塞
        16.         tcp_server_socket, client_addr = tcp_socket.accept()
        17.         # 将处理请求的任务交给协程处理
        18.         gevent.spawn(handle_request, tcp_server_socket)
        19.   
        20. if __name__ == "__main__":
        21.     main()
        复制代码
      • 使用队列和协程实现生产者消费者模型
        1. from gevent import queue, monkey
        2. import gevent
        3. import time
        4. import random
        5. monkey.patch_all()
        6. def producer(q):
        7.     for value in ['A', 'B', 'C', 'D', 'E', 'F']:
        8.         print(f'【Producer】 put {value} to queue...')
        9.         q.put(value)
        10.         time.sleep(random.random())
        11.     # 发送结束信号
        12.     q.put(None)
        13. def consumer(q):
        14.     while True:
        15.         # 如果队列中无数据,consumer会一直执行(协程只有在耗时操作时才切换任务),
        16.         # 可能会导致producer无法往队列中添加数据,我们可以手动添加一个延时
        17.         if not q.empty():
        18.             data = q.get()
        19.             # 接收到结束信号退出程序
        20.             if data is None:
        21.                 return
        22.             print(f"【Consumer】 get {data} from queue")
        23.             time.sleep(random.random())
        24.         else:
        25.             time.sleep(0.5)
        26. q = queue.Queue()
        27.   if __name__ == "__main__":
        28.     gevent.joinall([
        29.         gevent.spawn(producer, q),
        30.         gevent.spawn(consumer, q)
        31.   ])
        复制代码


四、进程、线程、协程区别


  • 进程是资源分配的单位
  • 线程是操作系统调度的单位
  • 进程切换需要的资源很最大,效率很低
  • 线程切换需要的资源一般,效率一般(当然了在不考虑GIL的情况下)
  • 协程切换任务资源很小,效率高
  • 多进程、多线程根据cpu核数不一样可能是并行的,但是协程是在一个线程中 所以是并发
五、同步和异步

1> 基本概念


  • 同步
    1.         同步调用即当我们提交一个任务后,就在原地等待,等拿到第一个任务的结果之后再继续下一行代码,效率比较低;同步调用解决方案:多线程/多进程
    复制代码
  • 异步
    1.         异步即当我们提交一个任务之后,不必等待上一个任务的结果,直接进行下一个任务,监听这些任务的完成状态,当有一个任务完成之后,再继续完成这个任务的后续操作
    复制代码
  • 同步异步代码实现与分析
    1. from gevent import monkey
    2. import gevent
    3. import time
    4. monkey.patch_all()
    5. def task(task_no, delay_time):
    6.     print(f"【任务-{task_no}】 开始!")
    7.     time.sleep(delay_time)
    8.     print(f"【任务-{task_no}】 结束!")
    9. # 发布3个任务, 3个任务分别耗时10, 6, 12
    10. if __name__ == '__main__':
    11.     # 同步执行方式
    12.     start_time = time.time()
    13.     task("同步1", 5)
    14.     task("同步2", 3)
    15.     task("同步3", 6)
    16.     end_time = time.time()
    17.     print(f"同步执行方式耗时 {end_time - start_time}")
    18.     # 异步执行方式
    19.     start_time = time.time()
    20.     gevent.joinall([
    21.         gevent.spawn(task, "异步1", 5),
    22.         gevent.spawn(task, "异步2", 3),
    23.         gevent.spawn(task, "异步3", 6),
    24.     ])
    25.     end_time = time.time()
    26.     print(f"异步执行方式耗时 {end_time - start_time}")
    复制代码

来源:https://www.cnblogs.com/cs-songbai/p/18096611
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具