翼度科技»论坛 编程开发 python 查看内容

Flask

4

主题

4

帖子

12

积分

新手上路

Rank: 1

积分
12
Flask


  • 短小精悍,可扩展强的一个web框架。上下文管理
  • 安装:pip3 install Flask,我们做web开发时,是站在两大东西之上做的web框架和wsgi,Flask和Django中都是应用的并不是自己写的。Flask中werkzurg就是。
  • web服务网关接口,wsgi是一个协议,实现该写一个的模块:wsgiref|werkzeug,实现其协议的模块本质上就是socket服务端用于接收用户请求,并处理。一般web框架基于wsgi实现,这样实现关注点分离。
  • flask就是基于Werkzurg搭建起来的
    1. from werkzeug.wrappers import Request, Response
    2. from werkzeug.serving import run_simple
    3. @Request.application
    4. def run(environ, start_response):
    5.    return Response("Hello World!")         # 返回字符串
    6. if __name__=="__main__":
    7.    run_simple("localhost", 4000, run)      # 监听本机的4000端口,如果访问就执行run函数
    复制代码
  • 一个最简单的Flask
    1. from flask import Flask
    2. app = Flask(__name__)                       # flask名称,一般这样写。
    3. @app.route("/index")                        # url路径
    4. def index():                                #所调用的函数
    5.    return "Hello World!"
    6. if __name__ == "__main__":
    7.    app.run()                               # 执行口
    复制代码
  • Flask实现用户登录程序
    1. from flask import Flask, render_template, request, redirect, session
    2. app = Flask(__name__, template_folder="templates", static_folder="static")
    3. app.secret_key = "asdf"                         # session加盐
    4. # request.form          # 请求体
    5. # request.args          # 请求头
    6. # request.method        # 请求类型
    7. @app.route("/login", methods=["GET", "POST"])
    8. def login():
    9.    if request.method == "GET":
    10.        return render_template("login.html")
    11.    user = request.from.get("user")
    12.    pwd= request.from.get("pwd")
    13.    if user == "alex" and pwd == "666":
    14.        session["user"] = user
    15.        return redirect("/index")
    16.    return render_template("login.html", error="用户名或密码错误")
    17. @app.route("/index")
    18. def index():
    19.    user = session.get("user")
    20.    if not user:
    21.        return redirect("/login")
    22.    return render_template("index.html")
    23. if __name__ == "__main__":
    24.    app.run()
    复制代码
1.配置文件


  • settings
    1. class Config(object):
    2.    DEBUG = False
    3.    TESTING = False
    4.    DATABASE_URL = "sqlite://:memory:"
    5. class ProductionConfig(Config):
    6.    DATABASE_URL = "mysql://user@localhost/foo"
    7.    
    8. class DevelopmentConfig(Config):
    9.    DEBUT = True
    10.    
    11. class TestingConfig(Config):
    12.    TESTING = True
    13.    
    14. flask中的配置文件是一个flask.config.Config对象(继承字典),默认配置为:
    15. {
    16.    'DEBUG':                                get_debug_flag(default=False),  是否开启Debug模式
    17.    'TESTING':                              False,                          是否开启测试模式
    18.    'PROPAGATE_EXCEPTIONS':                 None,                          
    19.    'PRESERVE_CONTEXT_ON_EXCEPTION':        None,
    20.    'SECRET_KEY':                           None,
    21.    'PERMANENT_SESSION_LIFETIME':           timedelta(days=31),
    22.    'USE_X_SENDFILE':                       False,
    23.    'LOGGER_NAME':                          None,
    24.    'LOGGER_HANDLER_POLICY':               'always',
    25.    'SERVER_NAME':                          None,
    26.    'APPLICATION_ROOT':                     None,
    27.    'SESSION_COOKIE_NAME':                  'session',
    28.    'SESSION_COOKIE_DOMAIN':                None,
    29.    'SESSION_COOKIE_PATH':                  None,
    30.    'SESSION_COOKIE_HTTPONLY':              True,
    31.    'SESSION_COOKIE_SECURE':                False,
    32.    'SESSION_REFRESH_EACH_REQUEST':         True,
    33.    'MAX_CONTENT_LENGTH':                   None,
    34.    'SEND_FILE_MAX_AGE_DEFAULT':            timedelta(hours=12),
    35.    'TRAP_BAD_REQUEST_ERRORS':              False,
    36.    'TRAP_HTTP_EXCEPTIONS':                 False,
    37.    'EXPLAIN_TEMPLATE_LOADING':             False,
    38.    'PREFERRED_URL_SCHEME':                 'http',
    39.    'JSON_AS_ASCII':                        True,
    40.    'JSON_SORT_KEYS':                       True,
    41.    'JSONIFY_PRETTYPRINT_REGULAR':          True,
    42.    'JSONIFY_MIMETYPE':                     'application/json',
    43.    'TEMPLATES_AUTO_RELOAD':                None,
    44. }
    复制代码
  • xx
    1. # 如何通过代码找到指定的类
    2. import importlib
    3. path = "settings.Foo"
    4. p, c = path.rsplit(".", maxsplit=1)
    5. m = importlib.import_module(p)
    6. cls = getattr(m, c)
    7. # 找出类中的所有静态变量
    8. for key in dir(cls):
    9.    if key.isupper():
    10.        print(key,getattr(cls, key))
    复制代码
  • Flask中有一个配置文件
    1. from flask import Flask, render_template, request, redirect, session
    2. app = Flask(__name__)
    3. print(app.config)                           # 这是flask提供的配置文件
    4. # app.config.["DEBUG"] = True                   # 我们可以通过这样来修改配置,但是flask也提供了另一种方式
    5. app.config.from_object("sttings.DevelopmentConfig")     # 这种方式就是自己创建一个文件和类,你在类中写的所有配置都会被应用。
    6. if __name__ == "__main__":
    7.    app.run()
    复制代码
     
2.路由和视图


  • flask的路由系统
    1. from flask import Flask, render_template, request, redirect, session, url_for
    2. app = Flask(__name__)
    3. # int 是值的类型,nid是名称:/login/1234
    4. """
    5. # 这是常用的类型,不加类型默认是字符串类型
    6. DEFAULT_CONVERTERS = {
    7.    'default':          UnicodeConverter,
    8.    'string':           UnicodeConverter,
    9.    'any':              AnyConverter,
    10.    'path':             PathConverter,
    11.    'int':              IntegerConverter,
    12.    'float':            FloatConverter,
    13.    'uuid':             UUIDConverter,
    14. }
    15. """
    16. @app.route("/login/<int:nid>", endpoint="n1")               # endpoint就是设置名字,不写默认是函数名
    17. def login():
    18.    url_for("n1", nid=122)                      # 反向查找url,nid是携带的值
    19. if __name__ == "__main__":
    20.    app.run()
    复制代码
  • 路由的两种方式
    1. @app.route("/xxx")
    2.     def index():
    3.        return "index"
    4. # 其实到内部调用时就是使用了app.add_url_rule来实现的,为了方便所以在外面套了个装饰器。
    5. def index():
    6.    return "index"
    7. app.add_url_rule("/xxx", None, index)
    8. # 注意事项:
    9.     # - 不用让endpoint重名
    10.     # - 如果重名函数也一定要相同。
    复制代码
  • app.route和app.add_url_rule参数
    1. @app.route和app.add_url_rule参数:
    2.     rule,                       URL规则
    3.     view_func,                  视图函数名称
    4.    defaults=None,              默认值,当URL中无参数,函数需要参数时,使用defaults={'k':'v'}为函数提供参数
    5.     endpoint=None,              名称,用于反向生成URL,即: url_for('名称')
    6.     methods=None,               允许的请求方式,如:["GET","POST"]
    7.            
    8.     strict_slashes=None,        对URL最后的 / 符号是否严格要求,
    9.         如:
    10.             @app.route('/index',strict_slashes=False),
    11.                 访问 http://www.xx.com/index/ 或 http://www.xx.com/index均可
    12.            @app.route('/index',strict_slashes=True)
    13.                仅访问 http://www.xx.com/index
    14.     redirect_to=None,           重定向到指定地址
    15.         如:
    16.             @app.route('/index/<int:nid>', redirect_to='/home/<nid>')
    17.            或
    18.            def func(adapter, nid):
    19.                 return "/home/888"
    20.            @app.route('/index/<int:nid>', redirect_to=func)
    21.     subdomain=None,             子域名访问
    22.         from flask import Flask, views, url_for
    23.        app = Flask(import_name=__name__)
    24.        app.config['SERVER_NAME'] = 'wupeiqi.com:5000'
    25.        @app.route("/", subdomain="admin")
    26.        def static_index():
    27.            """Flask supports static subdomains
    28.            This is available at static.your-domain.tld"""
    29.            return "static.your-domain.tld"
    30.        @app.route("/dynamic", subdomain="<username>")
    31.        def username_index(username):
    32.            """Dynamic subdomains are also supported
    33.            Try going to user1.your-domain.tld/dynamic"""
    34.            return username + ".your-domain.tld"
    35.        if __name__ == '__main__':
    36.        app.run()
    复制代码
  • CBV
    1. import functools
    2. from flask import Flask,views
    3. app = Flask(__name__)
    4. def wrapper(func):
    5.    @functools.wraps(func)
    6.    def inner(*args,**kwargs):
    7.        return func(*args,**kwargs)
    8.    return inner
    9. class UserView(views.MethodView):
    10.    methods = ['GET']
    11.    decorators = [wrapper,]
    12.    def get(self,*args,**kwargs):
    13.        return 'GET'
    14.    def post(self,*args,**kwargs):
    15.        return 'POST'
    16.    app.add_url_rule('/user',None,UserView.as_view('uuuu'))
    17.    if __name__ == '__main__':
    18.        app.run()
    复制代码
  • 自定义正则
    1. from flask import Flask,url_for
    2. app = Flask(__name__)
    3. # 步骤一:定制类
    4. from werkzeug.routing import BaseConverter
    5. class RegexConverter(BaseConverter):
    6.    """
    7.     自定义URL匹配正则表达式
    8.     """
    9.    def __init__(self, map, regex):
    10.        super(RegexConverter, self).__init__(map)
    11.        self.regex = regex
    12.    def to_python(self, value):
    13.        """
    14.         路由匹配时,匹配成功后传递给视图函数中参数的值
    15.         :param value:
    16.         :return:
    17.         """
    18.        return int(value)
    19.    def to_url(self, value):
    20.        """
    21.         使用url_for反向生成URL时,传递的参数经过该方法处理,返回的值用于生成URL中的参数
    22.         :param value:
    23.         :return:
    24.         """
    25.        val = super(RegexConverter, self).to_url(value)
    26.        return val
    27. # 步骤二:添加到转换器
    28. app.url_map.converters['reg'] = RegexConverter
    29. """
    30. 1. 用户发送请求
    31. 2. flask内部进行正则匹配
    32. 3. 调用to_python(正则匹配的结果)方法
    33. 4. to_python方法的返回值会交给视图函数的参数
    34. """
    35. # 步骤三:使用自定义正则
    36. @app.route('/index/<reg("\d+"):nid>')
    37. def index(nid):
    38.     print(nid,type(nid))
    39.    print(url_for('index',nid=987))
    40.    return "index"
    41. if __name__ == '__main__':
    42.    app.run()
    复制代码
     
