|
Flask
- 短小精悍,可扩展强的一个web框架。上下文管理
- 安装:pip3 install Flask,我们做web开发时,是站在两大东西之上做的web框架和wsgi,Flask和Django中都是应用的并不是自己写的。Flask中werkzurg就是。
- web服务网关接口,wsgi是一个协议,实现该写一个的模块:wsgiref|werkzeug,实现其协议的模块本质上就是socket服务端用于接收用户请求,并处理。一般web框架基于wsgi实现,这样实现关注点分离。
- flask就是基于Werkzurg搭建起来的
- from werkzeug.wrappers import Request, Response
- from werkzeug.serving import run_simple
-
- @Request.application
- def run(environ, start_response):
- return Response("Hello World!") # 返回字符串
- if __name__=="__main__":
- run_simple("localhost", 4000, run) # 监听本机的4000端口,如果访问就执行run函数
复制代码 - 一个最简单的Flask
- from flask import Flask
- app = Flask(__name__) # flask名称,一般这样写。
-
- @app.route("/index") # url路径
- def index(): #所调用的函数
- return "Hello World!"
- if __name__ == "__main__":
- app.run() # 执行口
复制代码 - Flask实现用户登录程序
- from flask import Flask, render_template, request, redirect, session
- app = Flask(__name__, template_folder="templates", static_folder="static")
- app.secret_key = "asdf" # session加盐
- # request.form # 请求体
- # request.args # 请求头
- # request.method # 请求类型
- @app.route("/login", methods=["GET", "POST"])
- def login():
- if request.method == "GET":
- return render_template("login.html")
- user = request.from.get("user")
- pwd= request.from.get("pwd")
- if user == "alex" and pwd == "666":
- session["user"] = user
- return redirect("/index")
- return render_template("login.html", error="用户名或密码错误")
-
- @app.route("/index")
- def index():
- user = session.get("user")
- if not user:
- return redirect("/login")
- return render_template("index.html")
-
- if __name__ == "__main__":
- app.run()
复制代码 1.配置文件
- settings
- class Config(object):
- DEBUG = False
- TESTING = False
- DATABASE_URL = "sqlite://:memory:"
-
- class ProductionConfig(Config):
- DATABASE_URL = "mysql://user@localhost/foo"
-
- class DevelopmentConfig(Config):
- DEBUT = True
-
- class TestingConfig(Config):
- TESTING = True
-
- flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
- {
- 'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
- 'TESTING': False, 是否开启测试模式
- 'PROPAGATE_EXCEPTIONS': None,
- 'PRESERVE_CONTEXT_ON_EXCEPTION': None,
- 'SECRET_KEY': None,
- 'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
- 'USE_X_SENDFILE': False,
- 'LOGGER_NAME': None,
- 'LOGGER_HANDLER_POLICY': 'always',
- 'SERVER_NAME': None,
- 'APPLICATION_ROOT': None,
- 'SESSION_COOKIE_NAME': 'session',
- 'SESSION_COOKIE_DOMAIN': None,
- 'SESSION_COOKIE_PATH': None,
- 'SESSION_COOKIE_HTTPONLY': True,
- 'SESSION_COOKIE_SECURE': False,
- 'SESSION_REFRESH_EACH_REQUEST': True,
- 'MAX_CONTENT_LENGTH': None,
- 'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
- 'TRAP_BAD_REQUEST_ERRORS': False,
- 'TRAP_HTTP_EXCEPTIONS': False,
- 'EXPLAIN_TEMPLATE_LOADING': False,
- 'PREFERRED_URL_SCHEME': 'http',
- 'JSON_AS_ASCII': True,
- 'JSON_SORT_KEYS': True,
- 'JSONIFY_PRETTYPRINT_REGULAR': True,
- 'JSONIFY_MIMETYPE': 'application/json',
- 'TEMPLATES_AUTO_RELOAD': None,
- }
复制代码 - xx
- # 如何通过代码找到指定的类
- import importlib
- path = "settings.Foo"
-
- p, c = path.rsplit(".", maxsplit=1)
- m = importlib.import_module(p)
- cls = getattr(m, c)
- # 找出类中的所有静态变量
- for key in dir(cls):
- if key.isupper():
- print(key,getattr(cls, key))
复制代码 - Flask中有一个配置文件
- from flask import Flask, render_template, request, redirect, session
- app = Flask(__name__)
-
- print(app.config) # 这是flask提供的配置文件
- # app.config.["DEBUG"] = True # 我们可以通过这样来修改配置,但是flask也提供了另一种方式
- app.config.from_object("sttings.DevelopmentConfig") # 这种方式就是自己创建一个文件和类,你在类中写的所有配置都会被应用。
-
-
- if __name__ == "__main__":
- app.run()
复制代码
2.路由和视图
- flask的路由系统
- from flask import Flask, render_template, request, redirect, session, url_for
- app = Flask(__name__)
- # int 是值的类型,nid是名称:/login/1234
- """
- # 这是常用的类型,不加类型默认是字符串类型
- DEFAULT_CONVERTERS = {
- 'default': UnicodeConverter,
- 'string': UnicodeConverter,
- 'any': AnyConverter,
- 'path': PathConverter,
- 'int': IntegerConverter,
- 'float': FloatConverter,
- 'uuid': UUIDConverter,
- }
- """
- @app.route("/login/<int:nid>", endpoint="n1") # endpoint就是设置名字,不写默认是函数名
- def login():
- url_for("n1", nid=122) # 反向查找url,nid是携带的值
-
- if __name__ == "__main__":
- app.run()
复制代码 - 路由的两种方式
- @app.route("/xxx")
- def index():
- return "index"
- # 其实到内部调用时就是使用了app.add_url_rule来实现的,为了方便所以在外面套了个装饰器。
- def index():
- return "index"
- app.add_url_rule("/xxx", None, index)
-
- # 注意事项:
- # - 不用让endpoint重名
- # - 如果重名函数也一定要相同。
复制代码 - app.route和app.add_url_rule参数
- @app.route和app.add_url_rule参数:
- rule, URL规则
- view_func, 视图函数名称
- defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
- endpoint=None, 名称,用于反向生成URL,即: url_for('名称')
- methods=None, 允许的请求方式,如:["GET","POST"]
-
- strict_slashes=None, 对URL最后的 / 符号是否严格要求,
- 如:
- @app.route('/index',strict_slashes=False),
- 访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
- @app.route('/index',strict_slashes=True)
- 仅访问 http://www.xx.com/index
- redirect_to=None, 重定向到指定地址
- 如:
- @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
- 或
- def func(adapter, nid):
- return "/home/888"
- @app.route('/index/<int:nid>', redirect_to=func)
- subdomain=None, 子域名访问
- from flask import Flask, views, url_for
-
- app = Flask(import_name=__name__)
- app.config['SERVER_NAME'] = 'wupeiqi.com:5000'
-
- @app.route("/", subdomain="admin")
- def static_index():
- """Flask supports static subdomains
- This is available at static.your-domain.tld"""
- return "static.your-domain.tld"
-
-
- @app.route("/dynamic", subdomain="<username>")
- def username_index(username):
- """Dynamic subdomains are also supported
- Try going to user1.your-domain.tld/dynamic"""
- return username + ".your-domain.tld"
-
-
- if __name__ == '__main__':
- app.run()
复制代码 - CBV
- import functools
- from flask import Flask,views
- app = Flask(__name__)
-
-
- def wrapper(func):
- @functools.wraps(func)
- def inner(*args,**kwargs):
- return func(*args,**kwargs)
-
- return inner
-
-
-
- class UserView(views.MethodView):
- methods = ['GET']
- decorators = [wrapper,]
-
- def get(self,*args,**kwargs):
- return 'GET'
-
- def post(self,*args,**kwargs):
- return 'POST'
-
- app.add_url_rule('/user',None,UserView.as_view('uuuu'))
-
- if __name__ == '__main__':
- app.run()
复制代码 - 自定义正则
- from flask import Flask,url_for
-
- app = Flask(__name__)
-
- # 步骤一:定制类
- from werkzeug.routing import BaseConverter
- class RegexConverter(BaseConverter):
- """
- 自定义URL匹配正则表达式
- """
-
- def __init__(self, map, regex):
- super(RegexConverter, self).__init__(map)
- self.regex = regex
-
- def to_python(self, value):
- """
- 路由匹配时,匹配成功后传递给视图函数中参数的值
- :param value:
- :return:
- """
- return int(value)
-
- def to_url(self, value):
- """
- 使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
- :param value:
- :return:
- """
- val = super(RegexConverter, self).to_url(value)
- return val
-
- # 步骤二:添加到转换器
- app.url_map.converters['reg'] = RegexConverter
-
- """
- 1. 用户发送请求
- 2. flask内部进行正则匹配
- 3. 调用to_python(正则匹配的结果)方法
- 4. to_python方法的返回值会交给视图函数的参数
- """
-
- # 步骤三:使用自定义正则
- @app.route('/index/<reg("\d+"):nid>')
- def index(nid):
- print(nid,type(nid))
-
- print(url_for('index',nid=987))
- return "index"
-
- if __name__ == '__main__':
- app.run()
复制代码
3.请求和响应
- 请求
- # request.method
- # request.args
- # request.form
- # request.values
- # request.cookies
- # request.headers
- # request.path
- # request.full_path
- # request.script_root
- # request.url
- # request.base_url
- # request.url_root
- # request.host_url
- # request.host
- # request.files
- # obj = request.files['the_file_name']
- # obj.save('/var/www/uploads/' + secure_filename(f.filename))
复制代码 - 响应
- from flask import Flask, render_template, request, redirect, session, url_for, jsonify
-
- # 响应相关信息
- # return "字符串"
- # return render_template('html模板路径',**{})
- # return redirect('/index.html')
- # return jsonify({"k1":"v1"})
-
- # response = make_response(render_template('index.html'))
- # response是flask.wrappers.Response类型
- # response.delete_cookie('key')
- # response.set_cookie('key', 'value')
- # response.headers['X-Something'] = 'A value'
- # return response
复制代码 - 实现登录后才可查看的效果
- # 版本一:在每个函数中都加入判断
- @app.route('/index')
- def index():
- if not session.get('user'):
- return redirect(url_for('login'))
- return render_template('index.html',stu_dic=STUDENT_DICT)
- # 版本二:使用装饰器,但是需要用到functools的wraps,应为你写的装饰器肯定要先执行,不然无法被应用,但是你写的装饰器到最后被调用的函数是inner,但是我们需要自己的属性才可以实现。wraps的功能就是将inner赋予调用函数的所有属性和特征。
- import functools
- def auth(func):
- @functools.wraps(func)
- def inner(*args,**kwargs):
- if not session.get('user'):
- return redirect(url_for('login'))
- ret = func(*args,**kwargs)
- return ret
- return inner
-
- @app.route('/index')
- @auth
- def index():
- return render_template('index.html',stu_dic=STUDENT_DICT)
-
- # 应用场景:比较少的函数中需要额外添加功能。
-
- # 版本三:before_request,在执行函数时会先执行before_request所定义的函数,返回None就会向后执行,如果返回数据就会直接将其返回到前端了。
- @app.before_request
- def xxxxxx():
- if request.path == '/login':
- return None
-
- if session.get('user'):
- return None
-
- return redirect('/login')
复制代码
4.模板渲染
- 基本数据类型:可以执行python语法,如:dict.get() list['xx']
- 传入函数:django,自动执行\flask,不自动执行
- 全局定义函数
- @app.template_global()
- def sb(a1, a2):
- # {{sb(1,9)}}
- return a1 + a2
-
- @app.template_filter()
- def db(a1, a2, a3):
- # {{ 1|db(2,3) }}
- return a1 + a2 + a3
复制代码 - 模板继承
- layout.html
- <!DOCTYPE html>
- <html lang="zh-CN">
- <head>
- <meta charset="UTF-8">
- <title>Title</title>
- <meta name="viewport" content="width=device-width, initial-scale=1">
- </head>
- <body>
- <h1>模板</h1>
- {% block content %}{% endblock %}
- </body>
- </html>
-
- tpl.html
- {% extends "layout.html"%}
-
-
- {% block content %}
- {{users.0}}
-
-
- {% endblock %}
复制代码 - include
- {% include "form.html" %}
-
-
- form.html
- <form>
- asdfasdf
- asdfasdf
- asdf
- asdf
- </form>
复制代码 - 宏
- {% macro ccccc(name, type='text', value='') %}
- <h1>宏</h1>
- <input type="{{ type }}" name="{{ name }}" value="{{ value }}">
- <input type="submit" value="提交">
- {% endmacro %}
- {{ ccccc('n1') }}
- {{ ccccc('n2') }}
复制代码 - flask默认时设置了xss的后端将html代码返回到前端时需要使用安全MarkUp("asdf"),前端也可以直接将其应用{{u|safe}}。
5.session
- 当请求刚到来:flask读取cookie中session对应的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,将该值解密并反序列化成字典,放入内存以便视图函数使用。
- @app.route('/ses')
- def ses():
- session['k1'] = 123
- session['k2'] = 456
- del session['k1']
- return "Session"
- session['xxx'] = 123
- session['xxx']
复制代码 - 当请求结束时,flask会读取内存中字典的值,进行序列化+加密,写入到用户cookie中。
- threading.local【和flask无任何关系】
- 为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。
- import threading
- from threading import local
- import time
- obj = local()
- def task(i):
- obj.xxxxx = i
- time.sleep(2)
- print(obj.xxxxx,i)
- for i in range(10):
- t = threading.Thread(target=task,args=(i,))
- t.start()
复制代码 - 根据字典自定义一个类似于threading.local功能?
- import time
- import threading
- DIC = {}
- def task(i):
- ident = threading.get_ident()
- if ident in DIC:
- DIC[ident]['xxxxx'] = i
- else:
- DIC[ident] = {'xxxxx':i }
- time.sleep(2)
- print(DIC[ident]['xxxxx'],i)
- for i in range(10):
- t = threading.Thread(target=task,args=(i,))
- t.start()
复制代码 - 通过getattr/setattr 构造出来 threading.local的加强版(协程)
- import time
- import threading
- try:
- import greenlet
- get_ident = greenlet.getcurrent # 获取协程唯一标识
- except Exception as e:
- get_ident = threading.get_ident # 获取线程唯一标识
- class Local(object):
- DIC = {}
- def __getattr__(self, item): # obj.xxx时触发,item等于xxx
- ident = get_ident()
- if ident in self.DIC:
- return self.DIC[ident].get(item)
- return None
- def __setattr__(self, key, value): # obj.xxx = "123"时触发,key等于xxx,value等于123
- ident = get_ident()
- if ident in self.DIC:
- self.DIC[ident][key] = value
- else:
- self.DIC[ident] = {key:value}
-
- obj = Local()
- def task(i):
- obj.xxxxx = i
- time.sleep(2)
- print(obj.xxxxx,i)
- for i in range(10):
- t = threading.Thread(target=task,args=(i,))
- t.start()
复制代码
5.1 flask-session
- 安装:pip3 install flask-session
- 使用
- import redis
- from flask import Flask,request,session
- from flask.sessions import SecureCookieSessionInterface
- from flask_session import Session
- app = Flask(__name__)
- # app.session_interface = SecureCookieSessionInterface()
- # app.session_interface = RedisSessionInterface()
- app.config['SESSION_TYPE'] = 'redis'
- app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')
- Session(app)
- @app.route('/login')
- def login():
- session['user'] = 'alex'
- return 'asdfasfd'
- @app.route('/home')
- def index():
- print(session.get('user'))
- return '...'
- if __name__ == '__main__':
- app.run()
复制代码
6.闪现和中间件
- 闪现,在session中存储一个数据,读取时通过pop将数据移除。
- from flask import Flask,flash,get_flashed_messages
- @app.route('/page1')
- def page1():
- flash('临时数据存储','error')
- flash('sdfsdf234234','error')
- flash('adasdfasdf','info')
- return "Session"
- @app.route('/page2')
- def page2():
- print(get_flashed_messages(category_filter=['error']))
- return "Session"
复制代码 - 中间件
- call方法什么时候出发?
- 用户发起请求时,才执行。
- 任务:在执行call方法之前,做一个操作,call方法执行之后做一个操作。
- class Middleware(object):
- def __init__(self,old):
- self.old = old
- def __call__(self, *args, **kwargs):
- ret = self.old(*args, **kwargs)
- return ret
- if __name__ == '__main__':
- app.wsgi_app = Middleware(app.wsgi_app)
- app.run()
复制代码 7.蓝图
- 完整目录图
- ProjectName
- -ProjectName
- --static
- --templates
- --views
- ---account.py
- from flask import Blueprint,render_template
- # 创建蓝图对象
- ac = Blueprint('ac',__name__)
- # 只有调用该文件中的页面时才会触发
- @ac.before_request
- def x1():
- print('app.before_request')
- @ac.route('/login')
- def login():
- return render_template('login.html')
- @ac.route('/logout')
- def logout():
- return 'Logout'
- --__init__.py
- from flask import Flask
- from .views.account import ac
- from .views.user import uc
- def create_app():
- app = Flask(__name__)
- # @app.before_request
- # def x1():
- # print('app.before_request')
- # 注册蓝图
- app.register_blueprint(ac)
- # 注册蓝图和编写url前缀
- app.register_blueprint(uc,url_prefix='/api')
- return app
- -manage.py
- from crm import create_app
- app = create_app()
- if __name__ == '__main__':
- app.run()
复制代码 8.特殊装饰器
- before_request and after_request
- from flask import Flask
- app = Flask(__name__)
- @app.before_request
- def x1():
- print('before:x1')
- return '滚'
- @app.before_request
- def xx1():
- print('before:xx1')
- @app.after_request
- def x2(response):
- print('after:x2')
- return response
- @app.after_request
- def xx2(response):
- print('after:xx2')
- return response
- @app.route('/index')
- def index():
- print('index')
- return "Index"
- @app.route('/order')
- def order():
- print('order')
- return "order"
- if __name__ == '__main__':
- app.run()
- # 执行结果是,before的返回会将其返回到页面,但是after还是会执行,并不像django返回后后面的都不会执行
- before:x1
- before:xx1
- request
- after:xx2
- after:x2
复制代码 - before_first_request,只有第一次时才会被触发
- from flask import Flask
- app = Flask(__name__)
- @app.before_first_request
- def x1():
- print('123123')
- @app.route('/index')
- def index():
- print('index')
- return "Index"
- @app.route('/order')
- def order():
- print('order')
- return "order"
- if __name__ == '__main__':
- app.run()
复制代码 - 页面渲染的全局装饰器
- @app.template_global()
- def sb(a1, a2):
- # {{sb(1,9)}}
- return a1 + a2
- @app.template_filter()
- def db(a1, a2, a3):
- # {{ 1|db(2,3) }}
- return a1 + a2 + a3
复制代码 - errorhandler,当报指定的错误时会触发。
- @app.errorhandler(404)
- def not_found(arg):
- print(arg)
- return "没找到"
复制代码 9.上下文管理
- 偏函数
- import functools
- def index(a1,a2):
- return a1 + a2
- # 原来的调用方式
- # ret = index(1,23)
- # print(ret)
- # 偏函数,帮助开发者自动传递参数
- new_func = functools.partial(index,666)
- ret = new_func(1)
- print(ret)
复制代码 - 父类方法的执行
- """
- class Base(object):
- def func(self):
- print('Base.func')
- class Foo(Base):
- def func(self):
- # 方式一:根据mro的顺序执行方法
- # super(Foo,self).func()
- # 方式二:主动执行Base类的方法
- # Base.func(self)
- print('Foo.func')
- obj = Foo()
- obj.func()
- """
- ####################################
- class Base(object):
- def func(self):
- super(Base, self).func()
- print('Base.func')
- class Bar(object):
- def func(self):
- print('Bar.func')
- class Foo(Base,Bar):
- pass
- # 示例一
- # obj = Foo()
- # obj.func()
- # print(Foo.__mro__)
- # 示例二
- # obj = Base()
- # obj.func()
复制代码 - 栈
- # by luffycity.com
- class Stack(object):
- def __init__(self):
- self.data = []
- def push(self,val):
- self.data.append(val)
- def pop(self):
- return self.data.pop()
- def top(self):
- return self.data[-1]
- _stack = Stack()
- _stack.push('佳俊')
- _stack.push('咸鱼')
- print(_stack.pop())
- print(_stack.pop())
复制代码 - Local
- """
- {
- 1232:{k:v}
- }
- """
- try:
- from greenlet import getcurrent as get_ident
- except:
- from threading import get_ident
- """
- class Local(object):
-
- def __init__(self):
- object.__setattr__(self,'storage',{})
-
- def __setattr__(self, key, value):
- ident = get_ident()
- if ident not in self.storage:
- self.storage[ident] = {key:value}
- else:
- self.storage[ident][key] = value
-
- def __getattr__(self, item):
- ident = get_ident()
- if ident in self.storage:
- return self.storage[ident].get(item)
- """
- class Local(object):
- __slots__ = ('__storage__', '__ident_func__')
- def __init__(self):
- # __storage__ = {1231:{'stack':[]}}
- object.__setattr__(self, '__storage__', {})
- object.__setattr__(self, '__ident_func__', get_ident)
- def __getattr__(self, name):
- try:
- return self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
- def __setattr__(self, name, value):
- ident = self.__ident_func__()
- storage = self.__storage__
- try:
- storage[ident][name] = value
- except KeyError:
- storage[ident] = {name: value}
- def __delattr__(self, name):
- try:
- del self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
- obj = Local()
- obj.stack = []
- obj.stack.append('佳俊')
- obj.stack.append('咸鱼')
- print(obj.stack)
- print(obj.stack.pop())
- print(obj.stack)
复制代码 - LocalStack
- import functools
- try:
- from greenlet import getcurrent as get_ident
- except:
- from threading import get_ident
- class Local(object):
- __slots__ = ('__storage__', '__ident_func__')
- def __init__(self):
- # __storage__ = {1231:{'stack':[]}}
- object.__setattr__(self, '__storage__', {})
- object.__setattr__(self, '__ident_func__', get_ident)
- def __getattr__(self, name):
- try:
- return self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
- def __setattr__(self, name, value):
- # name=stack
- # value=[]
- ident = self.__ident_func__()
- storage = self.__storage__
- try:
- storage[ident][name] = value
- except KeyError:
- storage[ident] = {name: value}
- def __delattr__(self, name):
- try:
- del self.__storage__[self.__ident_func__()][name]
- except KeyError:
- raise AttributeError(name)
- """
- __storage__ = {
- 12312: {stack:[ctx(session/request) ,]}
- }
- """
- # obj = Local()
- # obj.stack = []
- # obj.stack.append('佳俊')
- # obj.stack.append('咸鱼')
- # print(obj.stack)
- # print(obj.stack.pop())
- # print(obj.stack)
- class LocalStack(object):
- def __init__(self):
- self._local = Local()
- def push(self,value):
- rv = getattr(self._local, 'stack', None) # self._local.stack =>local.getattr
- if rv is None:
- self._local.stack = rv = [] # self._local.stack =>local.setattr
- rv.append(value) # self._local.stack.append(666)
- return rv
- def pop(self):
- """Removes the topmost item from the stack, will return the
- old value or `None` if the stack was already empty.
- """
- stack = getattr(self._local, 'stack', None)
- if stack is None:
- return None
- elif len(stack) == 1:
- return stack[-1]
- else:
- return stack.pop()
- def top(self):
- try:
- return self._local.stack[-1]
- except (AttributeError, IndexError):
- return None
- class RequestContext(object):
- def __init__(self):
- self.request = "xx"
- self.session = 'oo'
- _request_ctx_stack = LocalStack()
- _request_ctx_stack.push(RequestContext())
- def _lookup_req_object(arg):
- ctx = _request_ctx_stack.top()
- return getattr(ctx,arg) # ctx.request / ctx.session
- request = functools.partial(_lookup_req_object,'request')
- session = functools.partial(_lookup_req_object,'session')
- print(request())
- print(session())
复制代码 - slots
- class Foo(object):
- __slots__ = ('name',)
- def __init__(self):
- self.name = 'alex'
- # self.age = 18
- obj = Foo()
- print(obj.name)
- # print(obj.age)
复制代码 - 请求上下文管理(ctx):request,session
- - 请求到来之后wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法
- - 在wsgi_app方法中:
- - 首先将 请求相关+空session 封装到一个RequestContext对象中,即:ctx。
- - 将ctx交给LocalStack对象,再由LocalStack将ctx添加到Local中,Local结构:
- __storage__ = {
- 1231:{stack:[ctx,] }
- }
- - 根据请求中的cookie中提取名称为sessionid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session
- -> 视图函数
- - 把session中的数据再次写入到cookie中。
- - 将ctx删除
- - 结果返回给用户浏览器
- - 断开socket连接
复制代码 - LocalProxy
- from flask import Flask,request,session
- app = Flask(__name__)
- @app.route('/index')
- def index():
- # 1. request是LocalProxy对象
- # 2. 对象中有method、执行__getattr__
- print(request.method)
- # request['method']
- # request + 1
- # 1. session是LocalProxy对象
- # 2. LocalProxy对象的__setitem__
- session['x'] = 123
- return "Index"
- if __name__ == '__main__':
- app.run()
- # app.__call__
- # app.wsgi_app
- """
- 第一阶段:请求到来
- 将request和Session相关数据封装到ctx=RequestContext对象中。
- 再通过LocalStack将ctx添加到Local中。
- __storage__ = {
- 1231:{'stack':[ctx(request,session)]}
- }
- 第二阶段:视图函数中获取request或session
- 方式一:直接找LocalStack获取
- from flask.globals import _request_ctx_stack
- print(_request_ctx_stack.top.request.method)
-
- 方式二:通过代理LocalProxy(小东北)获取
- from flask import Flask,request
- print(request.method)
-
- """
复制代码 - g只存在在一次请求中当你return时g就会被清除。
10.文件的上传和解压
- view.py
- from flask import Blueprint, render_template, Flask, request, redirect,session
- import os
- import uuid
- from ..utils import helper
- ind = Blueprint('ind', __name__)
- ind.config["MAX_CONTENT_LENGTH"] = = 1024 * 1024 * 7
- @ind.before_request
- def process_request():
- if not session.get("user_info"):
- return redirect("/login")
- return None
- @ind.route('/home')
- def home():
- return render_template('home.html')
- @ind.route('/user_list')
- def user_list():
- # import pymysql
- # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
- # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
- # cursor.execute("SELECT id,user,nickname FROM userinfo")
- # data_list = cursor.fetchall()
- # cursor.close()
- # conn.close()
- data_list = helper.fetch_all("SELECT id,user,nickname FROM userinfo",[])
- return render_template('user_list.html',data_list=data_list)
- @ind.route('/detail/<int:nid>')
- def detail(nid):
- # import pymysql
- # conn = Config.POOL.connection()
- # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
- # cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
- # record_list = cursor.fetchall()
- # cursor.close()
- # conn.close()
- record_list = helper.fetch_all("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
- return render_template('detail.html',record_list=record_list)
- @ind.route('/upload',methods=['GET','POST'])
- def upload():
- if request.method == "GET":
- return render_template('upload.html')
- from werkzeug.datastructures import FileStorage
- file_obj = request.files.get('code')
- # 1. 检查上传文件后缀名
- name_ext = file_obj.filename.rsplit('.',maxsplit=1)
- if len(name_ext) != 2:
- return "请上传zip压缩文件"
- if name_ext[1] != 'zip':
- return "请上传zip压缩文件"
- """
- # 2. 接收用户上传文件,并写入到服务器本地.
- file_path = os.path.join("files",file_obj.filename)
- # 从file_obj.stream中读取内容,写入到文件
- file_obj.save(file_path)
- # 3. 解压zip文件
- import shutil
- # 通过open打开压缩文件,读取内容再进行解压。
- shutil._unpack_zipfile(file_path,'xsadfasdfasdf')
- """
- # 2+3, 接收用户上传文件,并解压到指定目录
- import shutil
- target_path = os.path.join('files',str(uuid.uuid4()))
- shutil._unpack_zipfile(file_obj.stream,target_path)
- # 4. 遍历某目录下的所有文件
- # for item in os.listdir(target_path):
- # print(item)
- total_num = 0
- for base_path,folder_list,file_list in os.walk(target_path):
- for file_name in file_list:
- file_path = os.path.join(base_path,file_name)
- file_ext = file_path.rsplit('.',maxsplit=1)
- if len(file_ext) != 2:
- continue
- if file_ext[1] != 'py':
- continue
- file_num = 0
- with open(file_path,'rb') as f:
- for line in f:
- line = line.strip()
- if not line:
- continue
- if line.startswith(b'#'):
- continue
- file_num += 1
- total_num += file_num
-
- # 获取当前时间
- import datetime
- ctime = datetime.date.today()
- print(total_num,ctime,session['user_info']['id'])
- # import pymysql
- # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
- # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
- # cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
- # data = cursor.fetchone()
- # cursor.close()
- # conn.close()
- data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
- if data:
- return "今天已经上传"
- # import pymysql
- # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
- # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
- # cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
- # conn.commit()
- # cursor.close()
- # conn.close()
- helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
- return "上传成功"
复制代码 - html.py
- {% extends "layout.html"%}
- {% block content %}
- <h1>代码上传</h1>
- <form method="post" enctype="multipart/form-data">
- <input type="file" name="code">
- <input type="submit" value="上传">
- </form>
- {% endblock %}
复制代码 11.Database Join 池
- 安装pip3 install DBUtils
- 创建连接池可以共享出去,谁要用就来获取一个连接,这样可以管理连接的数量,提高链接的利用率。
- from DBUtils.PooledDB import PooledDB, SharedDBConnection
- import pymysql
- POOL = PooledDB(
- creator=pymysql, # 使用链接数据库的模块
- maxconnections=6, # 连接池允许的最大连接数,0和None表示不限制连接数
- mincached=2, # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
- maxcached=5, # 链接池中最多闲置的链接,0和None不限制
- maxshared=3,
- # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
- blocking=True, # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
- maxusage=None, # 一个链接最多被重复使用的次数,None表示无限制
- setsession=[], # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
- ping=0,
- # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
- host='127.0.0.1',
- port=3306,
- user='root',
- password='123456',
- database='s9day118',
- charset='utf8'
- )
- # 获取连接
- conn = POOL.connection()
- cursor = conn.cursor(pymysql.cursors.DictCursor)
- cursor.execute("select * from tb1")
- result = cursor.fetchall()
- cursor.close()
- # 归还连接
- conn.close()
复制代码 12.wtforms
- WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。
- 安装pip3 install wtforms
12.1 用户登录注册示例
- 用户登录
- 当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如:
- 用户不能为空;用户长度必须大于6;
- 密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from flask import Flask, render_template, request, redirect
- from wtforms import Form
- from wtforms.fields import core
- from wtforms.fields import html5
- from wtforms.fields import simple
- from wtforms import validators
- from wtforms import widgets
- app = Flask(__name__, template_folder='templates')
- app.debug = True
- class LoginForm(Form):
- name = simple.StringField(
- label='用户名',
- validators=[
- validators.DataRequired(message='用户名不能为空.'),
- validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
- ],
- widget=widgets.TextInput(),
- render_kw={'class': 'form-control'}
- )
- pwd = simple.PasswordField(
- label='密码',
- validators=[
- validators.DataRequired(message='密码不能为空.'),
- validators.Length(min=8, message='用户名长度必须大于%(min)d'),
- validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
- message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- @app.route('/login', methods=['GET', 'POST'])
- def login():
- if request.method == 'GET':
- form = LoginForm()
- return render_template('login.html', form=form)
- else:
- form = LoginForm(formdata=request.form)
- if form.validate():
- print('用户提交数据通过格式验证,提交的值为:', form.data)
- else:
- print(form.errors)
- return render_template('login.html', form=form)
- if __name__ == '__main__':
- app.run()
- #######################################################
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>Title</title>
- </head>
- <body>
- <h1>登录</h1>
- <form method="post">
-
- <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
-
- <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
- <input type="submit" value="提交">
- </form>
- </body>
- </html>
复制代码
- 用户注册
- 注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。
- from flask import Flask, render_template, request, redirect
- from wtforms import Form
- from wtforms.fields import core
- from wtforms.fields import html5
- from wtforms.fields import simple
- from wtforms import validators
- from wtforms import widgets
- app = Flask(__name__, template_folder='templates')
- app.debug = True
- class RegisterForm(Form):
- name = simple.StringField(
- label='用户名',
- validators=[
- validators.DataRequired()
- ],
- widget=widgets.TextInput(),
- render_kw={'class': 'form-control'},
- default='alex'
- )
- pwd = simple.PasswordField(
- label='密码',
- validators=[
- validators.DataRequired(message='密码不能为空.')
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- pwd_confirm = simple.PasswordField(
- label='重复密码',
- validators=[
- validators.DataRequired(message='重复密码不能为空.'),
- validators.EqualTo('pwd', message="两次密码输入不一致")
- ],
- widget=widgets.PasswordInput(),
- render_kw={'class': 'form-control'}
- )
- email = html5.EmailField(
- label='邮箱',
- validators=[
- validators.DataRequired(message='邮箱不能为空.'),
- validators.Email(message='邮箱格式错误')
- ],
- widget=widgets.TextInput(input_type='email'),
- render_kw={'class': 'form-control'}
- )
- gender = core.RadioField(
- label='性别',
- choices=(
- (1, '男'),
- (2, '女'),
- ),
- coerce=int
- )
- city = core.SelectField(
- label='城市',
- choices=(
- ('bj', '北京'),
- ('sh', '上海'),
- )
- )
- hobby = core.SelectMultipleField(
- label='爱好',
- choices=(
- (1, '篮球'),
- (2, '足球'),
- ),
- coerce=int
- )
- favor = core.SelectMultipleField(
- label='喜好',
- choices=(
- (1, '篮球'),
- (2, '足球'),
- ),
- widget=widgets.ListWidget(prefix_label=False),
- option_widget=widgets.CheckboxInput(),
- coerce=int,
- default=[1, 2]
- )
- def __init__(self, *args, **kwargs):
- super(RegisterForm, self).__init__(*args, **kwargs)
- self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
- def validate_pwd_confirm(self, field):
- """
- 自定义pwd_confirm字段规则,例:与pwd字段是否一致
- :param field:
- :return:
- """
- # 最开始初始化时,self.data中已经有所有的值
- if field.data != self.data['pwd']:
- # raise validators.ValidationError("密码不一致") # 继续后续验证
- raise validators.StopValidation("密码不一致") # 不再继续后续验证
- @app.route('/register', methods=['GET', 'POST'])
- def register():
- if request.method == 'GET':
- form = RegisterForm(data={'gender': 1})
- return render_template('register.html', form=form)
- else:
- form = RegisterForm(formdata=request.form)
- if form.validate():
- print('用户提交数据通过格式验证,提交的值为:', form.data)
- else:
- print(form.errors)
- return render_template('register.html', form=form)
- if __name__ == '__main__':
- app.run()
-
-
- ############################################################
- <!DOCTYPE html>
- <html lang="en">
- <head>
- <meta charset="UTF-8">
- <title>Title</title>
- </head>
- <body>
- <h1>用户注册</h1>
- <form method="post" novalidate >
- {% for item in form %}
- <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
- {% endfor %}
- <input type="submit" value="提交">
- </form>
- </body>
- </html>
复制代码
- 动态获取数据库数据
- from flask import Flask,request,render_template,session,current_app,g,redirect
- from wtforms import Form
- from wtforms.fields import simple
- from wtforms.fields import html5
- from wtforms.fields import core
- from wtforms import widgets
- from wtforms import validators
- app = Flask(__name__)
- class LoginForm(Form):
- name = simple.StringField(
- validators=[
- validators.DataRequired(message='用户名不能为空.'),
- # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
- ],
- widget=widgets.TextInput(),
- render_kw={'placeholder':'请输入用户名'}
- )
- pwd = simple.PasswordField(
- validators=[
- validators.DataRequired(message='密码不能为空.'),
- # validators.Length(min=8, message='用户名长度必须大于%(min)d'),
- # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
- # message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
- ],
- render_kw={'placeholder':'请输入密码'}
- )
- @app.route('/login',methods=['GET','POST'])
- def login():
- if request.method == "GET":
- form = LoginForm()
- # print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__
- # print(form.pwd,type(form.pwd)) # form.pwd是PasswordField()对象,PasswordField().__str__
- return render_template('login.html',form=form)
- form = LoginForm(formdata=request.form)
- if form.validate():
- print(form.data)
- return redirect('https://www.luffycity.com/home')
- else:
- # print(form.errors)
- return render_template('login.html', form=form)
- import helper
- class UserForm(Form):
- city = core.SelectField(
- label='城市',
- choices=(),
- coerce=int # 从数据库获取的数据返回后是字符串类型的,把他写上后会先将其转为int后再返回到前端
- )
- name = simple.StringField(label='姓名')
- # 当你定义成静态字段是,只有启动时才会加载一次,但是从数据库需要实时拿到数据,下面是解决方法。你应该可以看懂。
- def __init__(self,*args,**kwargs):
- super(UserForm,self).__init__(*args,**kwargs)
- self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)
- @app.route('/user')
- def user():
- if request.method == "GET":
- #form = UserForm(data={'name':'alex','city':3})
- form = UserForm()
- return render_template('user.html',form=form)
- if __name__ == '__main__':
- app.run()
复制代码
- meta
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from flask import Flask, render_template, request, redirect, session
- from wtforms import Form
- from wtforms.csrf.core import CSRF
- from wtforms.fields import core
- from wtforms.fields import html5
- from wtforms.fields import simple
- from wtforms import validators
- from wtforms import widgets
- from hashlib import md5
- app = Flask(__name__, template_folder='templates')
- app.debug = True
- class MyCSRF(CSRF):
- """
- Generate a CSRF token based on the user's IP. I am probably not very
- secure, so don't use me.
- """
- def setup_form(self, form):
- self.csrf_context = form.meta.csrf_context()
- self.csrf_secret = form.meta.csrf_secret
- return super(MyCSRF, self).setup_form(form)
- def generate_csrf_token(self, csrf_token):
- gid = self.csrf_secret + self.csrf_context
- token = md5(gid.encode('utf-8')).hexdigest()
- return token
- def validate_csrf_token(self, form, field):
- print(field.data, field.current_token)
- if field.data != field.current_token:
- raise ValueError('Invalid CSRF')
- class TestForm(Form):
- name = html5.EmailField(label='用户名')
- pwd = simple.StringField(label='密码')
- class Meta:
- # -- CSRF
- # 是否自动生成CSRF标签
- csrf = True
- # 生成CSRF标签name
- csrf_field_name = 'csrf_token'
- # 自动生成标签的值,加密用的csrf_secret
- csrf_secret = 'xxxxxx'
- # 自动生成标签的值,加密用的csrf_context
- csrf_context = lambda x: request.url
- # 生成和比较csrf标签
- csrf_class = MyCSRF
- # -- i18n
- # 是否支持本地化
- # locales = False
- locales = ('zh', 'en')
- # 是否对本地化进行缓存
- cache_translations = True
- # 保存本地化缓存信息的字段
- translations_cache = {}
- @app.route('/index/', methods=['GET', 'POST'])
- def index():
- if request.method == 'GET':
- form = TestForm()
- else:
- form = TestForm(formdata=request.form)
- if form.validate():
- print(form)
- return render_template('index.html', form=form)
- if __name__ == '__main__':
- app.run()
复制代码 12.2 其他
- metaclass
- # new方法的返回值决定对象到底是什么?
- class Bar(object):
- pass
- class Foo(object):
- def __new__(cls, *args, **kwargs):
- # return super(Foo,cls).__new__(cls,*args, **kwargs)
- return Bar()
- obj = Foo()
- print(obj)
- # 当Foo()时先执行__new__方法,返回一个Bar,obj就等于Bar对象
- ##########################################################################
- class MyType(type):
- def __init__(self, *args, **kwargs):
- print('MyType创建类',self)
- super(MyType, self).__init__(*args, **kwargs)
- def __call__(self, *args, **kwargs):
- obj = super(MyType, self).__call__(*args, **kwargs)
- print('类创建对象', self, obj)
- return obj
- class Foo(object,metaclass=MyType):
- user = 'wupeiqi'
- age = 18
- obj = Foo()
- # 在创建Foo对象前先执行mytype的__call__方法,__call__方法后才会执行Foo。不写metaclass默认是由type类来做的。
- # 是先执行父类的__call__后再执行自己的__init__方法。
- """
- 创建类时,先执行type的__init__。
- 类的实例化时,执行type的__call__,__call__方法的的返回值就是实例化的对象。
- __call__内部调用:
- 类.__new__,创建对象
- 类.__init__,对象的初始化
- """
复制代码 - 实例化流程分析
- # 源码流程
- 1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中
- 2. 执行构造方法
-
- a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields[name] 中。
- 即:
- _fields = {
- name: wtforms.fields.core.StringField(),
- }
-
- PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()
-
- b. 循环_fields,为对象设置属性
- for name, field in iteritems(self._fields):
- # Set all the fields to attributes so that they obscure the class
- # attributes with the same names.
- setattr(self, name, field)
- c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)
- 优先级:obj,data,formdata;
-
- 再循环执行每个字段的process方法,为每个字段设置值:
- for name, field, in iteritems(self._fields):
- if obj is not None and hasattr(obj, name):
- field.process(formdata, getattr(obj, name))
- elif name in kwargs:
- field.process(formdata, kwargs[name])
- else:
- field.process(formdata)
-
- 执行每个字段的process方法,为字段的data和字段的raw_data赋值
- def process(self, formdata, data=unset_value):
- self.process_errors = []
- if data is unset_value:
- try:
- data = self.default()
- except TypeError:
- data = self.default
-
- self.object_data = data
-
- try:
- self.process_data(data)
- except ValueError as e:
- self.process_errors.append(e.args[0])
-
- if formdata:
- try:
- if self.name in formdata:
- self.raw_data = formdata.getlist(self.name)
- else:
- self.raw_data = []
- self.process_formdata(self.raw_data)
- except ValueError as e:
- self.process_errors.append(e.args[0])
-
- try:
- for filter in self.filters:
- self.data = filter(self.data)
- except ValueError as e:
- self.process_errors.append(e.args[0])
-
- d. 页面上执行print(form.name) 时,打印标签
-
- 因为执行了:
- 字段的 __str__ 方法
- 字符的 __call__ 方法
- self.meta.render_field(self, kwargs)
- def render_field(self, field, render_kw):
- other_kw = getattr(field, 'render_kw', None)
- if other_kw is not None:
- render_kw = dict(other_kw, **render_kw)
- return field.widget(field, **render_kw)
- 执行字段的插件对象的 __call__ 方法,返回标签字符串
复制代码 - 验证流程分析
- a. 执行form的validate方法,获取钩子方法
- def validate(self):
- extra = {}
- for name in self._fields:
- inline = getattr(self.__class__, 'validate_%s' % name, None)
- if inline is not None:
- extra[name] = [inline]
-
- return super(Form, self).validate(extra)
- b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
- def validate(self, extra_validators=None):
- self._errors = None
- success = True
- for name, field in iteritems(self._fields):
- if extra_validators is not None and name in extra_validators:
- extra = extra_validators[name]
- else:
- extra = tuple()
- if not field.validate(self, extra):
- success = False
- return success
- c. 每个字段进行验证时候
- 字段的pre_validate 【预留的扩展】
- 字段的_run_validation_chain,对正则和字段的钩子函数进行校验
- 字段的post_validate【预留的扩展】
复制代码 13.SQLAlchemy
- MySQLdb,只支持python2.0的,而pymysql是支持python2.0和3.0的。
- import MySQLdb
-
- conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
-
- cur = conn.cursor()
-
- reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))
- # reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})
-
- conn.commit()
-
- cur.close()
- conn.close()
-
- print reCount
复制代码 - SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
- 安装pip3 install sqlalchemy
- 组成部分:
- Engine,框架的引擎
- Connection Pooling ,数据库连接池
- Dialect,选择连接数据库的DB API种类
- Schema/Types,架构和类型
- SQL Exprression Language,SQL表达式语言
- SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
- MySQL-Python
- mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
-
- pymysql
- mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
-
- MySQL-Connector
- mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
-
- cx_Oracle
- oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
-
- 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
复制代码 - 执行原生SQL语句
- import time
- import threading
- import sqlalchemy
- from sqlalchemy import create_engine
- from sqlalchemy.engine.base import Engine
-
- engine = create_engine(
- "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
-
-
- def task(arg):
- conn = engine.raw_connection()
- cursor = conn.cursor()
- cursor.execute(
- "select * from t1"
- )
- result = cursor.fetchall()
- cursor.close()
- conn.close()
-
-
- for i in range(20):
- t = threading.Thread(target=task, args=(i,))
- t.start()
-
-
- ########################################################################################
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- import sqlalchemy
- from sqlalchemy import create_engine
- from sqlalchemy.engine.base import Engine
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
- def task(arg):
- conn = engine.contextual_connect()
- with conn:
- cur = conn.execute(
- "select * from t1"
- )
- result = cur.fetchall()
- print(result)
- for i in range(20):
- t = threading.Thread(target=task, args=(i,))
- t.start()
-
- ########################################################################################
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- import sqlalchemy
- from sqlalchemy import create_engine
- from sqlalchemy.engine.base import Engine
- from sqlalchemy.engine.result import ResultProxy
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
- def task(arg):
- cur = engine.execute("select * from t1")
- result = cur.fetchall()
- cur.close()
- print(result)
- for i in range(20):
- t = threading.Thread(target=task, args=(i,))
- t.start()
-
-
- # 注意: 查看连接 show status like 'Threads%';
复制代码 13.1 ORM
- 创建单表
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import datetime
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
- Base = declarative_base()
- class Users(Base):
- __tablename__ = 'users'
- id = Column(Integer, primary_key=True)
- name = Column(String(32), index=True, nullable=False)
- # email = Column(String(32), unique=True)
- # ctime = Column(DateTime, default=datetime.datetime.now)
- # extra = Column(Text, nullable=True)
- __table_args__ = (
- # UniqueConstraint('id', 'name', name='uix_id_name'),
- # Index('ix_id_name', 'name', 'email'),
- )
- def init_db():
- """
- 根据类创建数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.create_all(engine)
- def drop_db():
- """
- 根据类删除数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.drop_all(engine)
- if __name__ == '__main__':
- drop_db()
- init_db()
复制代码 - 创建多个表并包含Fk、M2M关系
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import datetime
- from sqlalchemy import create_engine
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
- from sqlalchemy.orm import relationship
- Base = declarative_base()
- # ##################### 单表示例 #########################
- class Users(Base):
- __tablename__ = 'users'
- id = Column(Integer, primary_key=True)
- name = Column(String(32), index=True)
- age = Column(Integer, default=18)
- email = Column(String(32), unique=True)
- ctime = Column(DateTime, default=datetime.datetime.now)
- extra = Column(Text, nullable=True)
- __table_args__ = (
- # UniqueConstraint('id', 'name', name='uix_id_name'),
- # Index('ix_id_name', 'name', 'extra'),
- )
- class Hosts(Base):
- __tablename__ = 'hosts'
- id = Column(Integer, primary_key=True)
- name = Column(String(32), index=True)
- ctime = Column(DateTime, default=datetime.datetime.now)
- # ##################### 一对多示例 #########################
- class Hobby(Base):
- __tablename__ = 'hobby'
- id = Column(Integer, primary_key=True)
- caption = Column(String(50), default='篮球')
- class Person(Base):
- __tablename__ = 'person'
- nid = Column(Integer, primary_key=True)
- name = Column(String(32), index=True, nullable=True)
- hobby_id = Column(Integer, ForeignKey("hobby.id"))
- # 与生成表结构无关,仅用于查询方便
- hobby = relationship("Hobby", backref='pers')
- # ##################### 多对多示例 #########################
- class Server2Group(Base):
- __tablename__ = 'server2group'
- id = Column(Integer, primary_key=True, autoincrement=True)
- server_id = Column(Integer, ForeignKey('server.id'))
- group_id = Column(Integer, ForeignKey('group.id'))
- class Group(Base):
- __tablename__ = 'group'
- id = Column(Integer, primary_key=True)
- name = Column(String(64), unique=True, nullable=False)
- # 与生成表结构无关,仅用于查询方便
- servers = relationship('Server', secondary='server2group', backref='groups')
- class Server(Base):
- __tablename__ = 'server'
- id = Column(Integer, primary_key=True, autoincrement=True)
- hostname = Column(String(64), unique=True, nullable=False)
- def init_db():
- """
- 根据类创建数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.create_all(engine)
- def drop_db():
- """
- 根据类删除数据库表
- :return:
- """
- engine = create_engine(
- "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
- max_overflow=0, # 超过连接池大小外最多创建的连接
- pool_size=5, # 连接池大小
- pool_timeout=30, # 池中没有线程最多等待的时间,否则报错
- pool_recycle=-1 # 多久之后对线程池中的线程进行一次连接的回收(重置)
- )
- Base.metadata.drop_all(engine)
- if __name__ == '__main__':
- drop_db()
- init_db()
复制代码 - 操作数据库表
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from models import Users
-
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
-
- # 每次执行数据库操作时,都需要创建一个session
- session = Session()
-
- # ############# 执行ORM操作 #############
- obj1 = Users(name="alex1")
- session.add(obj1)
-
- # 提交事务
- session.commit()
- # 关闭session
- session.close()
复制代码 - 基于scoped_session实现线程安全
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from sqlalchemy.orm import sessionmaker
- from sqlalchemy import create_engine
- from sqlalchemy.orm import scoped_session
- from models import Users
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- """
- # 线程安全,基于本地线程实现每个线程用同一个session
- # 特殊的:scoped_session中有原来方法的Session中的一下方法:
- public_methods = (
- '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
- 'close', 'commit', 'connection', 'delete', 'execute', 'expire',
- 'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
- 'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
- 'bulk_update_mappings',
- 'merge', 'query', 'refresh', 'rollback',
- 'scalar'
- )
- """
- session = scoped_session(Session)
- # ############# 执行ORM操作 #############
- obj1 = Users(name="alex1")
- session.add(obj1)
- # 提交事务
- session.commit()
- # 关闭session
- session.close()
复制代码 - 多线程执行示例
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from db import Users
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- def task(arg):
- session = Session()
- obj1 = Users(name="alex1")
- session.add(obj1)
- session.commit()
- for i in range(10):
- t = threading.Thread(target=task, args=(i,))
- t.start()
复制代码 - 基本增删改查示例
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text
- from db import Users, Hosts
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- session = Session()
- # ################ 添加 ################
- """
- obj1 = Users(name="wupeiqi")
- session.add(obj1)
- session.add_all([
- Users(name="wupeiqi"),
- Users(name="alex"),
- Hosts(name="c1.com"),
- ])
- session.commit()
- """
- # ################ 删除 ################
- """
- session.query(Users).filter(Users.id > 2).delete()
- session.commit()
- """
- # ################ 修改 ################
- """
- session.query(Users).filter(Users.id > 0).update({"name" : "099"})
- session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
- session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
- session.commit()
- """
- # ################ 查询 ################
- """
- r1 = session.query(Users).all()
- r2 = session.query(Users.name.label('xx'), Users.age).all()
- r3 = session.query(Users).filter(Users.name == "alex").all()
- r4 = session.query(Users).filter_by(name='alex').all()
- r5 = session.query(Users).filter_by(name='alex').first()
- r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
- r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
- """
- session.close()
复制代码 - 原生SQL语句
- # 条件
- ret = session.query(Users).filter_by(name='alex').all()
- ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
- ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
- ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
- ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
- ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
- from sqlalchemy import and_, or_
- ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
- ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
- ret = session.query(Users).filter(
- or_(
- Users.id < 2,
- and_(Users.name == 'eric', Users.id > 3),
- Users.extra != ""
- )).all()
- # 通配符
- ret = session.query(Users).filter(Users.name.like('e%')).all()
- ret = session.query(Users).filter(~Users.name.like('e%')).all()
- # 限制
- ret = session.query(Users)[1:2]
- # 排序
- ret = session.query(Users).order_by(Users.name.desc()).all()
- ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
- # 分组
- from sqlalchemy.sql import func
- ret = session.query(Users).group_by(Users.extra).all()
- ret = session.query(
- func.max(Users.id),
- func.sum(Users.id),
- func.min(Users.id)).group_by(Users.name).all()
- ret = session.query(
- func.max(Users.id),
- func.sum(Users.id),
- func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
- # 连表
- ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
- ret = session.query(Person).join(Favor).all()
- ret = session.query(Person).join(Favor, isouter=True).all()
- # 组合
- q1 = session.query(Users.name).filter(Users.id > 2)
- q2 = session.query(Favor.caption).filter(Favor.nid < 2)
- ret = q1.union(q2).all()
- q1 = session.query(Users.name).filter(Users.id > 2)
- q2 = session.query(Favor.caption).filter(Favor.nid < 2)
- ret = q1.union_all(q2).all()
复制代码 - 基于relationship操作ForeignKey
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text
- from sqlalchemy.engine.result import ResultProxy
- from db import Users, Hosts
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- session = Session()
- # 查询
- # cursor = session.execute('select * from users')
- # result = cursor.fetchall()
- # 添加
- cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
- session.commit()
- print(cursor.lastrowid)
- session.close()
复制代码 - 基于relationship操作m2m
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text
- from sqlalchemy.engine.result import ResultProxy
- from db import Users, Hosts, Hobby, Person
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- session = Session()
- # 添加
- """
- session.add_all([
- Hobby(caption='乒乓球'),
- Hobby(caption='羽毛球'),
- Person(name='张三', hobby_id=3),
- Person(name='李四', hobby_id=4),
- ])
- person = Person(name='张九', hobby=Hobby(caption='姑娘'))
- session.add(person)
- hb = Hobby(caption='人妖')
- hb.pers = [Person(name='文飞'), Person(name='博雅')]
- session.add(hb)
- session.commit()
- """
- # 使用relationship正向查询
- """
- v = session.query(Person).first()
- print(v.name)
- print(v.hobby.caption)
- """
- # 使用relationship反向查询
- """
- v = session.query(Hobby).first()
- print(v.caption)
- print(v.pers)
- """
- session.close()
复制代码 - 其他
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text
- from sqlalchemy.engine.result import ResultProxy
- from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- session = Session()
- # 添加
- """
- session.add_all([
- Server(hostname='c1.com'),
- Server(hostname='c2.com'),
- Group(name='A组'),
- Group(name='B组'),
- ])
- session.commit()
- s2g = Server2Group(server_id=1, group_id=1)
- session.add(s2g)
- session.commit()
- gp = Group(name='C组')
- gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
- session.add(gp)
- session.commit()
- ser = Server(hostname='c6.com')
- ser.groups = [Group(name='F组'),Group(name='G组')]
- session.add(ser)
- session.commit()
- """
- # 使用relationship正向查询
- """
- v = session.query(Group).first()
- print(v.name)
- print(v.servers)
- """
- # 使用relationship反向查询
- """
- v = session.query(Server).first()
- print(v.hostname)
- print(v.groups)
- """
- session.close()
复制代码 14.flask武装
- Flask_SQLAlchemy
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- import time
- import threading
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
- from sqlalchemy.orm import sessionmaker, relationship
- from sqlalchemy import create_engine
- from sqlalchemy.sql import text, func
- from sqlalchemy.engine.result import ResultProxy
- from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
- engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
- Session = sessionmaker(bind=engine)
- session = Session()
- # 关联子查询
- subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
- result = session.query(Group.name, subqry)
- """
- SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
- FROM server
- WHERE server.id = `group`.id) AS anon_1
- FROM `group`
- """
- # 原生SQL
- """
- # 查询
- cursor = session.execute('select * from users')
- result = cursor.fetchall()
- # 添加
- cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
- session.commit()
- print(cursor.lastrowid)
- """
- session.close()
复制代码 - flask-script
- a. 下载安装
- pip3 install flask-sqlalchemy
- b. chun.__init__.py
- 导入并实例化SQLAlchemy
- from flask_sqlalchemy import SQLAlchemy
- db = SQLAlchemy()
-
- 注意事项:
- - 必须在导入蓝图之前
- - 必须导入models.py
- c. 初始化
- db.init_app(app)
-
- d. 在配置文件中写入配置
- # ##### SQLALchemy配置文件 #####
- SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s9day122?charset=utf8"
- SQLALCHEMY_POOL_SIZE = 10
- SQLALCHEMY_MAX_OVERFLOW = 5
- e. 创建models.py中的类(对应数据库表)
- chun/models.py
- from sqlalchemy.ext.declarative import declarative_base
- from sqlalchemy import Column
- from sqlalchemy import Integer,String,Text,Date,DateTime
- from sqlalchemy import create_engine
- from chun import db
- class Users(db.Model):
- __tablename__ = 'users'
- id = Column(Integer, primary_key=True)
- name = Column(String(32), index=True, nullable=False)
- depart_id = Column(Integer)
- f. 生成表(使用app上下文)
- from chun import db,create_app
- app = create_app()
- app_ctx = app.app_context() # app_ctx = app/g
- with app_ctx: # __enter__,通过LocalStack放入Local中
- db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
-
- g. 基于ORM对数据库进行操作。
- from flask import Blueprint
- from chun import db
- from chun import models
- us = Blueprint('us',__name__)
- @us.route('/index')
- def index():
- # 使用SQLAlchemy在数据库中插入一条数据
- # db.session.add(models.Users(name='高件套',depart_id=1))
- # db.session.commit()
- # db.session.remove()
- result = db.session.query(models.Users).all()
- print(result)
- db.session.remove()
- return 'Index'
复制代码 - flask-migrate
- install the module "pip3 install flask-script "
- a. 增加 runserver
- from chun import create_app
- from flask_script import Manager
- app = create_app()
- manager = Manager(app)
- if __name__ == '__main__':
- # app.run()
- manager.run()
-
- b. 位置传参
- from chun import create_app
- from flask_script import Manager
- app = create_app()
- manager = Manager(app)
- @manager.command
- def custom(arg):
- """
- 自定义命令
- python manage.py custom 123
- :param arg:
- :return:
- """
- print(arg)
- if __name__ == '__main__':
- # app.run()
- manager.run()
- c. 关键字传参
- from chun import create_app
- from flask_script import Manager
- app = create_app()
- manager = Manager(app)
- @manager.option('-n', '--name', dest='name')
- @manager.option('-u', '--url', dest='url')
- def cmd(name, url):
- """
- 自定义命令
- 执行: python manage.py cmd -n wupeiqi -u http://www.oldboyedu.com
- :param name:
- :param url:
- :return:
- """
- print(name, url)
- if __name__ == '__main__':
- # app.run()
- manager.run()
复制代码 - 找到项目使用的所有组件和版本。
- install the module "pip3 install flask-migrate"
- flask-migrate rely on flask-script.
- #!/usr/bin/env python
- # -*- coding:utf-8 -*-
- from sansa import create_app
- from sansa import db
- from flask_script import Manager
- from flask_migrate import Migrate, MigrateCommand
- app = create_app()
- manager = Manager(app)
- Migrate(app, db)
- """
- # 数据库迁移命名
- python manage.py db init
- python manage.py db migrate
- python manage.py db upgrade
- """
- manager.add_command('db', MigrateCommand)
- if __name__ == '__main__':
- manager.run()
- # app.run()
复制代码 - python虚拟环境
- install the module "pip install pipreqs"
- # 终端输入后就可以找到所有的依赖
- pipreqs ./ --encoding=utf-8
复制代码
来源:https://www.cnblogs.com/InvincibleGrass/p/17562878.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作! |
|