解开尘封千年的秘密 发表于 2023-6-19 19:27:56

Python 标准类库-并发执行之multiprocessing-基于进程的并行

实践环境

Python3.6
介绍

multiprocessing是一个支持使用类似于线程模块的API派生进程的包。该包同时提供本地和远程并发,通过使用子进程而不是线程,有效地避开了全局解释器锁。因此,multiprocessing模块允许程序员充分利用给定机器上的多个处理器。它同时在Unix和Windows上运行。
该模块还引入了在线程模块中没有类似程序的API。这方面的一个主要例子是Pool对象,它提供了一种方便的方法,可以在多个输入值的情况下,为进程之间分配输入数据(数据并行),实现并行执行函数。以下示例演示了在模块中定义此类函数,以便子进程能够成功导入该模块的常见做法。这个使用Pool实现数据并行的基本示例
from multiprocessing import Pool

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(5) as p:
      print(p.map(f, ))控制台输出:
Process类

在multiprocessing中,进程是通过创建一个Process类并调用其start()方法来派生的。Process遵循threading.Thread的API。multiprocess程序的一个微小的例子:
from multiprocessing import Process

def f(name):
    print('hello', name) # 输出:hello shouke

if __name__ == '__main__':
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()下面是一个扩展示例,显示所涉及的各个进程ID:
from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('shouke',))
    p.start()
    p.join()控制台输出:
main line
module name: __main__
parent process: 13080
process id: 20044
function f
module name: __mp_main__
parent process: 20044
process id: 28952
hello shouke上下文和启动方法

根据平台的不同,multiprocessing支持三种启动进程的方式。这些启动方法是

[*]spawn
父进程启动一个新的python解释器进程。子进程将只继承那些运行进程对象run()方法所需的资源。特别是,来自父进程的不必要的文件描述符和句柄将不会被继承。与使用fork或forkserver相比,使用此方法启动进程相当慢。可在Unix和Windows上使用。Windows上默认使用该启动方法。
[*]fork
父进程使用os.fork()来fork Python解释器。子进程在开始时实际上与父进程相同。父进程的所有资源都由子进程继承。请注意,安全地fork多线程进程是有问题的。仅在Unix上可用。Unix上默认会用该方法。
[*]forkserver
当程序启动并选择forkserver启动方法时,服务器进程就会启动。从那时起,每当需要新进程时,父进程都会连接到服务器,并请求它fork一个新进程。fork服务器进程是单线程的,因此使用os.fork()是安全的。不会继承不必要的资源。在支持通过Unix管道传递文件描述符的Unix平台上可用。
To select a start method you use the set_start_method() in the if __name__ == '__main__' clause of the main module. For example
在3.4版本中进行了更改:在所有unix平台上添加了spawn,并为一些unix平台添加了forkserver。子进程不再继承Windows上的所有父级可继承句柄。
在Unix上,使用spawn或forkserver启动方法还将启动一个信号量跟踪器进程,该进程跟踪程序进程创建的未链接的命名信号量。当所有进程都退出时,信号量跟踪器将取消任何剩余信号量的链接。通常应该没有剩余信号量,但如果一个进程被信号杀死,可能会有一些“泄露”的信号量。(取消命名信号量的链接是一个严重的问题,因为系统只允许有限的数量,并且在下次重新启动之前不会自动取消链接。)
要选择启动方法,请在主模块的 if __name__ == '__main__'子句中使用set_start_method()。例如
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get()) # 输出 hello
    p.join()set_start_method() 在一个程序中只能用一次
或者,也可以使用get_context()来获取上下文对象。上下文对象与multiprocessing模块具有相同的API,并允许在同一程序中使用多个启动方法。
import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()请注意,与一个上下文相关的对象可能与不同上下文的进程不兼容。特别是,使用fork上下文创建的锁不能传递给使用spawn或forkserver启动方法启动的进程。
想要使用特定启动方法的库可能应该使用get_context()来避免干扰库用户的选择
在进程之间交换对象

multiprocessing支持进程之间的两种通信信道
进程同步

multiprocessing包含来自threading中所有同步原语的等效项。例如,可以使用锁来确保一次只有一个进程打印到标准输出:
from multiprocessing import Process, Queue

def f(q):
    q.put()

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints ""
    p.join()进程之间共享状态