3.请求和响应


  • 请求
    1. # request.method
    2. # request.args
    3. # request.form
    4. # request.values
    5. # request.cookies
    6. # request.headers
    7. # request.path
    8. # request.full_path
    9. # request.script_root
    10. # request.url
    11. # request.base_url
    12. # request.url_root
    13. # request.host_url
    14. # request.host
    15. # request.files
    16. # obj = request.files['the_file_name']
    17. # obj.save('/var/www/uploads/' + secure_filename(f.filename))
    复制代码
  • 响应
    1. from flask import Flask, render_template, request, redirect, session, url_for, jsonify
    2. # 响应相关信息
    3. # return "字符串"
    4. # return render_template('html模板路径',**{})
    5. # return redirect('/index.html')
    6. # return jsonify({"k1":"v1"})
    7. # response = make_response(render_template('index.html'))
    8. # response是flask.wrappers.Response类型
    9. # response.delete_cookie('key')
    10. # response.set_cookie('key', 'value')
    11. # response.headers['X-Something'] = 'A value'
    12. # return response
    复制代码
  • 实现登录后才可查看的效果
    1. # 版本一:在每个函数中都加入判断
    2. @app.route('/index')
    3. def index():
    4.    if not session.get('user'):
    5.        return redirect(url_for('login'))
    6.    return render_template('index.html',stu_dic=STUDENT_DICT)
    7. # 版本二:使用装饰器,但是需要用到functools的wraps,应为你写的装饰器肯定要先执行,不然无法被应用,但是你写的装饰器到最后被调用的函数是inner,但是我们需要自己的属性才可以实现。wraps的功能就是将inner赋予调用函数的所有属性和特征。
    8. import functools
    9. def auth(func):
    10.    @functools.wraps(func)
    11.    def inner(*args,**kwargs):
    12.        if not session.get('user'):
    13.            return redirect(url_for('login'))
    14.        ret = func(*args,**kwargs)
    15.        return ret
    16.    return inner
    17. @app.route('/index')
    18. @auth
    19. def index():
    20.    return render_template('index.html',stu_dic=STUDENT_DICT)
    21. # 应用场景:比较少的函数中需要额外添加功能。
    22. # 版本三:before_request,在执行函数时会先执行before_request所定义的函数,返回None就会向后执行,如果返回数据就会直接将其返回到前端了。
    23. @app.before_request
    24. def xxxxxx():
    25.    if request.path == '/login':
    26.        return None
    27.    if session.get('user'):
    28.        return None
    29.    return redirect('/login')
    复制代码
     
4.模板渲染


  • 基本数据类型:可以执行python语法,如:dict.get() list['xx']
  • 传入函数:django,自动执行\flask,不自动执行
  • 全局定义函数
    1. @app.template_global()
    2. def sb(a1, a2):
    3.    # {{sb(1,9)}}
    4.    return a1 + a2
    5. @app.template_filter()
    6. def db(a1, a2, a3):
    7.    # {{ 1|db(2,3) }}
    8.    return a1 + a2 + a3
    复制代码
  • 模板继承
    1. layout.html
    2. <!DOCTYPE html>
    3. <html lang="zh-CN">
    4. <head>
    5. <meta charset="UTF-8">
    6. <title>Title</title>
    7. <meta name="viewport" content="width=device-width, initial-scale=1">
    8. </head>
    9. <body>
    10. <h1>模板</h1>
    11. {% block content %}{% endblock %}
    12. </body>
    13. </html>
    14. tpl.html
    15. {% extends "layout.html"%}
    16. {% block content %}
    17. {{users.0}}
    18. {% endblock %}  
    复制代码
  • include
    1. {% include "form.html" %}
    2.                        
    3.                        
    4. form.html
    5.     <form>
    6.         asdfasdf
    7.         asdfasdf
    8.         asdf
    9.         asdf
    10.     </form>
    复制代码
    1. {% macro ccccc(name, type='text', value='') %}
    2.         <h1>宏</h1>
    3.         <input type="{{ type }}" name="{{ name }}" value="{{ value }}">
    4.         <input type="submit" value="提交">
    5. {% endmacro %}
    6. {{ ccccc('n1') }}
    7. {{ ccccc('n2') }}
    复制代码
  • flask默认时设置了xss的后端将html代码返回到前端时需要使用安全MarkUp("asdf"),前端也可以直接将其应用{{u|safe}}。
5.session


  • 当请求刚到来:flask读取cookie中session对应的值:eyJrMiI6NDU2LCJ1c2VyIjoib2xkYm95,将该值解密并反序列化成字典,放入内存以便视图函数使用。
    1. @app.route('/ses')
    2. def ses():
    3.     session['k1'] = 123
    4.     session['k2'] = 456
    5.     del session['k1']
    6.     return "Session"
    7. session['xxx'] = 123
    8. session['xxx']
    复制代码
  • 当请求结束时,flask会读取内存中字典的值,进行序列化+加密,写入到用户cookie中。
  • threading.local【和flask无任何关系】

    • 为每个线程创建一个独立的空间,使得线程对自己的空间中的数据进行操作(数据隔离)。
      1. import threading
      2. from threading import local
      3. import time
      4. obj = local()
      5. def task(i):
      6.     obj.xxxxx = i
      7.     time.sleep(2)
      8.     print(obj.xxxxx,i)
      9.     for i in range(10):
      10.         t = threading.Thread(target=task,args=(i,))
      11.         t.start()
      复制代码
    • 根据字典自定义一个类似于threading.local功能?
      1. import time
      2. import threading
      3. DIC = {}
      4. def task(i):
      5.     ident = threading.get_ident()
      6.     if ident in DIC:
      7.         DIC[ident]['xxxxx'] = i
      8.     else:
      9.         DIC[ident] = {'xxxxx':i }
      10.         time.sleep(2)
      11.         print(DIC[ident]['xxxxx'],i)
      12. for i in range(10):
      13.         t = threading.Thread(target=task,args=(i,))
      14.         t.start()
      复制代码
    • 通过getattr/setattr 构造出来 threading.local的加强版(协程)
      1. import time
      2. import threading
      3. try:
      4.     import greenlet
      5.     get_ident =  greenlet.getcurrent                                                # 获取协程唯一标识
      6. except Exception as e:
      7.     get_ident = threading.get_ident                                                        # 获取线程唯一标识
      8.     class Local(object):
      9.         DIC = {}
      10.         def __getattr__(self, item):                        # obj.xxx时触发,item等于xxx
      11.             ident = get_ident()
      12.             if ident in self.DIC:
      13.                 return self.DIC[ident].get(item)
      14.             return None
      15.         def __setattr__(self, key, value):                # obj.xxx = "123"时触发,key等于xxx,value等于123
      16.             ident = get_ident()
      17.             if ident in self.DIC:
      18.                 self.DIC[ident][key] = value
      19.                 else:
      20.                     self.DIC[ident] = {key:value}
      21.                                                
      22.                                 obj = Local()
      23.                                 def task(i):
      24.                                         obj.xxxxx = i
      25.                                         time.sleep(2)
      26.                                         print(obj.xxxxx,i)
      27.                                 for i in range(10):
      28.                                         t = threading.Thread(target=task,args=(i,))
      29.                                         t.start()
      复制代码

5.1 flask-session


  • 安装:pip3 install flask-session
  • 使用
    1. import redis
    2. from flask import Flask,request,session
    3. from flask.sessions import SecureCookieSessionInterface
    4. from flask_session import Session
    5. app = Flask(__name__)
    6. # app.session_interface = SecureCookieSessionInterface()
    7. # app.session_interface = RedisSessionInterface()
    8. app.config['SESSION_TYPE'] = 'redis'
    9. app.config['SESSION_REDIS'] = redis.Redis(host='140.143.227.206',port=6379,password='1234')
    10. Session(app)
    11. @app.route('/login')
    12. def login():
    13.     session['user'] = 'alex'
    14.     return 'asdfasfd'
    15. @app.route('/home')
    16. def index():
    17.     print(session.get('user'))
    18.     return '...'
    19. if __name__ == '__main__':
    20.     app.run()
    复制代码
     
6.闪现和中间件


  • 闪现,在session中存储一个数据,读取时通过pop将数据移除。
    1. from flask import Flask,flash,get_flashed_messages
    2. @app.route('/page1')
    3. def page1():
    4.         flash('临时数据存储','error')
    5.         flash('sdfsdf234234','error')
    6.         flash('adasdfasdf','info')
    7.         return "Session"
    8. @app.route('/page2')
    9. def page2():
    10.         print(get_flashed_messages(category_filter=['error']))
    11.         return "Session"
    复制代码
  • 中间件
    1. call方法什么时候出发?
    2.         用户发起请求时,才执行。
    3. 任务:在执行call方法之前,做一个操作,call方法执行之后做一个操作。
    4. class Middleware(object):
    5.     def __init__(self,old):
    6.         self.old = old
    7.     def __call__(self, *args, **kwargs):
    8.         ret = self.old(*args, **kwargs)
    9.             return ret
    10. if __name__ == '__main__':
    11.         app.wsgi_app = Middleware(app.wsgi_app)
    12.         app.run()
    复制代码
