翼度科技»论坛 编程开发 python 查看内容

Flask 实现Token认证机制

7

主题

7

帖子

21

积分

新手上路

Rank: 1

积分
21
在Flask框架中,实现Token认证机制并不是一件复杂的事情。除了使用官方提供的flask_httpauth模块或者第三方模块flask-jwt,我们还可以考虑自己实现一个简易版的Token认证工具。自定义Token认证机制的本质是生成一个令牌(Token),并在用户每次请求时验证这个令牌的有效性。
整个过程可以分为以下几个步骤:

  • 用户登录时生成Token,并将Token与用户关联存储在服务器端。
  • 用户在请求时携带Token。
  • 服务器在收到请求后,验证Token的有效性。
  • 如果Token有效,允许用户访问相应资源;否则,拒绝访问。
这种自定义的Token认证机制相对简单,适用于一些小型应用或者对于Token认证机制有特殊需求的场景。搭建这样一个简易的认证系统有助于理解Token认证的基本原理,并可以根据实际需求进行灵活的定制。
创建表结构

通过表结构的创建,建立用户认证和会话管理表。UserAuthDB表存储了用户的账号密码信息,而SessionAuthDB表则存储了用户登录后生成的Token信息,包括用户名、Token本身以及Token的过期时间。这为后续实现用户注册、登录以及Token认证等功能提供了数据库支持。
UserAuthDB表:

  • 用途:存储用户账号密码信息。
  • 字段:

    • id: 主键,自增,唯一标识每个用户。
    • username: 用户名,非空,唯一,用于登录时识别用户。
    • password: 密码,非空,用于验证用户身份。

SessionAuthDB表:

  • 用途:存储登录成功后用户的Token信息。
  • 字段:

    • id: 主键,自增,唯一标识每个登录会话。
    • username: 用户名,非空,唯一,关联到UserAuthDB表的用户名。
    • token: 用户登录后生成的Token,非空,唯一,用于身份验证。
    • invalid_date: Token的过期时间,用于判断Token是否过期。

代码通过Flask路由/create实现了数据库表结构的创建,主要包括两张表,分别是UserAuthDB和SessionAuthDB。
  1. @app.route("/create",methods=["GET"])
  2. def create():
  3.     conn = sqlite3.connect("./database.db")
  4.     cursor = conn.cursor()
  5.     create_auth = "create table UserAuthDB(" \
  6.              "id INTEGER primary key AUTOINCREMENT not null unique," \
  7.              "username varchar(64) not null unique," \
  8.              "password varchar(64) not null" \
  9.              ")"
  10.     cursor.execute(create_auth)
  11.     create_session = "create table SessionAuthDB(" \
  12.                      "id INTEGER primary key AUTOINCREMENT not null unique," \
  13.                      "username varchar(64) not null unique," \
  14.                      "token varchar(128) not null unique," \
  15.                      "invalid_date int not null" \
  16.                      ")"
  17.     cursor.execute(create_session)
  18.     conn.commit()
  19.     cursor.close()
  20.     conn.close()
  21.     return "create success"
复制代码
验证函数

该验证函数用于保证传入的用户名和密码满足一定的安全性和格式要求。通过对长度和字符内容的检查,确保了传入的参数不会导致潜在的安全问题。这样的验证机制在用户注册、登录等场景中可以有效地防止一些常见的安全漏洞。
参数验证:

  • 接受不定数量的参数*kwargs,可传入多个参数。
  • 对于每个传入的参数,首先验证其长度是否在合法范围内(小于128个字符且不为空)。
字符串处理:

  • 将参数转换为小写形式,然后去除两侧空格,并移除所有空格。
字符内容验证:

  • 遍历处理后的字符串,检查其中的字符是否仅包含大写字母、小写字母和数字。如果出现其他字符,则认为非法。
返回结果:

  • 如果所有参数验证通过,即长度合法且字符内容符合要求,则返回True,表示参数合法。
  • 如果有任何一个参数不合法,则返回False,表示参数存在非法字符或超出长度限制。
