寒夜星空 发表于 2023-5-25 19:04:56

它来了!真正的 python 多线程

哈喽大家好,我是咸鱼
几天前,IBM 工程师 Martin Heinz 发文表示 python 3.12 版本回引入"Per-Interpreter GIL”,有了这个 Per-Interpreter 全局解释器锁,python 就能实现真正意义上的并行/并发
我们知道,python 的多线程/进程并不是真正意义上的多线程/进程,这是因为 python GIL (Global Interpreter Lock)导致的
而即将发布的 Python 3.12 中引入了名为 "Per-Interpreter GIL" 的新特性,能够实现真正的并发
接下来我们来看下这篇文章,原文链接如下:
https://martinheinz.dev/blog/97
译文

Python 到现在已经 32 岁了,但它到现在还没有实现适当的、真正的并发/并行
由于将在 Python 3.12 (预计 2023 年 10 月发布)中引入 “Per-Interpreter GIL”(全局解释器锁),这种情况将会被改变
虽然距离 python 3.12 的发布还有几个月的时间,但是相关代码已经实现了。所以让我们提前来了解一下如何使用子解释器 API(ub-interpreters API) 来编写出真正的并发Python代码
子解释器(Sub-Interpreters)

我们首先来看下这个 “Per-Interpreter GIL” 是如何解决 Python 缺失适当并发性这个问题的
简单来讲,GIL(全局解释器锁)是一个互斥锁,它只允许一个线程控制 Python 解释器(某个线程想要执行,必须要先拿到 GIL ,在一个 python 解释器里面,GIL 只有一个,拿不到 GIL 的就不允许执行)
这就意味着即使你在 Python 中创建多个线程,也只会有一个线程在运行
随着 “Per-Interpreter GIL” 的引用,单个 python 解释器不再共享同一个 GIL。这种隔离级别允许每个子 python 解释器真正地并发运行
这意味着我们可以通过生成额外的子解释器来绕过 Python 的并发限制,其中每个子解释器都有自己的GIL(拿到一个 GIL 锁)
更详细的说明请参见 PEP 684,该文档描述了此功能/更改:https://peps.python.org/pep-0684/#per-interpreter-state
如何安装

想要使用这个新功能,我们需要安装最新的 python 版本,这需要源码编译安装
# https://devguide.python.org/getting-started/setup-building/#unix-compiling
git clone https://github.com/python/cpython.git
cd cpython

./configure --enable-optimizations --prefix=$(pwd)/python-3.12
make -s -j2
./python
# Python 3.12.0a7+ (heads/main:22f3425c3d, May 10 2023, 12:52:07) on linux
# Type "help", "copyright", "credits" or "license" for more information.C-API 在哪里

现在我们已经安装好了最新版本,那么我们该如何使用子解释器呢?我们可以直接通过 import 来导入吗?不幸的是,还不能
正如 PEP-684 中指出的:...this is an advanced feature meant for a narrow set of users of the C-API.
Per-Interpreter GIL 的特性目前只能通过 C-API 使用,还没有直接的接口供开发人员使用
接口预计会在 PEP 554中出现,如果大家能够接受,它应该会在 Python 3.13 中出现,在这个版本出现之前,我们必须自己想办法来实现子解释器
虽然还没有相关文档,也没有相关模块可以导入,但 CPython 代码库中有一些代码段向我们展示了如何使用它:

[*]方法一:我们可以使用 _xxsubinterpreters 模块(因为是通过 C 实现的,所以命名比较奇怪,而且在 python 中不能够简单地去检查代码)
[*]方法二:可以使用 CPython 的 test 模块,该模块具有用于测试的示例 Interpreter(和 Channel)类
# Choose one of these:
import _xxsubinterpreters as interpreters
from test.support import interpreters通常情况下我们一般用上面的第二种方法来实现
我们已经找到了子解释器,但我们还需要通过 test 模块去借用一些辅助函数,以便将代码传递给子解释器,辅助函数如下
from textwrap import dedent
import os
# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L75
def _captured_script(script):
    r, w = os.pipe()
    indented = script.replace('\n', '\n                ')
    wrapped = dedent(f"""
      import contextlib
      with open({w}, 'w', encoding="utf-8") as spipe:
            with contextlib.redirect_stdout(spipe):
                {indented}
      """)
    return wrapped, open(r, encoding="utf-8")


def _run_output(interp, request, channels=None):
    script, rpipe = _captured_script(request)
    with rpipe:
      interp.run(script, channels=channels)
      return rpipe.read()将 interpreters 模块与上面的辅助函数组合在一起,便可以生成第一个子解释器:
from test.support import interpreters

main = interpreters.get_main()
print(f"Main interpreter ID: {main}")
# Main interpreter ID: Interpreter(id=0, isolated=None)

interp = interpreters.create()

print(f"Sub-interpreter: {interp}")
# Sub-interpreter: Interpreter(id=1, isolated=True)

# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test__xxsubinterpreters.py#L236
code = dedent("""
            from test.support import interpreters
            cur = interpreters.get_current()
            print(cur.id)
            """)