7.蓝图


  • 完整目录图
    1. ProjectName
    2. -ProjectName
    3. --static
    4. --templates
    5. --views
    6. ---account.py
    7.         from flask import Blueprint,render_template
    8.         # 创建蓝图对象
    9.     ac = Blueprint('ac',__name__)
    10.         # 只有调用该文件中的页面时才会触发
    11.     @ac.before_request
    12.     def x1():
    13.         print('app.before_request')
    14.     @ac.route('/login')
    15.     def login():
    16.         return render_template('login.html')
    17.     @ac.route('/logout')
    18.     def logout():
    19.         return 'Logout'
    20. --__init__.py
    21.         from flask import Flask
    22.     from .views.account import ac
    23.     from .views.user import uc
    24.     def create_app():
    25.         app = Flask(__name__)
    26.         # @app.before_request
    27.         # def x1():
    28.         #     print('app.before_request')
    29.                 # 注册蓝图
    30.         app.register_blueprint(ac)
    31.         # 注册蓝图和编写url前缀
    32.         app.register_blueprint(uc,url_prefix='/api')
    33.         return app
    34. -manage.py
    35.         from crm import create_app
    36.     app = create_app()
    37.     if __name__ == '__main__':
    38.         app.run()
    复制代码
8.特殊装饰器


  • before_request and after_request
    1. from flask import Flask
    2. app = Flask(__name__)
    3. @app.before_request
    4. def x1():
    5.     print('before:x1')
    6.     return '滚'
    7. @app.before_request
    8. def xx1():
    9.     print('before:xx1')
    10. @app.after_request
    11. def x2(response):
    12.     print('after:x2')
    13.     return response
    14. @app.after_request
    15. def xx2(response):
    16.     print('after:xx2')
    17.     return response
    18. @app.route('/index')
    19. def index():
    20.     print('index')
    21.     return "Index"
    22. @app.route('/order')
    23. def order():
    24.     print('order')
    25.     return "order"
    26. if __name__ == '__main__':
    27.     app.run()
    28. # 执行结果是,before的返回会将其返回到页面,但是after还是会执行,并不像django返回后后面的都不会执行
    29. before:x1
    30. before:xx1
    31. request
    32. after:xx2
    33. after:x2
    复制代码
  • before_first_request,只有第一次时才会被触发
    1. from flask import Flask
    2. app = Flask(__name__)
    3. @app.before_first_request
    4. def x1():
    5.     print('123123')
    6. @app.route('/index')
    7. def index():
    8.     print('index')
    9.     return "Index"
    10. @app.route('/order')
    11. def order():
    12.     print('order')
    13.     return "order"
    14. if __name__ == '__main__':
    15.     app.run()
    复制代码
  • 页面渲染的全局装饰器
    1. @app.template_global()
    2. def sb(a1, a2):
    3.     # {{sb(1,9)}}
    4.     return a1 + a2
    5. @app.template_filter()
    6. def db(a1, a2, a3):
    7.     # {{ 1|db(2,3) }}
    8.     return a1 + a2 + a3
    复制代码
  • errorhandler,当报指定的错误时会触发。
    1. @app.errorhandler(404)
    2. def not_found(arg):
    3.     print(arg)
    4.     return "没找到"
    复制代码
9.上下文管理


  • 偏函数
    1. import functools
    2. def index(a1,a2):
    3.     return a1 + a2
    4. # 原来的调用方式
    5. # ret = index(1,23)
    6. # print(ret)
    7. # 偏函数,帮助开发者自动传递参数
    8. new_func = functools.partial(index,666)
    9. ret = new_func(1)
    10. print(ret)
    复制代码
  • 父类方法的执行
    1. """
    2. class Base(object):
    3.     def func(self):
    4.         print('Base.func')
    5. class Foo(Base):
    6.     def func(self):
    7.         # 方式一:根据mro的顺序执行方法
    8.         # super(Foo,self).func()
    9.         # 方式二:主动执行Base类的方法
    10.         # Base.func(self)
    11.         print('Foo.func')
    12. obj = Foo()
    13. obj.func()
    14. """
    15. ####################################
    16. class Base(object):
    17.     def func(self):
    18.         super(Base, self).func()
    19.         print('Base.func')
    20. class Bar(object):
    21.     def func(self):
    22.         print('Bar.func')
    23. class Foo(Base,Bar):
    24.     pass
    25. # 示例一
    26. # obj = Foo()
    27. # obj.func()
    28. # print(Foo.__mro__)
    29. # 示例二
    30. # obj = Base()
    31. # obj.func()
    复制代码
    1. # by luffycity.com
    2. class Stack(object):
    3.     def __init__(self):
    4.         self.data = []
    5.     def push(self,val):
    6.         self.data.append(val)
    7.     def pop(self):
    8.         return self.data.pop()
    9.     def top(self):
    10.         return self.data[-1]
    11. _stack = Stack()
    12. _stack.push('佳俊')
    13. _stack.push('咸鱼')
    14. print(_stack.pop())
    15. print(_stack.pop())
    复制代码
  • Local
    1. """
    2. {
    3.     1232:{k:v}
    4. }
    5. """
    6. try:
    7.     from greenlet import getcurrent as get_ident
    8. except:
    9.     from threading import get_ident
    10. """
    11. class Local(object):
    12.    
    13.     def __init__(self):
    14.         object.__setattr__(self,'storage',{})
    15.    
    16.     def __setattr__(self, key, value):
    17.         ident = get_ident()
    18.         if ident not in self.storage:
    19.             self.storage[ident] = {key:value}
    20.         else:
    21.             self.storage[ident][key] = value
    22.             
    23.     def __getattr__(self, item):
    24.         ident = get_ident()
    25.         if ident in self.storage:
    26.             return self.storage[ident].get(item)
    27. """
    28. class Local(object):
    29.     __slots__ = ('__storage__', '__ident_func__')
    30.     def __init__(self):
    31.         # __storage__ = {1231:{'stack':[]}}
    32.         object.__setattr__(self, '__storage__', {})
    33.         object.__setattr__(self, '__ident_func__', get_ident)
    34.     def __getattr__(self, name):
    35.         try:
    36.             return self.__storage__[self.__ident_func__()][name]
    37.         except KeyError:
    38.             raise AttributeError(name)
    39.     def __setattr__(self, name, value):
    40.         ident = self.__ident_func__()
    41.         storage = self.__storage__
    42.         try:
    43.             storage[ident][name] = value
    44.         except KeyError:
    45.             storage[ident] = {name: value}
    46.     def __delattr__(self, name):
    47.         try:
    48.             del self.__storage__[self.__ident_func__()][name]
    49.         except KeyError:
    50.             raise AttributeError(name)
    51. obj = Local()
    52. obj.stack = []
    53. obj.stack.append('佳俊')
    54. obj.stack.append('咸鱼')
    55. print(obj.stack)
    56. print(obj.stack.pop())
    57. print(obj.stack)
    复制代码
  • LocalStack
    1. import functools
    2. try:
    3.     from greenlet import getcurrent as get_ident
    4. except:
    5.     from threading import get_ident
    6. class Local(object):
    7.     __slots__ = ('__storage__', '__ident_func__')
    8.     def __init__(self):
    9.         # __storage__ = {1231:{'stack':[]}}
    10.         object.__setattr__(self, '__storage__', {})
    11.         object.__setattr__(self, '__ident_func__', get_ident)
    12.     def __getattr__(self, name):
    13.         try:
    14.             return self.__storage__[self.__ident_func__()][name]
    15.         except KeyError:
    16.             raise AttributeError(name)
    17.     def __setattr__(self, name, value):
    18.         # name=stack
    19.         # value=[]
    20.         ident = self.__ident_func__()
    21.         storage = self.__storage__
    22.         try:
    23.             storage[ident][name] = value
    24.         except KeyError:
    25.             storage[ident] = {name: value}
    26.     def __delattr__(self, name):
    27.         try:
    28.             del self.__storage__[self.__ident_func__()][name]
    29.         except KeyError:
    30.             raise AttributeError(name)
    31. """
    32. __storage__ = {
    33.     12312: {stack:[ctx(session/request) ,]}
    34. }
    35. """
    36. # obj = Local()
    37. # obj.stack = []
    38. # obj.stack.append('佳俊')
    39. # obj.stack.append('咸鱼')
    40. # print(obj.stack)
    41. # print(obj.stack.pop())
    42. # print(obj.stack)
    43. class LocalStack(object):
    44.     def __init__(self):
    45.         self._local = Local()
    46.     def push(self,value):
    47.         rv = getattr(self._local, 'stack', None) # self._local.stack =>local.getattr
    48.         if rv is None:
    49.             self._local.stack = rv = [] #  self._local.stack =>local.setattr
    50.         rv.append(value) # self._local.stack.append(666)
    51.         return rv
    52.     def pop(self):
    53.         """Removes the topmost item from the stack, will return the
    54.         old value or `None` if the stack was already empty.
    55.         """
    56.         stack = getattr(self._local, 'stack', None)
    57.         if stack is None:
    58.             return None
    59.         elif len(stack) == 1:
    60.             return stack[-1]
    61.         else:
    62.             return stack.pop()
    63.     def top(self):
    64.         try:
    65.             return self._local.stack[-1]
    66.         except (AttributeError, IndexError):
    67.             return None
    68. class RequestContext(object):
    69.     def __init__(self):
    70.         self.request = "xx"
    71.         self.session = 'oo'
    72. _request_ctx_stack = LocalStack()
    73. _request_ctx_stack.push(RequestContext())
    74. def _lookup_req_object(arg):
    75.     ctx = _request_ctx_stack.top()
    76.     return getattr(ctx,arg) # ctx.request / ctx.session
    77. request = functools.partial(_lookup_req_object,'request')
    78. session = functools.partial(_lookup_req_object,'session')
    79. print(request())
    80. print(session())
    复制代码
  • slots
    1. class Foo(object):
    2.     __slots__ = ('name',)
    3.     def __init__(self):
    4.         self.name = 'alex'
    5.         # self.age = 18
    6. obj = Foo()
    7. print(obj.name)
    8. # print(obj.age)
    复制代码
  • 请求上下文管理(ctx):request,session
    1. - 请求到来之后wsgi会触发__call__方法,由__call__方法再次调用wsgi_app方法
    2. - 在wsgi_app方法中:
    3. - 首先将 请求相关+空session 封装到一个RequestContext对象中,即:ctx。
    4. - 将ctx交给LocalStack对象,再由LocalStack将ctx添加到Local中,Local结构:
    5.     __storage__ = {
    6.         1231:{stack:[ctx,] }
    7.     }
    8. - 根据请求中的cookie中提取名称为sessionid对应的值,对cookie进行加密+反序列化,再次赋值给ctx中的session
    9. -> 视图函数
    10. - 把session中的数据再次写入到cookie中。
    11. - 将ctx删除
    12. - 结果返回给用户浏览器
    13. - 断开socket连接
    复制代码
  • LocalProxy
    1. from flask import Flask,request,session
    2. app = Flask(__name__)
    3. @app.route('/index')
    4. def index():
    5.     # 1. request是LocalProxy对象
    6.     # 2. 对象中有method、执行__getattr__
    7.     print(request.method)
    8.     # request['method']
    9.     # request + 1
    10.     # 1. session是LocalProxy对象
    11.     # 2. LocalProxy对象的__setitem__
    12.     session['x'] = 123
    13.     return "Index"
    14. if __name__ == '__main__':
    15.     app.run()
    16.     # app.__call__
    17.     # app.wsgi_app
    18. """
    19. 第一阶段:请求到来
    20.     将request和Session相关数据封装到ctx=RequestContext对象中。
    21.     再通过LocalStack将ctx添加到Local中。
    22.     __storage__ = {
    23.         1231:{'stack':[ctx(request,session)]}
    24.     }
    25. 第二阶段:视图函数中获取request或session
    26.     方式一:直接找LocalStack获取
    27.             from flask.globals import _request_ctx_stack
    28.             print(_request_ctx_stack.top.request.method)
    29.             
    30.     方式二:通过代理LocalProxy(小东北)获取
    31.             from flask import Flask,request
    32.             print(request.method)
    33.             
    34. """
    复制代码
  • g只存在在一次请求中当你return时g就会被清除。
