翼度科技»论坛 编程开发 python 查看内容

乾坤大挪移,如何将同步阻塞(sync)三方库包转换为异步非阻塞(async)模式?Py

12

主题

12

帖子

36

积分

新手上路

Rank: 1

积分
36
众所周知,异步并发编程可以帮助程序更好地处理阻塞操作,比如网络 IO 操作或文件 IO 操作,避免因等待这些操作完成而导致程序卡住的情况。云存储文件传输场景正好包含网络 IO 操作和文件 IO 操作,比如业内相对著名的七牛云存储,官方sdk的默认阻塞传输模式虽然差强人意,但未免有些循规蹈矩,不够锐意创新。在全球同性交友网站Github上找了一圈,也没有找到异步版本,那么本次我们来自己动手将同步阻塞版本改造为异步非阻塞版本,并上传至Python官方库。
异步改造

首先参见七牛云官方接口文档:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:
  1. # @Author:Liu Yue (v3u.cn)  
  2. # @Software:Vscode  
  3. # @Time:2022/12/30  
  4.   
  5. import base64  
  6. import hmac  
  7. import time  
  8. from hashlib import sha1  
  9. import json  
  10. import httpx  
  11. import aiofiles  
  12.   
  13.   
  14.   
  15. class Qiniu:  
  16.   
  17.     def __init__(self, access_key, secret_key):  
  18.         """初始化"""  
  19.         self.__checkKey(access_key, secret_key)  
  20.         self.__access_key = access_key  
  21.         self.__secret_key = secret_key.encode('utf-8')  
  22.   
  23.     def get_access_key(self):  
  24.         return self.__access_key  
  25.   
  26.     def get_secret_key(self):  
  27.         return self.__secret_key  
  28.   
  29.     def __token(self, data):  
  30.         hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)  
  31.         return self.urlsafe_base64_encode(hashed.digest())  
  32.   
  33.     def token(self, data):  
  34.         return '{0}:{1}'.format(self.__access_key, self.__token(data))  
  35.   
  36.     def token_with_data(self, data):  
  37.         data = self.urlsafe_base64_encode(data)  
  38.         return '{0}:{1}:{2}'.format(  
  39.             self.__access_key, self.__token(data), data)  
  40.   
  41.     def urlsafe_base64_encode(self,data):  
  42.   
  43.         if isinstance(data, str):  
  44.             data = data.encode('utf-8')  
  45.   
  46.         ret = base64.urlsafe_b64encode(data)  
  47.   
  48.         data = ret.decode('utf-8')  
  49.   
  50.         return data  
  51.   
  52.   
  53.     @staticmethod  
  54.     def __checkKey(access_key, secret_key):  
  55.         if not (access_key and secret_key):  
  56.             raise ValueError('invalid key')  
  57.   
  58.   
  59.     def upload_token(  
  60.             self,  
  61.             bucket,  
  62.             key=None,  
  63.             expires=3600,  
  64.             policy=None,  
  65.             strict_policy=True):  
  66.         """生成上传凭证  
  67.   
  68.         Args:  
  69.             bucket:  上传的空间名  
  70.             key:     上传的文件名,默认为空  
  71.             expires: 上传凭证的过期时间,默认为3600s  
  72.             policy:  上传策略,默认为空  
  73.   
  74.         Returns:  
  75.             上传凭证  
  76.         """  
  77.         if bucket is None or bucket == '':  
  78.             raise ValueError('invalid bucket name')  
  79.   
  80.         scope = bucket  
  81.         if key is not None:  
  82.             scope = '{0}:{1}'.format(bucket, key)  
  83.   
  84.         args = dict(  
  85.             scope=scope,  
  86.             deadline=int(time.time()) + expires,  
  87.         )  
  88.   
  89.         return self.__upload_token(args)  
  90.   
  91.     @staticmethod  
  92.     def up_token_decode(up_token):  
  93.         up_token_list = up_token.split(':')  
  94.         ak = up_token_list[0]  
  95.         sign = base64.urlsafe_b64decode(up_token_list[1])  
  96.         decode_policy = base64.urlsafe_b64decode(up_token_list[2])  
  97.         decode_policy = decode_policy.decode('utf-8')  
  98.         dict_policy = json.loads(decode_policy)  
  99.         return ak, sign, dict_policy  
  100.   
  101.     def __upload_token(self, policy):  
  102.         data = json.dumps(policy, separators=(',', ':'))  
  103.         return self.token_with_data(data)  
  104.   
  105.   
  106.     @staticmethod  
  107.     def __copy_policy(policy, to, strict_policy):  
  108.         for k, v in policy.items():  
  109.             if (not strict_policy) or k in _policy_fields:  
  110.                 to[k] = v
