翼度科技»论坛 编程开发 python 查看内容

Python使用WebSocket和SSE实现HTTP服务器消息推送方式

5

主题

5

帖子

15

积分

新手上路

Rank: 1

积分
15
很多时候我们需要实时获取最新数据,但是传统意义上的HTTP请求,必须由客户端向服务端发起请求,服务端再返回相应的数据。
那如果我们需要获取实时数据,就要通过HTTP轮询,客户端不间断的向服务器发起请求。
这样不断的的请求不但严重加大服务器的压力,还可能因为网络延迟而影响数据的时效性。
下面介绍两种方法能够很好的满足业务的需求。

一、WebSocket

WebSocket是HTML5开始提供的一种在单个 TCP 链接上进行全双工通信的协议。

  • 优点:双工通信
  • 缺点:需专门定义数据协议,解析数据流,且部分服务器支持不完善,后台例如java spring boot 2.1.2 仅支持websocket 1.0(最高已达1.3)


1.客户端代码

python 3+ 代码
  1. #python 3+
  2. import threading
  3. import websocket


  4. class Client:
  5.     def __init__(self,url):
  6.             self.url = url
  7.         self.ws = None
  8.         self.enable = True


  9.     def on_message(self,response):
  10.                 self.enable = False
  11.         print(response)


  12.     def on_error(self,error):
  13.         # print(ws)
  14.         print(error)


  15.     def on_close(self):
  16.             self.enable = True
  17.         print(ws)
  18.       

  19.     def on_open(self):
  20.                 print('open')
  21.    
  22.     def start_func(self):

  23.         while self.enable:
  24.             websocket.enableTrace(True)
  25.             self.ws = websocket.WebSocketApp(self.url,
  26.                                         on_message=self.on_message,
  27.                                         # on_data=on_data,
  28.                                         on_error=self.on_error,
  29.                                         on_open=self.on_open,
  30.                                         on_close=self.on_close, )


  31.             self.ws.run_forever(ping_interval=60, ping_timeout=5)

  32. if __name__ == "__main__":
  33.         cli = Client(url = 'wss://api.zb.live/websocket' )
  34.     t1 = threading.Thread(target=cli.start_func_zb)
  35.     t1.start()
复制代码
javascript 代码
  1. var ws = new WebSocket("wss://echo.websocket.org");

  2. ws.onopen = function(evt) {
  3.   console.log("Connection open ...");
  4.   ws.send("Hello WebSockets!");
  5. };

  6. ws.onmessage = function(evt) {
  7.   console.log( "Received Message: " + evt.data);
  8.   ws.close();
  9. };

  10. ws.onclose = function(evt) {
  11.   console.log("Connection closed.");
  12. };      
复制代码
2.服务端代码
  1. from websocket_server import WebsocketServer

  2. class WSSocketObj:
  3.     def __init__(self,host=None,port = 8131):
  4.        self.host = host if host else '127.0.0.1'
  5.        self.port = port

  6.     # 当新的客户端连接时会提示
  7.     def new_client(self,client, server,):
  8.         print("当新的客户端连接时会提示:%s" % client['id'])
  9.         dd = 122
  10.         server.send_message_to_all("Hey all, a new client has joined us")

  11.     # 当旧的客户端离开
  12.     def client_left(self,client, server):
  13.         print("客户端%s断开" % client['id'])

  14.     # 接收客户端的信息。
  15.     def message_received(self,client, server, message):
  16.         print("Client(%d) said: %s" % (client['id'], message))
  17.         # server.send_message_to_all(message) #发送消息给 全部客户端
  18.         server.send_message(client, 'hello,client')  # 发送消息给指定客户端

  19.     def run_server(self):
  20.         server = WebsocketServer(self.port, self.host)
  21.         server.set_fn_new_client(self.new_client)
  22.         server.set_fn_client_left(self.client_left)
  23.         server.set_fn_message_received(self.message_received)
  24.         server.run_forever()

  25. if __name__ == '__main__':

  26.     WSSocketObj().run_server()
复制代码
二、SSE(Server-Sent Events,服务器发送事件)

SSE ( Server-sent Events )通俗解释起来就是一种基于HTTP的,以流的形式由服务端持续向客户端发送数据的技术,是 WebSocket 的一种轻量代替方案。

  • 优点:开发简单,和传统的http开发几乎无任何差别,客户端开发简单,有标准支持(EventSource)
  • 缺点:和websocket相比,只能单工通信,建立连接后,只能由服务端发往客户端,且占用一个连接,如需客户端向服务端通信,需额外打开一个连接