代码定义了一个名为CheckParameters的验证函数,该函数用于验证传入的参数是否合法。主要验证的对象是用户名和密码,具体概述如下:
  1. def CheckParameters(*kwargs):
  2.     for item in range(len(kwargs)):
  3.         # 先验证长度
  4.         if len(kwargs[item]) >= 128 or len(kwargs[item]) == 0:
  5.             return False
  6.         # 先小写,然后去掉两侧空格,去掉所有空格
  7.         local_string = kwargs[item].lower().strip().replace(" ","")
  8.         # 判断是否只包含 大写 小写 数字
  9.         for kw in local_string:
  10.             if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:
  11.                 return False
  12.     return True
复制代码
登录认证函数

该函数实现了用户登录认证的核心逻辑。首先对输入的用户名和密码进行验证,然后检查用户是否存在以及是否已经有生成的Token。如果用户存在但Token不存在,生成一个新的Token并存入数据库,最终返回生成的Token。
路由定义:

  • 使用@app.route("/login", methods=["POST"])定义了一个POST请求的路由,用于处理用户登录请求。
参数获取:

  • 通过request.form.to_dict()获取POST请求中的参数,包括用户名(username)和密码(password)。
参数验证:

  • 调用之前定义的CheckParameters函数对获取的用户名和密码进行合法性验证,确保其符合安全性和格式要求。
用户存在性验证:

  • 调用RunSqlite函数查询UserAuthDB表,验证用户名和密码是否匹配。如果存在匹配的用户,则继续执行下一步。
生成Token:

  • 查询SessionAuthDB表,检查是否存在该用户的Token记录。如果存在,则直接返回该Token。
  • 如果不存在Token记录,则生成一个32位的随机Token,并设置过期时间为当前时间戳加上360秒(6分钟)。
Token写入数据库:

  • 将生成的Token和过期时间写入SessionAuthDB表。
返回结果:

  • 返回生成的Token,作为登录成功的标识。
  1. @app.route("/login",methods=["POST"])
  2. def login():
  3.     if request.method == "POST":
  4.         # 获取参数信息
  5.         obtain_dict = request.form.to_dict()
  6.         if len(obtain_dict) != 0 and len(obtain_dict) == 2:
  7.             username = obtain_dict["username"]
  8.             password = obtain_dict["password"]
  9.             # 验证是否合法
  10.             is_true = CheckParameters(username,password)
  11.             if is_true == True:
  12.                 # 查询是否存在该用户
  13.                 select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
  14.                 if select[0][0] == username and select[0][1] == password:
  15.                     # 查询Session列表是否存在
  16.                     select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
  17.                     if select_session != []:
  18.                         ref = {"message": ""}
  19.                         ref["message"] = select_session[0][0]
  20.                         return json.dumps(ref, ensure_ascii=False)
  21.                     # Session不存在则需要重新生成
  22.                     else:
  23.                         # 生成并写入token和过期时间戳
  24.                         token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
  25.                         # 设置360秒周期,过期时间
  26.                         time_stamp = int(time.time()) + 360
  27.                         insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
  28.                         if insert == True:
  29.                             ref = {"message": ""}
  30.                             ref["message"] = token
  31.                             return json.dumps(ref, ensure_ascii=False)
  32.                 else:
  33.                     return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
  34.             else:
  35.                 return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)
  36.     return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
复制代码
登录认证装饰器

检查用户登录状态Token是否过期的装饰器,装饰器用于装饰某一些函数,当主调函数被调用时,会优先执行装饰器内的代码,执行后根据装饰器执行结果返回或退出,装饰器分为两种模式,一种是FBV模式,另一种是CBV模式。
FBV(Function-Based Views)和CBV(Class-Based Views)是两种不同的视图设计模式,用于处理Web框架中的请求和生成响应。这两种模式在Django框架中被广泛使用。
FBV(Function-Based Views)


  • 定义: FBV是指使用普通的Python函数来处理请求和生成响应的视图设计模式。
  • 特点:

    • 每个视图对应一个函数,函数接收请求作为参数,返回响应。
    • 简单,易于理解和使用。
    • 视图的逻辑和处理集中在一个函数中。

