Home > Archives > Flask基础之利用SSE实现服务器推送

Flask基础之利用SSE实现服务器推送

Published on

因为一直对于服务器端消息推送比较感兴趣,加上正在自学Python、Flask,所以利用Html5的Server-Sent Event以及Redis的pub/sub模型实现了一个基于Flask-SSE-Redis的服务器端推送程序。

Server-sent events (SSE) is a technology where a browser receives automatic updates from a server via HTTP connection. The Server-Sent Events EventSource API is standardized as part of HTML5[1] by the W3C.

服务器端推送事件(Server-Sent Event)是Html5中的一个API,客户端向服务器发送请求,同时创建一个EventSource对象; 服务器端保持这个连接(Connection:keep-alive),当有事件发生时,服务器端会将数据写入该连接并且传输到客户端。

该事件对象(EventSource)有一个data属性,表示从服务器端接受的数据(同时在服务器端构建事件时也需要有该属性); 还有一个type属性,表示是什么事件,客户端可以为不同的类型的事件注册不同的回调。

var notifications = new EventSource('/notifications')

// addEventListener第一个参数为事件类型,第二个参数为回调函数
notifications.addEventListener('new_question', function(event){
     // 回调函数的具体逻辑
});

notifications.addEventListener('new_answer', function(event){
     // 回调函数的具体逻辑
});

而对于服务器,需要做的是当有新的内容需要推送时,构建事件(比如说new_question),通过连接发送给客户端。在Flask中提供了一个相应的功能: 流式的内容响应,它是通过响应(Response)和生成器(generator)实现的。

一个简单的案例,向客户端不停的发送1 ~ 6之间的数字:

import itertools
import time
from flask import Flask, Response

app = Flask(__name__)

@app.route("/")
def index():
    def generator():
        for num in itertools.cycle('123456'):
            yield "data: %s \n" % num
            time.sleep(0.5)
    # 注意响应头中的content_type
    return Response(generator(), content_type='text/event-stream')

if __name__ == '__main__':
    app.debug = True
    app.run(threaded=True)

访问localhost:5000会看到页面上不停的打印如下数据:

data: 1 
data: 2 
data: 3 
data: 4 
data: 5 
data: 6 
data: 1 
data: 2 
data: 3 
data: 4 
data: 5 
data: 6 
....

类似的如果我们有一个消息订阅,比如说改关注了某个话题,关注了某个用户。当话题有新回复,用户有新活动的时候,所有的订阅者的就会有收到提醒或者是通知,所以很自然的想到Redis的pub/sub模型, Github上已经有用户对于基于此进行封装Flask-see,通过pip install flask_sse安装。

服务器端

注册一个负责推送的蓝图sse(上面的flask-see包中的对象),定义好相关的endpoint以及视图函数(view_func),上面的版本是一个不断打印数字的。而对于Redis的话,就应该订阅指定Redis频道并且阻塞式获取最新的消息,然后推送到客户端。

flask-see.py中的相关代码如下:

# sse.stream从redis的指定channel获取数据
sse.add_url_rule(rule="", endpoint="stream", view_func=sse.stream)
app = Flask(__name__)
# 连接到Redis服务器以发布和订阅消息
app.config['REDIS_URL'] = "redis://127.0.0.1"
# 注册蓝图sse, 并且定义该模块的URL前缀;
app.register_blueprint(sse, url_prefix='/stream')

@app.route('/')
def index():
    return render_template('index.html')

@app.route('/notifications')
def fetch_updates():
    """
         该步骤实际上是向redis的sse频道(默认是sse), 发布了一条消息, 而且是以SSE格式发送的
         即 data: "", 而type为事件类型, 也就是前端监听的事件类型
         publish ---> self.redis.publish(channel=channel)
    """
    sse.publish({
        'question': 'which programming is the best?',
        'value': 'Absolutely, PHP',
        'who': 'allen'
    }, type='new_answers')
    return json.dumps({"state": "ok"})

客户端

向指定模块发送一个长连接,等待内容推送。template/index页面的js实现如下:

<script>
    // 将指定的endpoint: sse.stream转换成对应的URL
    var source = new EventSource("");

    source.addEventListener('server_closed', function(event){
        source.close();
    });

    // 注册新回答事件的回调
    source.addEventListener('new_answers', function(event){
        console.log(event)
        var data = JSON.parse(event.data)
        $('#data').html(data.question + ' : ' + data.value)
    }, false);
</script>

<body>
    <div id="data">received pushed data</div>
</body>

启动Redis服务器,订阅sse频道(SUBSCRIBE sse)。 进入http://localhost:5000/,然后再发送一个http://localhost:5000/notifications请求,这时候index页面的中div内容会变成which programming is the best? : Absolutely, PHP。同时Redis客户端也会显示的内容。

1) "message"
2) "sse"
3) "{\"data\": {\"question\": \"which programming is the best?\", \"value\": \"Absolutely, PHP\", \"who\": \"allen\"}, \"type\": \"new_answers\"}"

实际上fetch_updates不执行也能到达类似的效果,因为推送的数据源是Redis指定频道的消息,所以直接在Redis客户端向指定频道发布一条消息,页面上同样可以收到消息。

127.0.0.1:6379> publish sse "{\"data\": {\"question\": \"which programming is the best?\", \"value\": \"Absolutely, Python\", \"who\": \"zml\"}, \"type\": \"new_answers\"}"
(integer) 5

上述命令运行之后,页面和Redis订阅了sse的客户端将会发生了相应的改变。

以上就是基于Flask-SSE-Redis的一个简单推送模型,完整版的代码Github

参考

> Wiki中SSE的定义

声明: 本文采用 BY-NC-SA 授权。转载请注明转自: Allen写字的地方