10.文件的上传和解压


  • view.py
    1. from flask import Blueprint, render_template, Flask, request, redirect,session
    2. import os
    3. import uuid
    4. from ..utils import helper
    5. ind = Blueprint('ind', __name__)
    6. ind.config["MAX_CONTENT_LENGTH"] =  = 1024 * 1024 * 7
    7. @ind.before_request
    8. def process_request():
    9.     if not session.get("user_info"):
    10.         return redirect("/login")
    11.     return None
    12. @ind.route('/home')
    13. def home():
    14.     return render_template('home.html')
    15. @ind.route('/user_list')
    16. def user_list():
    17.     # import pymysql
    18.     # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
    19.     # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    20.     # cursor.execute("SELECT id,user,nickname FROM userinfo")
    21.     # data_list = cursor.fetchall()
    22.     # cursor.close()
    23.     # conn.close()
    24.     data_list = helper.fetch_all("SELECT id,user,nickname FROM userinfo",[])
    25.     return render_template('user_list.html',data_list=data_list)
    26. @ind.route('/detail/<int:nid>')
    27. def detail(nid):
    28.     # import pymysql
    29.     # conn = Config.POOL.connection()
    30.     # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    31.     # cursor.execute("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
    32.     # record_list = cursor.fetchall()
    33.     # cursor.close()
    34.     # conn.close()
    35.     record_list = helper.fetch_all("SELECT id,line,ctime FROM record where user_id=%s",(nid,))
    36.     return render_template('detail.html',record_list=record_list)
    37. @ind.route('/upload',methods=['GET','POST'])
    38. def upload():
    39.     if request.method == "GET":
    40.         return render_template('upload.html')
    41.     from werkzeug.datastructures import FileStorage
    42.     file_obj = request.files.get('code')
    43.     # 1. 检查上传文件后缀名
    44.     name_ext = file_obj.filename.rsplit('.',maxsplit=1)
    45.     if len(name_ext) != 2:
    46.         return "请上传zip压缩文件"
    47.     if name_ext[1] != 'zip':
    48.         return "请上传zip压缩文件"
    49.     """
    50.     # 2. 接收用户上传文件,并写入到服务器本地.
    51.     file_path = os.path.join("files",file_obj.filename)
    52.     # 从file_obj.stream中读取内容,写入到文件
    53.     file_obj.save(file_path)
    54.     # 3. 解压zip文件
    55.     import shutil
    56.     # 通过open打开压缩文件,读取内容再进行解压。
    57.     shutil._unpack_zipfile(file_path,'xsadfasdfasdf')
    58.     """
    59.     # 2+3, 接收用户上传文件,并解压到指定目录
    60.     import shutil
    61.     target_path = os.path.join('files',str(uuid.uuid4()))
    62.     shutil._unpack_zipfile(file_obj.stream,target_path)
    63.     # 4. 遍历某目录下的所有文件
    64.     # for item in os.listdir(target_path):
    65.     #     print(item)
    66.     total_num = 0
    67.     for base_path,folder_list,file_list in os.walk(target_path):
    68.         for file_name in file_list:
    69.             file_path = os.path.join(base_path,file_name)
    70.             file_ext = file_path.rsplit('.',maxsplit=1)
    71.             if len(file_ext) != 2:
    72.                 continue
    73.             if file_ext[1] != 'py':
    74.                 continue
    75.             file_num = 0
    76.             with open(file_path,'rb') as f:
    77.                 for line in f:
    78.                     line = line.strip()
    79.                     if not line:
    80.                         continue
    81.                     if line.startswith(b'#'):
    82.                         continue
    83.                     file_num += 1
    84.             total_num += file_num
    85.    
    86.     # 获取当前时间
    87.     import datetime
    88.     ctime = datetime.date.today()
    89.     print(total_num,ctime,session['user_info']['id'])
    90.     # import pymysql
    91.     # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
    92.     # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    93.     # cursor.execute("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
    94.     # data = cursor.fetchone()
    95.     # cursor.close()
    96.     # conn.close()
    97.     data = helper.fetch_one("select id from record where ctime=%s and user_id=%s",(ctime,session['user_info']['id']))
    98.     if data:
    99.         return "今天已经上传"
    100.     # import pymysql
    101.     # conn = pymysql.Connect(host='127.0.0.1', user='root', password='123456', database='s9day118', charset='utf8')
    102.     # cursor = conn.cursor(cursor=pymysql.cursors.DictCursor)
    103.     # cursor.execute("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
    104.     # conn.commit()
    105.     # cursor.close()
    106.     # conn.close()
    107.     helper.insert("insert into record(line,ctime,user_id)values(%s,%s,%s)",(total_num,ctime,session['user_info']['id']))
    108.     return "上传成功"
    复制代码
  • html.py
    1. {% extends "layout.html"%}
    2. {% block content %}
    3.     <h1>代码上传</h1>
    4.     <form method="post" enctype="multipart/form-data">
    5.         <input type="file" name="code">
    6.         <input type="submit" value="上传">
    7.     </form>
    8. {% endblock %}
    复制代码
11.Database Join 池


  • 安装pip3 install DBUtils
  • 创建连接池可以共享出去,谁要用就来获取一个连接,这样可以管理连接的数量,提高链接的利用率。
    1. from DBUtils.PooledDB import PooledDB, SharedDBConnection
    2. import pymysql
    3. POOL = PooledDB(
    4.         creator=pymysql,  # 使用链接数据库的模块
    5.         maxconnections=6,  # 连接池允许的最大连接数,0和None表示不限制连接数
    6.         mincached=2,  # 初始化时,链接池中至少创建的空闲的链接,0表示不创建
    7.         maxcached=5,  # 链接池中最多闲置的链接,0和None不限制
    8.         maxshared=3,
    9.         # 链接池中最多共享的链接数量,0和None表示全部共享。PS: 无用,因为pymysql和MySQLdb等模块的 threadsafety都为1,所有值无论设置为多少,_maxcached永远为0,所以永远是所有链接都共享。
    10.         blocking=True,  # 连接池中如果没有可用连接后,是否阻塞等待。True,等待;False,不等待然后报错
    11.         maxusage=None,  # 一个链接最多被重复使用的次数,None表示无限制
    12.         setsession=[],  # 开始会话前执行的命令列表。如:["set datestyle to ...", "set time zone ..."]
    13.         ping=0,
    14.         # ping MySQL服务端,检查是否服务可用。# 如:0 = None = never, 1 = default = whenever it is requested, 2 = when a cursor is created, 4 = when a query is executed, 7 = always
    15.         host='127.0.0.1',
    16.         port=3306,
    17.         user='root',
    18.         password='123456',
    19.         database='s9day118',
    20.         charset='utf8'
    21.     )
    22. # 获取连接
    23. conn = POOL.connection()
    24. cursor = conn.cursor(pymysql.cursors.DictCursor)
    25. cursor.execute("select * from tb1")
    26. result = cursor.fetchall()
    27. cursor.close()
    28. # 归还连接
    29. conn.close()
    复制代码
12.wtforms


  • WTForms是一个支持多个web框架的form组件,主要用于对用户请求数据进行验证。
  • 安装pip3 install wtforms