如上所述,在进行并发编程时,通常最好尽可能避免使用共享状态。当使用多个进程时尤其如此。
但是,如果您确实需要使用一些共享数据,那么multiprocessing提供了几种方法
共享内存

可以使用multiprocessing.Value或multiprocessing.Array将数据存储在共享内存映射中。例如,以下代码
from multiprocessing import Process, Queue

q = Queue()

def f():
    global q
    q.put()

if __name__ == '__main__':
    p = Process(target=f)
    p.start()
    print(q.get())    # 取不到值
    p.join()创建num和arr时使用的'd'和'i'参数是数组模块使用的类型代码:'d'表示双精度浮点,'i'表示有符号整数。这些共享对象将是进程和线程安全的。
为了在使用共享内存时获得更大的灵活性,可以使用multiprocessing.sharedtypes模块,该模块支持创建从共享内存分配的任意ctypes对象。
服务器进程(Server Process)

Manager()返回的管理器对象控制一个服务器进程,该进程可保存Python对象,并允许其他进程使用代理操作它们。
管理器对象返回的管理器支持类型 list, dict, multiprocessing.managers.Namespace, multiprocessing.Lock, multiprocessing.RLock, multiprocessing.Semaphore, multiprocessing.BoundedSemaphore, multiprocessing.Condition, multiprocessing.Event, multiprocessing.Barrier, multiprocessing.Queue, multiprocessing.Value 和multiprocessing.Array。例如
from multiprocessing import Process, Queue

class TestClass:
    def __init__(self, q):
      self.q = q

    def f(self):
      self.q.put()

if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    p = Process(target=obj.f)
    p.start()
    print(q.get())    # prints ""
    p.join()服务器进程管理器比使用共享内存对象更灵活,因为它们可以支持任意对象类型。此外,单个管理器可以由不同计算机上的进程通过网络共享。然而,它们比使用共享内存要慢。
使用进程池

Pool类代表一个工作进程池。它具有允许以几种不同方式将任务转移给工作进程的方法。
例如:
from multiprocessing import Process, Queue

q = Queue()

class TestClass:
    def f(self, q):
      q.put()


if __name__ == '__main__':
    q = Queue()
    obj = TestClass()
    p = Process(target=obj.f, args=(q,))
    p.start()
    print(q.get())    # prints ""
    p.join()输出:
import threading
import time

from multiprocessing import Process, Queue

class TestClass:
    def __init__(self, q):
      self.q = q
      self.task_done = False

    def f1(self):
      i = 0
      while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
      self.task_done = True

    def f2(self):
      
      # while死循环了
      while not self.q.empty() or not self.task_done: # self.task_done永远为True
            try:
                print(self.q.get_nowait())
            except Exception:
                pass

    def run(self):
      thread = threading.Thread(target=self.f1,
                                  name="f1")
      thread.start()

      p = Process(target=self.f2)
      p.start()


if __name__ == '__main__':
    q = Queue()
    obj = TestClass(q)
    obj.run()请注意,池的方法只能由创建池的进程使用。
此程序包中的功能要求 __main__模块可由子级导入。这意味着一些示例,如multiprocessing.pool.pool示例将无法在交互式解释器中工作。例如
import threading
import time

from multiprocessing import Process, Queue, active_children, Value

class TestClass:
    def __init__(self, q, task_done):
      self.q = q
      self.task_done = task_done

    def f1(self):
      i = 0
      while i < 5:
            self.q.put('hello')
            time.sleep(0.3)
            i += 1
      self.task_done.value = 1

    def f2(self):
      item = ''
      while not self.q.empty() or self.task_done.value == 0:
            try:
                item = self.q.get_nowait()
                print(item)
            except Exception:
                pass

    def run(self):
      thread = threading.Thread(target=self.f1,
                                  name="f1")
      thread.start()

      p = Process(target=self.f2)
      p.start()


if __name__ == '__main__':
    q = Queue()
    task_done = Value('h', 0)
    obj = TestClass(q, task_done)
    obj.run()(如果你尝试这样做,它实际上会以半随机的方式输出三个交错的完整traceback,然后你可能不得不以某种方式停止主进程。)
API参考

multiprocessing包大部分复制线程模块的API。
multiprocessing.Process 和exception

Process

from multiprocessing import Process, Pipe

