因为一直对于服务器端消息推送比较感兴趣,加上正在自学Python、Flask,所以利用Html5的Server-Sent Event以及Redis的pub/sub模型实现了一个基于Flask-SSE-Redis的服务器端推送程序。
Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C.
服务器端推送事件(Server-Sent Event)是Html5中的一个API,客户端向服务器发送请求,同时创建一个EventSource对象; 服务器端保持这个连接(Connection:keep-alive),当有事件发生时,服务器端会将数据写入该连接并且传输到客户端。
该事件对象(EventSource)有一个data属性,表示从服务器端接受的数据(同时在服务器端构建事件时也需要有该属性); 还有一个type属性,表示是什么事件,客户端可以为不同的类型的事件注册不同的回调。
var notifications = new EventSource('/notifications')
// addEventListener第一个参数为事件类型,第二个参数为回调函数
notifications.addEventListener('new_question', function(event){
// 回调函数的具体逻辑
});
notifications.addEventListener('new_answer', function(event){
// 回调函数的具体逻辑
});
而对于服务器,需要做的是当有新的内容需要推送时,构建事件(比如说new_question),通过连接发送给客户端。在Flask中提供了一个相应的功能: 流式的内容响应,它是通过响应(Response)和生成器(generator)实现的。
一个简单的案例,向客户端不停的发送1 ~ 6之间的数字:
import itertools
import time
from flask import Flask, Response
app = Flask(__name__)
@app.route("/")
def index():
def generator():
for num in itertools.cycle('123456'):
yield "data: %s \n" % num
time.sleep(0.5)
# 注意响应头中的content_type
return Response(generator(), content_type='text/event-stream')
if __name__ == '__main__':
app.debug = True
app.run(threaded=True)
访问localhost:5000
会看到页面上不停的打印如下数据:
data: 1
data: 2
data: 3
data: 4
data: 5
data: 6
data: 1
data: 2
data: 3
data: 4
data: 5
data: 6
....
类似的如果我们有一个消息订阅,比如说改关注了某个话题,关注了某个用户。当话题有新回复,用户有新活动的时候,所有的订阅者的就会有收到提醒或者是通知,所以很自然的想到Redis的pub/sub模型, Github上已经有用户对于基于此进行封装Flask-see,通过pip install flask_sse
安装。
服务器端
注册一个负责推送的蓝图sse(上面的flask-see包中的对象),定义好相关的endpoint以及视图函数(view_func),上面的版本是一个不断打印数字的。而对于Redis的话,就应该订阅指定Redis频道并且阻塞式获取最新的消息,然后推送到客户端。
在flask-see.py
中的相关代码如下:
# sse.stream从redis的指定channel获取数据
sse.add_url_rule(rule="", endpoint="stream", view_func=sse.stream)
app = Flask(__name__)
# 连接到Redis服务器以发布和订阅消息
app.config['REDIS_URL'] = "redis://127.0.0.1"
# 注册蓝图sse, 并且定义该模块的URL前缀;
app.register_blueprint(sse, url_prefix='/stream')
@app.route('/')
def index():
return render_template('index.html')
@app.route('/notifications')
def fetch_updates():
"""
该步骤实际上是向redis的sse频道(默认是sse), 发布了一条消息, 而且是以SSE格式发送的
即 data: "", 而type为事件类型, 也就是前端监听的事件类型
publish ---> self.redis.publish(channel=channel)
"""
sse.publish({
'question': 'which programming is the best?',
'value': 'Absolutely, PHP',
'who': 'allen'
}, type='new_answers')
return json.dumps({"state": "ok"})
客户端
向指定模块发送一个长连接,等待内容推送。template/index页面的js实现如下:
<script>
// 将指定的endpoint: sse.stream转换成对应的URL
var source = new EventSource("");
source.addEventListener('server_closed', function(event){
source.close();
});
// 注册新回答事件的回调
source.addEventListener('new_answers', function(event){
console.log(event)
var data = JSON.parse(event.data)
$('#data').html(data.question + ' : ' + data.value)
}, false);
</script>
<body>
<div id="data">received pushed data</div>
</body>
启动Redis服务器,订阅sse频道(SUBSCRIBE sse
)。
进入http://localhost:5000/
,然后再发送一个http://localhost:5000/notifications
请求,这时候index页面的中div内容会变成which programming is the best? : Absolutely, PHP
。同时Redis客户端也会显示的内容。
1) "message"
2) "sse"
3) "{\"data\": {\"question\": \"which programming is the best?\", \"value\": \"Absolutely, PHP\", \"who\": \"allen\"}, \"type\": \"new_answers\"}"
实际上fetch_updates
不执行也能到达类似的效果,因为推送的数据源是Redis指定频道的消息,所以直接在Redis客户端向指定频道发布一条消息,页面上同样可以收到消息。
127.0.0.1:6379> publish sse "{\"data\": {\"question\": \"which programming is the best?\", \"value\": \"Absolutely, Python\", \"who\": \"zml\"}, \"type\": \"new_answers\"}"
(integer) 5
上述命令运行之后,页面和Redis订阅了sse的客户端将会发生了相应的改变。
以上就是基于Flask-SSE-Redis的一个简单推送模型,完整版的代码Github