12.1 用户登录注册示例


  • 用户登录

    • 当用户登录时候,需要对用户提交的用户名和密码进行多种格式校验。如:
    • 用户不能为空;用户长度必须大于6;
    • 密码不能为空;密码长度必须大于12;密码必须包含 字母、数字、特殊字符等(自定义正则);
      1. #!/usr/bin/env python
      2. # -*- coding:utf-8 -*-
      3. from flask import Flask, render_template, request, redirect
      4. from wtforms import Form
      5. from wtforms.fields import core
      6. from wtforms.fields import html5
      7. from wtforms.fields import simple
      8. from wtforms import validators
      9. from wtforms import widgets
      10. app = Flask(__name__, template_folder='templates')
      11. app.debug = True
      12. class LoginForm(Form):
      13.     name = simple.StringField(
      14.         label='用户名',
      15.         validators=[
      16.             validators.DataRequired(message='用户名不能为空.'),
      17.             validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
      18.         ],
      19.         widget=widgets.TextInput(),
      20.         render_kw={'class': 'form-control'}
      21.     )
      22.     pwd = simple.PasswordField(
      23.         label='密码',
      24.         validators=[
      25.             validators.DataRequired(message='密码不能为空.'),
      26.             validators.Length(min=8, message='用户名长度必须大于%(min)d'),
      27.             validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
      28.                               message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
      29.         ],
      30.         widget=widgets.PasswordInput(),
      31.         render_kw={'class': 'form-control'}
      32.     )
      33. @app.route('/login', methods=['GET', 'POST'])
      34. def login():
      35.     if request.method == 'GET':
      36.         form = LoginForm()
      37.         return render_template('login.html', form=form)
      38.     else:
      39.         form = LoginForm(formdata=request.form)
      40.         if form.validate():
      41.             print('用户提交数据通过格式验证,提交的值为:', form.data)
      42.         else:
      43.             print(form.errors)
      44.         return render_template('login.html', form=form)
      45. if __name__ == '__main__':
      46.     app.run()
      47. #######################################################
      48. <!DOCTYPE html>
      49. <html lang="en">
      50. <head>
      51.     <meta charset="UTF-8">
      52.     <title>Title</title>
      53. </head>
      54. <body>
      55. <h1>登录</h1>
      56. <form method="post">
      57.    
      58.     <p>{{form.name.label}} {{form.name}} {{form.name.errors[0] }}</p>
      59.    
      60.     <p>{{form.pwd.label}} {{form.pwd}} {{form.pwd.errors[0] }}</p>
      61.     <input type="submit" value="提交">
      62. </form>
      63. </body>
      64. </html>
      复制代码

  • 用户注册

    • 注册页面需要让用户输入:用户名、密码、密码重复、性别、爱好等。
      1. from flask import Flask, render_template, request, redirect
      2. from wtforms import Form
      3. from wtforms.fields import core
      4. from wtforms.fields import html5
      5. from wtforms.fields import simple
      6. from wtforms import validators
      7. from wtforms import widgets
      8. app = Flask(__name__, template_folder='templates')
      9. app.debug = True
      10. class RegisterForm(Form):
      11.     name = simple.StringField(
      12.         label='用户名',
      13.         validators=[
      14.             validators.DataRequired()
      15.         ],
      16.         widget=widgets.TextInput(),
      17.         render_kw={'class': 'form-control'},
      18.         default='alex'
      19.     )
      20.     pwd = simple.PasswordField(
      21.         label='密码',
      22.         validators=[
      23.             validators.DataRequired(message='密码不能为空.')
      24.         ],
      25.         widget=widgets.PasswordInput(),
      26.         render_kw={'class': 'form-control'}
      27.     )
      28.     pwd_confirm = simple.PasswordField(
      29.         label='重复密码',
      30.         validators=[
      31.             validators.DataRequired(message='重复密码不能为空.'),
      32.             validators.EqualTo('pwd', message="两次密码输入不一致")
      33.         ],
      34.         widget=widgets.PasswordInput(),
      35.         render_kw={'class': 'form-control'}
      36.     )
      37.     email = html5.EmailField(
      38.         label='邮箱',
      39.         validators=[
      40.             validators.DataRequired(message='邮箱不能为空.'),
      41.             validators.Email(message='邮箱格式错误')
      42.         ],
      43.         widget=widgets.TextInput(input_type='email'),
      44.         render_kw={'class': 'form-control'}
      45.     )
      46.     gender = core.RadioField(
      47.         label='性别',
      48.         choices=(
      49.             (1, '男'),
      50.             (2, '女'),
      51.         ),
      52.         coerce=int
      53.     )
      54.     city = core.SelectField(
      55.         label='城市',
      56.         choices=(
      57.             ('bj', '北京'),
      58.             ('sh', '上海'),
      59.         )
      60.     )
      61.     hobby = core.SelectMultipleField(
      62.         label='爱好',
      63.         choices=(
      64.             (1, '篮球'),
      65.             (2, '足球'),
      66.         ),
      67.         coerce=int
      68.     )
      69.     favor = core.SelectMultipleField(
      70.         label='喜好',
      71.         choices=(
      72.             (1, '篮球'),
      73.             (2, '足球'),
      74.         ),
      75.         widget=widgets.ListWidget(prefix_label=False),
      76.         option_widget=widgets.CheckboxInput(),
      77.         coerce=int,
      78.         default=[1, 2]
      79.     )
      80.     def __init__(self, *args, **kwargs):
      81.         super(RegisterForm, self).__init__(*args, **kwargs)
      82.         self.favor.choices = ((1, '篮球'), (2, '足球'), (3, '羽毛球'))
      83.     def validate_pwd_confirm(self, field):
      84.         """
      85.         自定义pwd_confirm字段规则,例:与pwd字段是否一致
      86.         :param field:
      87.         :return:
      88.         """
      89.         # 最开始初始化时,self.data中已经有所有的值
      90.         if field.data != self.data['pwd']:
      91.             # raise validators.ValidationError("密码不一致") # 继续后续验证
      92.             raise validators.StopValidation("密码不一致")  # 不再继续后续验证
      93. @app.route('/register', methods=['GET', 'POST'])
      94. def register():
      95.     if request.method == 'GET':
      96.         form = RegisterForm(data={'gender': 1})
      97.         return render_template('register.html', form=form)
      98.     else:
      99.         form = RegisterForm(formdata=request.form)
      100.         if form.validate():
      101.             print('用户提交数据通过格式验证,提交的值为:', form.data)
      102.         else:
      103.             print(form.errors)
      104.         return render_template('register.html', form=form)
      105. if __name__ == '__main__':
      106.     app.run()
      107.    
      108.    
      109. ############################################################
      110. <!DOCTYPE html>
      111. <html lang="en">
      112. <head>
      113.     <meta charset="UTF-8">
      114.     <title>Title</title>
      115. </head>
      116. <body>
      117. <h1>用户注册</h1>
      118. <form method="post" novalidate >
      119.     {% for item in form %}
      120.     <p>{{item.label}}: {{item}} {{item.errors[0] }}</p>
      121.     {% endfor %}
      122.     <input type="submit" value="提交">
      123. </form>
      124. </body>
      125. </html>
      复制代码

  • 动态获取数据库数据
    1. from flask import Flask,request,render_template,session,current_app,g,redirect
    2. from wtforms import Form
    3. from wtforms.fields import simple
    4. from wtforms.fields import html5
    5. from wtforms.fields import core
    6. from wtforms import widgets
    7. from wtforms import validators
    8. app = Flask(__name__)
    9. class LoginForm(Form):
    10.     name = simple.StringField(
    11.         validators=[
    12.             validators.DataRequired(message='用户名不能为空.'),
    13.             # validators.Length(min=6, max=18, message='用户名长度必须大于%(min)d且小于%(max)d')
    14.         ],
    15.         widget=widgets.TextInput(),
    16.         render_kw={'placeholder':'请输入用户名'}
    17.     )
    18.     pwd = simple.PasswordField(
    19.         validators=[
    20.             validators.DataRequired(message='密码不能为空.'),
    21.             # validators.Length(min=8, message='用户名长度必须大于%(min)d'),
    22.             # validators.Regexp(regex="^(?=.*[a-z])(?=.*[A-Z])(?=.*\d)(?=.*[$@$!%*?&])[A-Za-z\d$@$!%*?&]{8,}",
    23.             #                   message='密码至少8个字符,至少1个大写字母,1个小写字母,1个数字和1个特殊字符')
    24.         ],
    25.         render_kw={'placeholder':'请输入密码'}
    26.     )
    27. @app.route('/login',methods=['GET','POST'])
    28. def login():
    29.     if request.method == "GET":
    30.         form = LoginForm()
    31.         # print(form.name,type(form.name)) # form.name是StringField()对象, StringField().__str__
    32.         # print(form.pwd,type(form.pwd))   # form.pwd是PasswordField()对象,PasswordField().__str__
    33.         return render_template('login.html',form=form)
    34.     form = LoginForm(formdata=request.form)
    35.     if form.validate():
    36.         print(form.data)
    37.         return redirect('https://www.luffycity.com/home')
    38.     else:
    39.         # print(form.errors)
    40.         return render_template('login.html', form=form)
    41. import helper
    42. class UserForm(Form):
    43.     city = core.SelectField(
    44.         label='城市',
    45.         choices=(),
    46.         coerce=int                # 从数据库获取的数据返回后是字符串类型的,把他写上后会先将其转为int后再返回到前端
    47.     )
    48.     name = simple.StringField(label='姓名')
    49.         # 当你定义成静态字段是,只有启动时才会加载一次,但是从数据库需要实时拿到数据,下面是解决方法。你应该可以看懂。
    50.     def __init__(self,*args,**kwargs):
    51.         super(UserForm,self).__init__(*args,**kwargs)
    52.         self.city.choices=helper.fetch_all('select id,name from tb1',[],type=None)
    53. @app.route('/user')
    54. def user():
    55.     if request.method == "GET":
    56.         #form = UserForm(data={'name':'alex','city':3})
    57.         form = UserForm()
    58.         return render_template('user.html',form=form)
    59. if __name__ == '__main__':
    60.     app.run()
    复制代码
     
  • meta
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. from flask import Flask, render_template, request, redirect, session
    4. from wtforms import Form
    5. from wtforms.csrf.core import CSRF
    6. from wtforms.fields import core
    7. from wtforms.fields import html5
    8. from wtforms.fields import simple
    9. from wtforms import validators
    10. from wtforms import widgets
    11. from hashlib import md5
    12. app = Flask(__name__, template_folder='templates')
    13. app.debug = True
    14. class MyCSRF(CSRF):
    15.     """
    16.     Generate a CSRF token based on the user's IP. I am probably not very
    17.     secure, so don't use me.
    18.     """
    19.     def setup_form(self, form):
    20.         self.csrf_context = form.meta.csrf_context()
    21.         self.csrf_secret = form.meta.csrf_secret
    22.         return super(MyCSRF, self).setup_form(form)
    23.     def generate_csrf_token(self, csrf_token):
    24.         gid = self.csrf_secret + self.csrf_context
    25.         token = md5(gid.encode('utf-8')).hexdigest()
    26.         return token
    27.     def validate_csrf_token(self, form, field):
    28.         print(field.data, field.current_token)
    29.         if field.data != field.current_token:
    30.             raise ValueError('Invalid CSRF')
    31. class TestForm(Form):
    32.     name = html5.EmailField(label='用户名')
    33.     pwd = simple.StringField(label='密码')
    34.     class Meta:
    35.         # -- CSRF
    36.         # 是否自动生成CSRF标签
    37.         csrf = True
    38.         # 生成CSRF标签name
    39.         csrf_field_name = 'csrf_token'
    40.         # 自动生成标签的值,加密用的csrf_secret
    41.         csrf_secret = 'xxxxxx'
    42.         # 自动生成标签的值,加密用的csrf_context
    43.         csrf_context = lambda x: request.url
    44.         # 生成和比较csrf标签
    45.         csrf_class = MyCSRF
    46.         # -- i18n
    47.         # 是否支持本地化
    48.         # locales = False
    49.         locales = ('zh', 'en')
    50.         # 是否对本地化进行缓存
    51.         cache_translations = True
    52.         # 保存本地化缓存信息的字段
    53.         translations_cache = {}
    54. @app.route('/index/', methods=['GET', 'POST'])
    55. def index():
    56.     if request.method == 'GET':
    57.         form = TestForm()
    58.     else:
    59.         form = TestForm(formdata=request.form)
    60.         if form.validate():
    61.             print(form)
    62.     return render_template('index.html', form=form)
    63. if __name__ == '__main__':
    64.     app.run()
    复制代码