示例:
  1. def my_view(request):
  2.     # 处理逻辑
  3.     return HttpResponse("Hello, World!")
复制代码
CBV(Class-Based Views)


  • 定义: CBV是指使用基于类的Python类来处理请求和生成响应的视图设计模式。
  • 特点:

    • 视图是类,每个类中可以包含多个方法来处理不同HTTP方法(GET、POST等)的请求。
    • 提供了更多的代码组织和复用的可能性,可以使用类的继承、Mixin等方式。
    • 更灵活,适用于复杂的业务逻辑和共享逻辑。

示例:
  1. class MyView(View):
  2.     def get(self, request):
  3.         # 处理 GET 请求的逻辑
  4.         return HttpResponse("Hello, World!")
  5.     def post(self, request):
  6.         # 处理 POST 请求的逻辑
  7.         return HttpResponse("Received a POST request")
复制代码
FBV与CBV区别


  • 结构差异: FBV使用函数,逻辑较为集中;CBV使用类,允许通过类的继承和Mixin等方式更好地组织代码。
  • 代码复用: CBV更容易实现代码复用,可以通过继承和Mixin在不同的类之间共享逻辑;而FBV需要显式地将共享逻辑提取为函数。
  • 装饰器: 在FBV中,使用装饰器来添加额外的功能;而在CBV中,通过类的继承和Mixin来实现相似的功能。
  • 可读性: 对于简单的视图逻辑,FBV可能更直观易懂;对于较为复杂的业务逻辑,CBV提供了更好的组织和扩展性。
在Flask中,两种设计模式都可以使用,开发者可以根据项目的需求和个人喜好选择使用FBV或CBV。
基于FBV的装饰器设置使用时,需要注意装饰器嵌入的位置,装饰器需要在请求进入路由之前,即在请求未走原逻辑代码的时候介入,对原业务逻辑进行业务拓展。
  1. from flask import Flask, request,render_template
  2. from functools import wraps
  3. app = Flask(__name__)
  4. def login(func):
  5.     @wraps(func)
  6.     def wrapper(*args, **kwargs):
  7.         print("登录请求: {}".format(request.url))
  8.         value = request.form.get("value")
  9.         if value == "lyshark":
  10.             # 调用原函数,并返回
  11.             function_ptr = func(*args, **kwargs)
  12.             return function_ptr
  13.         else:
  14.             return "登录失败"
  15.     return wrapper
  16. @app.route('/', methods=['GET', 'POST'])
  17. @login
  18. def index():
  19.     if request.method == "POST":
  20.         value = request.form.get("value")
  21.     return "index"
  22. if __name__ == '__main__':
  23.     app.run()
复制代码
而基于CBV的装饰器设置,使用就显得更加细分化,可以定制管理专属功能,在外部定义装饰器可以全局使用,内部定义可以针对特定路由函数特殊处理。
  1. from flask import Flask, request,render_template,views
  2. from functools import wraps
  3. app = Flask(__name__)
  4. # 装饰器
  5. def login(func):
  6.     @wraps(func)
  7.     def wrapper(*args, **kwargs):
  8.         print("登录请求: {}".format(request.url))
  9.         value = request.form.get("value")
  10.         if value == "lyshark":
  11.             # 调用原函数,并返回
  12.             function_ptr = func(*args, **kwargs)
  13.             return function_ptr
  14.         else:
  15.             return "登录失败"
  16.     return wrapper
  17. # 类视图
  18. class index(views.MethodView):
  19.     @login
  20.     def get(self):
  21.         return request.args
  22.     @login
  23.     def post(self):
  24.         return "success"
  25. # 增加路由
  26. app.add_url_rule(rule='/', view_func=index.as_view('index'))
  27. if __name__ == '__main__':
  28.     app.run()
