安卓的subscribe方法安卓sdk是什么意思思

RxAndroid,subscribe(onNext)中操作 能否移至doOnNext(onNext?
Activity中有多个区域,每个区域进行不同的网络请求(返回格式为json,但model不同),现在需要按从上到下的顺序加载各个区域。首先我想到&br&方式1:&br&&div class=&highlight&&&pre&&code class=&language-text&&concat(observableA,observableB). subscribe(...)
&/code&&/pre&&/div&但最终subscribe时泛型参数没了,都变成Object了&br&&div class=&highlight&&&pre&&code class=&language-java&&&span class=&n&&subscribe&/span&&span class=&o&&(&/span&&span class=&k&&new&/span& &span class=&n&&Action1&/span&&span class=&o&&&&/span&&span class=&n&&Object&/span&&span class=&o&&&()&/span& &span class=&o&&{&/span&
&span class=&nd&&@Override&/span&
&span class=&kd&&public&/span& &span class=&kt&&void&/span& &span class=&nf&&call&/span&&span class=&o&&(&/span&&span class=&n&&Object&/span& &span class=&n&&o&/span&&span class=&o&&)&/span& &span class=&o&&{&/span&
&span class=&k&&if&/span&&span class=&o&&(&/span&&span class=&n&&o&/span& &span class=&k&&instanceof&/span& &span class=&n&&A&/span&&span class=&o&&){&/span&&span class=&c1&&//更新UI区域1} else if (o instanceof B){//更新UI区域2}&/span&
&span class=&o&&}&/span&
&span class=&o&&})&/span&
&/code&&/pre&&/div&为了区分是哪个请求返回的数据,得使用instanceof,然后强制转换类型后进行UI更新,这样不仅失去了泛型的便利,而且“玷污”了代码的风格。&br&&br&之后便尝试了&br&方式2:&br&&div class=&highlight&&&pre&&code class=&language-text&&concat(observableA.doOnNext(a-&{//更新UI区域1}),observableB.doOnNext(b-&{//更新UI区域2})). subscribe(o-&{//空实现})
&/code&&/pre&&/div&&br&其中&br&&div class=&highlight&&&pre&&code class=&language-text&&observableA.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
observableB.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
&/code&&/pre&&/div&困惑:&br&1.doOnNext()所在的Scheduler和observeOn()中的AndroidSchedulers.mainThread()一致么?&br&2.上述情况下,方式2能否取代方式1?
Activity中有多个区域,每个区域进行不同的网络请求(返回格式为json,但model不同),现在需要按从上到下的顺序加载各个区域。首先我想到方式1:concat(observableA,observableB). subscribe(...)
但最终subscribe时泛型参数没了,都变成Object了subscribe(new Action1&Object&() {
public void call(Object o) {
if(o instanceof A){//更新UI区域1} else if (o instanceof B){//更新UI区域2}
为了区分是哪个请求返回的数据,得使用instanceof,然后强制转换类型后进行UI更新,这样不仅失去了泛型的便利,而且“玷污”了代码的风格。之后便尝试了方式2:concat(observableA.doOnNext(a-&{//更新UI区域1}),observableB.doOnNext(b-&{//更新UI区域2})). subscribe(o-&{//空实现})
你要的可是这个效果?Observable.just(1)
.subscribeOn(Schedulers.io())
.doOnNext(i -& printCurrentThreadId())
.observeOn(AndroidSchedulers.mainThread())
.doOnNext(i -& printCurrentThreadId())
.observeOn(Schedulers.io())
.doOnNext(i -& printCurrentThreadId())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(i -& printCurrentThreadId(), throwable -& {});
输出:V/MainActivity: 508
V/MainActivity: 1
V/MainActivity: 507
V/MainActivity: 1
另外,还有两个 operator 分别叫做 map 和 flatMap。
1.你的这个情况没有问题,doOnNext就是在observeOn所指定的线程中工作的。2.没看懂
observableA.flatmap(A -& {updateA();return oberableB}).subscribe()跟上面的答案说的
用flatmap这样似乎能解决问题
但感觉阅读起来不是很好。。
我使用了compose()变换方法,不知是否符合你的需求?Transformer tf = new Observable.Transformer&T, R&() {
public Observable&R& call(Observable&T& observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread());
observableA.compose(tf).subscribe(更新UI区域1);
observableB.compose(tf).subscribe(更新UI区域2);
observableC.compose(tf).subscribe(更新UI区域3);
======分界线以下是对上面代码的升级================Transformer tf = new Observable.Transformer&T, R&() {
public Observable&R& call(Observable&T& observable) {
return observable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread()).doOnext(更新UI区域N);
observableA.compose(tf).subscribe();
observableB.compose(tf).subscribe();
observableC.compose(tf).subscribe();
这次将Transformer的功能增加“更新UI区域”,使得代码更加模块化、内聚。subscribe()没有传入参数,仅仅是起到启动observable的作用。
已有帐号?
无法登录?
社交帐号登录您当前的位置:
> 从案例学RxAndroid开发(上)
总文章数:74894总点阅数:2845851当月点阅:1103442
数据正在载入中…
数据正在载入中…
从案例学RxAndroid开发(上)
数据正在载入中…
版权声明原作者:Kurtis Nusbaum译者:程大治本文由作者授权翻译并发布,由译者投稿发布于本公众号,未经允许禁止转载。RxJava最近很火,由代码家组织的Gank匠心写作上也了不少相关的好文章,RxAndroid是RxJava在Android平台的扩展,将一些类绑定到RxJava,使得在Android应用中编写响应式组件变得非常简单。本文将从一些小案例来讲解如何使用RxAndroid进行开发。以下为正文:为了不让你把我踩过的坑再踩一遍,我会基于我的学习成果写一些例子出来,让你能够对RxJava有足够的了解,并能在你的Android应用中使用它。当我第一次使用RxJava的时候我只是在照搬代码,这些代码能跑起来,但是我对RxJava的基础部分仍然存在误解,而且我找不到好的源码来学习。所以为了理解RxJava,我不得不一点一点学习,踩了不少坑。源码见这里(/klnusbaum/rxandroidexamples)。在每个例子的开始,我会写清每个代码段是属于哪个Activity的。我会将本文分为两个部分,在第一部分里,我会着重讲解如何用RxJava异步加载数据;在第二部分里,我会探索一些更高级的用法。几个概念在开始说代码之前,先澄清几个概念。RxJava最核心的东西就是Observable和Observer。Observable会发出数据,而与之相对的Observer则会通过订阅Observable来进行观察。Observer可以在Observable发出数据、报错或者声明没有数据可以发送时进行相应的操作。这三个操作被封装在Observer接口中,相应的方法为onNext,onError和onCompleted。明确了这些概念以后,让我们来看一些例子。案例1:基础现在我们要写一个用来展示一个颜色列表的Activity,源码见这里(/klnusbaum/rxandroidexamples/blob/master/app/src/main/java/kurtis/rx/androidexamples/Example1Activity.java)。我们要写一个能发送一个字符串列表、然后结束的Observeable。而后我们会通过这个字符串列表来填充颜色列表,这里要使用到Observable.just方法。由这个方法创建的Observable对象的特点是:所有Observer一旦订阅这个Observable就会立即调用onNext方法并传入Observable.just的参数,而后因为Observable没有数据可以发送了,onComplete方法会被调用。Observable&List&String&& listObservable = Observable.just(getColorList);注意这里的getColorList是一个不耗时的方法。虽然现在看来这个方法无足轻重,但一会我们会回到这个方法。下一步,我们写一个Observer来观察Observable。而后神奇的事情就发生了。如我刚才所说,一旦通过subscribe方法订阅Observable,就会发生一系列事情:1. onNext方法被调用,被发送的颜色列表会作为参数传入。2. 既然不再有数据可以发送(我们在Observable.just()中只让Observable发送一个数据),onComplete方法会被调用。请记住:通过Observable被订阅后的行为来区分它们。在这个例子中我们不关心Observable何时完成数据的传输,所以我们不用在onComplete方法里写代码。而且在这里不会有异常抛出,所以我们也不用管onError方法。写了这么多你可能觉得很多余,毕竟我们本可以在adapter中直接设置作为数据源的颜色列表。请带着这个疑问,和我看下面这个更有趣一些的例子。案例2:异步加载在这里我们要写一个显示电视剧列表的Activity,源码见这里(/klnusbaum/rxandroidexamples/blob/master/app/src/main/java/kurtis/rx/androidexamples/Example2Activity.java)。在Android中RxJava的主要用途就在于异步数据加载。首先让我们写一个Observable:在刚才的例子中,我们使用Observable.just来创建Observable,你可能认为在这里可以通过Observable.just(mRestClient.getFavoriteTvShows)来创建Observable。但在这里我们不能这么做,因为mRestClient.getFavoriteTvShows会发起网络请求。如果在这里我们使用Observable.just,mRestClient.getFavoriteTvShows会被立即执行并阻塞UI线程。使用Observable.fromCallable方法有两点好处:1. 获取要发送的数据的代码只会在有Observer订阅之后执行。2. 获取数据的代码可以在子线程中执行。这两点好处有时可能非常重要。现在让我们订阅这个Observable。让我们一个方法一个方法地来看这段代码。subscribeOn会修改我们刚刚创建的Observable。在默认情况下Observable的所有代码,包括刚才说到的只有在被订阅之后才会执行的代码,都会在主线程中运行。而通过subscribeOn方法,这些代码可以在其他线程中执行。但具体是哪个线程呢?在这个例子中我们让代码在"IO Scheduler"中执行(Schedulers.io())。现在我们可以只把Scheduler当做一个可以工作的子线程,这个描述对于现在的我们已经足够了,不过这其中还有更深层次的内容。不过我们的确遇到了一个小障碍。既然Observable会在IO Scheduler中运行,那么它与Observer的连接也会在IO Scheduler中完成。这就意味着Observer的onNext方法也会在IO Scheduler中运行,而onNext方法会操作UI中的View,但View只能在UI主线程中操作。事实上解决这个问题也很简单,我们可以告诉RxJava我们要在UI线程中观察这个Observable,也就是,我们想让onNext方法在UI线程中执行。这一点我们可以通过在observeOn方法中指定另一个Scheduler来完成,在这里也就是AndroidSchedules.mainThread所返回的Scheduler(UI线程的Scheduler)。而后我们调用subscribe方法。这个方法最重要,因为Callable只会在有Observer订阅后运行。还记得刚才我说Observable通过其被订阅后的行为来区分吗?这就是一个很好的例子。还有最后一件事。这个mTvShowSubscription到底是什么?每当Observer订阅Observable时就会生成一个Subscription对象。一个Subscription代表了一个Observer与Observable之间的连接。有时我们需要操作这个连接,这里拿在Activity的onDestroy方法中的代码举个例子:如果你与多线程打过交道,你肯定会意识到一个大坑:当Activity执行onDestroy后线程才结束(甚至永不结束)的话,就有可能发生内存泄漏与PointerException空指针异常。Subscription就可以解决这个问题,我们可以通过调用unsubscribe方法告诉Observable它所发送的数据不再被Observer所接收。在调用unsubscribe方法后,我们创建的Observer就不再会收到数据了,同时也就解决了刚才说的问题。说到这里难点已经过去,让我们来总结一下:Observable.fromCallable方法可以拖延Observable获取数据的操作,这一点在数据需要在其他线程获取时尤其重要。subscribeOn让我们在指定线程中运行获取数据的代码,只要不是UI线程就行。observeOn让我们在合适的线程中接收Observable发送的数据,在这里是UI主线程。记住要让Observer取消订阅以免Observable异步加载数据时发生意外。案例3:使用Single这次我们还是写一个展示电视剧列表的Activity,源码见这里:(/klnusbaum/rxandroidexamples/blob/master/app/src/main/java/kurtis/rx/androidexamples/Example3Activity.java),但这次我们走一种更简单的风格。Observable挺好用的,但在某些情况下过于重量级。比如说,你可能一经发现在过去的两个方法中我们只是让Observable发送一个数据,而且我们从来也没写过onComplete回调方法。其实呢,Observable还有一个精简版,叫做Single。Single几乎和Observable一模一样,但其回调方法不是onComplete/onNext/onError,而是onSuccess/onError。我们现在把刚才写过的Observable用Single重写一遍。首先我们要创建一个Single:然后订阅一下:这段代码和刚才很像,我们调用subscribeOn方法以确保getFavoriteTvShows在子线程中执行。而后我们调用observeOn以确保Single的数据被发送到UI线程。但这次我们不再使用Observer,而是使用一个叫SingleSubscriber的类。这个类和Observer非常像,只不过它只有上述两个方法:onSuccess和onError。SingleSubscriber之于Single就如Observer之于Observable。订阅一个Single的同时也会自动创建一个Subscription对象。这里的Subscription和案例2中没有区别,一定要在onDestroy中解除订阅。最后一点:在这里我们添加了处理异常的代码,所以如果mRestClient出了问题,onError就会被调用。建议你亲手写一个案例玩一玩,体验一下有异常时程序是怎么运行的。结语第一部分就说这么多了,希望这几个案例能对你有所帮助,不要忘了看看第二部分中更加高级的使用方法哦。本周移动开发前线公众号优秀文章:国外iOS大牛的分享经验和对Swift的看法移动携程App架构优化之旅欢迎大家关注:点击下方阅读原文
来源:星空互联
数据正载入中…
重要声明:本站所有文章由会员即时发表,本站对所有文章的真实性、完整性及立场等,不负任何法律责任。所有文章内容只代表发文者个人意见,并非本网站之立场,用户不应信赖内容,并应自行判断内容之真实性。发文者拥有在秀客网发布的文章。 由于本站是受到「即时发表」运作方式所规限,故不能完全监察所有即时文章,如有不适当或对于文章出处有疑虑,请联系我们告知,我们将在最短时间内进行撤除。本站有权删除任何留言及拒绝任何人士发文,同时亦有不删除文章的权利。切勿撰写粗言秽语、诽谤、渲染色情暴力或人身攻击的言论,敬请自律。本网站保留一切法律权利。
COPYRIGHT (C)
秀客网 ALL RIGHTS RESERVED (C)版权所有编写和MQTT服务器通信的Android客户端程序(二)
客户端使用的API,开始我使用的是mqtt-client,使用过后发现问题百出,不能很好的满足要求,后来使用了官方推荐的,下面开始客户端代码的编写,为了方便测试这里有android和j2se两个工程:
1、新建android工程MQTTClient
2、MainActivity代码如下:
package ldw.
import java.util.concurrent.E
import java.util.concurrent.ScheduledExecutorS
import java.util.concurrent.TimeU
import org.eclipse.paho.client.mqttv3.IMqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttE
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.persist.MemoryP
import android.app.A
import android.os.B
import android.os.H
import android.os.M
import android.view.KeyE
import android.widget.TextV
import android.widget.T
public class MainActivity extends Activity {
private TextView resultTv;
private String host = "tcp://127.0.0.1:1883";
private String userName = "admin";
private String passWord = "password";
private MqttC
private String myTopic = "test/topic";
private MqttConnectO
private ScheduledExecutorS
protected void onCreate(Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.main);
resultTv = (TextView) findViewById(R.id.result);
handler = new Handler() {
public void handleMessage(Message msg) {
super.handleMessage(msg);
if(msg.what == 1) {
Toast.makeText(MainActivity.this, (String) msg.obj,
Toast.LENGTH_SHORT).show();
System.out.println("-----------------------------");
} else if(msg.what == 2) {
Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();
client.subscribe(myTopic, 1);
} catch (Exception e) {
e.printStackTrace();
} else if(msg.what == 3) {
Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();
startReconnect();
private void startReconnect() {
scheduler = Executors.newSingleThreadScheduledExecutor();
scheduler.scheduleAtFixedRate(new Runnable() {
public void run() {
if(!client.isConnected()) {
connect();
}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);
private void init() {
//host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存
client = new MqttClient(host, "test",
new MemoryPersistence());
//MQTT的连接设置
options = new MqttConnectOptions();
//设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接
options.setCleanSession(true);
//设置连接的用户名
options.setUserName(userName);
//设置连接的密码
options.setPassword(passWord.toCharArray());
// 设置超时时间 单位为秒
options.setConnectionTimeout(10);
// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
options.setKeepAliveInterval(20);
//设置回调
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
//连接丢失后,一般在这里面进行重连
System.out.println("connectionLost----------");
public void deliveryComplete(IMqttDeliveryToken token) {
//publish后会执行到这里
System.out.println("deliveryComplete---------"
+ token.isComplete());
public void messageArrived(String topicName, MqttMessage message)
throws Exception {
//subscribe后得到的消息会执行到这里面
System.out.println("messageArrived----------");
Message msg = new Message();
msg.what = 1;
msg.obj = topicName+"---"+message.toString();
handler.sendMessage(msg);
connect();
} catch (Exception e) {
e.printStackTrace();
private void connect() {
new Thread(new Runnable() {
public void run() {
client.connect(options);
Message msg = new Message();
msg.what = 2;
handler.sendMessage(msg);
} catch (Exception e) {
e.printStackTrace();
Message msg = new Message();
msg.what = 3;
handler.sendMessage(msg);
}).start();
public boolean onKeyDown(int keyCode, KeyEvent event) {
if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {
client.disconnect();
} catch (Exception e) {
e.printStackTrace();
return super.onKeyDown(keyCode, event);
protected void onDestroy() {
super.onDestroy();
scheduler.shutdown();
client.disconnect();
} catch (MqttException e) {
e.printStackTrace();
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180
package ldw.mqttclient;&import java.util.concurrent.Executors;import java.util.concurrent.ScheduledExecutorService;import java.util.concurrent.TimeUnit;&import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttException;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;&import android.app.Activity;import android.os.Bundle;import android.os.Handler;import android.os.Message;import android.view.KeyEvent;import android.widget.TextView;import android.widget.Toast;&public class MainActivity extends Activity {&&&&&private TextView resultTv;&&&&&private String host = "tcp://127.0.0.1:1883";&&&&private String userName = "admin";&&&&private String passWord = "password";&&&&&private Handler handler;&&&&&private MqttClient client;&&&&&private String myTopic = "test/topic";&&&&&private MqttConnectOptions options;&&&&&private ScheduledExecutorService scheduler;&&&&&@Override&&&&protected void onCreate(Bundle savedInstanceState) {&&&&&&&&super.onCreate(savedInstanceState);&&&&&&&&setContentView(R.layout.main);&&&&&&&&&resultTv = (TextView) findViewById(R.id.result);&&&&&&&&&init();&&&&&&&&&handler = new Handler() {&&&&&&&&&&&&@Override&&&&&&&&&&&&public void handleMessage(Message msg) {&&&&&&&&&&&&&&&&super.handleMessage(msg);&&&&&&&&&&&&&&&&if(msg.what == 1) {&&&&&&&&&&&&&&&&&&&&Toast.makeText(MainActivity.this, (String) msg.obj,&&&&&&&&&&&&&&&&&&&&&&&&&&&&Toast.LENGTH_SHORT).show();&&&&&&&&&&&&&&&&&&&&System.out.println("-----------------------------");&&&&&&&&&&&&&&&&} else if(msg.what == 2) {&&&&&&&&&&&&&&&&&&&&Toast.makeText(MainActivity.this, "连接成功", Toast.LENGTH_SHORT).show();&&&&&&&&&&&&&&&&&&&&try {&&&&&&&&&&&&&&&&&&&&&&&&client.subscribe(myTopic, 1);&&&&&&&&&&&&&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&&&&&&&&&&&&&e.printStackTrace();&&&&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&} else if(msg.what == 3) {&&&&&&&&&&&&&&&&&&&&Toast.makeText(MainActivity.this, "连接失败,系统正在重连", Toast.LENGTH_SHORT).show();&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&&&&&&&&};&&&&&&&&&startReconnect();&&&&&}&&&&&private void startReconnect() {&&&&&&&&scheduler = Executors.newSingleThreadScheduledExecutor();&&&&&&&&scheduler.scheduleAtFixedRate(new Runnable() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&public void run() {&&&&&&&&&&&&&&&&if(!client.isConnected()) {&&&&&&&&&&&&&&&&&&&&connect();&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&&&&&&&&}, 0 * 1000, 10 * 1000, TimeUnit.MILLISECONDS);&&&&}&&&&&private void init() {&&&&&&&&try {&&&&&&&&&&&&&&&&&&&&&& //host为主机名,test为clientid即连接MQTT的客户端ID,一般以客户端唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存&&&&&&&&&&&&client = new MqttClient(host, "test",&&&&&&&&&&&&&&&&&&&&new MemoryPersistence());&&&&&&&&&&&&&&&&&&&&&& //MQTT的连接设置&&&&&&&&&&&&options = new MqttConnectOptions();&&&&&&&&&&&&&&&&&&&&&& //设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接&&&&&&&&&&&&options.setCleanSession(true);&&&&&&&&&&&&&&&&&&&&&& //设置连接的用户名&&&&&&&&&&&&options.setUserName(userName);&&&&&&&&&&&&&&&&&&&&&& //设置连接的密码&&&&&&&&&&&&options.setPassword(passWord.toCharArray());&&&&&&&&&&&&// 设置超时时间 单位为秒&&&&&&&&&&&&options.setConnectionTimeout(10);&&&&&&&&&&&&// 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制&&&&&&&&&&&&options.setKeepAliveInterval(20);&&&&&&&&&&&&&&&&&&&&&&&&//设置回调&&&&&&&&&&&&client.setCallback(new MqttCallback() {&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void connectionLost(Throwable cause) {&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&//连接丢失后,一般在这里面进行重连&&&&&&&&&&&&&&&&&&&&System.out.println("connectionLost----------");&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void deliveryComplete(IMqttDeliveryToken token) {&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&//publish后会执行到这里&&&&&&&&&&&&&&&&&&&&System.out.println("deliveryComplete---------"&&&&&&&&&&&&&&&&&&&&&&&&&&&&+ token.isComplete());&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void messageArrived(String topicName, MqttMessage message)&&&&&&&&&&&&&&&&&&&&&&&&throws Exception {&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&&//subscribe后得到的消息会执行到这里面&&&&&&&&&&&&&&&&&&&&System.out.println("messageArrived----------");&&&&&&&&&&&&&&&&&&&&Message msg = new Message();&&&&&&&&&&&&&&&&&&&&msg.what = 1;&&&&&&&&&&&&&&&&&&&&msg.obj = topicName+"---"+message.toString();&&&&&&&&&&&&&&&&&&&&handler.sendMessage(msg);&&&&&&&&&&&&&&&&}&&&&&&&&&&&&});//&&&&&&&&&&&&connect();&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&e.printStackTrace();&&&&&&&&}&&&&}&&&&&private void connect() {&&&&&&&&new Thread(new Runnable() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&public void run() {&&&&&&&&&&&&&&&&try {&&&&&&&&&&&&&&&&&&&&client.connect(options);&&&&&&&&&&&&&&&&&&&&Message msg = new Message();&&&&&&&&&&&&&&&&&&&&msg.what = 2;&&&&&&&&&&&&&&&&&&&&handler.sendMessage(msg);&&&&&&&&&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&&&&&&&&&e.printStackTrace();&&&&&&&&&&&&&&&&&&&&Message msg = new Message();&&&&&&&&&&&&&&&&&&&&msg.what = 3;&&&&&&&&&&&&&&&&&&&&handler.sendMessage(msg);&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&&&&&&&&}).start();&&&&}&&&&&@Override&&&&public boolean onKeyDown(int keyCode, KeyEvent event) {&&&&&&&&if(client != null && keyCode == KeyEvent.KEYCODE_BACK) {&&&&&&&&&&&&try {&&&&&&&&&&&&&&&&client.disconnect();&&&&&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&&&&&e.printStackTrace();&&&&&&&&&&&&}&&&&&&&&}&&&&&&&&return super.onKeyDown(keyCode, event);&&&&}&&&&&@Override&&&&protected void onDestroy() {&&&&&&&&super.onDestroy();&&&&&&&&try {&&&&&&&&&&&&scheduler.shutdown();&&&&&&&&&&&&client.disconnect();&&&&&&&&} catch (MqttException e) {&&&&&&&&&&&&e.printStackTrace();&&&&&&&&}&&&&}}
由于项目需要,我用到了心跳重连。根据的解释设置apollo.xml,主要有设置主机连接的地址。另外,options还有个setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。
3、新建j2se工程MQTTServer
4、Server代码如下:
import java.awt.C
import java.awt.event.ActionE
import java.awt.event.ActionL
import javax.swing.JB
import javax.swing.JF
import javax.swing.JP
import org.eclipse.paho.client.mqttv3.IMqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttC
import org.eclipse.paho.client.mqttv3.MqttConnectO
import org.eclipse.paho.client.mqttv3.MqttDeliveryT
import org.eclipse.paho.client.mqttv3.MqttM
import org.eclipse.paho.client.mqttv3.MqttT
import org.eclipse.paho.client.mqttv3.persist.MemoryP
public class Server extends JFrame {
private static final long serialVersionUID = 1L;
private JP
private JB
private MqttC
private String host = "tcp://127.0.0.1:1883";
// private String host = "tcp://localhost:1883";
private String userName = "test";
private String passWord = "test";
private MqttT
private MqttM
private String myTopic = "test/topic";
public Server() {
client = new MqttClient(host, "Server",
new MemoryPersistence());
connect();
} catch (Exception e) {
e.printStackTrace();
Container container = this.getContentPane();
panel = new JPanel();
button = new JButton("发布话题");
button.addActionListener(new ActionListener() {
public void actionPerformed(ActionEvent ae) {
MqttDeliveryToken token = topic.publish(message);
token.waitForCompletion();
System.out.println(token.isComplete()+"========");
} catch (Exception e) {
e.printStackTrace();
panel.add(button);
container.add(panel, "North");
private void connect() {
MqttConnectOptions options = new MqttConnectOptions();
options.setCleanSession(false);
options.setUserName(userName);
options.setPassword(passWord.toCharArray());
// 设置超时时间
options.setConnectionTimeout(10);
// 设置会话心跳时间
options.setKeepAliveInterval(20);
client.setCallback(new MqttCallback() {
public void connectionLost(Throwable cause) {
System.out.println("connectionLost-----------");
public void deliveryComplete(IMqttDeliveryToken token) {
System.out.println("deliveryComplete---------"+token.isComplete());
public void messageArrived(String topic, MqttMessage arg1)
throws Exception {
System.out.println("messageArrived----------");
topic = client.getTopic(myTopic);
message = new MqttMessage();
message.setQos(1);
message.setRetained(true);
System.out.println(message.isRetained()+"------ratained状态");
message.setPayload("eeeeeaaaaaawwwwww---".getBytes());
client.connect(options);
} catch (Exception e) {
e.printStackTrace();
public static void main(String[] args) {
Server s = new Server();
s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);
s.setSize(600, 370);
s.setLocationRelativeTo(null);
s.setVisible(true);
123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117
import java.awt.Container;import java.awt.event.ActionEvent;import java.awt.event.ActionListener;&import javax.swing.JButton;import javax.swing.JFrame;import javax.swing.JPanel;&import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttCallback;import org.eclipse.paho.client.mqttv3.MqttClient;import org.eclipse.paho.client.mqttv3.MqttConnectOptions;import org.eclipse.paho.client.mqttv3.MqttDeliveryToken;import org.eclipse.paho.client.mqttv3.MqttMessage;import org.eclipse.paho.client.mqttv3.MqttTopic;import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;&public class Server extends JFrame {&&&&private static final long serialVersionUID = 1L;&&&&private JPanel panel;&&&&private JButton button;&&&&&private MqttClient client;&&&&private String host = "tcp://127.0.0.1:1883";//&&&&private String host = "tcp://localhost:1883";&&&&private String userName = "test";&&&&private String passWord = "test";&&&&private MqttTopic topic;&&&&private MqttMessage message;&&&&&private String myTopic = "test/topic";&&&&&public Server() {&&&&&&&&&try {&&&&&&&&&&&&client = new MqttClient(host, "Server",&&&&&&&&&&&&&&&&&&&&new MemoryPersistence());&&&&&&&&&&&&connect();&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&e.printStackTrace();&&&&&&&&}&&&&&&&&&Container container = this.getContentPane();&&&&&&&&panel = new JPanel();&&&&&&&&button = new JButton("发布话题");&&&&&&&&button.addActionListener(new ActionListener() {&&&&&&&&&&&&&@Override&&&&&&&&&&&&public void actionPerformed(ActionEvent ae) {&&&&&&&&&&&&&&&&try {&&&&&&&&&&&&&&&&&&&&MqttDeliveryToken token = topic.publish(message);&&&&&&&&&&&&&&&&&&&&token.waitForCompletion();&&&&&&&&&&&&&&&&&&&&System.out.println(token.isComplete()+"========");&&&&&&&&&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&&&&&&&&&e.printStackTrace();&&&&&&&&&&&&&&&&}&&&&&&&&&&&&}&&&&&&&&});&&&&&&&&panel.add(button);&&&&&&&&container.add(panel, "North");&&&&&}&&&&&private void connect() {&&&&&&&&&MqttConnectOptions options = new MqttConnectOptions();&&&&&&&&options.setCleanSession(false);&&&&&&&&options.setUserName(userName);&&&&&&&&options.setPassword(passWord.toCharArray());&&&&&&&&// 设置超时时间&&&&&&&&options.setConnectionTimeout(10);&&&&&&&&// 设置会话心跳时间&&&&&&&&options.setKeepAliveInterval(20);&&&&&&&&try {&&&&&&&&&&&&client.setCallback(new MqttCallback() {&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void connectionLost(Throwable cause) {&&&&&&&&&&&&&&&&&&&&System.out.println("connectionLost-----------");&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void deliveryComplete(IMqttDeliveryToken token) {&&&&&&&&&&&&&&&&&&&&System.out.println("deliveryComplete---------"+token.isComplete());&&&&&&&&&&&&&&&&}&&&&&&&&&&&&&&&&&@Override&&&&&&&&&&&&&&&&public void messageArrived(String topic, MqttMessage arg1)&&&&&&&&&&&&&&&&&&&&&&&&throws Exception {&&&&&&&&&&&&&&&&&&&&System.out.println("messageArrived----------");&&&&&&&&&&&&&&&&&}&&&&&&&&&&&&});&&&&&&&&&&&&&topic = client.getTopic(myTopic);&&&&&&&&&&&&&message = new MqttMessage();&&&&&&&&&&&&message.setQos(1);&&&&&&&&&&&&message.setRetained(true);&&&&&&&&&&&&System.out.println(message.isRetained()+"------ratained状态");&&&&&&&&&&&&message.setPayload("eeeeeaaaaaawwwwww---".getBytes());&&&&&&&&&&&&&client.connect(options);&&&&&&&&} catch (Exception e) {&&&&&&&&&&&&e.printStackTrace();&&&&&&&&}&&&&&}&&&&&public static void main(String[] args) {&&&&&&&&Server s = new Server();&&&&&&&&s.setDefaultCloseOperation(JFrame.EXIT_ON_CLOSE);&&&&&&&&s.setSize(600, 370);&&&&&&&&s.setLocationRelativeTo(null);&&&&&&&&s.setVisible(true);&&&&}}
上面代码跟客户端的代码差不多,这里就不做解释了。
没什么好说的,MQTT就是这么简单,但开始在使用的时候要注意一些参数的设置来适应项目的需求。
此条目发表在
分类目录。将加入收藏夹。}

我要回帖

更多关于 安卓双清是什么意思 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信