1.客户端代码

python
  1. # 第一种方式
  2. def sse_client():
  3.     """
  4.     pip install sseclient-py
  5.     只对于返回标准SSE格式的请求可用 格式:event: {EVENT}\nretry: 10000\ndata: {DATA}\n\n
  6.     :return:
  7.     """
  8.     import requests
  9.     # res = requests.request('get', url, json=data, stream=True, headers={'Accept': 'text/event-stream'})
  10.     client = requests.post(url, json=data, stream=True, headers={'Accept': 'text/event-stream'})
  11.     client = sseclient.SSEClient(client)
  12.     for i in client.events():
  13.         print(i.data)

  14. # 第二种方式
  15. def sse_with_requests():
  16.     headers = {"Accept": "text/event-stream"}
  17.     r = requests.post(url, headers=headers, json=data, stream=True)
  18.     r.encoding = 'utf-8'
  19.     for chunk in r.iter_content(chunk_size=None, decode_unicode=True):
  20.         # 处理接收到的数据块
  21.         print("Received:", chunk)
复制代码
javascript
第一种方式:
  1. //判断是否支持SSE
  2. if('EventSource' in window){

  3. //初始化SSE
  4. var url="http:localhost:8000/stream";
  5. var source=new EventSource(url);

  6. // 连接成功后会触发 open 事件
  7. source.onopen=(event)=>{

  8. console.log("开启SSE");

  9. }

  10. // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
  11. source.onmessage=(event)=>{

  12. var data=event.data;

  13. $("body").append($("<p>").text(data));

  14. }

  15. //监听like事件
  16. source.addEventListener('like',function(event){

  17. var data=event.data;

  18. $("body").append($("<p>").text(data));
  19. },false);

  20. // 连接异常时会触发 error 事件并自动重连
  21. source.onerror=(event)=>{

  22. console.log(event);

  23. }
复制代码
第二种方式:使用 addEventListener 方法来添加相应的事件处理方法
  1. if (window.EventSource) {
  2.   // 创建 EventSource 对象连接服务器
  3.   const source = new EventSource('http://localhost:2000');

  4.   // 连接成功后会触发 open 事件
  5.   source.addEventListener('open', () => {
  6.     console.log('Connected');
  7.   }, false);

  8.   // 服务器发送信息到客户端时,如果没有 event 字段,默认会触发 message 事件
  9.   source.addEventListener('message', e => {
  10.     console.log(`data: ${e.data}`);
  11.   }, false);

  12.   // 自定义 EventHandler,在收到 event 字段为 slide 的消息时触发
  13.   source.addEventListener('slide', e => {
  14.     console.log(`data: ${e.data}`); // => data: 7
  15.   }, false);

  16.   // 连接异常时会触发 error 事件并自动重连
  17.   source.addEventListener('error', e => {
  18.     if (e.target.readyState === EventSource.CLOSED) {
  19.       console.log('Disconnected');
  20.     } else if (e.target.readyState === EventSource.CONNECTING) {
  21.       console.log('Connecting...');
  22.     }
  23.   }, false);
  24. } else {
  25.   console.error('Your browser doesn\'t support SSE');
  26. }
复制代码
EventSource从父接口
  1. EventTarget
复制代码
中继承了属性和方法,其内置了 3 个
  1. EventHandler
复制代码
属性、2 个只读属性和 1 个方法:
EventHandler 属性

    1. EventSource.onopen
    复制代码
    在连接打开时被调用。
    1. EventSource.onmessage
    复制代码
    在收到一个没有 event 属性的消息时被调用。
    1. EventSource.onerror
    复制代码
    在连接异常时被调用。 只读属性
    1. EventSource.readyState
    复制代码
    一个 unsigned short 值,代表连接状态。可能值是 CONNECTING (0), OPEN (1), 或者 CLOSED (2)。
    1. EventSource.url
    复制代码
    连接的 URL。 方法
    1. EventSource.close()
    复制代码
    关闭连接
EventSource 对象的
  1. onmessage
复制代码
属性的作用类似于
  1. addEventListener( ‘ message ’ )
复制代码

2.服务端代码(基于Flask)
  1. import json
  2. import time

  3. from flask import Flask, request
  4. from flask import Response
  5. from flask import render_template

  6. app = Flask(__name__)


  7. def get_message():
  8.     """this could be any function that blocks until data is ready"""
  9.     time.sleep(1)
  10.     s = time.ctime(time.time())
  11.     return json.dumps(['当前时间:' + s , 'a'], ensure_ascii=False)


  12. @app.route('/')
  13. def hello_world():
  14.     return render_template('index.html')


  15. @app.route('/stream')
  16. def stream():
  17.     user_id = request.args.get('user_id')
  18.     print(user_id)
  19.     def eventStream():
  20.         id = 0
  21.         while True:
  22.             id +=1
  23.             # wait for source data to be available, then push it

  24.             yield 'id: {}\nevent: add\ndata: {}\n\n'.format(id,get_message())


  25.     return Response(eventStream(), mimetype="text/event-stream")


  26. if __name__ == '__main__':
  27.     app.run()
复制代码
因为SSE是http请求,可是又限定是一个长链接,因此要设置MIME类型为text/event-stream。返回的为字符串。
消息的格式
服务器向浏览器发送的 SSE 数据,必须是 UTF-8 编码的文本;
每一次发送的信息,由若干个message组成,每一个message之间用\n\n分隔。每一个message内部由若干行组成

  • 格式
  1. [field]:value\n
复制代码
其中在规范中为消息定义了 4 个字段

  • id 表明id
  • event 表明消息的事件类型
  • data 消息的数据字段
  • retry 客户端重连的时间。只接受整数,单位是毫秒。如果这个值不是整数则会被自动忽略
需要注意的是,id字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
一个很有意思的地方是,规范中规定以冒号开头的消息都会被当作注释,一条普通的注释(
  1. :\n\n
复制代码
)对于服务器来说只占 5个字符,但是发送到客户端上的时候不会触发任何事件,这对客户端来说是非常友好的。所以注释一般被用于维持服务器和客户端的长连接。

3.SSE使用注意事项

1、SSE 如何保证数据完整性
客户端在每次接收到消息时,会把消息的 id 字段作为内部属性 Last-Event-ID 储存起来。
SSE 默认支持断线重连机制,在连接断开时会 触发 EventSource 的 error 事件,同时自动重连。再次连接成功时EventSource 会把 Last-Event-ID 属性作为请求头发送给服务器,这样服务器就可以根据这个 Last-Event-ID作出相应的处理。
这里需要注意的是,id 字段不是必须的,服务器有可能不会在消息中带上 id 字段,这样子客户端就不会存在 Last-Event-ID这个属性。所以为了保证数据可靠,我们需要在每条消息上带上 id 字段。
2、减少开销
在 SSE 的草案中提到,“text/event-stream” 的 MIME 类型传输应当在静置 15秒后自动断开。在实际的项目中也会有这个机制,但是断开的时间没有被列入标准中。
为了减少服务器的开销,我们也可以有目的的断开和重连。
简单的办法是服务器发送一个 关闭消息并指定一个重连的时间戳,客户端在触发关闭事件时关闭当前连接并创建 一个计时器,在重连时把计时器销毁。
  1. function connectSSE() {
  2.   if (window.EventSource) {
  3.     const source = new EventSource('http://localhost:2000');
  4.     let reconnectTimeout;

  5.     source.addEventListener('open', () => {
  6.       console.log('Connected');
  7.       clearTimeout(reconnectTimeout);
  8.     }, false);

  9.     source.addEventListener('pause', e => {
  10.       source.close();
  11.       const reconnectTime = +e.data;
  12.       const currentTime = +new Date();
  13.       reconnectTimeout = setTimeout(() => {
  14.         connectSSE();
  15.       }, reconnectTime - currentTime);
  16.     }, false);
  17.   } else {
  18.     console.error('Your browser doesn\'t support SSE');
  19.   }
  20. }

  21. connectSSE();
复制代码
总结

以上为个人经验,希望能给大家一个参考,也希望大家多多支持脚本之家。

来源:https://www.jb51.net/python/33122674h.htm
免责声明:由于采集信息均来自互联网,如果侵犯了您的权益,请联系我们【E-Mail:cb@itdo.tech】 我们会及时删除侵权内容,谢谢合作!

举报 回复 使用道具