out = _run_output(interp, code)

print(f"All Interpreters: {interpreters.list_all()}")
# All Interpreters:
print(f"Output: {out}")# Result of 'print(cur.id)'
# Output: 1生成和运行新解释器的一种方法是使用 create() 函数,然后将解释器与我们想要执行的代码一起传递给 _run_output() 辅助函数
还有一种更简单的方法,如下所示
interp = interpreters.create()
interp.run(code)直接使用 interpreters模块的 run 方法。
但如果我们运行上面这两段代码时,会收到以下报错
Fatal Python error: PyInterpreterState_Delete: remaining subinterpreters
Python runtime state: finalizing (tstate=0x000055b5926bf398)为了避免这个报错,我们还需要清理一些悬挂的解释器:
def cleanup_interpreters():
    for i in interpreters.list_all():
      if i.id == 0:# main
            continue
      try:
            print(f"Cleaning up interpreter: {i}")
            i.close()
      except RuntimeError:
            pass# already destroyed

cleanup_interpreters()
# Cleaning up interpreter: Interpreter(id=1, isolated=None)
# Cleaning up interpreter: Interpreter(id=2, isolated=None)线程

虽然使用上面的辅助函数运行代码是可行的,但在 threading 模块中使用熟悉的接口可能会更方便
import threading

def run_in_thread():
    t = threading.Thread(target=interpreters.create)
    print(t)
    t.start()
    print(t)
    t.join()
    print(t)

run_in_thread()
run_in_thread()

# <Thread(Thread-1 (create), initial)>
# <Thread(Thread-1 (create), started 139772371633728)>
# <Thread(Thread-1 (create), stopped 139772371633728)>
# <Thread(Thread-2 (create), initial)>
# <Thread(Thread-2 (create), started 139772371633728)>
# <Thread(Thread-2 (create), stopped 139772371633728)>我们通过把 interpreters.create 函数传递给Thread,它会自动在线程内部生成新的子解释器
我们也可以结合这两种方法,并将辅助函数传递给 threading.Thread:
import time

def run_in_thread():
    interp = interpreters.create(isolated=True)
    t = threading.Thread(target=_run_output, args=(interp, dedent("""
            import _xxsubinterpreters as _interpreters
            cur = _interpreters.get_current()

            import time
            time.sleep(2)
            # Can't print from here, won't bubble-up to main interpreter

            assert isinstance(cur, _interpreters.InterpreterID)
            """)))
    print(f"Created Thread: {t}")
    t.start()
    return t


t1 = run_in_thread()
print(f"First running Thread: {t1}")
t2 = run_in_thread()
print(f"Second running Thread: {t2}")
time.sleep(4)# Need to sleep to give Threads time to complete
cleanup_interpreters()上面的代码中演示了如何使用 _xxsubinterpreters 模块来实现 (方法一)
我们还在每个线程中休眠 2 秒来模拟“工作”状态
请注意,我们甚至不必调用 join() 函数等待线程完成,只需在线程完成时清理解释器即可
Channels

如果我们进一步挖掘 CPythontest 模块,我们还会发现 RecvChannel 和 SendChannel 类的实现类似于 Golang 中已知的通道
# https://github.com/python/cpython/blob/
#   15665d896bae9c3d8b60bd7210ac1b7dc533b093/Lib/test/test_interpreters.py#L583
r, s = interpreters.create_channel()

print(f"Channel: {r}, {s}")
# Channel: RecvChannel(id=0), SendChannel(id=0)

orig = b'spam'
s.send_nowait(orig)
obj = r.recv()
print(f"Received: {obj}")
# Received: b'spam'

cleanup_interpreters()
# Need clean up, otherwise:

# free(): invalid pointer
# Aborted (core dumped)上面的例子介绍了如何创建一个接收端通道(r)和发送端通道(s),然后我们使用 send_nowait 方法将数据发送,通过 recv 方法来接收数据
这个通道实际上只是另一个解释器,和以前一样,我们需要在处理完它之后进行清理
Digging Deeper

如果我们想要修改或者调整子解释器的选项(这些选项通常在 C 代码中设置),我们可以使用
test.support 模块中的代码,具体来说是run_in_subinterp_with_config
import test.support

def run_in_thread(script):
    test.support.run_in_subinterp_with_config(
      script,
      use_main_obmalloc=True,
      allow_fork=True,
      allow_exec=True,
      allow_threads=True,
      allow_daemon_threads=False,
      check_multi_interp_extensions=False,
      own_gil=True,
    )

code = dedent(f"""
            from test.support import interpreters
            cur = interpreters.get_current()
            print(cur)
            """)

run_in_thread(code)
# Interpreter(id=7, isolated=None)
run_in_thread(code)
# Interpreter(id=8, isolated=None)上面这个run_in_subinterp_with_config函数是 C 函数的 Python API。它提供了一些子解释器选项,如 own_gil,指定子解释器是否应该拥有自己的 GIL

来源:https://www.cnblogs.com/edisonfish/p/17431316.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页: [1]
查看完整版本: 它来了!真正的 python 多线程