def f(conn):
    conn.send()
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints ""
    p.join()Process对象表示在立进程中运行的活动。Process类具有threading.Thread的所有方法的等价项。
构造函数应始终使用关键字参数调用。

[*]group 应始终为None,它的存在只是为了与threading.Thread.target兼容。
[*]target 供run()方法调用的可调用对象。默认为None,表示不调用任何内容。
[*]name 是进程名称。
[*]args 是target调用的参数元组。
[*]kwargs 是target调用的关键字参数字典。
[*]daemon用于设置将进程是否为守护进程,True - 是 或False - 否。如果为None(默认值),则将从创建进程中继承。
默认情况下,不会向target传递任何参数。
如果子类重写构造函数,则必须确保在对进程执行其他操作之前调用基类构造函数(Process.__init__())。
在版本3.3中更改:添加daemon参数

[*]run()
表示进程活动的方法。
可以在子类中重写此方法。标准run()方法调用作为target参数传递给对象构造函数的可调用对象(如果有的话),其中顺序参数和关键字参数分别取自args和kwargs参数
[*]start()
启动进程活动。
没改进程对下最多只能调用一次。 它安排在单独的进程中调用对象的run()方法。
[*]join()
如果可选参数timeout为None(默认值),则该方法将阻塞,直到调用其join()方法的进程终止为止。如果timeout是一个正数,则表示最多阻塞timeout参数指定的秒数。请注意,如果该方法的进程终止或方法超时,则该方法将返回None。检查进程的退出码以确定它是否已终止。
一个进程可以被join多次。
注意:阻塞表示不继续往下执行,如果阻塞超时,程序继续往下还行,如果此时target未运行完成,主程序会等待其运行完成后才终止。
进程不能join自身,因为这会导致死锁。在进程启动之前尝试join进程是错误的。
[*]name
进程的名称。一个字符串,仅用于识别目的。它没有语义。多个进程可能被赋予相同的名称。
初始名称由构造函数设置。如果没有向构造函数提供显式名称,则进程名被构造为形如Process-N1:N2:…:Nk字符串,其中每个Nk是其父进程的第N个子节点。
[*]is_alive()
返回进程是否还存活
大致上,进程对象从start()方法返回的那一刻起一直处于活动状态,直到子进程终止。
[*]daemon
进程的守护进程标志,一个布尔值。这必须在调用start()之前设置。
初始值是从创建进程时继承的。
当进程退出时,它会尝试终止其所有守护进程子进程。
请注意,守护进程不允许创建子进程。否则,如果守护进程在其父进程退出时被终止,它的子进程将成为孤儿进程。此外,这些不是Unix守护进程或服务,它们是正常进程,如果非守护进程退出,它们将被终止(而不是被join)。
除了threading.Thread API之外,Process对象还支持以下属性和方法:

[*]pid
返回进程ID。进程派生之前,其值为None
[*]exitcode
子进程的退出码。如果进程尚未终止,则其值为None。负值-N表示子进程被信号N终止。
[*]terminate()
终止进程。在Unix上,这是使用SIGTERM信号完成的;在Windows上使用TerminateProcess()。请注意,退出handler和和finally子句等将不会被执行。
请注意,进程的子进程不会被终止,它们只会成为孤儿进程
[*]..略,更多参考请查阅官方文档
示例

Process的一些方法的示例用法
from multiprocessing import Process, Lock

def f(l, i):
    l.acquire()
    try:
      print('hello world', i)
    finally:
      l.release()

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
      Process(target=f, args=(lock, num)).start()异常


[*]exception multiprocessing.ProcessError
所有multiprocessing异常的基类
[*]exception multiprocessing.BufferTooShort
当提供的缓冲区对象太小而无法读取消息时引发的异常。
[*]exceptionmultiprocessing.AuthenticationError
发生身份验证错误时引发的异常
[*]exception multiprocessing.TimeoutError
具有timeout的方法超时引发的异常。
管道和队列