12.2 其他


  • metaclass
    1. # new方法的返回值决定对象到底是什么?
    2. class Bar(object):
    3.     pass
    4. class Foo(object):
    5.     def __new__(cls, *args, **kwargs):
    6.         # return super(Foo,cls).__new__(cls,*args, **kwargs)
    7.         return Bar()
    8.     obj = Foo()
    9.     print(obj)
    10. # 当Foo()时先执行__new__方法,返回一个Bar,obj就等于Bar对象
    11. ##########################################################################
    12. class MyType(type):
    13.     def __init__(self, *args, **kwargs):
    14.         print('MyType创建类',self)
    15.         super(MyType, self).__init__(*args, **kwargs)
    16.     def __call__(self, *args, **kwargs):
    17.         obj = super(MyType, self).__call__(*args, **kwargs)
    18.         print('类创建对象', self, obj)
    19.         return obj
    20. class Foo(object,metaclass=MyType):
    21.     user = 'wupeiqi'
    22.     age = 18
    23. obj = Foo()
    24. # 在创建Foo对象前先执行mytype的__call__方法,__call__方法后才会执行Foo。不写metaclass默认是由type类来做的。
    25. # 是先执行父类的__call__后再执行自己的__init__方法。
    26. """
    27. 创建类时,先执行type的__init__。
    28. 类的实例化时,执行type的__call__,__call__方法的的返回值就是实例化的对象。
    29. __call__内部调用:
    30.         类.__new__,创建对象
    31.         类.__init__,对象的初始化
    32. """
    复制代码
  • 实例化流程分析
    1.     # 源码流程
    2.     1. 执行type的 __call__ 方法,读取字段到静态字段 cls._unbound_fields 中; meta类读取到cls._wtforms_meta中
    3.     2. 执行构造方法
    4.         
    5.         a. 循环cls._unbound_fields中的字段,并执行字段的bind方法,然后将返回值添加到 self._fields[name] 中。
    6.             即:
    7.                 _fields = {
    8.                     name: wtforms.fields.core.StringField(),
    9.                 }
    10.                
    11.             PS:由于字段中的__new__方法,实例化时:name = simple.StringField(label='用户名'),创建的是UnboundField(cls, *args, **kwargs),当执行完bind之后,才变成执行 wtforms.fields.core.StringField()
    12.         
    13.         b. 循环_fields,为对象设置属性
    14.             for name, field in iteritems(self._fields):
    15.                 # Set all the fields to attributes so that they obscure the class
    16.                 # attributes with the same names.
    17.                 setattr(self, name, field)
    18.         c. 执行process,为字段设置默认值:self.process(formdata, obj, data=data, **kwargs)
    19.             优先级:obj,data,formdata;
    20.             
    21.             再循环执行每个字段的process方法,为每个字段设置值:
    22.             for name, field, in iteritems(self._fields):
    23.                 if obj is not None and hasattr(obj, name):
    24.                     field.process(formdata, getattr(obj, name))
    25.                 elif name in kwargs:
    26.                     field.process(formdata, kwargs[name])
    27.                 else:
    28.                     field.process(formdata)
    29.             
    30.             执行每个字段的process方法,为字段的data和字段的raw_data赋值
    31.             def process(self, formdata, data=unset_value):
    32.                 self.process_errors = []
    33.                 if data is unset_value:
    34.                     try:
    35.                         data = self.default()
    36.                     except TypeError:
    37.                         data = self.default
    38.         
    39.                 self.object_data = data
    40.         
    41.                 try:
    42.                     self.process_data(data)
    43.                 except ValueError as e:
    44.                     self.process_errors.append(e.args[0])
    45.         
    46.                 if formdata:
    47.                     try:
    48.                         if self.name in formdata:
    49.                             self.raw_data = formdata.getlist(self.name)
    50.                         else:
    51.                             self.raw_data = []
    52.                         self.process_formdata(self.raw_data)
    53.                     except ValueError as e:
    54.                         self.process_errors.append(e.args[0])
    55.         
    56.                 try:
    57.                     for filter in self.filters:
    58.                         self.data = filter(self.data)
    59.                 except ValueError as e:
    60.                     self.process_errors.append(e.args[0])
    61.                
    62.         d. 页面上执行print(form.name) 时,打印标签
    63.             
    64.             因为执行了:
    65.                 字段的 __str__ 方法
    66.                 字符的 __call__ 方法
    67.                 self.meta.render_field(self, kwargs)
    68.                     def render_field(self, field, render_kw):
    69.                         other_kw = getattr(field, 'render_kw', None)
    70.                         if other_kw is not None:
    71.                             render_kw = dict(other_kw, **render_kw)
    72.                         return field.widget(field, **render_kw)
    73.                 执行字段的插件对象的 __call__ 方法,返回标签字符串
    复制代码
  • 验证流程分析
    1.         a. 执行form的validate方法,获取钩子方法
    2.             def validate(self):
    3.                 extra = {}
    4.                 for name in self._fields:
    5.                     inline = getattr(self.__class__, 'validate_%s' % name, None)
    6.                     if inline is not None:
    7.                         extra[name] = [inline]
    8.         
    9.                 return super(Form, self).validate(extra)
    10.         b. 循环每一个字段,执行字段的 validate 方法进行校验(参数传递了钩子函数)
    11.             def validate(self, extra_validators=None):
    12.                 self._errors = None
    13.                 success = True
    14.                 for name, field in iteritems(self._fields):
    15.                     if extra_validators is not None and name in extra_validators:
    16.                         extra = extra_validators[name]
    17.                     else:
    18.                         extra = tuple()
    19.                     if not field.validate(self, extra):
    20.                         success = False
    21.                 return success
    22.         c. 每个字段进行验证时候
    23.             字段的pre_validate 【预留的扩展】
    24.             字段的_run_validation_chain,对正则和字段的钩子函数进行校验
    25.             字段的post_validate【预留的扩展】
    复制代码
13.SQLAlchemy


  • MySQLdb,只支持python2.0的,而pymysql是支持python2.0和3.0的。
    1. import MySQLdb
    2.   
    3. conn = MySQLdb.connect(host='127.0.0.1',user='root',passwd='1234',db='mydb')
    4.   
    5. cur = conn.cursor()
    6.   
    7. reCount = cur.execute('insert into UserInfo(Name,Address) values(%s,%s)',('alex','usa'))
    8. # reCount = cur.execute('insert into UserInfo(Name,Address) values(%(id)s, %(name)s)',{'id':12345,'name':'wupeiqi'})
    9.   
    10. conn.commit()
    11.   
    12. cur.close()
    13. conn.close()
    14.   
    15. print reCount
    复制代码
  • SQLAlchemy是一个基于Python实现的ORM框架。该框架建立在 DB API之上,使用关系对象映射进行数据库操作,简言之便是:将类和对象转换成SQL,然后使用数据API执行SQL并获取执行结果。
  • 安装pip3 install sqlalchemy
  • 组成部分:

    • Engine,框架的引擎
    • Connection Pooling ,数据库连接池
    • Dialect,选择连接数据库的DB API种类
    • Schema/Types,架构和类型
    • SQL Exprression Language,SQL表达式语言

  • SQLAlchemy本身无法操作数据库,其必须以来pymsql等第三方插件,Dialect用于和数据API进行交流,根据配置文件的不同调用不同的数据库API,从而实现对数据库的操作,如:
    1. MySQL-Python
    2.     mysql+mysqldb://<user>:<password>@<host>[:<port>]/<dbname>
    3.    
    4. pymysql
    5.     mysql+pymysql://<username>:<password>@<host>/<dbname>[?<options>]
    6.    
    7. MySQL-Connector
    8.     mysql+mysqlconnector://<user>:<password>@<host>[:<port>]/<dbname>
    9.    
    10. cx_Oracle
    11.     oracle+cx_oracle://user:pass@host:port/dbname[?key=value&key=value...]
    12.    
    13. 更多:http://docs.sqlalchemy.org/en/latest/dialects/index.html
    复制代码
  • 执行原生SQL语句
    1. import time
    2. import threading
    3. import sqlalchemy
    4. from sqlalchemy import create_engine
    5. from sqlalchemy.engine.base import Engine
    6. engine = create_engine(
    7.     "mysql+pymysql://root:123@127.0.0.1:3306/t1?charset=utf8",
    8.     max_overflow=0,  # 超过连接池大小外最多创建的连接
    9.     pool_size=5,  # 连接池大小
    10.     pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    11.     pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    12. )
    13. def task(arg):
    14.     conn = engine.raw_connection()
    15.     cursor = conn.cursor()
    16.     cursor.execute(
    17.         "select * from t1"
    18.     )
    19.     result = cursor.fetchall()
    20.     cursor.close()
    21.     conn.close()
    22. for i in range(20):
    23.     t = threading.Thread(target=task, args=(i,))
    24.     t.start()
    25.    
    26.    
    27. ########################################################################################
    28. #!/usr/bin/env python
    29. # -*- coding:utf-8 -*-
    30. import time
    31. import threading
    32. import sqlalchemy
    33. from sqlalchemy import create_engine
    34. from sqlalchemy.engine.base import Engine
    35. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
    36. def task(arg):
    37.     conn = engine.contextual_connect()
    38.     with conn:
    39.         cur = conn.execute(
    40.             "select * from t1"
    41.         )
    42.         result = cur.fetchall()
    43.         print(result)
    44. for i in range(20):
    45.     t = threading.Thread(target=task, args=(i,))
    46.     t.start()
    47.    
    48. ########################################################################################
    49. #!/usr/bin/env python
    50. # -*- coding:utf-8 -*-
    51. import time
    52. import threading
    53. import sqlalchemy
    54. from sqlalchemy import create_engine
    55. from sqlalchemy.engine.base import Engine
    56. from sqlalchemy.engine.result import ResultProxy
    57. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/t1", max_overflow=0, pool_size=5)
    58. def task(arg):
    59.     cur = engine.execute("select * from t1")
    60.     result = cur.fetchall()
    61.     cur.close()
    62.     print(result)
    63. for i in range(20):
    64.     t = threading.Thread(target=task, args=(i,))
    65.     t.start()
    66.    
    67.    
    68. # 注意: 查看连接 show status like 'Threads%';
    复制代码
