|
众所周知,异步并发编程可以帮助程序更好地处理阻塞操作,比如网络 IO 操作或文件 IO 操作,避免因等待这些操作完成而导致程序卡住的情况。云存储文件传输场景正好包含网络 IO 操作和文件 IO 操作,比如业内相对著名的七牛云存储,官方sdk的默认阻塞传输模式虽然差强人意,但未免有些循规蹈矩,不够锐意创新。在全球同性交友网站Github上找了一圈,也没有找到异步版本,那么本次我们来自己动手将同步阻塞版本改造为异步非阻塞版本,并上传至Python官方库。
异步改造
首先参见七牛云官方接口文档:https://developer.qiniu.com/kodo,新建qiniu_async.py文件:
- # @Author:Liu Yue (v3u.cn)
- # @Software:Vscode
- # @Time:2022/12/30
-
- import base64
- import hmac
- import time
- from hashlib import sha1
- import json
- import httpx
- import aiofiles
-
-
-
- class Qiniu:
-
- def __init__(self, access_key, secret_key):
- """初始化"""
- self.__checkKey(access_key, secret_key)
- self.__access_key = access_key
- self.__secret_key = secret_key.encode('utf-8')
-
- def get_access_key(self):
- return self.__access_key
-
- def get_secret_key(self):
- return self.__secret_key
-
- def __token(self, data):
- hashed = hmac.new(self.__secret_key,data.encode('utf-8'), sha1)
- return self.urlsafe_base64_encode(hashed.digest())
-
- def token(self, data):
- return '{0}:{1}'.format(self.__access_key, self.__token(data))
-
- def token_with_data(self, data):
- data = self.urlsafe_base64_encode(data)
- return '{0}:{1}:{2}'.format(
- self.__access_key, self.__token(data), data)
-
- def urlsafe_base64_encode(self,data):
-
- if isinstance(data, str):
- data = data.encode('utf-8')
-
- ret = base64.urlsafe_b64encode(data)
-
- data = ret.decode('utf-8')
-
- return data
-
-
- @staticmethod
- def __checkKey(access_key, secret_key):
- if not (access_key and secret_key):
- raise ValueError('invalid key')
-
-
- def upload_token(
- self,
- bucket,
- key=None,
- expires=3600,
- policy=None,
- strict_policy=True):
- """生成上传凭证
-
- Args:
- bucket: 上传的空间名
- key: 上传的文件名,默认为空
- expires: 上传凭证的过期时间,默认为3600s
- policy: 上传策略,默认为空
-
- Returns:
- 上传凭证
- """
- if bucket is None or bucket == '':
- raise ValueError('invalid bucket name')
-
- scope = bucket
- if key is not None:
- scope = '{0}:{1}'.format(bucket, key)
-
- args = dict(
- scope=scope,
- deadline=int(time.time()) + expires,
- )
-
- return self.__upload_token(args)
-
- @staticmethod
- def up_token_decode(up_token):
- up_token_list = up_token.split(':')
- ak = up_token_list[0]
- sign = base64.urlsafe_b64decode(up_token_list[1])
- decode_policy = base64.urlsafe_b64decode(up_token_list[2])
- decode_policy = decode_policy.decode('utf-8')
- dict_policy = json.loads(decode_policy)
- return ak, sign, dict_policy
-
- def __upload_token(self, policy):
- data = json.dumps(policy, separators=(',', ':'))
- return self.token_with_data(data)
-
-
- @staticmethod
- def __copy_policy(policy, to, strict_policy):
- for k, v in policy.items():
- if (not strict_policy) or k in _policy_fields:
- to[k] = v
复制代码 这里有两个很关键的异步非阻塞三方库,分别是httpx和aiofiles,对应处理网络IO和文件IO阻塞问题:
- pip3 install httpx
- pip3 install aiofiles
复制代码 随后按照文档流程通过加密方法获取文件上传token,这里无须进行异步改造,因为并不涉及IO操作:
- q = Qiniu(access_key,access_secret)
-
- token = q.upload_token("空间名称")
-
- print(token)
复制代码 程序返回:
- ➜ mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
- q06bq54Ps5JLfZyP8Ax-qvByMBdu8AoIVJpMco2m:8RjIo9a4CxHM3009DwjbMxDzlU8=:eyJzY29wZSI6ImFkLWgyMTEyIiwiZGVhZGxpbmUiOjE2NzIzNjg2NTd9
复制代码 接着添加文件流推送方法,先看官方原版逻辑:
- def put_data(
- up_token, key, data, params=None, mime_type='application/octet-stream', check_crc=False, progress_handler=None,
- fname=None, hostscache_dir=None, metadata=None):
- """上传二进制流到七牛
- Args:
- up_token: 上传凭证
- key: 上传文件名
- data: 上传二进制流
- params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar
- mime_type: 上传数据的mimeType
- check_crc: 是否校验crc32
- progress_handler: 上传进度
- hostscache_dir: host请求 缓存文件保存位置
- metadata: 元数据
- Returns:
- 一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
- 一个ResponseInfo对象
- """
- final_data = b''
- if hasattr(data, 'read'):
- while True:
- tmp_data = data.read(config._BLOCK_SIZE)
- if len(tmp_data) == 0:
- break
- else:
- final_data += tmp_data
- else:
- final_data = data
-
- crc = crc32(final_data)
- return _form_put(up_token, key, final_data, params, mime_type,
- crc, hostscache_dir, progress_handler, fname, metadata=metadata)
-
- def _form_put(up_token, key, data, params, mime_type, crc, hostscache_dir=None, progress_handler=None, file_name=None,
- modify_time=None, keep_last_modified=False, metadata=None):
- fields = {}
- if params:
- for k, v in params.items():
- fields[k] = str(v)
- if crc:
- fields['crc32'] = crc
- if key is not None:
- fields['key'] = key
-
- fields['token'] = up_token
- if config.get_default('default_zone').up_host:
- url = config.get_default('default_zone').up_host
- else:
- url = config.get_default('default_zone').get_up_host_by_token(up_token, hostscache_dir)
- # name = key if key else file_name
-
- fname = file_name
- if not fname or not fname.strip():
- fname = 'file_name'
-
- # last modify time
- if modify_time and keep_last_modified:
- fields['x-qn-meta-!Last-Modified'] = rfc_from_timestamp(modify_time)
-
- if metadata:
- for k, v in metadata.items():
- if k.startswith('x-qn-meta-'):
- fields[k] = str(v)
-
- r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
- if r is None and info.need_retry():
- if info.connect_failed:
- if config.get_default('default_zone').up_host_backup:
- url = config.get_default('default_zone').up_host_backup
- else:
- url = config.get_default('default_zone').get_up_host_backup_by_token(up_token, hostscache_dir)
- if hasattr(data, 'read') is False:
- pass
- elif hasattr(data, 'seek') and (not hasattr(data, 'seekable') or data.seekable()):
- data.seek(0)
- else:
- return r, info
- r, info = http._post_file(url, data=fields, files={'file': (fname, data, mime_type)})
-
- return r, info
复制代码 这里官方使用两个方法,先试用put_data方法将字符串转换为二进制文件流,随后调用_form_put进行同步上传操作,这里_form_put这个私有方法是可复用的,既兼容文件流也兼容文件实体,写法上非常值得我们借鉴,弄明白了官方原版的流程后,让我们撰写文件流传输的异步版本:
- # 上传文件流
- async def upload_data(self,up_token, key,data,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):
-
- data.encode('utf-8')
-
- fields = {}
- if params:
- for k, v in params.items():
- fields[k] = str(v)
-
- if key is not None:
- fields['key'] = key
- fields['token'] = up_token
-
- fname = file_name
- if not fname or not fname.strip():
- fname = 'file_name'
-
- async with httpx.AsyncClient() as client:
-
- # 调用异步使用await关键字
- res = await client.post(url,data=fields,files={'file': (fname,data,mime_type)})
-
- print(res.text)
复制代码 这里我们声明异步方法upload_data,通过encode直接转换文件流,并使用异步httpx.AsyncClient()对象将文件流推送到官网接口地址:up-z1.qiniup.com
随后进行测试:
- import asyncio
- q = qiniu_async.Qiniu("accesskey","accesssecret")
-
- token = q.upload_token("空间名称")
-
- #文件流上传
- asyncio.run(q.upload_data(token,"3343.txt","123测试"))
复制代码 程序返回:
- ➜ mydemo git:(master) ✗ /opt/homebrew/bin/python3.10 "/Users/liuyue/wodfan/work/mydemo/src/test.py"
- {"hash":"FtnQXAXft5AsOH1mrmXGaRzSt-95","key":"33434.txt"}
复制代码 接口会返回文件流的hash编码,没有问题。
接着查看文件上传流程:
- def put_file(up_token, key, file_path, params=None,
- mime_type='application/octet-stream', check_crc=False,
- progress_handler=None, upload_progress_recorder=None, keep_last_modified=False, hostscache_dir=None,
- part_size=None, version=None, bucket_name=None, metadata=None):
- """上传文件到七牛
- Args:
- up_token: 上传凭证
- key: 上传文件名
- file_path: 上传文件的路径
- params: 自定义变量,规格参考 https://developer.qiniu.com/kodo/manual/vars#xvar
- mime_type: 上传数据的mimeType
- check_crc: 是否校验crc32
- progress_handler: 上传进度
- upload_progress_recorder: 记录上传进度,用于断点续传
- hostscache_dir: host请求 缓存文件保存位置
- version: 分片上传版本 目前支持v1/v2版本 默认v1
- part_size: 分片上传v2必传字段 默认大小为4MB 分片大小范围为1 MB - 1 GB
- bucket_name: 分片上传v2字段 空间名称
- metadata: 元数据信息
- Returns:
- 一个dict变量,类似 {"hash": "<Hash string>", "key": "<Key string>"}
- 一个ResponseInfo对象
- """
- ret = {}
- size = os.stat(file_path).st_size
- with open(file_path, 'rb') as input_stream:
- file_name = os.path.basename(file_path)
- modify_time = int(os.path.getmtime(file_path))
- if size > config.get_default('default_upload_threshold'):
- ret, info = put_stream(up_token, key, input_stream, file_name, size, hostscache_dir, params,
- mime_type, progress_handler,
- upload_progress_recorder=upload_progress_recorder,
- modify_time=modify_time, keep_last_modified=keep_last_modified,
- part_size=part_size, version=version, bucket_name=bucket_name, metadata=metadata)
- else:
- crc = file_crc32(file_path)
- ret, info = _form_put(up_token, key, input_stream, params, mime_type,
- crc, hostscache_dir, progress_handler, file_name,
- modify_time=modify_time, keep_last_modified=keep_last_modified, metadata=metadata)
- return ret, info
复制代码 这里官方使用的是标准库上下文管理器同步读取文件,改写为异步方法:
- # 上传文件实体
- async def upload_file(self,up_token,key,path,url="http://up-z1.qiniup.com",params=None,mime_type='application/octet-stream',file_name=None,metadata=None):
-
-
- async with aiofiles.open(path, mode='rb') as f:
- contents = await f.read()
-
- fields = {}
- if params:
- for k, v in params.items():
- fields[k] = str(v)
-
- if key is not None:
- fields['key'] = key
- fields['token'] = up_token
-
- fname = file_name
- if not fname or not fname.strip():
- fname = 'file_name'
-
- async with httpx.AsyncClient() as client:
-
- # 调用异步使用await关键字
- res = await client.post(url,data=fields,files={'file': (fname,contents,mime_type)})
-
- print(res.text)
复制代码 通过aiofiles异步读取文件后,在通过httpx.AsyncClient()进行异步传输。
需要注意的是,这里默认传输到up-z1.qiniup.com接口,如果是不同区域的云存储服务器,需要更改url参数的值,具体服务器接口列表请参照官网文档。
至此,文件流和文件异步传输就改造好了。
上传至Python官方库
为了方便广大七牛云用户使用异步传输版本库,可以将qiniu-async上传到Python官方库,首先注册成为Python官方库的开发者:pypi.org/
随后在项目根目录下新建setup.py文件:
- import setuptools
- import pathlib
-
- here = pathlib.Path(__file__).parent.resolve()
- long_description = (here / "README.md").read_text(encoding="utf-8")
-
- setuptools.setup(
- name="qiniu-async",
- version="1.0.1",
- author="LiuYue",
- author_email="zcxey2911@gmail.com",
- description="qiniu_async python library",
- long_description=long_description,
- long_description_content_type="text/markdown",
- url="https://github.com/qiniu-async",
- packages=setuptools.find_packages(),
- license="Apache 2.0",
- classifiers=[
- "Development Status :: 3 - Alpha",
- "Intended Audience :: Developers",
- "Programming Language :: Python :: 3",
- "Programming Language :: Python :: 3.7",
- "Programming Language :: Python :: 3.8",
- "Programming Language :: Python :: 3.9",
- "Programming Language :: Python :: 3.10",
- "Programming Language :: Python :: 3 :: Only",
- "License :: OSI Approved :: MIT License",
- "Operating System :: OS Independent",
-
- ],
- keywords="qiniu, qiniu_async, async",
- py_modules=[
- 'qiniu_async'
- ],
- install_requires=["aiofiles","httpx"],
- )
复制代码 这是安装文件,主要为了声明该模块的名称、作者、版本以及依赖库。
随后本地打包文件:
程序会根据setup.py文件生成压缩包:
- ➜ qiniu_async tree
- .
- ├── README.md
- ├── dist
- │ └── qiniu-async-1.0.1.tar.gz
- ├── https:
- │ └── github.com
- │ └── zcxey2911
- │ └── qiniu-async.git
- ├── qiniu_async.egg-info
- │ ├── PKG-INFO
- │ ├── SOURCES.txt
- │ ├── dependency_links.txt
- │ ├── requires.txt
- │ └── top_level.txt
- ├── qiniu_async.py
- └── setup.py
复制代码 接着安装twine库, 准备提交Python官网:
随后在根目录运行命令提交:
在官网进行查看:https://pypi.org/project/qiniu-async/
随后本地就可以直接通过pip命令句进行安装了:
- pip install qiniu-async -i https://pypi.org/simple
复制代码 非常方便。
结语
云端存储,异步加持,猛虎添翼,未敢拥钵独飨,除了通过pip安装qiniu-async库,也奉上Github项目地址:https://github.com/zcxey2911/qiniu-async ,与众乡亲同飨。
来源:https://www.cnblogs.com/v3ucn/p/17014299.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|