[*]class multiprocessing.Pipe()
返回一对表示管道终端的multiprocessing.Connection对象(conn1,conn2)。如果duplex为True(默认值),则管道为双向管道。如果duplex为False,则管道是单向的:conn1只能用于接收消息,conn2只能用于发送消息
[*]class multiprocessing.Queue()
返回使用管道和一些锁/信号量实现的进程共享队列。当进程第一次将项目放入队列时,会启动一个feeder线程,该线程将对象从缓冲区传输到管道中。来自标准库的queue模块的常见queue.Empty和queue.Full异常被引发以发出超时信号。multiprocessing.Queue实现了Queue.Queue的所有方法,除了task_done()和join()
[*]qsize()
返回队列的大致大小。由于多线程/多进程的语义,这是不可靠的。
请注意,这可能会在Unix平台(如Mac OS X)上触发NotImplementedError,因为其未实现sem_getvalue()。
[*]empty()
如果队列为空,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。
[*]full()
如果队列已满,则返回True,否则返回False。由于多线程/多处理语义的原因,这是不可靠的。
[*]put(obj[, block[, timeout]])
将obj放入队列。如果可选参数block为True(默认值),并且timeout为None(默认值),则必要时阻塞,直到有可用空闲slot。如果timeout是一个正数,最多会阻塞timeout指定秒数,并抛出queue.Full异常,如果在该时间内没有可用slot的话。如果block为False,如果有可用空闲slot,则将项目放入队列中,否则抛出queue.Full异常(在这种情况下会忽略timeout)。
[*]put_nowait(obj)
等价于put(obj, False)
[*]get(])
从队列中删除并返回被删除项目。如果参数block为True(默认值),并且timeout为None(默认值),则获取不到项目时阻塞,直到有可获取项。如果timeout是一个正数,最多会阻塞timeout指定秒数,并抛出queue.Empty异常,如果在超时时间内没有可用项目的话。如果block为False,如果有可获取项,则立即返回项目,否则抛出queue.Empty异常(在这种情况下会忽略timeout)。
[*]get_nowait()
等价于get(False)
[*]..略,更多参考请查阅官方文档
...略,更多参考请查阅官方文档
杂项


[*]multiprocessing.active_children()
返回当前进程的所有活动子进程的列表。调用该方法的副作用是“阻塞”任何已经完成的进程(原文:Calling this has the side effect of “joining” any processes which havealready finished。)
[*]multiprocessing.cpu_count()
返回系统的CPU数量。该数量并不等于当前进程可以使用的CPU数量。可用cpu的数量可以通过len(os.sched_getaffinity(0))获取,不过可能会抛NotImplementedError异常。
[*]multiprocessing.``current_process()
返回当前进程对应的multiprocessing.Process对下。类似threading.current_thread()
[*]multiprocessing.get_all_start_methods()
返回支持的启动方法的列表,其中第一个是默认方法。可能的启动方法有'fork', 'spawn' 和'forkserver'。在Windows上,仅 'spawn'可用。在Unix上,始终支持'fork' 和'spawn',默认值为“'fork'。
3.4版新增
[*]multiprocessing.get_start_method(allow_none=False)
返回用于启动进程的启动方法的名称。如果尚未设置启动方法,且allow_none为False,则返回默认方法名词,如果尚未设置启动方法,并且allow_none为True,则返回None。返回值可以是'fork', 'spawn', 'forkserver' 或 None. 'fork'为Unix上的默认值,而'spawn'则是Windows上的默认值。
3.4版新增。
[*]multiprocessing.``set_start_method(method)
设置应用于启动子进程的方法。method可以是 'fork','spawn'或'forkserver'。请注意,最多只能调用一次,并且应该在主模块的if__name__=='__main__'子句中使用。
3.4版新增。
[*]..略,更多参考请查阅官方文档
..略,更多参考请查阅官方文档
Process工具

可以创建一个进程池,用于执行使用multiprocessing.pool.Pool类提交给它的任务。
Pool类


[*]class multiprocessing.pool.Pool(]]]])
一个进程池对象,用于控制可以向其提交作业的工作进程池。它支持带有超时和回调的异步结果,并具有并行map实现。

