如何打造一个高性能,高大并发推送服务器源码的消息推送系统

Worktile中百万级实时消息推送服务的实现
发表于 11:43|
来源伯乐在线|
作者Worktile研发团队
摘要:相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果。
在团队协同工具
的使用过程中,你会发现无论是右上角的消息通知,还是在任务面板中拖动任务,还有用户的在线状态,都是实时刷新。Worktile中的推送服务是采用的是基于XMPP协议、Erlang语言实现的Ejabberd,并在其源码基础上,结合我们的业务,对源码作了修改以适配我们自身的需求。另外,基于AMQP协议也可以作为实时消息推送的一种选择,踢踢网就是采用
RabbitMQ+STOMP协议实现的消息推送服务。本文将结合我在Worktile和踢踢网的项目实践,介绍下消息推送服务的具体实现。
实时推送的几种实现方式
相较于手机端的消息推送(一般都是以Socket方式实现),WEB端是基于HTTP协议,很难像TCP一样保持长连接。但随着技术的发展,出现了WebSocket,Comet等新的技术可以达到类似长连接的效果,这些技术大体可分为以下几类:
1)短轮询。页面端通过JS定时异步刷新,这种方式实时效果较差。
2)长轮询。页面端通过JS异步请求服务端,服务端在接收到请求后,如果该次请求没有数据,则挂起这次请求,直到有数据到达或时间片(服务端设定)到,则返回本次请求,客户端接着下一次请求。示例如下:
3)WebSocket。浏览器通过WebSocket协议连接服务端,实现了浏览器和服务器端的全双工通信。需要服务端和浏览器都支持WebSocket协议。
以上几种方式中,方式1实现较简单,但效率和实时效果较差。方式2对服务端实现的要求比较高,尤其是并发量大的情况下,对服务端的压力很大。方式3效率较高,但对较低版本的浏览器不支持,另外服务端也需要有支持WebSocket的实现。Worktile的WEB端实时消息推送,采用的是XMPP扩展协议XEP-0124
),本质是采用方式2长轮询的方式。踢踢网则采用了WebSocket连接RabbitMQ的方式实现,下面我会具体介绍如何用这两种方式实现Server
运行时环境准备
服务端的实现中,无论采用Ejabberd还是RabbitMQ,都是基于Erlang语言开发的,所以必须安装Erlang运行时环境。Erlang是一种函数式语言,具有容错、高并发的特点,借助OTP的函数库,很容易构建一个健壮的分布式系统。目前,基于Erlang开发的产品有,数据库方面:Riak(Dynamo实现)、CouchDB,
Webserver方面:Cowboy、Mochiweb, 消息中间件有RabbitMQ等。对于服务端程序员来说,Erlang提供的高并发、容错、热部署等特性是其他语言无法达到的。无论在实时通信还是在游戏程序中,用Erlang可以很容易为每一个上线用户创建一个对应的Process,对一台4核8个G的服务器来说,承载上百万个这样的Process是非常轻松的事。下图是Erlang程序发起Process的一般性示意图:
如图所示,Session Manager(or Gateway)负责为每个用户(UID)创建相对应的Process, 并把这个对应关系(MAP)存放到数据表中。每个Process则对应用户数据,并且他们之间可以相互发送消息。Erlang的优势就是在内存足够的情况下创建上百万个这样的Process,而且它的创建和销毁比JAVA的Thread要轻量的多,两者不是一个数量级的。
好了,我们现在开始着手Erlang环境的搭建(实验的系统为Ubuntu&12.04, 4核8个G内存):
1、依赖库安装
sudo apt-get install build-essential
sudo apt-get install libncurses5-dev
sudo apt-get install libssl-dev libyaml-dev
sudo apt-get install m4
sudo apt-get install unixodbc unixodbc-dev
sudo apt-get install freeglut3-dev libwxgtk2.8-dev
sudo apt-get install xsltproc
sudo apt-get install fop tk8.5 libxml2-utils
2、官网下载OTP源码包(
), 解压并安装:
tar zxvf otpsrcR16B01.tar.gz
cd otpsrcR16B01
make & make install
至此,erlang运行环境就完成了。下面将分别介绍rabbitmq和ejabberd构建实时消息服务。
基于RabbitMQ的实时消息服务
RabbitMQ是在业界广泛应用的消息中间件,也是对AMQP协议实现最好的一种中间件。AMQP协议中定义了Producer、 Consumer、MessageQueue、Exchange、Binding、Virtual
Host等实体,他们的关系如下图所示:
消息发布者(Producer)连接交换器(Exchange), 交换器和消息队列(Message Queue)通过KEY进行Binding,Binding是根据Exchange的类型(分为Fanout、Direct、Topic、Header)分别对消息作不同形式的派发。Message
Queue又分为Durable、Temporary、Auto-Delete三种类型,Durable Queue是持久化队列,不会因为服务ShutDown而消失,Temporary
Queue则服务重启后会消失,Auto-Delete则是在没有Consumer连接时自动删除。另外RabbitMQ有很多第三方插件,可以基于AMQP协议基础之上做出很多扩展的应用。下面我们将介绍WEB
STOMP插件构建基于AMQP之上的STOMP文本协议,通过浏览器WebSocket达到实时的消息传输。系统的结构如图:
如图所示,WEB端我们使用STOMP.JS和SockJS.JS与RabbitMQ的WEB STOMP Plugin通信,手机端可以用STOMPj,
Gozirra(Android)或者Objc-STOMP(IOS)通过STOMP协议与RabbitMQ收发消息。因为我们是实时消息系统通常都是要与已有的用户系统结合,RabbitMQ可以通过第三方插件RabbitMQ-AYTH-Backend-HTTP来适配已有的用户系统,这个插件可以通过HTTP接口完成用户连接时的认证过程。当然,认证方式还有LDAP等其他方式。下面介绍具体步骤:
)下载最新版本的源码包,解压并安装:
tar zxf rabbitmq-server-x.x.x.tar.gz
cd rabbitmq-server-x.x.x
make & make install为RabbitMQ安装WEB-STOMP插件
cd /path/to/your/rabbitmq
./sbin/rabbitmq-plugins enable rabbitmq_web_stomp
./sbin/rabbitmq-plugins enable rabbitmq_web_stomp_examples
./sbin/rabbitmqctl stop
./sbin/rabbitmqctl start
./sbin/rabbitmqctl status
将会显示下图所示的运行的插件列表
安装用户授权插件
cd /path/to/your/rabbitmq/plugins
wget &a href="/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez"&/community-plugins/v3.3.x/rabbitmq_auth_backend_http-3.3.x-e7ac6289.ez&/a&
./sbin/rabbitmq-plugins enable rabbitmq_auth_backend_http编辑RabbitMQ.Config文件(默认存放于/ECT/RabbitMQ/下),添加:
{rabbit, [{auth_backends, [rabbit_auth_backend_http]}]},
{rabbitmq_auth_backend_http,
[{user_path, “http://your-server/auth/user”},
{vhost_path, “http://your-server/auth/vhost”},
{resource_path, “http://your-server/auth/resource”}
其中,User_Path是根据用户名密码进行校验,VHOST_Path是校验是否有权限访问VHOST, Resource_Path是校验用户对传入的Exchange、Queue是否有权限。我下面的代码是用Node.js实现的这三个接口的示例:
var express = require('express');
var app = express();
app.get('/auth/user', function(req, res){
var name = req.query.
var pass = req.query.
console.log("name : " + name + ", pass : " + pass);
if(name === 'guest' && pass === "guest"){
console.log("allow");
res.send("allow");
res.send('deny');
app.get('/auth/vhost', function(req, res){
console.log("/auth/vhost");
res.send("allow");
app.get('/auth/resource', function(req, res){
console.log("/auth/resource");
res.send("allow");
app.listen(3000);浏览器端JS实现,示例代码如下:
var ws = new SockJS('http://' + window.location.hostname + ':15674/stomp');
var client = Stomp.over(ws);
// SockJS does not support heart-beat: disable heart-beats
client.heartbeat.outgoing = 0;
client.heartbeat.incoming = 0;
client.debug = pipe('#second');
var print_first = pipe('#first', function(data) {
client.send('/exchange/feed/user_x', {"content-type":"text/plain"}, data);
var on_connect = function(x) {
id = client.subscribe("/exchange/feed/user_x", function(d) {
print_first(d.body);
var on_error = function() {
console.log('error');
client.connect('guest1', 'guest1', on_connect, on_error, '/');
需要说明的时,在这里我们首先要在RabbitMQ实例中创建Feed这个Exchange,我们用STOMP.JS连接成功后,根据当前登陆用户的ID(user_x)绑定到这个Exchange,即Subscribe(“/exchange/feed/user_x”,
…) 这个操作的行为,这样在向RabbitMQ中Feed Exchange发送消息并指定用户ID(user_x)为KEY,页面端就会通过WEB
Socket实时接收到这条消息。
到目前为止,基于RabbitMQ+STOMP实现WEB端消息推送就已经完成,其中很多的细节需要小伙伴们亲自去实践了,这里就不多说了。实践过程中可以参照官方文档:
以上的实现是我本人在踢踢网时采用的方式,下面接着介绍一下现在在Worktile中如何通过Ejabberd实现消息推送。
基于Ejabberd的实时消息推送
与RabbitMQ不同,Ejabberd是XMPP协议的一种实现,与AMQP相比,XMPP广泛应用于即时通信领域。XMPP协议的实现有很多种,比如JAVA的OpenFire,但相较其他实现,Ejabberd的并发性能无疑使最优秀的。XMPP协议的前身是Jabber协议,早期的Jabber协议主要包括在线状态(Presence)、好友花名册(Roster)、IQ(Info/Query)几个部分。现在Jabber已经成为RFC的官方标准,如RFC2799,RFC4622,RFC6121,以及XMPP的扩展协议(XEP)。Worktile
Web端的消息提醒功能就是基于XEP-0124、XEP-0206定义的BOSH扩展协议。
由于自身业务的需要,我们对Ejabberd的用户认证和好友列表模块的源码进行修改,通过Redis保存用户的在线状态,而不是Mnesia和MySQL。另外好友这块我们是从已有的数据库中(MongoDB)中获取项目或团队的成员。Web端通过Strophe.JS来连接(HTTP-BIND),Strophe.JS可以以长轮询和WebSocket两种方式来连接,由于Ejabberd还没有好的WebSocket的实现,就采用了BOSH的方式模拟长连接。整个系统的结构如下:
Web端用Strophe.JS通过HTTP-BIND进行连接Nginx代理,Nginx反向代理EjabberdCluster。iOS用XMPP-FramWork连接,
Android可以用Smack直接连Ejabberd服务器集群。这些都是现有的库,无需对Client进行开发。在线状态根据用户UID作为KEY定义了在线、离线、忙等状态存放于Redis中。好友列表从MongoDB的Project表中获取。用户认证直接修改了Ejabberd_Auth_Internal.erl文件,通过MongoDB驱动连接用户库,在线状态等功能是新加了模块,其部分代码如下:
-module(wt_mod_proj).
-behaviour(gen_mod).
-behaviour(gen_server).
-include("ejabberd.hrl").
-include("logger.hrl").
-include("jlib.hrl").
-define(SUPERVISOR, ejabberd_sup).
-define(ONLINE, 1).
-define(OFFLINE, 0).
-define(BUSY, 2).
-define(LEAVE, 3).
-export([start_link/2, get_proj_online_users/2]).
%% gen_mod callbacks
-export([start/2, stop/1]).
%% gen_server callbacks
-export([init/1, terminate/2, handle_call/3, handle_cast/2, handle_info/2, code_change/3]).
%% Hook callbacks
-export([user_available/1, unset_presence/3, set_presence/4]).
-export([get_redis/1, remove_online_user/3, append_online_user/3]).
-record(state,{host = &&""&&, server_host, rconn, mconn}).
start_link(Host, Opts) -&
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:start_link({local, Proc}, ?MODULE, [Host, Opts], []).
user_available(New) -&
LUser = New#jid.luser, LServer = New#jid.lserver,
Proc = gen_mod:get_module_proc(LServer, ?MODULE),
gen_server:cast(Proc, {user_available, LUser, LServer}).
append_online_user(Uid, Proj, Host) -&
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, {append_online_user, Uid, Proj}).
remove_online_user(Uid, Proj, Host) -&
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, {remove_online_user, Uid, Proj}).
set_presence(User, Server, Resource, Packet) -&
Proc = gen_mod:get_module_proc(Server, ?MODULE),
gen_server:cast(Proc, {set_presence, User, Server, Resource, Packet}).
start(Host, Opts) -&
Proc = gen_mod:get_module_proc(Host, ?MODULE),
ChildSpec = {Proc, {?MODULE, start_link, [Host, Opts]},
transient, 2000, worker, [?MODULE]},
supervisor:start_child(?SUPERVISOR, ChildSpec).
stop(Host) -&
Proc = gen_mod:get_module_proc(Host, ?MODULE),
gen_server:call(Proc, stop),
supervisor:delete_child(?SUPERVISOR, Proc).
init([Host, Opts]) -&
MyHost = gen_mod:get_opt_host(Host, Opts, &&"wtmuc.@HOST@"&&),
RedisHost = gen_mod:get_opt(redis_host, Opts, fun(B) -& B end,?REDIS_HOST),
RedisPort = gen_mod:get_opt(redis_port, Opts, fun(I) when is_integer(I), I&0 -& I end, ?REDIS_PORT),
ejabberd_hooks:add(set_presence_hook, Host, ?MODULE, set_presence, 100),
ejabberd_hooks:add(user_available_hook, Host, ?MODULE, user_available, 50),
ejabberd_hooks:add(sm_remove_connection_hook, Host, ?MODULE, unset_presence, 50),
MongoHost = gen_mod:get_opt(mongo_host, Opts, fun(B) -& binary_to_list(B) end, ?MONGO_HOST),
MongoPort = gen_mod:get_opt(mongo_port, Opts, fun(I) when is_integer(I), I&0 -& I end, ?MONGO_PORT),
{ok, Mongo} = mongo_connection:start_link({MongoHost, MongoPort}),
C = c(RedisHost, RedisPort),
ejabberd_router:register_route(MyHost), {ok, #state{host = Host, server_host = MyHost, rconn = C, mconn = Mongo}}.
terminate(_Reason, #state{host = Host, rconn = C, mconn = Mongo}) -&
ejabberd_hooks:delete(set_presence_hook, Host, ?MODULE, set_presence, 100),
ejabberd_hooks:delete(user_available_hook, Host, ?MODULE, user_available, 50),
ejabberd_hooks:delete(unset_presence_hook, Host, ?MODULE, unset_presence, 50),
eredis:stop(C),
handle_call({append_online_user, Uid, ProjId}, _From, State) -&
C = State#state.rconn,
Key = &&!--?PRE_RPOJ_ONLINE_USERS /binary, ProjId/binary--&&,
Resp = eredis:q(C, ["SADD", Key, Uid]),
{reply, Resp, State};
handle_call({remove_online_user, Uid, ProjId}, _From, State) -&
handle_call({get_proj_online_users, ProjId}, _From, State) -&
handle_cast({set_presence, User, Server, Resource, Packet}, #state{mconn = Mongo} = State) -&
C = State#state.rconn,
Key = &&!--?USER_PRESENCE /binary, User/binary--&&,
Pids = get_user_projs(User, Mongo),
Cmd = get_proj_key(Pids, ["SUNION"]),
case xml:get_subtag_cdata(Packet, &&"show"&&) of
&&"away"&& -&
eredis:q(C, ["SET", Key, ?LEAVE]);
&&"offline"&& -&
handle_cast(_Msg, State) -& {noreply, State}.
handle_info({route, From, To, Packet}, #state{host = Host, server_host = MyHost, rconn = RedisConn, mconn = Mongo} = State) -&
case catch do_route(Host, MyHost, From, To, Packet, RedisConn, Mongo) of
{'EXIT', Reason} -&
?ERROR_MSG("~p", [Reason]);
{noreply, State};
handle_info(_Info, State) -& {noreply, State}.
code_change(_OldVsn, State, _Extra) -& {ok, State}.
其中,User\_Available\_HOOK和SM\_Remove\_Connection\_HOOK 就是用户上线和用户断开连接触发的事件,Ejabberd
中正是由于这些HOOK,才能很容易扩展功能。
在用Tsung对Ejabberd进行压力测试,测试机器为4核心8G内存的普通PC,以3台客户机模拟用户登录、设置在线状态、发送一条文本消息、关闭连接操作,在同时在线达到30w时,CPU占用不到3%,内存大概到3个G左右,随着用户数增多,主要内存的损耗较大。由于压力测试比较耗时,再等到有时间的时候,会在做一些更深入的测试。
免费订阅“CSDN云计算(左)和CSDN大数据(右)”微信公众号,实时掌握第一手云中消息,了解最新的大数据进展!
CSDN发布虚拟化、Docker、OpenStack、CloudStack、数据中心等相关云计算资讯, & & 分享Hadoop、Spark、NoSQL/NewSQL、HBase、Impala、内存计算、流计算、机器学习和智能算法等相关大数据观点,提供云计算和大数据技术、平台、实践和产业信息等服务。 & & & &
推荐阅读相关主题:
CSDN官方微信
扫描二维码,向CSDN吐槽
微信号:CSDNnews
相关热门文章-------------
新增文件夹...
新增文件夹
(多个标签用逗号分隔)
go语言高并发实战:构建千万级在线的实时消息推送服务.ppt
一个 Go 进程可以轻易支撑几十万上百万并发运行的 Go 例程(只要你内存足够大)
一个 Go 进程可以轻易支撑几十万上百万并发运行的 Go 例程(只要你内存足够大)
加载中...!
如果长时间没有加载,请点击
来安装或允许flash插件运行!
下载本文档需要登录,并付出相应积分()。
文件大小:178.00 KB
所需积分:& 10
相关资讯  — 
相关讨论话题  — 
浏览:0次&& 下载:0次
上传时间: 09:30:09
同类热门文档
45091次浏览 &30次下载
7026次浏览 &9次下载
0次浏览 &20次下载
15952次浏览 &7次下载
0次浏览 &0次下载
0次浏览 &0次下载
相关经验 -
& 0人评&5页
& 0人评&19页
& 0人评&14页
& 4人评&26页
& 0人评&11页
OPEN-OPEN, all rights reserved.如何构建一套高可用的移动消息推送平台?-闽南网
如何构建一套高可用的移动消息推送平台?
&  编者按:本文来自微信公众号“InfoQ”(ID:infoqchina),作者李晓清、董泽光;36氪经授权发布。&  消息推送作为移动 APP 运营中的一项关键技术,已经被越来越广泛的运用。本文追溯了推送技术的发展历史,剖析了其核心原理,并对推送服务的关键技术进行深入剖析,围绕消息推送时产生的服务不稳定性,消息丢失、延迟,接入复杂性,统计缺失等问题,提供了一整套平台级的高可用消息推送解决方案。实践中,借助于该平台,不仅能提能显著提高消息到达率,还能提高研发效率,并道出了移动开发基础设施的平台化架构思路。 推送基础 &  移动互联网蓬勃发展的今天,大部分手机 APP 都提供了消息推送功能,如新闻客户端的热点新闻推荐,IM 工具的聊天消息提醒,电商产品促销信息,企业应用的通知和审批流程等等。推送对于提高产品活跃度、提高功能模块使用率、提升用户粘性、提升用户留存率起到了重要作用,作为 APP 运营中一个关键的免费渠道,对消息推送的合理运用能有效促进目标的实现。&  推送最早诞生于 Email 中,用于提醒新的消息,而移动互联网时代则更多的运用在了移动客户端程序。要获取服务器的数据,通常有两种方式:第一种是客户端 PULL(拉)方式,即每隔一段时间去服务器获取是否有数据;第二种是服务端 PUSH(推)方式,服务器在有数据的时候主动发给客户端。&  很显然,PULL 方案优点是简单但是实时性较差,我们也可以通过提高查询频率来提高实时性,但这又会造成电量、流量的消耗过高,反之 PUSH 方案基于 TCP 长连接方式实现,消息实时性好,但是由于要保持 APP 客户端和服务端的长连接心跳,也会带来额外的电量和流量消耗。因此在整体架构设计中需要折中平衡,目前主流的推送实现方式都是基于 PUSH 的方案。&  移动推送的三种实现方式 &  目前移动推送技术实现方式主要有以下三种:&  轮询方式(PULL)&  客户端和服务器定期的建立连接,通过消息队列等方式来查询是否有新的消息,需要控制连接和查询的频率,频率不能过慢或过快,过慢会导致部分消息更新不及时,过快会消耗更多的资源(流量、电量等),对用户体验有较大伤害。&  短信推送方式(SMS PUSH)&  通过短信发送推送消息,并在客户端植入短信拦截模块(主要针对 Android 平台),可以实现对短信进行拦截并提取其中的内容转发给 App 应用处理,这个方案借助于运营商的短消息,能够保证最好的实时性和到达率,但此方案对于成本要求较高,开发者需要为每一条 SMS 支付费用。&  长连接方式(PUSH)&  移动 Push 推送基于 TCP 长连接实现, 客户端主动和服务器建立 TCP 长连接之后, 客户端定期向服务器发送心跳包用于保持连接, 有消息的时候, 服务器直接通过这个已经建立好的 TCP 连接通知客户端。尽管长连接也会造成一定的开销,对于轮询和 SMS 方案的硬伤来说,目前已经是最优的方式,而且通过良好的设计,可以将损耗降至最低。不过,随着客户端数量和消息并发量的上升,对于消息服务器的性能和稳定性要求提出了非常大的考验。因此,就难度而言,此方式代价最高。&  推送解决方案&  基于 TCP 长连接的方式是主流的推送方式,基于该推送方式逐步发展出系统级、应用级一系列的推送解决方案。&  系统级方案&  iOS 平台(APNs)&  iOS 在系统层面与苹果 APNs(Apple Push Notification service)服务器建立连接,应用通过观察者模式向 ioS 系统注册关注的消息,系统收到 APNs Server 消息后转发到相应的应用程序,整个过程很清晰,并且所有 APP 都共用同一个系统级的连接,减少了系统开销,虽然 APNs 能无障碍的访问,但实际使用过程中,发现延时和丢消息的情况偶有发生。&  Android 平台(C2DM)
&  Android 的 C2DM(Android Cloud to Device Messaging)采取与 iOS 类似的机制,都是由系统层面来支持消息推送,但是由于 Google 的服务在国内不能稳定的访问,此方案对于中国用户来说基本是无法使用的。&  除了 Google 官方提供的方案,中国众多的手机厂商在其定制的系统中也内置了推送功能,如小米、华为等。&  应用级方案&  第三方推送服务&  鉴于 Android 平台 C2DM 推送的不可用性,国内涌现出大量的第三方推送服务提供商,采用第三方推送服务的系统流程如下图:&  图 1:消息推送流程&  目前应用最为广泛的第三方推送服务提供商包括个推、极光、友盟、小米、华为、BAT 等,绝大部分 APP 都会优先考虑采用第三方推送服务。&  自建推送服务
&  第三方服务在开发成本和消息到达率上表现都不错,但所有信息会经过第三方服务器,对于信息敏感类 APP 而言,有必要考虑自建一套消息推送服务,能最大化保证安全,但对于自建推送服务,如果从零开始来做需要解决几个难点:&  第一,移动推送服务器对 App 客户端海量长连接的维护管理。第二,App 客户端如何保证 Push Service 常驻,对于 Android 我们可以通过发现 push service 不存在可以定时拉起的方式。第三,通信协议的制定,我们可以采用开源的 XMPP 方式实现,也可以自定义协议,不管哪种方式我们都要保证消息传送的到达率的准确性。第四,在移动互联网网络环境下,经常出现弱网环境,特别是 2G、3G 等网络不稳定的情况下,如果保证消息在弱网环境下不重、不丢也是一个挑战。&  存在问题 &  无论是第三方推送服务,还是自建推送服务,在实际的使用过程中,发现都存在以下问题:&  应用服务端与推送服务强耦合。当推送服务不可用时,造成整个业务系统无法推送,甚无法正常工作。&  缺乏 ACK 机制。推送的过程是异步的,从应用服务端发送到推送服务时,可以得知发送是否成功,但是从第三方推送服务下发到 APP 时,无法得知客户端是否接收到。iOS 平台中,从推送服务发送到苹果 APNs 服务时,同样无法确定 APNs 是否收到。同时,第三方推送服务通常使用共享的推送通道,受其他推送方的影响,可能造成消息的延迟和丢失。&  服务会被杀死。尤其在 Android 平台上,后台推送 service 会被各种主动或者被动原因 kill 掉,导致消息丢失。&  缺乏消息的持久化。对于推送服务而言,消息推送是来一条推一条,无法追溯历史消息和消息状态。&  缺乏重传机制。整个推送过程涉及多个环节,当其中某个环节出现问题,造成客户端接收不到推送的消息时,就导致消息丢失,再无法接收到。&  客户端接入逻辑复杂。每接入一个新的 APP,都要进行重复的接入工作,接入逻辑完全一致,代码无法复用,需要在不同项目中拷贝。&  客户端与推送服务的 SDK 强耦合。客户端使用推送服务的接口,而各推送服务提供的接口不统一,如果需要替换推送服务,那么接入部分代码需完全重写。&  缺乏数据监控和统计。每个应用每天推送了多少消息,成功到达 app 多少,失败多少,目前均没有统计。&  解决之道 &  为了解决以上问题,我们考虑基于第三方消息推送服务构建一套移动消息推送中间件平台,该消息平台采用了低耦合的分层架构设计(如图 2 所示),分为三层:接入层、传输层和应用层。其中接入层是业务方调用的入口,我们采用异步消息队列的方式提供了较高的业务系统发送消息的速度,并且具备了消息缓冲功能,即使高峰期的海量消息推送对整个平台冲击较少,保护了推送系统;&  传输层会从接入层接收消息并进行解析,对推送消息进行合法性检查校验,如果消息不合法直接丢弃,同时将合法的消息进行协议转换并发送到对应的第三方推送平台;应用层主要是提供统一的 SDK 供业务使用,封装适配第三方推送平台的 SDK 接口到统一的接口 SDK 中,这样业务 APP 使用方只关注统一封装的 SDK 即可实现业务消息的操作,而不需要考虑各种滤重、校验等通用操作。主要功能包括:&  屏蔽推送接口,实现业务与推送服务解耦,提供一套通用的客户端 SDK,简化客户端接入。&  实现多点接入,可同时接入多套推送服务,根据历史推送成功率动态选择最优推送路径,当一条路径失效可选择备用路径进行推送,保证消息推送万无一失。&  引入消息持久化机制,方便追溯和统计。&  引入消息的 ACK 机制和重传机制,提高消息的到达率。&  实现数据监控和统计机制,提供相关数据的统计分析,和报警预警功能。&  提供 web 管理后台,便于进行 APP 设置、推送设置、查看数据报表,提高系统维护的工作效率。&  整个系统设计由三部分组成:移动推送平台、客户端 SDK、应用管理界面(第三方推送服务和自建推送服务统称为推送服务)。&  图 2:系统架构&  移动推送平台提供统一的服务,对于应用层屏蔽推送服务接口,且实现推送服务可动态轮替。推送平台将接收到的消息持久化到数据库中,方便进行消息推送失败后的重发,以及后续数据的统计分析。&  客户端 SDK 对 App 提供统一的使用接口,屏蔽推送服务 SDK 使用细节,且实现多种推送 SDK 可替换,隐藏 SDK 复杂的接入过程,方便使用。&  应用管理系统面向 App 开发人员,实现应用申请,推送服务配置,消息查询与管理,数据统计与分析。&  主要流程 &  消息推送涉及的主要模块是消息推送平台和客户端 SDK,主要流程如下图所示:&  图 3:消息推送中间件核心流程&  正常情况下,消息推送过程如下:&  系统接收到业务方的推送请求后,首先进行权限的验证,这包括应用 appKey 的验证、接口参数的验证、黑名单验证等。&  验证不通过,返回错误信息;验证通过后,为此条消息分配一个唯一 id(uuid),将消息内容持久化到数据库中,此时消息的状态为待发送。&  消息进入推送队列中,将之后推送接口请求的响应返回给业务方。&  推送队列的消费者从队列中取出待发送的消息,标记该条消息的状态为发送中,然后调用第三方推送服务接口进行发送。&  如果调用成功,那么标记该消息的状态为发送成功客户端未收到。&  客户端 SDK 在收到推送后,回调服务端接口,发送收到推送的回执;服务端收到客户端回执后,标记消息状态为发送成功客户端已收到。&  对于推送过程中可能出现的异常情况,总结如下:&  在调用第三方推送服务接口时,可能出现调用失败的情况;此时需要标记消息的状态为发送失败,留待重发。&  在调用第三方推送服务接口成功后、第三方推送服务在下发至客户端的过程中,可能由于某种原因,造成客户端无法收到消息;此时消息的状态为发送成功客户端未收到,对于这种状态,需要重发。&  客户端在收到推送的消息后、向服务端发送 ACK 回执时,可能由于网络环境的问题,造成服务端没有收到客户端发送的回执,此时消息的状态为发送成功客户端未收到,对于这种状态,需要重发。&  消息在重发 N 次(N 次可配置)、仍然没有进入发送成功客户端已收到的状态,那么将不再进行自动重发;管理界面将提供手动重发消息的操作入口,如有需要,可以手动再进行重发。监控平台对于一直重复不成功的消息会报警通知操作人员,这样操作人员可以及时通过手动方式处理。&  根据消息发送流程,可以得到消息在生命周期中状态的变迁如下图:&  图 4:消息状态机&  重发机制 &  消息重发主要存在三种场景:系统启动时,查询所有的发送失败或发送成功未收到客户端回执的消息,加载到推送队列重发;系统运行时,后台线程定时查询需要重发的消息,进入推送队列;手动触发时,直接将消息加入推送队列。&  由于消息推送中间件服务通常要求高可用,为分布式部署,消息重发必须保证在单一节点执行,且保证只发送一次。需采用分布式锁的方式,保证重发只发一次,主流实现方式有三种:&  ZooKeeper:通过竞争创建临时节点的方式获取锁。&  Redis:Redlock 是 Redis 作者的提出了一种分布式锁的算法,基于 Redis 实现,该算法实现了一种更安全、可靠的分布式锁管理。&  数据库:如使用 MySQL 的 GET_LOCK 函数&  对于每种锁机制的特点本文不详细介绍,根据实际应用需要任选一种即可。&  由于 iOS 平台和 Android 平台的差异,消息重发需要考虑平台差异性。&  使用第三方推送时,如果 iOS 应用在前台运行,那么将通过第三方推送维护的长连接,以透传的方式直接下发到 APP,称为应用内消息;而当 APP 在后台时,则第三方推送将消息推送到 APNs,由 APNs 推送到 APP,称为 APNs 通知。当通过 APNs 推送时,手机在收到消息后将在顶部的通知栏出现相关推送内容,这一行为是系统级别的,APP 无法控制。可能会出现这一问题:当 APP 在后台或者手机锁屏的情况下,如果服务端重发了消息,手机的通知栏将出现多条通知。&  因此,考虑当 APP 在后台时,针对 iOS 平台的消息不再进行重发;只有当 APP 进入前台,才重新进行重发。APP 的活动状态通过第三方推送服务的 api 可以获取到。&  Android 平台不存在该问题。&  由于消息重发可能会造成客户端收到重复消息,需要在客户端进行消息去重。服务端为每一条消息分配了一个唯一 id,重发时唯一 id 不变。客户端需要保存收到的每一条消息,在接收到新消息时首先根据唯一 id 判断是否已经收到了这条消息,如收到则不响应。客户端保存消息可以采用 sqlite 数据库。&  安全和控制 &  客户端 SDK 与服务端的通信过程使用 appKey 和 appSecret 进行权限控制。appKey 是服务端为每个 app 分配的唯一标识,appSecret 是服务端为每个 app 分配的秘钥。&  客户端 SDK 在请求服务端 HTTP 接口时,会将 appKey+appSecret 做一次签名,将签名值作为签名 sign 参数,与其他请求参数(业务参数 +appKey)一同传到服务端;服务端拿到请求参数后,也先用 appKey+appSecret 做一次签名,比较和客户端传来的 sign 参数是否一致,从而完成权限验证过程。为了能够实现灵活控制推送与否,可实现黑名单管理的功能。处于黑名单内的 app 客户端不再进行消息的推送。黑名单控制的粒度到账号级别,也可以根据实际业务需要进行分组管理。&  在某些业务场景中,需要对消息进行过滤,分析,做出相应的处理甚至预警,借助于消息推送平台,都能方便的实现。&  SDK 设计 &  客户端 SDK 是基于推送服务的 SDK 封装实现,对外提供统一的使用接口。SDK 的使用者不再关注具体使用了哪些第三方推送、推送服务的接入细节。实现与推送服务的充分解耦,降低开发和使用成本。&  由于 iOS 和 Android 平台的差异性,在客户端 SDK 的封装上存在差异,下面分别介绍两个平台的 SDK 封装方式。&  iOS 平台&  SDK 提供启动和停止的方法;同时定义一个 protocol,包含 SDK 提供的接口。SDK 在收到消息或出现错误时将会回调 protocol 中的接口。&  由于推送的接入涉及 AppDelegate 的生命周期方法,为避免 SDK 使用者关注这些繁琐的细节,SDK 使用 Aspects 的方式,将推送时相应的处理函数 hook 到 AppDelegate 的生命周期方法上。&  Android 平台&  在 Android 中使用 Receiver 组件来接收收到的消息。一个基本的配置如下所示:&  流程如下:当推送服务的 SDK 在接收到推送过来的消息后,将发送广播,这个广播的用 intent-filter 标识,当应用中的 Receiver 代码注册了这个 intent-filter,就可以接收到广播,并进行后续处理。&  系统管理 &  图 5:后台管理示意图&  消息后台管理系统提供应用申请、应用服务配置、推送服务配置、消息查询与管理等功能。&  1、应用申请&  填写应用名、应用描述等信息后,生成该应用唯一的 appKey 和 appSecret。&  2、应用服务配置&  为应用选择要使用的移动端通用服务,可供选择的有推送、反馈、版本发布。&  3、推送服务配置&  为应用配置推送服务,可供选择个推、极光等;以及推送时使用的优先级顺序。&  4、消息查询与管理&  查看应用所发出的消息,包括消息所属应用、所属账号、消息的状态、最终发送成功的第三方渠道、消息的来源、发送者 ip 等信息&  5、数据统计&  通过分析 message 表中的各消息的状态,可统计各应用消息的发送成功率和到达率,以及哪个第三方推送的更优,方便选择。同时,提供每日、每周、每月推送消息量的统计,并提供统计图表。&  高可用、高性能、高稳定性 &  消息推送平台通过无状态设计、统一存储、冗余部署方式保证了高可用,对应的状态数据统一存储到 MySQL、Redis 中保证各个无状态实例共享数据。&  对于消息的接收处理我们通过纯异步、动态多线程的方式提供了推送平台的高性能。同时对于异步接收的消息我们通过 log append 的方式保证消息先落地然后再进行处理,进一步确保系统在异常过程中我们可以随时恢复消息,保证不丢失。&  通过质量保障、全方位多维度监控体系(基础监控、错误日志监控、发送数据波动监控、进程监控等监控指标)保障系统在出现问题时实现秒级报警、及时处理保证了消息推送平台的高稳定性。&  写在最后 &  本文介绍了一种基于第三方或自建推送服务、但又不强依赖特定推送服务的通用移动消息推送中间件平台,可以实现安全、稳定、可靠的消息推送功能,并提供完善的数据统计,在实际应用中,可以结合邮件、短信、网站消息、用户留言等打造成更加通用的企业消息平台。
中国共产党第十九次全国代表大会18日至24日召开。
48小时点击排行榜}

我要回帖

更多关于 微信小程序 消息推送 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信