Flask
Flask[*]短小精悍,可扩展强的一个web框架。上下文管理
[*]安装:pip3 install Flask,我们做web开发时,是站在两大东西之上做的web框架和wsgi,Flask和Django中都是应用的并不是自己写的。Flask中werkzurg就是。
[*]web服务网关接口,wsgi是一个协议,实现该写一个的模块:wsgiref|werkzeug,实现其协议的模块本质上就是socket服务端用于接收用户请求,并处理。一般web框架基于wsgi实现,这样实现关注点分离。
[*]flask就是基于Werkzurg搭建起来的
from werkzeug.wrappers import Request, Response
from werkzeug.serving import run_simple
@Request.application
def run(environ, start_response):
return Response("Hello World!") # 返回字符串
if __name__=="__main__":
run_simple("localhost", 4000, run) # 监听本机的4000端口,如果访问就执行run函数
[*]一个最简单的Flask
from flask import Flask
app = Flask(__name__) # flask名称,一般这样写。
@app.route("/index") # url路径
def index(): #所调用的函数
return "Hello World!"
if __name__ == "__main__":
app.run() # 执行口
[*]Flask实现用户登录程序
from flask import Flask, render_template, request, redirect, session
app = Flask(__name__, template_folder="templates", static_folder="static")
app.secret_key = "asdf" # session加盐
# request.form # 请求体
# request.args # 请求头
# request.method # 请求类型
@app.route("/login", methods=["GET", "POST"])
def login():
if request.method == "GET":
return render_template("login.html")
user = request.from.get("user")
pwd= request.from.get("pwd")
if user == "alex" and pwd == "666":
session["user"] = user
return redirect("/index")
return render_template("login.html", error="用户名或密码错误")
@app.route("/index")
def index():
user = session.get("user")
if not user:
return redirect("/login")
return render_template("index.html")
if __name__ == "__main__":
app.run()
1.配置文件
[*]settings
class Config(object):
DEBUG = False
TESTING = False
DATABASE_URL = "sqlite://:memory:"
class ProductionConfig(Config):
DATABASE_URL = "mysql://user@localhost/foo"
class DevelopmentConfig(Config):
DEBUT = True
class TestingConfig(Config):
TESTING = True
flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
{
'DEBUG': get_debug_flag(default=False), 是否开启Debug模式
'TESTING': False, 是否开启测试模式
'PROPAGATE_EXCEPTIONS': None,
'PRESERVE_CONTEXT_ON_EXCEPTION': None,
'SECRET_KEY': None,
'PERMANENT_SESSION_LIFETIME': timedelta(days=31),
'USE_X_SENDFILE': False,
'LOGGER_NAME': None,
'LOGGER_HANDLER_POLICY': 'always',
'SERVER_NAME': None,
'APPLICATION_ROOT': None,
'SESSION_COOKIE_NAME': 'session',
'SESSION_COOKIE_DOMAIN': None,
'SESSION_COOKIE_PATH': None,
'SESSION_COOKIE_HTTPONLY': True,
'SESSION_COOKIE_SECURE': False,
'SESSION_REFRESH_EACH_REQUEST': True,
'MAX_CONTENT_LENGTH': None,
'SEND_FILE_MAX_AGE_DEFAULT': timedelta(hours=12),
'TRAP_BAD_REQUEST_ERRORS': False,
'TRAP_HTTP_EXCEPTIONS': False,
'EXPLAIN_TEMPLATE_LOADING': False,
'PREFERRED_URL_SCHEME': 'http',
'JSON_AS_ASCII': True,
'JSON_SORT_KEYS': True,
'JSONIFY_PRETTYPRINT_REGULAR': True,
'JSONIFY_MIMETYPE': 'application/json',
'TEMPLATES_AUTO_RELOAD': None,
}
[*]xx
# 如何通过代码找到指定的类
import importlib
path = "settings.Foo"
p, c = path.rsplit(".", maxsplit=1)
m = importlib.import_module(p)
cls = getattr(m, c)
# 找出类中的所有静态变量
for key in dir(cls):
if key.isupper():
print(key,getattr(cls, key))
[*]Flask中有一个配置文件
from flask import Flask, render_template, request, redirect, session
app = Flask(__name__)
print(app.config) # 这是flask提供的配置文件
# app.config.["DEBUG"] = True # 我们可以通过这样来修改配置,但是flask也提供了另一种方式
app.config.from_object("sttings.DevelopmentConfig") # 这种方式就是自己创建一个文件和类,你在类中写的所有配置都会被应用。
if __name__ == "__main__":
app.run()
2.路由和视图
[*]flask的路由系统
from flask import Flask, render_template, request, redirect, session, url_for
app = Flask(__name__)
# int 是值的类型,nid是名称:/login/1234
"""
# 这是常用的类型,不加类型默认是字符串类型
DEFAULT_CONVERTERS = {
'default': UnicodeConverter,
'string': UnicodeConverter,
'any': AnyConverter,
'path': PathConverter,
'int': IntegerConverter,
'float': FloatConverter,
'uuid': UUIDConverter,
}
"""
@app.route("/login/<int:nid>", endpoint="n1") # endpoint就是设置名字,不写默认是函数名
def login():
url_for("n1", nid=122) # 反向查找url,nid是携带的值
if __name__ == "__main__":
app.run()
[*]路由的两种方式
@app.route("/xxx")
def index():
return "index"
# 其实到内部调用时就是使用了app.add_url_rule来实现的,为了方便所以在外面套了个装饰器。
def index():
return "index"
app.add_url_rule("/xxx", None, index)
# 注意事项:
# - 不用让endpoint重名
# - 如果重名函数也一定要相同。
[*]app.route和app.add_url_rule参数
@app.route和app.add_url_rule参数:
rule, URL规则
view_func, 视图函数名称
defaults=None, 默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
endpoint=None, 名称,用于反向生成URL,即: url_for('名称')
methods=None, 允许的请求方式,如:["GET","POST"]
strict_slashes=None, 对URL最后的 / 符号是否严格要求,
如:
@app.route('/index',strict_slashes=False),
访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
@app.route('/index',strict_slashes=True)
仅访问 http://www.xx.com/index
redirect_to=None, 重定向到指定地址
如:
@app.route('/index/<int:nid>', redirect_to='/home/<nid>')
或
def func(adapter, nid):
return "/home/888"
@app.route('/index/<int:nid>', redirect_to=func)
subdomain=None, 子域名访问
from flask import Flask, views, url_for
app = Flask(import_name=__name__)
app.config['SERVER_NAME'] = 'wupeiqi.com:5000'
@app.route("/", subdomain="admin")
def static_index():
"""Flask supports static subdomains
This is available at static.your-domain.tld"""
return "static.your-domain.tld"
@app.route("/dynamic", subdomain="<username>")
def username_index(username):
"""Dynamic subdomains are also supported
Try going to user1.your-domain.tld/dynamic"""
return username + ".your-domain.tld"
if __name__ == '__main__':
app.run()
[*]CBV
import functools
from flask import Flask,views
app = Flask(__name__)
def wrapper(func):
@functools.wraps(func)
def inner(*args,**kwargs):
return func(*args,**kwargs)
return inner
class UserView(views.MethodView):
methods = ['GET']
decorators =
def get(self,*args,**kwargs):
return 'GET'
def post(self,*args,**kwargs):
return 'POST'
app.add_url_rule('/user',None,UserView.as_view('uuuu'))
if __name__ == '__main__':
app.run()
[*]自定义正则
from flask import Flask,url_for
app = Flask(__name__)
# 步骤一:定制类
from werkzeug.routing import BaseConverter
class RegexConverter(BaseConverter):
"""
自定义URL匹配正则表达式
"""
def __init__(self, map, regex):
super(RegexConverter, self).__init__(map)
self.regex = regex
def to_python(self, value):
"""
路由匹配时,匹配成功后传递给视图函数中参数的值
:param value:
:return:
"""
return int(value)
def to_url(self, value):
"""
使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
:param value:
:return:
"""
val = super(RegexConverter, self).to_url(value)
return val
# 步骤二:添加到转换器
app.url_map.converters['reg'] = RegexConverter
"""
1. 用户发送请求
2. flask内部进行正则匹配
3. 调用to_python(正则匹配的结果)方法
4. to_python方法的返回值会交给视图函数的参数
"""
# 步骤三:使用自定义正则
@app.route('/index/<reg("\d+"):nid>')
def index(nid):
print(nid,type(nid))
print(url_for('index',nid=987))
return "index"
if __name__ == '__main__':
app.run()
3.请求和响应
[*]请求
# request.method
# request.args
# request.form
# request.values
# request.cookies
# request.headers
# request.path
# request.full_path
# request.script_root
# request.url
# request.base_url
# request.url_root
# request.host_url
# request.host
# request.files
# obj = request.files['the_file_name']
# obj.save('/var/www/uploads/' + secure_filename(f.filename))
[*]响应
from flask import Flask, render_template, request, redirect, session, url_for, jsonify
# 响应相关信息
# return "字符串"
# return render_template('html模板路径',**{})
# return redirect('/index.html')
# return jsonify({"k1":"v1"})
# response = make_response(render_template('index.html'))
# response是flask.wrappers.Response类型
# response.delete_cookie('key')
# response.set_cookie('key', 'value')
# response.headers['X-Something'] = 'A value'
# return response
[*]实现登录后才可查看的效果
# 版本一:在每个函数中都加入判断
@app.route('/index')
def index():
if not session.get('user'):
return redirect(url_for('login'))
return render_template('index.html',stu_dic=STUDENT_DICT)
# 版本二:使用装饰器,但是需要用到functools的wraps,应为你写的装饰器肯定要先执行,不然无法被应用,但是你写的装饰器到最后被调用的函数是inner,但是我们需要自己的属性才可以实现。wraps的功能就是将inner赋予调用函数的所有属性和特征。
import functools
def auth(func):
@functools.wraps(func)
def inner(*args,**kwargs):
if not session.get('user'):
return redirect(url_for('login'))
ret = func(*args,**kwargs)
return ret
return inner
@app.route('/index')
@auth
def index():
return render_template('index.html',stu_dic=STUDENT_DICT)
# 应用场景:比较少的函数中需要额外添加功能。
# 版本三:before_request,在执行函数时会先执行before_request所定义的函数,返回None就会向后执行,如果返回数据就会直接将其返回到前端了。
@app.before_request
def xxxxxx():
if request.path == '/login':
return None
if session.get('user'):
return None
return redirect('/login')
4.模板渲染
[*]基本数据类型:可以执行python语法,如:dict.get() list['xx']
[*]传入函数:django,自动执行\flask,不自动执行
[*]全局定义函数
@app.template_global()
def sb(a1, a2):
# {{sb(1,9)}}
return a1 + a2
@app.template_filter()
def db(a1, a2, a3):
# {{ 1|db(2,3) }}
return a1 + a2 + a3
[*]模板继承
layout.html
<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<title>Title</title>
<meta name="viewport" content="width=device-width, initial-scale=1">
</head>
<body>
<h1>模板</h1>
{% block content %}{% endblock %}
</body>
</html>
tpl.html
{% extends "layout.html"%}
{% block content %}
{{users.0}}
{% endblock %}
[*]include
{% include "form.html" %}
form.html
<form>
asdfasdf
asdfasdf
asdf
asdf
</form>
[*]宏
{% macro ccccc(name, type='text', value='') %}
<h1>宏</h1>
<input type="{{ type }}" name="{{ name }}" value="{{ value }}">
<input type="submit" value="提交">
{% endmacro %}
{{ ccccc('n1') }}
{{ ccccc('n2') }}
[*]flask默认时设置了xss的后端将html代码返回到前端时需要使用安全MarkUp("asdf"),前端也可以直接将其应用{{u|safe}}。
5.session
[*]当请求刚到来:flask读取cookie中session对应的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,将该值解密并反序列化成字典,放入内存以便视图函数使用。
@app.route('/ses')
def ses():
session['k1'] = 123
session['k2'] = 456
del session['k1']
return "Session"
session['xxx'] = 123
session['xxx']
[*]当请求结束时,flask会读取内存中字典的值,进行序列化+加密,写入到用户cookie中。
[*]threading.local【和flask无任何关系】
[*]为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。
import threading
from threading import local
import time
obj = local()
def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i)
for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
[*]根据字典自定义一个类似于threading.local功能?
import time
import threading
DIC = {}
def task(i):
ident = threading.get_ident()
if ident in DIC:
DIC['xxxxx'] = i
else:
DIC = {'xxxxx':i }
time.sleep(2)
print(DIC['xxxxx'],i)
for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
[*]通过getattr/setattr 构造出来 threading.local的加强版(协程)
import time
import threading
try:
import greenlet
get_ident =greenlet.getcurrent # 获取协程唯一标识
except Exception as e:
get_ident = threading.get_ident # 获取线程唯一标识
class Local(object):
DIC = {}
def __getattr__(self, item): # obj.xxx时触发,item等于xxx
ident = get_ident()
if ident in self.DIC:
return self.DIC.get(item)
return None
def __setattr__(self, key, value): # obj.xxx = "123"时触发,key等于xxx,value等于123
ident = get_ident()
if ident in self.DIC:
self.DIC = value
else:
self.DIC = {key:value}
obj = Local()
def task(i):
obj.xxxxx = i
time.sleep(2)
print(obj.xxxxx,i)
for i in range(10):
t = threading.Thread(target=task,args=(i,))
t.start()
5.1 flask-session
[*]安装:pip3 install flask-session
[*]使用
import redis
from flask import Flask,request,session
from flask.sessions import SecureCookieSessionInterface
from flask_session import Session
app = Flask(__name__)
# app.session_interface = SecureCookieSessionInterface()
# app.session_interface = RedisSessionInterface()
app.config['SESSION_TYPE'] = 'redis'
app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')
Session(app)
@app.route('/login')
def login():
session['user'] = 'alex'
return 'asdfasfd'
@app.route('/home')
def index():
print(session.get('user'))
return '...'
if __name__ == '__main__':
app.run()
6.闪现和中间件
[*]闪现,在session中存储一个数据,读取时通过pop将数据移除。
from flask import Flask,flash,get_flashed_messages
@app.route('/page1')
def page1():
flash('临时数据存储','error')
flash('sdfsdf234234','error')
flash('adasdfasdf','info')
return "Session"
@app.route('/page2')
def page2():
print(get_flashed_messages(category_filter=['error']))
return "Session"
[*]中间件
call方法什么时候出发?
用户发起请求时,才执行。
任务:在执行call方法之前,做一个操作,call方法执行之后做一个操作。
class Middleware(object):
def __init__(self,old):
self.old = old
def __call__(self, *args, **kwargs):
ret = self.old(*args, **kwargs)
return ret
if __name__ == '__main__':
app.wsgi_app = Middleware(app.wsgi_app)
app.run()
7.蓝图
[*]完整目录图
ProjectName
-ProjectName
--static
--templates
--views
---account.py
from flask import Blueprint,render_template
# 创建蓝图对象
ac = Blueprint('ac',__name__)
# 只有调用该文件中的页面时才会触发
@ac.before_request
def x1():
print('app.before_request')
@ac.route('/login')
def login():
return render_template('login.html')
@ac.route('/logout')
def logout():
return 'Logout'
--__init__.py
from flask import Flask
from .views.account import ac
from .views.user import uc
def create_app():
app = Flask(__name__)
# @app.before_request
# def x1():
# print('app.before_request')
# 注册蓝图
app.register_blueprint(ac)
# 注册蓝图和编写url前缀
app.register_blueprint(uc,url_prefix='/api')
return app
-manage.py
from crm import create_app
app = create_app()
if __name__ == '__main__':
app.run()
8.特殊装饰器
[*]before_request and after_request
from flask import Flask
app = Flask(__name__)
@app.before_request
def x1():
print('before:x1')
return '滚'
@app.before_request
def xx1():
print('before:xx1')
@app.after_request
def x2(response):
print('after:x2')
return response
@app.after_request
def xx2(response):
print('after:xx2')
return response
@app.route('/index')
def index():
print('index')
return "Index"
@app.route('/order')
def order():
print('order')
return "order"
if __name__ == '__main__':
app.run()
# 执行结果是,before的返回会将其返回到页面,但是after还是会执行,并不像django返回后后面的都不会执行
before:x1
before:xx1
request
after:xx2
after:x2
[*]before_first_request,只有第一次时才会被触发
from flask import Flask
app = Flask(__name__)
@app.before_first_request
def x1():
print('123123')
@app.route('/index')
def index():
print('index')
return "Index"
@app.route('/order')
def order():
print('order')
return "order"
if __name__ == '__main__':
app.run()
[*]页面渲染的全局装饰器
@app.template_global()
def sb(a1, a2):
# {{sb(1,9)}}
return a1 + a2
@app.template_filter()
def db(a1, a2, a3):
# {{ 1|db(2,3) }}
return a1 + a2 + a3
[*]errorhandler,当报指定的错误时会触发。
@app.errorhandler(404)
def not_found(arg):
print(arg)
return "没找到"
9.上下文管理
[*]偏函数
import functools
def index(a1,a2):
return a1 + a2
# 原来的调用方式
# ret = index(1,23)
# print(ret)
# 偏函数,帮助开发者自动传递参数
new_func = functools.partial(index,666)
ret = new_func(1)
print(ret)
[*]父类方法的执行
"""
class Base(object):
def func(self):
print('Base.func')
class Foo(Base):
def func(self):
# 方式一:根据mro的顺序执行方法
# super(Foo,self).func()
# 方式二:主动执行Base类的方法
# Base.func(self)
print('Foo.func')
obj = Foo()
obj.func()
"""
####################################
class Base(object):
def func(self):
super(Base, self).func()
print('Base.func')
class Bar(object):
def func(self):
print('Bar.func')
class Foo(Base,Bar):
pass
# 示例一
# obj = Foo()
# obj.func()
# print(Foo.__mro__)
# 示例二
# obj = Base()
# obj.func()
[*]栈
# by luffycity.com
class Stack(object):
def __init__(self):
self.data = []
def push(self,val):
self.data.append(val)
def pop(self):
return self.data.pop()
def top(self):
return self.data[-1]
_stack = Stack()
_stack.push('佳俊')
_stack.push('咸鱼')
print(_stack.pop())
print(_stack.pop())
[*]Local
"""
{
1232:{k:v}
}
"""
try:
from greenlet import getcurrent as get_ident
except:
from threading import get_ident
"""
class Local(object):
def __init__(self):
object.__setattr__(self,'storage',{})
def __setattr__(self, key, value):
ident = get_ident()
if ident not in self.storage:
self.storage = {key:value}
else:
self.storage = value
def __getattr__(self, item):
ident = get_ident()
if ident in self.storage:
return self.storage.get(item)
"""
class Local(object):
__slots__ = ('__storage__', '__ident_func__')
def __init__(self):
# __storage__ = {1231:{'stack':[]}}
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def __getattr__(self, name):
try:
return self.__storage__
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
ident = self.__ident_func__()
storage = self.__storage__
try:
storage = value
except KeyError:
storage = {name: value}
def __delattr__(self, name):
try:
del self.__storage__
except KeyError:
raise AttributeError(name)
obj = Local()
obj.stack = []
obj.stack.append('佳俊')
obj.stack.append('咸鱼')
print(obj.stack)
print(obj.stack.pop())
print(obj.stack)
[*]LocalStack
import functools
try:
from greenlet import getcurrent as get_ident
except:
from threading import get_ident
class Local(object):
__slots__ = ('__storage__', '__ident_func__')
def __init__(self):
# __storage__ = {1231:{'stack':[]}}
object.__setattr__(self, '__storage__', {})
object.__setattr__(self, '__ident_func__', get_ident)
def __getattr__(self, name):
try:
return self.__storage__
except KeyError:
raise AttributeError(name)
def __setattr__(self, name, value):
# name=stack
# value=[]
ident = self.__ident_func__()
storage = self.__storage__
try:
storage = value
except KeyError:
storage = {name: value}
def __delattr__(self, name):
try:
del self.__storage__
except KeyError:
raise AttributeError(name)
"""
__storage__ = {
12312: {stack:}
}
"""
# obj = Local()
# obj.stack = []
# obj.stack.append('佳俊')
# obj.stack.append('咸鱼')
# print(obj.stack)
# print(obj.stack.pop())
# print(obj.stack)
class LocalStack(object):
def __init__(self):
self._local = Local()
def push(self,value):
rv = getattr(self._local, 'stack', None) # self._local.stack =>local.getattr
if rv is None:
self._local.stack = rv = [] #self._local.stack =>local.setattr
rv.append(value) # self._local.stack.append(666)
return rv
def pop(self):
"""Removes the topmost item from the stack, will return the
old value or `None` if the stack was already empty.
"""
stack = getattr(self._local, 'stack', None)
if stack is None:
return None
elif len(stack) == 1:
return stack[-1]
else:
return stack.pop()
def top(self):
try:
return self._local.stack[-1]
except (AttributeError, IndexError):
return None
class RequestContext(object):
def __init__(self):
self.request = "xx"
self.session = 'oo'
_request_ctx_stack = LocalStack()
_request_ctx_stack.push(RequestContext())
def _lookup_req_object(arg):
ctx = _request_ctx_stack.top()
return getattr(ctx,arg) # ctx.request / ctx.session
request = functools.partial(_lookup_req_object,'request')
session = functools.partial(_lookup_req_object,'session')
print(request())
print(session())
[*]slots
class Foo(object):
__slots__ = ('name',)
def __init__(self):
self.name = 'alex'
# self.age = 18
obj = Foo()
print(obj.name)
# print(obj.age)
[*]请求上下文管理(ctx):request,session
- 请求到来之后wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法
- 在wsgi_app方法中:
- 首先将 请求相关+空session 封装到一个RequestContext对象中,即:ctx。
- 将ctx交给LocalStack对象,再由LocalStack将ctx添加到Local中,Local结构:
__storage__ = {
1231:{stack: }
}
- 根据请求中的cookie中提取名称为sessionid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session
-> 视图函数
- 把session中的数据再次写入到cookie中。
- 将ctx删除
- 结果返回给用户浏览器
- 断开socket连接
[*]LocalProxy
from flask import Flask,request,session
app = Flask(__name__)
@app.route('/index')
def index():
# 1. request是LocalProxy对象
# 2. 对象中有method、执行__getattr__
print(request.method)
# request['method']
# request + 1
# 1. session是LocalProxy对象
# 2. LocalProxy对象的__setitem__
session['x'] = 123
return "Index"
if __name__ == '__main__':
app.run()
# app.__call__
# app.wsgi_app
"""
第一阶段:请求到来
将request和Session相关数据封装到ctx=RequestContext对象中。
再通过LocalStack将ctx添加到Local中。
__storage__ = {
1231:{'stack':}
}
第二阶段:视图函数中获取request或session
方式一:直接找LocalStack获取
from flask.globals import _request_ctx_stack
print(_request_ctx_stack.top.request.method)
方式二:通过代理LocalProxy(小东北)获取
from flask import Flask,request
print(request.method)
"""
[*]g只存在在一次请求中当你return时g就会被清除。
10.文件的上传和解压
[*]view.py
from flask import Blueprint, render_template, Flask, request, redirect,session
import os
import uuid
from ..utils import helper
ind = Blueprint('ind', __name__)
ind.config["MAX_CONTENT_LENGTH"] == 1024 * 1024 * 7
@ind.before_request
def process_request():
if not session.get("user_info"):
return redirect("/login")
return None
@ind.route('/home')
def home():
return render_template('home.html')
@ind.route('/user_list')
def user_list():
# import pymysql
# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("SELECT id,user,nickname FROM userinfo")
# data_list = cursor.fetchall()
# cursor.close()
# conn.close()
data_list = helper.fetch_all("SELECT id,user,nickname FROM userinfo",[])
return render_template('user_list.html',data_list=data_list)
@ind.route('/detail/<int:nid>')
def detail(nid):
# import pymysql
# conn = Config.POOL.connection()
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
# record_list = cursor.fetchall()
# cursor.close()
# conn.close()
record_list = helper.fetch_all("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
return render_template('detail.html',record_list=record_list)
@ind.route('/upload',methods=['GET','POST'])
def upload():
if request.method == "GET":
return render_template('upload.html')
from werkzeug.datastructures import FileStorage
file_obj = request.files.get('code')
# 1. 检查上传文件后缀名
name_ext = file_obj.filename.rsplit('.',maxsplit=1)
if len(name_ext) != 2:
return "请上传zip压缩文件"
if name_ext != 'zip':
return "请上传zip压缩文件"
"""
# 2. 接收用户上传文件,并写入到服务器本地.
file_path = os.path.join("files",file_obj.filename)
# 从file_obj.stream中读取内容,写入到文件
file_obj.save(file_path)
# 3. 解压zip文件
import shutil
# 通过open打开压缩文件,读取内容再进行解压。
shutil._unpack_zipfile(file_path,'xsadfasdfasdf')
"""
# 2+3, 接收用户上传文件,并解压到指定目录
import shutil
target_path = os.path.join('files',str(uuid.uuid4()))
shutil._unpack_zipfile(file_obj.stream,target_path)
# 4. 遍历某目录下的所有文件
# for item in os.listdir(target_path):
# print(item)
total_num = 0
for base_path,folder_list,file_list in os.walk(target_path):
for file_name in file_list:
file_path = os.path.join(base_path,file_name)
file_ext = file_path.rsplit('.',maxsplit=1)
if len(file_ext) != 2:
continue
if file_ext != 'py':
continue
file_num = 0
with open(file_path,'rb') as f:
for line in f:
line = line.strip()
if not line:
continue
if line.startswith(b'#'):
continue
file_num += 1
total_num += file_num
# 获取当前时间
import datetime
ctime = datetime.date.today()
print(total_num,ctime,session['user_info']['id'])
# import pymysql
# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
# data = cursor.fetchone()
# cursor.close()
# conn.close()
data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
if data:
return "今天已经上传"
# import pymysql
# conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
# cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
# cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
# conn.commit()
# cursor.close()
# conn.close()
helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
return "上传成功"
[*]html.py
{% extends "layout.html"%}
{% block content %}
<h1>代码上传</h1>
<form method="post" enctype="multipart/form-data">
<input type="file" name="code">
<input type="submit" value="上传">
</form>
{% endblock %}
11.Database Join 池
[*]安装pip3 install DBUtils
[*]创建连接池可以共享出去,谁要用就来获取一个连接,这样可以管理连接的数量,提高链接的利用率。
from DBUtils.PooledDB import PooledDB, SharedDBConnection
import pymysql
POOL = PooledDB(
creator=pymysql,# 使用链接数据库的模块
maxconnections=6,# 连接池允许的最大连接数,0和None表示不限制连接数
mincached=2,# 初始化时,链接池中至少创建的空闲的链接,0表示不创建
maxcached=5,# 链接池中最多闲置的链接,0和None不限制
maxshared=3,
# 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
blocking=True,# 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
maxusage=None,# 一个链接最多被重复使用的次数,None表示无限制
setsession=[],# 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
ping=0,
# ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
host='127.0.0.1',
port=3306,
user='root',
password='123456',
database='s9day118',
charset='utf8'
)
# 获取连接
conn = POOL.connection()
cursor = conn.cursor(pymysql.cursors.DictCursor)
cursor.execute("select * from tb1")
result = cursor.fetchall()
cursor.close()
# 归还连接
conn.close()
12.wtforms
[*]WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。
[*]安装pip3 install wtforms
12.1 用户登录注册示例
[*]用户登录
[*]当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如:
[*]用户不能为空;用户长度必须大于6;
[*]密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class LoginForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired(message='用户名不能为空.'),
validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'}
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.'),
validators.Length(min=8, message='用户名长度必须大于%(min)d'),
validators.Regexp(regex="^(?=.*)(?=.*)(?=.*\d)(?=.*[$@$!%*?&]){8,}",
message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
@app.route('/login', methods=['GET', 'POST'])
def login():
if request.method == 'GET':
form = LoginForm()
return render_template('login.html', form=form)
else:
form = LoginForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('login.html', form=form)
if __name__ == '__main__':
app.run()
#######################################################
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>登录</h1>
<form method="post">
<p>{{form.name.label}} {{form.name}} {{form.name.errors }}</p>
<p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors }}</p>
<input type="submit" value="提交">
</form>
</body>
</html>
[*]用户注册
[*]注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。
from flask import Flask, render_template, request, redirect
from wtforms import Form
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
app = Flask(__name__, template_folder='templates')
app.debug = True
class RegisterForm(Form):
name = simple.StringField(
label='用户名',
validators=[
validators.DataRequired()
],
widget=widgets.TextInput(),
render_kw={'class': 'form-control'},
default='alex'
)
pwd = simple.PasswordField(
label='密码',
validators=[
validators.DataRequired(message='密码不能为空.')
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
pwd_confirm = simple.PasswordField(
label='重复密码',
validators=[
validators.DataRequired(message='重复密码不能为空.'),
validators.EqualTo('pwd', message="两次密码输入不一致")
],
widget=widgets.PasswordInput(),
render_kw={'class': 'form-control'}
)
email = html5.EmailField(
label='邮箱',
validators=[
validators.DataRequired(message='邮箱不能为空.'),
validators.Email(message='邮箱格式错误')
],
widget=widgets.TextInput(input_type='email'),
render_kw={'class': 'form-control'}
)
gender = core.RadioField(
label='性别',
choices=(
(1, '男'),
(2, '女'),
),
coerce=int
)
city = core.SelectField(
label='城市',
choices=(
('bj', '北京'),
('sh', '上海'),
)
)
hobby = core.SelectMultipleField(
label='爱好',
choices=(
(1, '篮球'),
(2, '足球'),
),
coerce=int
)
favor = core.SelectMultipleField(
label='喜好',
choices=(
(1, '篮球'),
(2, '足球'),
),
widget=widgets.ListWidget(prefix_label=False),
option_widget=widgets.CheckboxInput(),
coerce=int,
default=
)
def __init__(self, *args, **kwargs):
super(RegisterForm, self).__init__(*args, **kwargs)
self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
def validate_pwd_confirm(self, field):
"""
自定义pwd_confirm字段规则,例:与pwd字段是否一致
:param field:
:return:
"""
# 最开始初始化时,self.data中已经有所有的值
if field.data != self.data['pwd']:
# raise validators.ValidationError("密码不一致") # 继续后续验证
raise validators.StopValidation("密码不一致")# 不再继续后续验证
@app.route('/register', methods=['GET', 'POST'])
def register():
if request.method == 'GET':
form = RegisterForm(data={'gender': 1})
return render_template('register.html', form=form)
else:
form = RegisterForm(formdata=request.form)
if form.validate():
print('用户提交数据通过格式验证,提交的值为:', form.data)
else:
print(form.errors)
return render_template('register.html', form=form)
if __name__ == '__main__':
app.run()
############################################################
<!DOCTYPE html>
<html lang="en">
<head>
<meta charset="UTF-8">
<title>Title</title>
</head>
<body>
<h1>用户注册</h1>
<form method="post" novalidate >
{% for item in form %}
<p>{{item.label}}: {{item}} {{item.errors }}</p>
{% endfor %}
<input type="submit" value="提交">
</form>
</body>
</html>
[*]动态获取数据库数据
from flask import Flask,request,render_template,session,current_app,g,redirect
from wtforms import Form
from wtforms.fields import simple
from wtforms.fields import html5
from wtforms.fields import core
from wtforms import widgets
from wtforms import validators
app = Flask(__name__)
class LoginForm(Form):
name = simple.StringField(
validators=[
validators.DataRequired(message='用户名不能为空.'),
# validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
],
widget=widgets.TextInput(),
render_kw={'placeholder':'请输入用户名'}
)
pwd = simple.PasswordField(
validators=[
validators.DataRequired(message='密码不能为空.'),
# validators.Length(min=8, message='用户名长度必须大于%(min)d'),
# validators.Regexp(regex="^(?=.*)(?=.*)(?=.*\d)(?=.*[$@$!%*?&]){8,}",
# message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
],
render_kw={'placeholder':'请输入密码'}
)
@app.route('/login',methods=['GET','POST'])
def login():
if request.method == "GET":
form = LoginForm()
# print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__
# print(form.pwd,type(form.pwd)) # form.pwd是PasswordField()对象,PasswordField().__str__
return render_template('login.html',form=form)
form = LoginForm(formdata=request.form)
if form.validate():
print(form.data)
return redirect('https://www.luffycity.com/home')
else:
# print(form.errors)
return render_template('login.html', form=form)
import helper
class UserForm(Form):
city = core.SelectField(
label='城市',
choices=(),
coerce=int # 从数据库获取的数据返回后是字符串类型的,把他写上后会先将其转为int后再返回到前端
)
name = simple.StringField(label='姓名')
# 当你定义成静态字段是,只有启动时才会加载一次,但是从数据库需要实时拿到数据,下面是解决方法。你应该可以看懂。
def __init__(self,*args,**kwargs):
super(UserForm,self).__init__(*args,**kwargs)
self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)
@app.route('/user')
def user():
if request.method == "GET":
#form = UserForm(data={'name':'alex','city':3})
form = UserForm()
return render_template('user.html',form=form)
if __name__ == '__main__':
app.run()
[*]meta
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from flask import Flask, render_template, request, redirect, session
from wtforms import Form
from wtforms.csrf.core import CSRF
from wtforms.fields import core
from wtforms.fields import html5
from wtforms.fields import simple
from wtforms import validators
from wtforms import widgets
from hashlib import md5
app = Flask(__name__, template_folder='templates')
app.debug = True
class MyCSRF(CSRF):
"""
Generate a CSRF token based on the user's IP. I am probably not very
secure, so don't use me.
"""
def setup_form(self, form):
self.csrf_context = form.meta.csrf_context()
self.csrf_secret = form.meta.csrf_secret
return super(MyCSRF, self).setup_form(form)
def generate_csrf_token(self, csrf_token):
gid = self.csrf_secret + self.csrf_context
token = md5(gid.encode('utf-8')).hexdigest()
return token
def validate_csrf_token(self, form, field):
print(field.data, field.current_token)
if field.data != field.current_token:
raise ValueError('Invalid CSRF')
class TestForm(Form):
name = html5.EmailField(label='用户名')
pwd = simple.StringField(label='密码')
class Meta:
# -- CSRF
# 是否自动生成CSRF标签
csrf = True
# 生成CSRF标签name
csrf_field_name = 'csrf_token'
# 自动生成标签的值,加密用的csrf_secret
csrf_secret = 'xxxxxx'
# 自动生成标签的值,加密用的csrf_context
csrf_context = lambda x: request.url
# 生成和比较csrf标签
csrf_class = MyCSRF
# -- i18n
# 是否支持本地化
# locales = False
locales = ('zh', 'en')
# 是否对本地化进行缓存
cache_translations = True
# 保存本地化缓存信息的字段
translations_cache = {}
@app.route('/index/', methods=['GET', 'POST'])
def index():
if request.method == 'GET':
form = TestForm()
else:
form = TestForm(formdata=request.form)
if form.validate():
print(form)
return render_template('index.html', form=form)
if __name__ == '__main__':
app.run()
12.2 其他
[*]metaclass
# new方法的返回值决定对象到底是什么?
class Bar(object):
pass
class Foo(object):
def __new__(cls, *args, **kwargs):
# return super(Foo,cls).__new__(cls,*args, **kwargs)
return Bar()
obj = Foo()
print(obj)
# 当Foo()时先执行__new__方法,返回一个Bar,obj就等于Bar对象
##########################################################################
class MyType(type):
def __init__(self, *args, **kwargs):
print('MyType创建类',self)
super(MyType, self).__init__(*args, **kwargs)
def __call__(self, *args, **kwargs):
obj = super(MyType, self).__call__(*args, **kwargs)
print('类创建对象', self, obj)
return obj
class Foo(object,metaclass=MyType):
user = 'wupeiqi'
age = 18
obj = Foo()
# 在创建Foo对象前先执行mytype的__call__方法,__call__方法后才会执行Foo。不写metaclass默认是由type类来做的。
# 是先执行父类的__call__后再执行自己的__init__方法。
"""
创建类时,先执行type的__init__。
类的实例化时,执行type的__call__,__call__方法的的返回值就是实例化的对象。
__call__内部调用:
类.__new__,创建对象
类.__init__,对象的初始化
"""
[*]实例化流程分析
# 源码流程
1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中
2. 执行构造方法
a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields 中。
即:
_fields = {
name: wtforms.fields.core.StringField(),
}
PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()
b. 循环_fields,为对象设置属性
for name, field in iteritems(self._fields):
# Set all the fields to attributes so that they obscure the class
# attributes with the same names.
setattr(self, name, field)
c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)
优先级:obj,data,formdata;
再循环执行每个字段的process方法,为每个字段设置值:
for name, field, in iteritems(self._fields):
if obj is not None and hasattr(obj, name):
field.process(formdata, getattr(obj, name))
elif name in kwargs:
field.process(formdata, kwargs)
else:
field.process(formdata)
执行每个字段的process方法,为字段的data和字段的raw_data赋值
def process(self, formdata, data=unset_value):
self.process_errors = []
if data is unset_value:
try:
data = self.default()
except TypeError:
data = self.default
self.object_data = data
try:
self.process_data(data)
except ValueError as e:
self.process_errors.append(e.args)
if formdata:
try:
if self.name in formdata:
self.raw_data = formdata.getlist(self.name)
else:
self.raw_data = []
self.process_formdata(self.raw_data)
except ValueError as e:
self.process_errors.append(e.args)
try:
for filter in self.filters:
self.data = filter(self.data)
except ValueError as e:
self.process_errors.append(e.args)
d. 页面上执行print(form.name) 时,打印标签
因为执行了:
字段的 __str__ 方法
字符的 __call__ 方法
self.meta.render_field(self, kwargs)
def render_field(self, field, render_kw):
other_kw = getattr(field, 'render_kw', None)
if other_kw is not None:
render_kw = dict(other_kw, **render_kw)
return field.widget(field, **render_kw)
执行字段的插件对象的 __call__ 方法,返回标签字符串
[*]验证流程分析
a. 执行form的validate方法,获取钩子方法
def validate(self):
extra = {}
for name in self._fields:
inline = getattr(self.__class__, 'validate_%s' % name, None)
if inline is not None:
extra =
return super(Form, self).validate(extra)
b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
def validate(self, extra_validators=None):
self._errors = None
success = True
for name, field in iteritems(self._fields):
if extra_validators is not None and name in extra_validators:
extra = extra_validators
else:
extra = tuple()
if not field.validate(self, extra):
success = False
return success
c. 每个字段进行验证时候
字段的pre_validate 【预留的扩展】
字段的_run_validation_chain,对正则和字段的钩子函数进行校验
字段的post_validate【预留的扩展】
13.SQLAlchemy
[*]MySQLdb,只支持python2.0的,而pymysql是支持python2.0和3.0的。
import MySQLdb
conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
cur = conn.cursor()
reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))
# reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})
conn.commit()
cur.close()
conn.close()
print reCount
[*]SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
[*]安装pip3 install sqlalchemy
[*]组成部分:
[*]Engine,框架的引擎
[*]Connection Pooling ,数据库连接池
[*]Dialect,选择连接数据库的DB API种类
[*]Schema/Types,架构和类型
[*]SQL Exprression Language,SQL表达式语言
[*]SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
MySQL-Python
mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
pymysql
mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
MySQL-Connector
mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
cx_Oracle
oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
[*]执行原生SQL语句
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5,# 连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
def task(arg):
conn = engine.raw_connection()
cursor = conn.cursor()
cursor.execute(
"select * from t1"
)
result = cursor.fetchall()
cursor.close()
conn.close()
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
########################################################################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
def task(arg):
conn = engine.contextual_connect()
with conn:
cur = conn.execute(
"select * from t1"
)
result = cur.fetchall()
print(result)
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
########################################################################################
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.engine.base import Engine
from sqlalchemy.engine.result import ResultProxy
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
def task(arg):
cur = engine.execute("select * from t1")
result = cur.fetchall()
cur.close()
print(result)
for i in range(20):
t = threading.Thread(target=task, args=(i,))
t.start()
# 注意: 查看连接 show status like 'Threads%';
13.1 ORM
[*]创建单表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
Base = declarative_base()
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
# email = Column(String(32), unique=True)
# ctime = Column(DateTime, default=datetime.datetime.now)
# extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'email'),
)
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5,# 连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5,# 连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_db()
init_db()
[*]创建多个表并包含Fk、M2M关系
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import datetime
from sqlalchemy import create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import relationship
Base = declarative_base()
# ##################### 单表示例 #########################
class Users(Base):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
age = Column(Integer, default=18)
email = Column(String(32), unique=True)
ctime = Column(DateTime, default=datetime.datetime.now)
extra = Column(Text, nullable=True)
__table_args__ = (
# UniqueConstraint('id', 'name', name='uix_id_name'),
# Index('ix_id_name', 'name', 'extra'),
)
class Hosts(Base):
__tablename__ = 'hosts'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True)
ctime = Column(DateTime, default=datetime.datetime.now)
# ##################### 一对多示例 #########################
class Hobby(Base):
__tablename__ = 'hobby'
id = Column(Integer, primary_key=True)
caption = Column(String(50), default='篮球')
class Person(Base):
__tablename__ = 'person'
nid = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=True)
hobby_id = Column(Integer, ForeignKey("hobby.id"))
# 与生成表结构无关,仅用于查询方便
hobby = relationship("Hobby", backref='pers')
# ##################### 多对多示例 #########################
class Server2Group(Base):
__tablename__ = 'server2group'
id = Column(Integer, primary_key=True, autoincrement=True)
server_id = Column(Integer, ForeignKey('server.id'))
group_id = Column(Integer, ForeignKey('group.id'))
class Group(Base):
__tablename__ = 'group'
id = Column(Integer, primary_key=True)
name = Column(String(64), unique=True, nullable=False)
# 与生成表结构无关,仅用于查询方便
servers = relationship('Server', secondary='server2group', backref='groups')
class Server(Base):
__tablename__ = 'server'
id = Column(Integer, primary_key=True, autoincrement=True)
hostname = Column(String(64), unique=True, nullable=False)
def init_db():
"""
根据类创建数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5,# 连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.create_all(engine)
def drop_db():
"""
根据类删除数据库表
:return:
"""
engine = create_engine(
"mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
max_overflow=0,# 超过连接池大小外最多创建的连接
pool_size=5,# 连接池大小
pool_timeout=30,# 池中没有线程最多等待的时间,否则报错
pool_recycle=-1# 多久之后对线程池中的线程进行一次连接的回收(重置)
)
Base.metadata.drop_all(engine)
if __name__ == '__main__':
drop_db()
init_db()
[*]操作数据库表
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
# 每次执行数据库操作时,都需要创建一个session
session = Session()
# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)
# 提交事务
session.commit()
# 关闭session
session.close()
[*]基于scoped_session实现线程安全
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sqlalchemy.orm import sessionmaker
from sqlalchemy import create_engine
from sqlalchemy.orm import scoped_session
from models import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
"""
# 线程安全,基于本地线程实现每个线程用同一个session
# 特殊的:scoped_session中有原来方法的Session中的一下方法:
public_methods = (
'__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
'close', 'commit', 'connection', 'delete', 'execute', 'expire',
'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
'bulk_update_mappings',
'merge', 'query', 'refresh', 'rollback',
'scalar'
)
"""
session = scoped_session(Session)
# ############# 执行ORM操作 #############
obj1 = Users(name="alex1")
session.add(obj1)
# 提交事务
session.commit()
# 关闭session
session.close()
[*]多线程执行示例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from db import Users
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
def task(arg):
session = Session()
obj1 = Users(name="alex1")
session.add(obj1)
session.commit()
for i in range(10):
t = threading.Thread(target=task, args=(i,))
t.start()
[*]基本增删改查示例
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# ################ 添加 ################
"""
obj1 = Users(name="wupeiqi")
session.add(obj1)
session.add_all([
Users(name="wupeiqi"),
Users(name="alex"),
Hosts(name="c1.com"),
])
session.commit()
"""
# ################ 删除 ################
"""
session.query(Users).filter(Users.id > 2).delete()
session.commit()
"""
# ################ 修改 ################
"""
session.query(Users).filter(Users.id > 0).update({"name" : "099"})
session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
session.commit()
"""
# ################ 查询 ################
"""
r1 = session.query(Users).all()
r2 = session.query(Users.name.label('xx'), Users.age).all()
r3 = session.query(Users).filter(Users.name == "alex").all()
r4 = session.query(Users).filter_by(name='alex').all()
r5 = session.query(Users).filter_by(name='alex').first()
r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
"""
session.close()
[*]原生SQL语句
# 条件
ret = session.query(Users).filter_by(name='alex').all()
ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
ret = session.query(Users).filter(Users.id.in_()).all()
ret = session.query(Users).filter(~Users.id.in_()).all()
ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
from sqlalchemy import and_, or_
ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
ret = session.query(Users).filter(
or_(
Users.id < 2,
and_(Users.name == 'eric', Users.id > 3),
Users.extra != ""
)).all()
# 通配符
ret = session.query(Users).filter(Users.name.like('e%')).all()
ret = session.query(Users).filter(~Users.name.like('e%')).all()
# 限制
ret = session.query(Users)
# 排序
ret = session.query(Users).order_by(Users.name.desc()).all()
ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
# 分组
from sqlalchemy.sql import func
ret = session.query(Users).group_by(Users.extra).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).all()
ret = session.query(
func.max(Users.id),
func.sum(Users.id),
func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
# 连表
ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
ret = session.query(Person).join(Favor).all()
ret = session.query(Person).join(Favor, isouter=True).all()
# 组合
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union(q2).all()
q1 = session.query(Users.name).filter(Users.id > 2)
q2 = session.query(Favor.caption).filter(Favor.nid < 2)
ret = q1.union_all(q2).all()
[*]基于relationship操作ForeignKey
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 查询
# cursor = session.execute('select * from users')
# result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
session.close()
[*]基于relationship操作m2m
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Hobby(caption='乒乓球'),
Hobby(caption='羽毛球'),
Person(name='张三', hobby_id=3),
Person(name='李四', hobby_id=4),
])
person = Person(name='张九', hobby=Hobby(caption='姑娘'))
session.add(person)
hb = Hobby(caption='人妖')
hb.pers =
session.add(hb)
session.commit()
"""
# 使用relationship正向查询
"""
v = session.query(Person).first()
print(v.name)
print(v.hobby.caption)
"""
# 使用relationship反向查询
"""
v = session.query(Hobby).first()
print(v.caption)
print(v.pers)
"""
session.close()
[*]其他
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 添加
"""
session.add_all([
Server(hostname='c1.com'),
Server(hostname='c2.com'),
Group(name='A组'),
Group(name='B组'),
])
session.commit()
s2g = Server2Group(server_id=1, group_id=1)
session.add(s2g)
session.commit()
gp = Group(name='C组')
gp.servers =
session.add(gp)
session.commit()
ser = Server(hostname='c6.com')
ser.groups =
session.add(ser)
session.commit()
"""
# 使用relationship正向查询
"""
v = session.query(Group).first()
print(v.name)
print(v.servers)
"""
# 使用relationship反向查询
"""
v = session.query(Server).first()
print(v.hostname)
print(v.groups)
"""
session.close()
14.flask武装
[*]Flask_SQLAlchemy
#!/usr/bin/env python
# -*- coding:utf-8 -*-
import time
import threading
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
from sqlalchemy.orm import sessionmaker, relationship
from sqlalchemy import create_engine
from sqlalchemy.sql import text, func
from sqlalchemy.engine.result import ResultProxy
from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
Session = sessionmaker(bind=engine)
session = Session()
# 关联子查询
subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
result = session.query(Group.name, subqry)
"""
SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
FROM server
WHERE server.id = `group`.id) AS anon_1
FROM `group`
"""
# 原生SQL
"""
# 查询
cursor = session.execute('select * from users')
result = cursor.fetchall()
# 添加
cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
session.commit()
print(cursor.lastrowid)
"""
session.close()
[*]flask-script
a. 下载安装
pip3 install flask-sqlalchemy
b. chun.__init__.py
导入并实例化SQLAlchemy
from flask_sqlalchemy import SQLAlchemy
db = SQLAlchemy()
注意事项:
- 必须在导入蓝图之前
- 必须导入models.py
c. 初始化
db.init_app(app)
d. 在配置文件中写入配置
# ##### SQLALchemy配置文件 #####
SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s9day122?charset=utf8"
SQLALCHEMY_POOL_SIZE = 10
SQLALCHEMY_MAX_OVERFLOW = 5
e. 创建models.py中的类(对应数据库表)
chun/models.py
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column
from sqlalchemy import Integer,String,Text,Date,DateTime
from sqlalchemy import create_engine
from chun import db
class Users(db.Model):
__tablename__ = 'users'
id = Column(Integer, primary_key=True)
name = Column(String(32), index=True, nullable=False)
depart_id = Column(Integer)
f. 生成表(使用app上下文)
from chun import db,create_app
app = create_app()
app_ctx = app.app_context() # app_ctx = app/g
with app_ctx: # __enter__,通过LocalStack放入Local中
db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
g. 基于ORM对数据库进行操作。
from flask import Blueprint
from chun import db
from chun import models
us = Blueprint('us',__name__)
@us.route('/index')
def index():
# 使用SQLAlchemy在数据库中插入一条数据
# db.session.add(models.Users(name='高件套',depart_id=1))
# db.session.commit()
# db.session.remove()
result = db.session.query(models.Users).all()
print(result)
db.session.remove()
return 'Index'
[*]flask-migrate
install the module "pip3 install flask-script "
a. 增加 runserver
from chun import create_app
from flask_script import Manager
app = create_app()
manager = Manager(app)
if __name__ == '__main__':
# app.run()
manager.run()
b. 位置传参
from chun import create_app
from flask_script import Manager
app = create_app()
manager = Manager(app)
@manager.command
def custom(arg):
"""
自定义命令
python manage.py custom 123
:param arg:
:return:
"""
print(arg)
if __name__ == '__main__':
# app.run()
manager.run()
c. 关键字传参
from chun import create_app
from flask_script import Manager
app = create_app()
manager = Manager(app)
@manager.option('-n', '--name', dest='name')
@manager.option('-u', '--url', dest='url')
def cmd(name, url):
"""
自定义命令
执行: python manage.pycmd -n wupeiqi -u http://www.oldboyedu.com
:param name:
:param url:
:return:
"""
print(name, url)
if __name__ == '__main__':
# app.run()
manager.run()
[*]找到项目使用的所有组件和版本。
install the module "pip3 install flask-migrate"
flask-migrate rely on flask-script.
#!/usr/bin/env python
# -*- coding:utf-8 -*-
from sansa import create_app
from sansa import db
from flask_script import Manager
from flask_migrate import Migrate, MigrateCommand
app = create_app()
manager = Manager(app)
Migrate(app, db)
"""
# 数据库迁移命名
python manage.py db init
python manage.py db migrate
python manage.py db upgrade
"""
manager.add_command('db', MigrateCommand)
if __name__ == '__main__':
manager.run()
# app.run()
[*]python虚拟环境
install the module "pip install pipreqs"
# 终端输入后就可以找到所有的依赖
pipreqs ./ --encoding=utf-8
来源:https://www.cnblogs.com/InvincibleGrass/p/17562878.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!
页:
[1]