[*]processes 是要使用的工作进程的数量。如果processes 为None,则默认使用os.cpu_count()返回的数字。
[*]initializer 如果值不为None,那么每个工作进程在启动时都会调用initializer(*initargs)。
[*]maxtasksperchild 是工作进程在退出并替换为新的工作进程之前可以完成的任务数,以便释放未使用的资源。默认的maxtasksperchild为None,这意味着工作进程存活时间将与进程池一样长。
[*]context用于指定用于启动工作进程的上下文。通常,进程池是使用上下文对象的函数multiprocessing.Pool()或Pool()方法创建的。在这两种情况下,上下文都设置得适当。
请注意,池对象的方法只能由创建池的进程调用。
3.2版新增:maxtasksperchild
3.4版新增:context
注意:
池中的工作进程通常在工作队列的整个持续时间内保持存活。在其他系统(如Apache、mod_wsgi等)中发现的一种释放工作进程所持有资源的常见模式是,允许池中的工作进程在退出、清理和生成新进程以取代旧进程之前只完成一定数量的工作。池的maxtasksperchild参数向最终用户暴露了这一能力。
apply(func[, args[, kwds]])
使用参数args和关键字参数kwds调用func。它会阻塞,直到可获取结果为止。考虑到阻塞问题,apply_async()更适合并行执行工作。此外,func只在池的一个工作进程中执行。
apply_async(func[, args[, kwds[, callback[, error_callback]]]])
apply()方法的变体,返回结果对象。
如果指定了callback,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback,除非调用失败,在这种情况下,将对其应用error_callback。
如果指定了error_callback,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
map(func, iterable[, chunksize])
内置函数map()的并行等价物(不过它只支持一个iterable参数)。它会阻塞,直到可获取结果。
该方法将iterable分割为多个块,并将这些块作为单独的任务提交给进程池。可以通过将chunksize设置为正整数来指定这些块的(近似)大小。
map_async(func, iterable[, chunksize[, callback[, error_callback]]])
map()方法的一个变体,它返回一个结果对象。
如果指定了callback ,那么它应该是一个接受单个参数的可调用函数。当可获取结果时,将对其应用callback,除非调用失败,在这种情况下,将应用error_callback。
如果指定了error_callback,那么它应该是一个接受单个参数的可调用函数。如果目标函数失败,则会使用异常实例调用error_callback。
回调应该立即完成,否则处理结果的线程将被阻塞。
imap(func, iterable[, chunksize])
map()的一个更惰性版本。
chunksize参数与map()方法使用的参数相同。对于非常长的迭代,使用较大的chunksize值可以使作业比使用默认值1更快地完成。
此外,如果chunksize为1,则imap()方法返回的迭代器的next()方法有一个可选的timeout参数:如果无法在timeout秒内返回结果,next(timeout)将引发multiprocessing.TimeoutError
imap_unordered(func, iterable[, chunksize])
与imap()相同,只是返回迭代器的结果的顺序是任意的。(只有当只有一个工作进程时,才能保证顺序“正确”)
starmap(func, iterable[, chunksize])
类似于map(),只是iterable的元素被当做参数,不拆解。
因此,[(1,2), (3,4)]的迭代结果是。
3.3版新增。
starmap_async(func, iterable[, chunksize[, callback[, error_back]]])
starma()和map_async()的组合,对可迭代项中的可迭代项进行迭代,并在未拆解可迭代项的情况下调用func。返回一个结果对象。
3.3版新增。
close()
阻止将更多任务提交到进程池中。完成所有任务后,工作进程将退出。
terminate()
在未完成未完成的工作的情况下立即停止工作进程。当进程池对象被垃圾回收时,将立即调用terminate()。
join()
等待工作进程退出。在使用join()之前,必须调用close()或terminate()。
3.3版新增:进程池对象现在支持上下文管理协议——请参阅上下文管理器类型__enter__()返回池对象,__exit_()调用terminate()

AsyncResult类


[*]class multiprocessing.pool.AsyncResult
Pool.apply_async()和Pool.map_async()返回的结果类。
get()
当结果已准备好时返回结果。如果timeout不是None,并且没有在timeout秒内获取到结果,则会引发multiprocessing.TimeoutError。如果远程调用引发了异常,则该异常将由get()重新抛出。
wait()
等待,直到结果可获取,或者直到超过timeout秒。
ready()
返回调用是否完成
successful()
返回调用是否已完成,不引发异常。如果结果还未准备好,将引发AssertionError。
进程池使用示例

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
      a = -a

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value) # 输出:3.1415927
    print(arr[:]) # 输出:...略

来源:https://www.cnblogs.com/shouke/p/17472025.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: Python 标准类库-并发执行之multiprocessing-基于进程的并行