复制代码
这里有两个很关键的异步非阻塞三方库,分别是httpx和aiofiles,对应处理网络IO和文件IO阻塞问题:
  1. pip3 install httpx  
  2. pip3 install aiofiles
复制代码
随后按照文档流程通过加密方法获取文件上传token,这里无须进行异步改造,因为并不涉及IO操作:
  1. q = Qiniu(access_key,access_secret)  
  2.   
  3. token = q.upload_token("空间名称")  
  4.   
  5. print(token)
复制代码
程序返回:
  1. ➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
  2. q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9
复制代码
接着添加文件流推送方法,先看官方原版逻辑:
  1. def put_data(  
  2.         up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,  
  3.         fname=None, hostscache_dir=None, metadata=None):  
  4.     """上传二进制流到七牛  
  5.     Args:  
  6.         up_token:         上传凭证  
  7.         key:              上传文件名  
  8.         data:             上传二进制流  
  9.         params:           自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar  
  10.         mime_type:        上传数据的mimeType  
  11.         check_crc:        是否校验crc32  
  12.         progress_handler: 上传进度  
  13.         hostscache_dir:   host请求 缓存文件保存位置  
  14.         metadata:         元数据  
  15.     Returns:  
  16.         一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}  
  17.         一个ResponseInfo对象  
  18.     """  
  19.     final_data = b''  
  20.     if hasattr(data, 'read'):  
  21.         while True:  
  22.             tmp_data = data.read(config._BLOCK_SIZE)  
  23.             if len(tmp_data) == 0:  
  24.                 break  
  25.             else:  
  26.                 final_data += tmp_data  
  27.     else:  
  28.         final_data = data  
  29.   
  30.     crc = crc32(final_data)  
  31.     return _form_put(up_token, key, final_data, params, mime_type,  
  32.                      crc, hostscache_dir, progress_handler, fname, metadata=metadata)  
  33.   
  34. def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,  
  35.               modify_time=None, keep_last_modified=False, metadata=None):  
  36.     fields = {}  
  37.     if params:  
  38.         for k, v in params.items():  
  39.             fields[k] = str(v)  
  40.     if crc:  
  41.         fields['crc32'] = crc  
  42.     if key is not None:  
  43.         fields['key'] = key  
  44.   
  45.     fields['token'] = up_token  
  46.     if config.get_default('default_zone').up_host:  
  47.         url = config.get_default('default_zone').up_host  
  48.     else:  
  49.         url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)  
  50.     # name = key if key else file_name  
  51.   
  52.     fname = file_name  
  53.     if not fname or not fname.strip():  
  54.         fname = 'file_name'  
  55.   
  56.     # last modify time  
  57.     if modify_time and keep_last_modified:  
  58.         fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)  
  59.   
  60.     if metadata:  
  61.         for k, v in metadata.items():  
  62.             if k.startswith('x-qn-meta-'):  
  63.                 fields[k] = str(v)  
  64.   
  65.     r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
  66.     if r is None and info.need_retry():  
  67.         if info.connect_failed:  
  68.             if config.get_default('default_zone').up_host_backup:  
  69.                 url = config.get_default('default_zone').up_host_backup  
  70.             else:  
  71.                 url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)  
  72.         if hasattr(data, 'read') is False:  
  73.             pass  
  74.         elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):  
  75.             data.seek(0)  
  76.         else:  
  77.             return r, info  
  78.         r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})  
  79.   
  80.     return r, info