13.1 ORM


  • 创建单表
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import datetime
    4. from sqlalchemy import create_engine
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    7. Base = declarative_base()
    8. class Users(Base):
    9.     __tablename__ = 'users'
    10.     id = Column(Integer, primary_key=True)
    11.     name = Column(String(32), index=True, nullable=False)
    12.     # email = Column(String(32), unique=True)
    13.     # ctime = Column(DateTime, default=datetime.datetime.now)
    14.     # extra = Column(Text, nullable=True)
    15.     __table_args__ = (
    16.         # UniqueConstraint('id', 'name', name='uix_id_name'),
    17.         # Index('ix_id_name', 'name', 'email'),
    18.     )
    19. def init_db():
    20.     """
    21.     根据类创建数据库表
    22.     :return:
    23.     """
    24.     engine = create_engine(
    25.         "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
    26.         max_overflow=0,  # 超过连接池大小外最多创建的连接
    27.         pool_size=5,  # 连接池大小
    28.         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    29.         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    30.     )
    31.     Base.metadata.create_all(engine)
    32. def drop_db():
    33.     """
    34.     根据类删除数据库表
    35.     :return:
    36.     """
    37.     engine = create_engine(
    38.         "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
    39.         max_overflow=0,  # 超过连接池大小外最多创建的连接
    40.         pool_size=5,  # 连接池大小
    41.         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    42.         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    43.     )
    44.     Base.metadata.drop_all(engine)
    45. if __name__ == '__main__':
    46.     drop_db()
    47.     init_db()
    复制代码
  • 创建多个表并包含Fk、M2M关系
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import datetime
    4. from sqlalchemy import create_engine
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, Text, ForeignKey, DateTime, UniqueConstraint, Index
    7. from sqlalchemy.orm import relationship
    8. Base = declarative_base()
    9. # ##################### 单表示例 #########################
    10. class Users(Base):
    11.     __tablename__ = 'users'
    12.     id = Column(Integer, primary_key=True)
    13.     name = Column(String(32), index=True)
    14.     age = Column(Integer, default=18)
    15.     email = Column(String(32), unique=True)
    16.     ctime = Column(DateTime, default=datetime.datetime.now)
    17.     extra = Column(Text, nullable=True)
    18.     __table_args__ = (
    19.         # UniqueConstraint('id', 'name', name='uix_id_name'),
    20.         # Index('ix_id_name', 'name', 'extra'),
    21.     )
    22. class Hosts(Base):
    23.     __tablename__ = 'hosts'
    24.     id = Column(Integer, primary_key=True)
    25.     name = Column(String(32), index=True)
    26.     ctime = Column(DateTime, default=datetime.datetime.now)
    27. # ##################### 一对多示例 #########################
    28. class Hobby(Base):
    29.     __tablename__ = 'hobby'
    30.     id = Column(Integer, primary_key=True)
    31.     caption = Column(String(50), default='篮球')
    32. class Person(Base):
    33.     __tablename__ = 'person'
    34.     nid = Column(Integer, primary_key=True)
    35.     name = Column(String(32), index=True, nullable=True)
    36.     hobby_id = Column(Integer, ForeignKey("hobby.id"))
    37.     # 与生成表结构无关,仅用于查询方便
    38.     hobby = relationship("Hobby", backref='pers')
    39. # ##################### 多对多示例 #########################
    40. class Server2Group(Base):
    41.     __tablename__ = 'server2group'
    42.     id = Column(Integer, primary_key=True, autoincrement=True)
    43.     server_id = Column(Integer, ForeignKey('server.id'))
    44.     group_id = Column(Integer, ForeignKey('group.id'))
    45. class Group(Base):
    46.     __tablename__ = 'group'
    47.     id = Column(Integer, primary_key=True)
    48.     name = Column(String(64), unique=True, nullable=False)
    49.     # 与生成表结构无关,仅用于查询方便
    50.     servers = relationship('Server', secondary='server2group', backref='groups')
    51. class Server(Base):
    52.     __tablename__ = 'server'
    53.     id = Column(Integer, primary_key=True, autoincrement=True)
    54.     hostname = Column(String(64), unique=True, nullable=False)
    55. def init_db():
    56.     """
    57.     根据类创建数据库表
    58.     :return:
    59.     """
    60.     engine = create_engine(
    61.         "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
    62.         max_overflow=0,  # 超过连接池大小外最多创建的连接
    63.         pool_size=5,  # 连接池大小
    64.         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    65.         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    66.     )
    67.     Base.metadata.create_all(engine)
    68. def drop_db():
    69.     """
    70.     根据类删除数据库表
    71.     :return:
    72.     """
    73.     engine = create_engine(
    74.         "mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8",
    75.         max_overflow=0,  # 超过连接池大小外最多创建的连接
    76.         pool_size=5,  # 连接池大小
    77.         pool_timeout=30,  # 池中没有线程最多等待的时间,否则报错
    78.         pool_recycle=-1  # 多久之后对线程池中的线程进行一次连接的回收(重置)
    79.     )
    80.     Base.metadata.drop_all(engine)
    81. if __name__ == '__main__':
    82.     drop_db()
    83.     init_db()
    复制代码
  • 操作数据库表
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. from sqlalchemy.orm import sessionmaker
    4. from sqlalchemy import create_engine
    5. from models import Users
    6.   
    7. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    8. Session = sessionmaker(bind=engine)
    9.   
    10. # 每次执行数据库操作时,都需要创建一个session
    11. session = Session()
    12.   
    13. # ############# 执行ORM操作 #############
    14. obj1 = Users(name="alex1")
    15. session.add(obj1)
    16.   
    17. # 提交事务
    18. session.commit()
    19. # 关闭session
    20. session.close()
    复制代码
  • 基于scoped_session实现线程安全
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. from sqlalchemy.orm import sessionmaker
    4. from sqlalchemy import create_engine
    5. from sqlalchemy.orm import scoped_session
    6. from models import Users
    7. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    8. Session = sessionmaker(bind=engine)
    9. """
    10. # 线程安全,基于本地线程实现每个线程用同一个session
    11. # 特殊的:scoped_session中有原来方法的Session中的一下方法:
    12. public_methods = (
    13.     '__contains__', '__iter__', 'add', 'add_all', 'begin', 'begin_nested',
    14.     'close', 'commit', 'connection', 'delete', 'execute', 'expire',
    15.     'expire_all', 'expunge', 'expunge_all', 'flush', 'get_bind',
    16.     'is_modified', 'bulk_save_objects', 'bulk_insert_mappings',
    17.     'bulk_update_mappings',
    18.     'merge', 'query', 'refresh', 'rollback',
    19.     'scalar'
    20. )
    21. """
    22. session = scoped_session(Session)
    23. # ############# 执行ORM操作 #############
    24. obj1 = Users(name="alex1")
    25. session.add(obj1)
    26. # 提交事务
    27. session.commit()
    28. # 关闭session
    29. session.close()
    复制代码
  • 多线程执行示例
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from db import Users
    10. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    11. Session = sessionmaker(bind=engine)
    12. def task(arg):
    13.     session = Session()
    14.     obj1 = Users(name="alex1")
    15.     session.add(obj1)
    16.     session.commit()
    17. for i in range(10):
    18.     t = threading.Thread(target=task, args=(i,))
    19.     t.start()
    复制代码
  • 基本增删改查示例
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from sqlalchemy.sql import text
    10. from db import Users, Hosts
    11. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    12. Session = sessionmaker(bind=engine)
    13. session = Session()
    14. # ################ 添加 ################
    15. """
    16. obj1 = Users(name="wupeiqi")
    17. session.add(obj1)
    18. session.add_all([
    19.     Users(name="wupeiqi"),
    20.     Users(name="alex"),
    21.     Hosts(name="c1.com"),
    22. ])
    23. session.commit()
    24. """
    25. # ################ 删除 ################
    26. """
    27. session.query(Users).filter(Users.id > 2).delete()
    28. session.commit()
    29. """
    30. # ################ 修改 ################
    31. """
    32. session.query(Users).filter(Users.id > 0).update({"name" : "099"})
    33. session.query(Users).filter(Users.id > 0).update({Users.name: Users.name + "099"}, synchronize_session=False)
    34. session.query(Users).filter(Users.id > 0).update({"age": Users.age + 1}, synchronize_session="evaluate")
    35. session.commit()
    36. """
    37. # ################ 查询 ################
    38. """
    39. r1 = session.query(Users).all()
    40. r2 = session.query(Users.name.label('xx'), Users.age).all()
    41. r3 = session.query(Users).filter(Users.name == "alex").all()
    42. r4 = session.query(Users).filter_by(name='alex').all()
    43. r5 = session.query(Users).filter_by(name='alex').first()
    44. r6 = session.query(Users).filter(text("id<:value and name=:name")).params(value=224, name='fred').order_by(Users.id).all()
    45. r7 = session.query(Users).from_statement(text("SELECT * FROM users where name=:name")).params(name='ed').all()
    46. """
    47. session.close()
    复制代码
  • 原生SQL语句
    1. # 条件
    2. ret = session.query(Users).filter_by(name='alex').all()
    3. ret = session.query(Users).filter(Users.id > 1, Users.name == 'eric').all()
    4. ret = session.query(Users).filter(Users.id.between(1, 3), Users.name == 'eric').all()
    5. ret = session.query(Users).filter(Users.id.in_([1,3,4])).all()
    6. ret = session.query(Users).filter(~Users.id.in_([1,3,4])).all()
    7. ret = session.query(Users).filter(Users.id.in_(session.query(Users.id).filter_by(name='eric'))).all()
    8. from sqlalchemy import and_, or_
    9. ret = session.query(Users).filter(and_(Users.id > 3, Users.name == 'eric')).all()
    10. ret = session.query(Users).filter(or_(Users.id < 2, Users.name == 'eric')).all()
    11. ret = session.query(Users).filter(
    12.     or_(
    13.         Users.id < 2,
    14.         and_(Users.name == 'eric', Users.id > 3),
    15.         Users.extra != ""
    16.     )).all()
    17. # 通配符
    18. ret = session.query(Users).filter(Users.name.like('e%')).all()
    19. ret = session.query(Users).filter(~Users.name.like('e%')).all()
    20. # 限制
    21. ret = session.query(Users)[1:2]
    22. # 排序
    23. ret = session.query(Users).order_by(Users.name.desc()).all()
    24. ret = session.query(Users).order_by(Users.name.desc(), Users.id.asc()).all()
    25. # 分组
    26. from sqlalchemy.sql import func
    27. ret = session.query(Users).group_by(Users.extra).all()
    28. ret = session.query(
    29.     func.max(Users.id),
    30.     func.sum(Users.id),
    31.     func.min(Users.id)).group_by(Users.name).all()
    32. ret = session.query(
    33.     func.max(Users.id),
    34.     func.sum(Users.id),
    35.     func.min(Users.id)).group_by(Users.name).having(func.min(Users.id) >2).all()
    36. # 连表
    37. ret = session.query(Users, Favor).filter(Users.id == Favor.nid).all()
    38. ret = session.query(Person).join(Favor).all()
    39. ret = session.query(Person).join(Favor, isouter=True).all()
    40. # 组合
    41. q1 = session.query(Users.name).filter(Users.id > 2)
    42. q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    43. ret = q1.union(q2).all()
    44. q1 = session.query(Users.name).filter(Users.id > 2)
    45. q2 = session.query(Favor.caption).filter(Favor.nid < 2)
    46. ret = q1.union_all(q2).all()
    复制代码
  • 基于relationship操作ForeignKey
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from sqlalchemy.sql import text
    10. from sqlalchemy.engine.result import ResultProxy
    11. from db import Users, Hosts
    12. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6", max_overflow=0, pool_size=5)
    13. Session = sessionmaker(bind=engine)
    14. session = Session()
    15. # 查询
    16. # cursor = session.execute('select * from users')
    17. # result = cursor.fetchall()
    18. # 添加
    19. cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    20. session.commit()
    21. print(cursor.lastrowid)
    22. session.close()
    复制代码
  • 基于relationship操作m2m
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from sqlalchemy.sql import text
    10. from sqlalchemy.engine.result import ResultProxy
    11. from db import Users, Hosts, Hobby, Person
    12. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    13. Session = sessionmaker(bind=engine)
    14. session = Session()
    15. # 添加
    16. """
    17. session.add_all([
    18.     Hobby(caption='乒乓球'),
    19.     Hobby(caption='羽毛球'),
    20.     Person(name='张三', hobby_id=3),
    21.     Person(name='李四', hobby_id=4),
    22. ])
    23. person = Person(name='张九', hobby=Hobby(caption='姑娘'))
    24. session.add(person)
    25. hb = Hobby(caption='人妖')
    26. hb.pers = [Person(name='文飞'), Person(name='博雅')]
    27. session.add(hb)
    28. session.commit()
    29. """
    30. # 使用relationship正向查询
    31. """
    32. v = session.query(Person).first()
    33. print(v.name)
    34. print(v.hobby.caption)
    35. """
    36. # 使用relationship反向查询
    37. """
    38. v = session.query(Hobby).first()
    39. print(v.caption)
    40. print(v.pers)
    41. """
    42. session.close()
    复制代码
  • 其他
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from sqlalchemy.sql import text
    10. from sqlalchemy.engine.result import ResultProxy
    11. from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    12. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    13. Session = sessionmaker(bind=engine)
    14. session = Session()
    15. # 添加
    16. """
    17. session.add_all([
    18.     Server(hostname='c1.com'),
    19.     Server(hostname='c2.com'),
    20.     Group(name='A组'),
    21.     Group(name='B组'),
    22. ])
    23. session.commit()
    24. s2g = Server2Group(server_id=1, group_id=1)
    25. session.add(s2g)
    26. session.commit()
    27. gp = Group(name='C组')
    28. gp.servers = [Server(hostname='c3.com'),Server(hostname='c4.com')]
    29. session.add(gp)
    30. session.commit()
    31. ser = Server(hostname='c6.com')
    32. ser.groups = [Group(name='F组'),Group(name='G组')]
    33. session.add(ser)
    34. session.commit()
    35. """
    36. # 使用relationship正向查询
    37. """
    38. v = session.query(Group).first()
    39. print(v.name)
    40. print(v.servers)
    41. """
    42. # 使用relationship反向查询
    43. """
    44. v = session.query(Server).first()
    45. print(v.hostname)
    46. print(v.groups)
    47. """
    48. session.close()
    复制代码