复制代码
此处为了实现起来更简单一些此处直接使用FBV模式,我们实现的login_check装饰器通过FVB模式构建,代码中取得用户的Token以及用户名对用户身份进行验证。
  1. def login_check(func):
  2.     @wraps(func)
  3.     def wrapper(*args, **kwargs):
  4.         print("处理登录逻辑部分: {}".format(request.url))
  5.         # 得到token 验证是否登陆了,且token没有过期
  6.         local_timestamp = int(time.time())
  7.         get_token = request.headers.get("token")
  8.         # 验证传入参数是否合法
  9.         if CheckParameters(get_token) == True:
  10.             select = RunSqlite("database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")
  11.             print(select)
  12.             # 判断是否存在记录,如果存在,在判断时间戳是否合理
  13.             if select != []:
  14.                 # 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录
  15.                 if local_timestamp >= int(select[0][1]):
  16.                     print("时间戳过期了")
  17.                     # 删除原来的Token
  18.                     delete = RunSqlite("database.db","SessionAuthDB","delete",f"token='{get_token}'","none")
  19.                     if delete == True:
  20.                         return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)
  21.                     else:
  22.                         return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)
  23.                 else:
  24.                     # 验证Token是否一致
  25.                     if select[0][0] == get_token:
  26.                         print("Token验证正常,继续执行function_ptr指向代码.")
  27.                         # 返回到原函数
  28.                         return func(*args, **kwargs)
  29.                     else:
  30.                         print("Token验证错误 {}".format(select))
  31.                         return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)
  32.             # 装饰器调用原函数
  33.             # function_ptr = func(*args, **kwargs)
  34.         return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)
  35.     return wrapper
复制代码
调用演示

主调用函数则是具体的功能实现可以自定义扩展,当用户访问该路由时会优先调用login_check装饰器来验证用户携带Token的合法性,如果合法则会通过return func(*args, **kwargs)返回执行主调函数,否则直接返回验证失败的消息。
  1. # 获取参数函数
  2. @app.route("/GetPage", methods=["POST"])
  3. @login_check
  4. def GetPage():
  5.     if request.method == "POST":
  6.         # 获取参数信息
  7.         obtain_dict = request.form.to_dict()
  8.         if len(obtain_dict) != 0 and len(obtain_dict) == 1:
  9.             pagename = obtain_dict["pagename"]
  10.             print("查询名称: {}".format(obtain_dict["pagename"]))
  11.             # 相应头的完整写法
  12.             req = Response(response="ok", status=200, mimetype="application/json")
  13.             req.headers["Content-Type"] = "text/json; charset=utf-8"
  14.             req.headers["Server"] = "LyShark Server 1.0"
  15.             req.data = json.dumps("{'message': 'hello world'}")
  16.             return req
  17.         else:
  18.             return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)
  19.     return json.dumps("{'token': '未知错误'}", ensure_ascii=False)
  20. # 用户注册函数
  21. @app.route("/register", methods=["POST"])
  22. def Register():
  23.     if request.method == "POST":
  24.         obtain_dict = request.form.to_dict()
  25.         if len(obtain_dict) != 0 and len(obtain_dict) == 2:
  26.             print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))
  27.             reg_username = obtain_dict["username"]
  28.             reg_password = obtain_dict["password"]
  29.             # 验证是否合法
  30.             if CheckParameters(reg_username, reg_password) == False:
  31.                 return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)
  32.             # 查询用户是否存在
  33.             select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")
  34.             if select != []:
  35.                 return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)
  36.             else:
  37.                 insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")
  38.                 if insert == True:
  39.                     return json.dumps("{'message': '注册成功'}", ensure_ascii=False)
  40.                 else:
  41.                     return json.dumps("{'message': '注册失败'}", ensure_ascii=False)
  42.         else:
  43.             return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
  44.     return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
  45. # 密码修改函数
  46. @app.route("/modify", methods=["POST"])
  47. @login_check
  48. def modify():
  49.     if request.method == "POST":
  50.         obtain_dict = request.form.to_dict()
  51.         if len(obtain_dict) != 0 and len(obtain_dict) == 1:
  52.             mdf_password = obtain_dict["password"]
  53.             get_token = request.headers.get("token")
  54.             print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))
  55.             # 验证是否合法
  56.             if CheckParameters(get_token, mdf_password) == False:
  57.                 return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)
  58.             # 先得到token对应用户名
  59.             select = RunSqlite("database.db","SessionAuthDB","select","username",f"token='{get_token}'")
  60.             if select != []:
  61.                 # 接着直接修改密码即可
  62.                 modify_username = str(select[0][0])
  63.                 print("得到的用户名: {}".format(modify_username))
  64.                 update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")
  65.                 if update == True:
  66.                     # 删除原来的token,让用户重新获取
  67.                     delete = RunSqlite("database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")
  68.                     print("删除token状态: {}".format(delete))
  69.                     return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)
  70.                 else:
  71.                     return json.dumps("{'message': '修改失败'}", ensure_ascii=False)
  72.             else:
  73.                 return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)
  74.         else:
  75.             return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)
  76.     return json.dumps("{'message': '未知错误'}", ensure_ascii=False)