复制代码
这里官方使用两个方法,先试用put_data方法将字符串转换为二进制文件流,随后调用_form_put进行同步上传操作,这里_form_put这个私有方法是可复用的,既兼容文件流也兼容文件实体,写法上非常值得我们借鉴,弄明白了官方原版的流程后,让我们撰写文件流传输的异步版本:
  1. # 上传文件流  
  2.     async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  3.   
  4.         data.encode('utf-8')  
  5.          
  6.         fields = {}  
  7.         if params:  
  8.             for k, v in params.items():  
  9.                 fields[k] = str(v)  
  10.   
  11.         if key is not None:  
  12.             fields['key'] = key  
  13.         fields['token'] = up_token  
  14.   
  15.         fname = file_name  
  16.         if not fname or not fname.strip():  
  17.             fname = 'file_name'  
  18.   
  19.         async with httpx.AsyncClient() as client:  
  20.   
  21.             # 调用异步使用await关键字  
  22.             res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})  
  23.   
  24.             print(res.text)
复制代码
这里我们声明异步方法upload_data,通过encode直接转换文件流,并使用异步httpx.AsyncClient()对象将文件流推送到官网接口地址:up-z1.qiniup.com
随后进行测试:
  1. import asyncio
  2. q = qiniu_async.Qiniu("accesskey","accesssecret")  
  3.   
  4. token = q.upload_token("空间名称")  
  5.   
  6. #文件流上传  
  7. asyncio.run(q.upload_data(token,"3343.txt","123测试"))
复制代码
程序返回:
  1. ➜  mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"  
  2. {"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}
复制代码
接口会返回文件流的hash编码,没有问题。
接着查看文件上传流程:
  1. def put_file(up_token, key, file_path, params=None,  
  2.              mime_type='application/octet-stream', check_crc=False,  
  3.              progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,  
  4.              part_size=None, version=None, bucket_name=None, metadata=None):  
  5.     """上传文件到七牛  
  6.     Args:  
  7.         up_token:                 上传凭证  
  8.         key:                      上传文件名  
  9.         file_path:                上传文件的路径  
  10.         params:                   自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar  
  11.         mime_type:                上传数据的mimeType  
  12.         check_crc:                是否校验crc32  
  13.         progress_handler:         上传进度  
  14.         upload_progress_recorder: 记录上传进度,用于断点续传  
  15.         hostscache_dir:           host请求 缓存文件保存位置  
  16.         version:                  分片上传版本 目前支持v1/v2版本 默认v1  
  17.         part_size:                分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB  
  18.         bucket_name:              分片上传v2字段 空间名称  
  19.         metadata:                 元数据信息  
  20.     Returns:  
  21.         一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}  
  22.         一个ResponseInfo对象  
  23.     """  
  24.     ret = {}  
  25.     size = os.stat(file_path).st_size  
  26.     with open(file_path, 'rb') as input_stream:  
  27.         file_name = os.path.basename(file_path)  
  28.         modify_time = int(os.path.getmtime(file_path))  
  29.         if size > config.get_default('default_upload_threshold'):  
  30.             ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,  
  31.                                    mime_type, progress_handler,  
  32.                                    upload_progress_recorder=upload_progress_recorder,  
  33.                                    modify_time=modify_time, keep_last_modified=keep_last_modified,  
  34.                                    part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)  
  35.         else:  
  36.             crc = file_crc32(file_path)  
  37.             ret, info = _form_put(up_token, key, input_stream, params, mime_type,  
  38.                                   crc, hostscache_dir, progress_handler, file_name,  
  39.                                   modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)  
  40.     return ret, info
复制代码
这里官方使用的是标准库上下文管理器同步读取文件,改写为异步方法:
  1. # 上传文件实体  
  2.     async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):  
  3.   
  4.   
  5.         async with aiofiles.open(path, mode='rb') as f:  
  6.             contents = await f.read()  
  7.          
  8.         fields = {}  
  9.         if params:  
  10.             for k, v in params.items():  
  11.                 fields[k] = str(v)  
  12.   
  13.         if key is not None:  
  14.             fields['key'] = key  
  15.         fields['token'] = up_token  
  16.   
  17.         fname = file_name  
  18.         if not fname or not fname.strip():  
  19.             fname = 'file_name'  
  20.   
  21.         async with httpx.AsyncClient() as client:  
  22.   
  23.             # 调用异步使用await关键字  
  24.             res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})  
  25.   
  26.             print(res.text)