14.flask武装


  • Flask_SQLAlchemy
    1. #!/usr/bin/env python
    2. # -*- coding:utf-8 -*-
    3. import time
    4. import threading
    5. from sqlalchemy.ext.declarative import declarative_base
    6. from sqlalchemy import Column, Integer, String, ForeignKey, UniqueConstraint, Index
    7. from sqlalchemy.orm import sessionmaker, relationship
    8. from sqlalchemy import create_engine
    9. from sqlalchemy.sql import text, func
    10. from sqlalchemy.engine.result import ResultProxy
    11. from db import Users, Hosts, Hobby, Person, Group, Server, Server2Group
    12. engine = create_engine("mysql+pymysql://root:123@127.0.0.1:3306/s6?charset=utf8", max_overflow=0, pool_size=5)
    13. Session = sessionmaker(bind=engine)
    14. session = Session()
    15. # 关联子查询
    16. subqry = session.query(func.count(Server.id).label("sid")).filter(Server.id == Group.id).correlate(Group).as_scalar()
    17. result = session.query(Group.name, subqry)
    18. """
    19. SELECT `group`.name AS group_name, (SELECT count(server.id) AS sid
    20. FROM server
    21. WHERE server.id = `group`.id) AS anon_1
    22. FROM `group`
    23. """
    24. # 原生SQL
    25. """
    26. # 查询
    27. cursor = session.execute('select * from users')
    28. result = cursor.fetchall()
    29. # 添加
    30. cursor = session.execute('insert into users(name) values(:value)',params={"value":'wupeiqi'})
    31. session.commit()
    32. print(cursor.lastrowid)
    33. """
    34. session.close()
    复制代码
  • flask-script
    1. a. 下载安装
    2.     pip3 install flask-sqlalchemy
    3. b. chun.__init__.py
    4.     导入并实例化SQLAlchemy
    5.     from flask_sqlalchemy import SQLAlchemy
    6.     db = SQLAlchemy()
    7.    
    8.     注意事项:
    9.          - 必须在导入蓝图之前
    10.          - 必须导入models.py
    11. c. 初始化
    12.     db.init_app(app)
    13.    
    14. d. 在配置文件中写入配置
    15.     # ##### SQLALchemy配置文件 #####
    16.     SQLALCHEMY_DATABASE_URI = "mysql+pymysql://root:123456@127.0.0.1:3306/s9day122?charset=utf8"
    17.     SQLALCHEMY_POOL_SIZE = 10
    18.     SQLALCHEMY_MAX_OVERFLOW = 5
    19. e. 创建models.py中的类(对应数据库表)
    20.     chun/models.py
    21.         from sqlalchemy.ext.declarative import declarative_base
    22.         from sqlalchemy import Column
    23.         from sqlalchemy import Integer,String,Text,Date,DateTime
    24.         from sqlalchemy import create_engine
    25.         from chun import db
    26.         class Users(db.Model):
    27.             __tablename__ = 'users'
    28.             id = Column(Integer, primary_key=True)
    29.             name = Column(String(32), index=True, nullable=False)
    30.             depart_id = Column(Integer)
    31. f. 生成表(使用app上下文)
    32.     from chun import db,create_app
    33.     app = create_app()
    34.     app_ctx = app.app_context() # app_ctx = app/g
    35.     with app_ctx: # __enter__,通过LocalStack放入Local中
    36.         db.create_all() # 调用LocalStack放入Local中获取app,再去app中获取配置
    37.         
    38. g. 基于ORM对数据库进行操作。
    39.     from flask import Blueprint
    40.     from chun import db
    41.     from chun import models
    42.     us = Blueprint('us',__name__)
    43.     @us.route('/index')
    44.     def index():
    45.         # 使用SQLAlchemy在数据库中插入一条数据
    46.         # db.session.add(models.Users(name='高件套',depart_id=1))
    47.         # db.session.commit()
    48.         # db.session.remove()
    49.         result = db.session.query(models.Users).all()
    50.         print(result)
    51.         db.session.remove()
    52.         return 'Index'
    复制代码
  • flask-migrate
    1. install the module "pip3 install flask-script "
    2. a. 增加 runserver
    3.     from chun import create_app
    4.     from flask_script import Manager
    5.     app = create_app()
    6.     manager = Manager(app)
    7.     if __name__ == '__main__':
    8.         # app.run()
    9.         manager.run()
    10.         
    11. b. 位置传参
    12.     from chun import create_app
    13.     from flask_script import Manager
    14.     app = create_app()
    15.     manager = Manager(app)
    16.     @manager.command
    17.     def custom(arg):
    18.         """
    19.         自定义命令
    20.         python manage.py custom 123
    21.         :param arg:
    22.         :return:
    23.         """
    24.         print(arg)
    25.     if __name__ == '__main__':
    26.         # app.run()
    27.         manager.run()
    28. c. 关键字传参
    29.     from chun import create_app
    30.     from flask_script import Manager
    31.     app = create_app()
    32.     manager = Manager(app)
    33.     @manager.option('-n', '--name', dest='name')
    34.     @manager.option('-u', '--url', dest='url')
    35.     def cmd(name, url):
    36.         """
    37.         自定义命令
    38.         执行: python manage.py  cmd -n wupeiqi -u http://www.oldboyedu.com
    39.         :param name:
    40.         :param url:
    41.         :return:
    42.         """
    43.         print(name, url)
    44.     if __name__ == '__main__':
    45.         # app.run()
    46.         manager.run()
    复制代码
  • 找到项目使用的所有组件和版本。
    1. install the module "pip3 install flask-migrate"
    2. flask-migrate rely on flask-script.
    3. #!/usr/bin/env python
    4. # -*- coding:utf-8 -*-
    5. from sansa import create_app
    6. from sansa import db
    7. from flask_script import Manager
    8. from flask_migrate import Migrate, MigrateCommand
    9. app = create_app()
    10. manager = Manager(app)
    11. Migrate(app, db)
    12. """
    13. # 数据库迁移命名
    14.     python manage.py db init
    15.     python manage.py db migrate
    16.     python manage.py db upgrade
    17. """
    18. manager.add_command('db', MigrateCommand)
    19. if __name__ == '__main__':
    20.     manager.run()
    21.     # app.run()
    复制代码
  • python虚拟环境
    1. install the module "pip install pipreqs"
    2. # 终端输入后就可以找到所有的依赖
    3. pipreqs ./ --encoding=utf-8
    复制代码

来源:https://www.cnblogs.com/InvincibleGrass/p/17562878.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

上一篇: Python基础笔记

下一篇: Python基础(2)

举报 回复 使用道具