复制代码
FBV模式下的完整代码,以下是对代码的概述:
主要功能:


  • 数据库操作: 封装了对 SQLite 数据库的基本增删改查操作(RunSqlite 函数)。
  • 用户认证: 提供了用户登录、注册和密码修改的功能。使用了 Token 机制进行登录认证,并通过装饰器 login_check 来验证 Token 的有效性。
  • 创建数据库表: 提供了一个用于初始化数据库表结构的接口 /create。
  • 获取页面信息: 通过 /GetPage 接口,使用了 login_check 装饰器来验证用户登录状态,仅对已登录用户提供页面信息。
主要路由


  • /create:创建数据库表结构。
  • /login:用户登录接口,返回用户的 Token。
  • /GetPage:获取页面信息,需要用户登录并携带有效 Token。
  • /register:用户注册接口。
  • /modify:修改用户密码接口,需要用户登录并携带有效 Token。
代码结构


  • 数据库操作:

    • 提供了对 SQLite 数据库的基本操作,包括插入、更新、查询和删除。

  • 用户认证:

    • 使用了装饰器 login_check 对需要登录的路由进行认证。
    • 提供了用户登录、注册和密码修改的路由。

  • 创建数据库表:

    • 提供了一个用于初始化数据库表结构的路由。

  • 获取页面信息:

    • 提供了一个用于获取页面信息的路由,需要用户登录并携带有效 Token。

  1. from flask import Flask,render_template,request,Response,redirect,jsonifyfrom functools import wrapsimport json,sqlite3,random,string,timeapp = Flask(__name__)# 增删改查简单封装def RunSqlite(db,table,action,field,value):    connect = sqlite3.connect(db)    cursor = connect.cursor()    # 执行插入动作    if action == "insert":        insert = f"insert into {table}({field}) values({value});"        if insert == None or len(insert) == 0:            return False        try:            cursor.execute(insert)        except Exception:            return False    # 执行更新操作    elif action == "update":        update = f"update {table} set {value} where {field};"        if update == None or len(update) == 0:            return False        try:            cursor.execute(update)        except Exception:            return False    # 执行查询操作    elif action == "select":        # 查询条件是否为空        if value == "none":            select = f"select {field} from {table};"        else:            select = f"select {field} from {table} where {value};"        try:            ref = cursor.execute(select)            ref_data = ref.fetchall()            connect.commit()            connect.close()            return ref_data        except Exception:            return False    # 执行删除操作    elif action == "delete":        delete = f"delete from {table} where {field};"        if delete == None or len(delete) == 0:            return False        try:            cursor.execute(delete)        except Exception:            return False    try:        connect.commit()        connect.close()        return True    except Exception:        return False@app.route("/create",methods=["GET"])
  2. def create():
  3.     conn = sqlite3.connect("./database.db")
  4.     cursor = conn.cursor()
  5.     create_auth = "create table UserAuthDB(" \
  6.              "id INTEGER primary key AUTOINCREMENT not null unique," \
  7.              "username varchar(64) not null unique," \
  8.              "password varchar(64) not null" \
  9.              ")"
  10.     cursor.execute(create_auth)
  11.     create_session = "create table SessionAuthDB(" \
  12.                      "id INTEGER primary key AUTOINCREMENT not null unique," \
  13.                      "username varchar(64) not null unique," \
  14.                      "token varchar(128) not null unique," \
  15.                      "invalid_date int not null" \
  16.                      ")"
  17.     cursor.execute(create_session)
  18.     conn.commit()
  19.     cursor.close()
  20.     conn.close()
  21.     return "create success"# 验证用户名密码是否合法def CheckParameters(*kwargs):    for item in range(len(kwargs)):        # 先验证长度        if len(kwargs[item]) >= 256 or len(kwargs[item]) == 0:            return False        # 先小写,然后去掉两侧空格,去掉所有空格        local_string = kwargs[item].lower().strip().replace(" ","")        # 判断是否只包含 大写 小写 数字        for kw in local_string:            if kw.isupper() != True and kw.islower() != True and kw.isdigit() != True:                return False    return True# 登录认证模块@app.route("/login",methods=["POST"])
  22. def login():
  23.     if request.method == "POST":
  24.         # 获取参数信息
  25.         obtain_dict = request.form.to_dict()
  26.         if len(obtain_dict) != 0 and len(obtain_dict) == 2:
  27.             username = obtain_dict["username"]
  28.             password = obtain_dict["password"]
  29.             # 验证是否合法
  30.             is_true = CheckParameters(username,password)
  31.             if is_true == True:
  32.                 # 查询是否存在该用户
  33.                 select = RunSqlite("./database.db", "UserAuthDB", "select", "username,password", f"username='{username}'")
  34.                 if select[0][0] == username and select[0][1] == password:
  35.                     # 查询Session列表是否存在
  36.                     select_session = RunSqlite("./database.db","SessionAuthDB","select","token",f"username='{username}'")
  37.                     if select_session != []:
  38.                         ref = {"message": ""}
  39.                         ref["message"] = select_session[0][0]
  40.                         return json.dumps(ref, ensure_ascii=False)
  41.                     # Session不存在则需要重新生成
  42.                     else:
  43.                         # 生成并写入token和过期时间戳
  44.                         token = ''.join(random.sample(string.ascii_letters + string.digits, 32))
  45.                         # 设置360秒周期,过期时间
  46.                         time_stamp = int(time.time()) + 360
  47.                         insert = RunSqlite("./database.db", "SessionAuthDB", "insert", "username,token,invalid_date", f"'{username}','{token}',{time_stamp}")
  48.                         if insert == True:
  49.                             ref = {"message": ""}
  50.                             ref["message"] = token
  51.                             return json.dumps(ref, ensure_ascii=False)
  52.                 else:
  53.                     return json.dumps("{'message': '用户名或密码错误'}", ensure_ascii=False)
  54.             else:
  55.                 return json.dumps("{'message': '输入参数不可用'}", ensure_ascii=False)
  56.     return json.dumps("{'message': '未知错误'}", ensure_ascii=False)# 检查登录状态 token是否过期的装饰器def login_check(func):    @wraps(func)    def wrapper(*args, **kwargs):        print("处理登录逻辑部分: {}".format(request.url))        # 得到token 验证是否登陆了,且token没有过期        local_timestamp = int(time.time())        get_token = request.headers.get("token")        # 验证传入参数是否合法        if CheckParameters(get_token) == True:            select = RunSqlite("./database.db","SessionAuthDB","select","token,invalid_date",f"token='{get_token}'")            print(select)            # 判断是否存在记录,如果存在,在判断时间戳是否合理            if select != []:                # 如果当前时间与数据库比对,大于说明过期了需要删除原来的,让用户重新登录                if local_timestamp >= int(select[0][1]):                    print("时间戳过期了")                    # 删除原来的Token                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"token='{get_token}'","none")                    if delete == True:                        return json.dumps("{'token': 'Token 已过期,请重新登录获取'}", ensure_ascii=False)                    else:                        return json.dumps("{'token': '数据库删除异常,请联系开发者'}", ensure_ascii=False)                else:                    # 验证Token是否一致                    if select[0][0] == get_token:                        print("Token验证正常,继续执行function_ptr指向代码.")                        # 返回到原函数                        return func(*args, **kwargs)                    else:                        print("Token验证错误 {}".format(select))                        return json.dumps("{'token': 'Token 传入错误'}", ensure_ascii=False)            # 装饰器调用原函数            # function_ptr = func(*args, **kwargs)        return json.dumps("{'token': 'Token 验证失败'}", ensure_ascii=False)    return wrapper# 获取参数函数@app.route("/GetPage", methods=["POST"])@login_checkdef GetPage():    if request.method == "POST":        # 获取参数信息        obtain_dict = request.form.to_dict()        if len(obtain_dict) != 0 and len(obtain_dict) == 1:            pagename = obtain_dict["pagename"]            print("查询名称: {}".format(obtain_dict["pagename"]))            # 相应头的完整写法            req = Response(response="ok", status=200, mimetype="application/json")            req.headers["Content-Type"] = "text/json; charset=utf-8"            req.headers["Server"] = "LyShark Server 1.0"            req.data = json.dumps("{'message': 'hello world'}")            return req        else:            return json.dumps("{'message': '传入参数错误,请携带正确参数请求'}", ensure_ascii=False)    return json.dumps("{'token': '未知错误'}", ensure_ascii=False)# 用户注册函数@app.route("/register", methods=["POST"])def Register():    if request.method == "POST":        obtain_dict = request.form.to_dict()        if len(obtain_dict) != 0 and len(obtain_dict) == 2:            print("用户名: {} 密码: {}".format(obtain_dict["username"], obtain_dict["password"]))            reg_username = obtain_dict["username"]            reg_password = obtain_dict["password"]            # 验证是否合法            if CheckParameters(reg_username, reg_password) == False:                return json.dumps("{'message': '传入用户名密码不合法'}", ensure_ascii=False)            # 查询用户是否存在            select = RunSqlite("database.db","UserAuthDB","select","id",f"username='{reg_username}'")            if select != []:                return json.dumps("{'message': '用户名已被注册'}", ensure_ascii=False)            else:                insert = RunSqlite("database.db","UserAuthDB","insert","username,password",f"'{reg_username}','{reg_password}'")                if insert == True:                    return json.dumps("{'message': '注册成功'}", ensure_ascii=False)                else:                    return json.dumps("{'message': '注册失败'}", ensure_ascii=False)        else:            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)# 密码修改函数@app.route("/modify", methods=["POST"])@login_checkdef modify():    if request.method == "POST":        obtain_dict = request.form.to_dict()        if len(obtain_dict) != 0 and len(obtain_dict) == 1:            mdf_password = obtain_dict["password"]            get_token = request.headers.get("token")            print("获取token: {} 修改后密码: {}".format(get_token,mdf_password))            # 验证是否合法            if CheckParameters(get_token, mdf_password) == False:                return json.dumps("{'message': '传入密码不合法'}", ensure_ascii=False)            # 先得到token对应用户名            select = RunSqlite("./database.db","SessionAuthDB","select","username",f"token='{get_token}'")            if select != []:                # 接着直接修改密码即可                modify_username = str(select[0][0])                print("得到的用户名: {}".format(modify_username))                update = RunSqlite("database.db","UserAuthDB","update",f"username='{modify_username}'",f"password='{mdf_password}'")                if update == True:                    # 删除原来的token,让用户重新获取                    delete = RunSqlite("./database.db","SessionAuthDB","delete",f"username='{modify_username}'","none")                    print("删除token状态: {}".format(delete))                    return json.dumps("{'message': '修改成功,请重新登录获取Token'}", ensure_ascii=False)                else:                    return json.dumps("{'message': '修改失败'}", ensure_ascii=False)            else:                return json.dumps("{'message': '不存在该Token,无法修改密码'}", ensure_ascii=False)        else:            return json.dumps("{'message': '传入参数个数不正确'}", ensure_ascii=False)    return json.dumps("{'message': '未知错误'}", ensure_ascii=False)if __name__ == '__main__':    app.run(debug=True)
复制代码
首先需要在Web页面访问http://127.0.0.1/create路径实现对数据库的初始化,并打开Postman工具,通过传入参数来使用这个案例。


来源:https://www.cnblogs.com/LyShark/p/17859045.html
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

本帖子中包含更多资源

您需要 登录 才可以下载或查看,没有账号?立即注册

x

举报 回复 使用道具