复制代码
通过aiofiles异步读取文件后,在通过httpx.AsyncClient()进行异步传输。
需要注意的是,这里默认传输到up-z1.qiniup.com接口,如果是不同区域的云存储服务器,需要更改url参数的值,具体服务器接口列表请参照官网文档。
至此,文件流和文件异步传输就改造好了。
上传至Python官方库

为了方便广大七牛云用户使用异步传输版本库,可以将qiniu-async上传到Python官方库,首先注册成为Python官方库的开发者:pypi.org/
随后在项目根目录下新建setup.py文件:
  1. import setuptools  
  2. import pathlib  
  3.   
  4. here = pathlib.Path(__file__).parent.resolve()  
  5. long_description = (here / "README.md").read_text(encoding="utf-8")  
  6.   
  7. setuptools.setup(  
  8.     name="qiniu-async",  
  9.     version="1.0.1",  
  10.     author="LiuYue",  
  11.     author_email="zcxey2911@gmail.com",  
  12.     description="qiniu_async python library",  
  13.     long_description=long_description,  
  14.     long_description_content_type="text/markdown",  
  15.     url="https://github.com/qiniu-async",  
  16.     packages=setuptools.find_packages(),  
  17.     license="Apache 2.0",  
  18.     classifiers=[  
  19.         "Development Status :: 3 - Alpha",  
  20.         "Intended Audience :: Developers",  
  21.         "Programming Language :: Python :: 3",  
  22.         "Programming Language :: Python :: 3.7",  
  23.         "Programming Language :: Python :: 3.8",  
  24.         "Programming Language :: Python :: 3.9",  
  25.         "Programming Language :: Python :: 3.10",  
  26.         "Programming Language :: Python :: 3 :: Only",  
  27.         "License :: OSI Approved :: MIT License",  
  28.         "Operating System :: OS Independent",  
  29.   
  30.     ],  
  31.     keywords="qiniu, qiniu_async, async",  
  32.     py_modules=[  
  33.         'qiniu_async'  
  34.     ],  
  35.     install_requires=["aiofiles","httpx"],  
  36. )
复制代码
这是安装文件,主要为了声明该模块的名称、作者、版本以及依赖库。
随后本地打包文件:
  1. python3 setup.py sdist
复制代码
程序会根据setup.py文件生成压缩包:
  1. ➜  qiniu_async tree  
  2. .  
  3. ├── README.md  
  4. ├── dist  
  5. │ └── qiniu-async-1.0.1.tar.gz  
  6. ├── https:  
  7. │ └── github.com  
  8. │     └── zcxey2911  
  9. │         └── qiniu-async.git  
  10. ├── qiniu_async.egg-info  
  11. │ ├── PKG-INFO  
  12. │ ├── SOURCES.txt  
  13. │ ├── dependency_links.txt  
  14. │ ├── requires.txt  
  15. │ └── top_level.txt  
  16. ├── qiniu_async.py  
  17. └── setup.py
复制代码
接着安装twine库, 准备提交Python官网:
  1. pip3 install twine
复制代码
随后在根目录运行命令提交:
  1. twine upload dist/*
复制代码
在官网进行查看:https://pypi.org/project/qiniu-async/
随后本地就可以直接通过pip命令句进行安装了:
  1. pip install qiniu-async -i https://pypi.org/simple
复制代码
非常方便。
结语

云端存储,异步加持,猛虎添翼,未敢拥钵独飨,除了通过pip安装qiniu-async库,也奉上Github项目地址:https://github.com/zcxey2911/qiniu-async ,与众乡亲同飨。

来源:https://www.cnblogs.com/v3ucn/p/17014299.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具