求助,activemq 查看队列监控界面点击队列的消息ID查看消息内容报错

> 博客详情
初次发博文,勿喷~~
&&& 最近老大让我使用ActiveMQ实现这么个东东:1.查询消息队列中还有多少任务没有执行;2.消息队列的持久化;
&&& 真是愁杀我也,以前没见过啊,于是又看文档,又百度又google的,最终还是在一天半之后整出来鸟~~
&&& 首先向大家介绍一本书籍《ActiveMQ in Action》,我大部分代码都是参考这本书实现的。好了,废话少说,看代码:
&&& 1.首先启动activeMQ的服务
public class RunServer {
/** 启动activeMQ服务 */
public static void main(String[] args) throws Exception {
RunServer rs = new RunServer();
BrokerService broker = rs.startServer();
public BrokerService startServer() throws Exception{
// java代码调用activemq相关的类来构造并启动brokerService
BrokerService broker = new BrokerService();
// 以下是持久化的配置
// 持久化文件存储位置
File dataFilterDir = new File("targer/amq-in-action/kahadb");
KahaDBStore kaha = new KahaDBStore();
kaha.setDirectory(dataFilterDir);
// use a bigger journal file
kaha.setJournalMaxFileLength();
// small batch means more frequent and smaller writes
kaha.setIndexWriteBatchSize(100);
// do the index write in a separate thread
kaha.setEnableIndexWriteAsync(true);
broker.setPersistenceAdapter(kaha);
// create a transport connector
broker.addConnector("tcp://localhost:61616");
broker.setUseJmx(true);
//broker.setDataDirectory("data/");
// 以下是ManagementContext的配置,从这个容器中可以取得消息队列中未执行的消息数、消费者数、出队数等等
// 设置ManagementContext
ManagementContext context = broker.getManagementContext();
context.setConnectorPort(2011);
context.setJmxDomainName("my-broker");
context.setConnectorPath("/jmxrmi");
broker.start();
System.in.read();
&&& 2.发送消息
public class Sender {
private static final int SEND_NUMBER = 1;
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection =
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// MessageProducer:消息发送者
// 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD,
"tcp://localhost:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
// 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
destination = session.createQueue("test-persistence");
// 得到消息生成者【发送者】
producer = session.createProducer(destination);
// 设置不持久化,可以更改
producer.setDeliveryMode(DeliveryMode.PERSISTENT);
// 构造消息
sendMessage(session, producer);
session.commit();
} catch (Exception e) {
e.printStackTrace();
} finally {
if (null != connection)
connection.close();
} catch (Throwable ignore) {
public static void sendMessage(Session session, MessageProducer producer)
throws Exception {
for (int i = 1; i &= SEND_NUMBER; i++) {
TextMessage message = session
.createTextMessage("ActiveMq 发送的消息" + i);
// 发送消息到目的地方
System.out.println("发送消息:" + i);
producer.send(message);
&&& 3.收消息
public class Receiver {
public static void main(String[] args) {
// ConnectionFactory :连接工厂,JMS 用它创建连接
ConnectionFactory connectionF
// Connection :JMS 客户端到JMS Provider 的连接
Connection connection =
// Session: 一个发送或接收消息的线程
// Destination :消息的目的地;消息发送给谁.
// 消费者,消息接收者
connectionFactory = new ActiveMQConnectionFactory(
ActiveMQConnection.DEFAULT_USER,
ActiveMQConnection.DEFAULT_PASSWORD, "tcp://localhost:61616");
// 构造从工厂得到连接对象
connection = connectionFactory.createConnection();
connection.start();
// 获取操作连接
session = connection.createSession(Boolean.TRUE,
Session.AUTO_ACKNOWLEDGE);
//test-queue跟sender的保持一致,一个创建一个来接收
destination = session.createQueue("test-persistence");
consumer = session.createConsumer(destination);
consumer.setMessageListener(new MessageListener() {
public void onMessage(Message arg0) {
Thread.sleep(1000);
} catch (InterruptedException e1) {
e1.printStackTrace();
System.out.println("==================");
System.out.println("RECEIVE1第一个获得者:"
+ ((TextMessage) arg0).getText());
} catch (JMSException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (Exception e) {
e.printStackTrace();
if (null != connection)
connection.close();
} catch (Throwable ignore) {
&&& 4.获取消息的状态,也就是上面所说的获得消息队列中未执行的消息数、消费者数、出队数等等
public class StateTest {
* 获取状态
* @throws Exception
public static void main(String[] args) throws Exception {
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://localhost:2011/jmxrmi");
JMXConnector connector = JMXConnectorFactory.connect(url, null);
connector.connect();
MBeanServerConnection connection = connector.getMBeanServerConnection();
// 需要注意的是,这里的my-broker必须和上面配置的名称相同
ObjectName name = new ObjectName("my-broker:BrokerName=localhost,Type=Broker");
BrokerViewMBean mBean =
(BrokerViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection,
name, BrokerViewMBean.class, true);
// System.out.println(mBean.getBrokerName());
for(ObjectName queueName : mBean.getQueues()) {
QueueViewMBean queueMBean =
(QueueViewMBean)MBeanServerInvocationHandler.newProxyInstance(connection, queueName, QueueViewMBean.class, true);
System.out.println("\n------------------------------\n");
// 消息队列名称
System.out.println("States for queue --- " + queueMBean.getName());
// 队列中剩余的消息数
System.out.println("Size --- " + queueMBean.getQueueSize());
// 消费者数
System.out.println("Number of consumers --- " + queueMBean.getConsumerCount());
System.out.println("Number of dequeue ---" + queueMBean.getDequeueCount() );
到此结束,希望可以为大家做个参考~~
”在线下联结了各位 OSCer,推广开源项目和理念,很荣幸有你的参与~
领取条件:参与过开源中国“源创会”的 OSCer 可以领取
如果active mq嵌入到自己项目中就不行了呀
大小写改一下:
my-broker:type=Broker,brokerName=localhost
支付宝支付
微信扫码支付
打赏金额: ¥
已支付成功
打赏金额: ¥帐号:密码:下次自动登录{url:/nForum/slist.json?uid=guest&root=list-section}{url:/nForum/nlist.json?uid=guest&root=list-section}
贴数:20&分页:水木总版主发信人: aaa888 (aaa888), 信区: Java
标&&题: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sat Dec 10 21:16:30 2016), 站内 && activemq消息队列和kafka有什么区别呢
-- && ※ 来源:·水木社区 ·[FROM: 111.192.57.*]
我的月份又来了发信人: JulyClyde (我的月份又来了), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 14:04:21 2016), 站内 && activeMQ是标准消息队列的一个实现产品
kafka是什么?
【 在 aaa888 (aaa888) 的大作中提到: 】
: activemq消息队列和kafka有什么区别呢
发信人: suzhe (烦着呢), 信区: LinuxApp
标&&题: Re: Fail to update gadget data,Please try again later.
发信站: 水木社区 (Mon Jun&&2 21:50:33 2008), 站内
【 在 JulyClyde (七月) 的大作中提到: 】
: redhat的太监操作系统!
&&&& ※ 来源:·水木社区 newsmth.net·[FROM: 219.143.89.*]
天下无云发信人: quhw (天下无云), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 14:28:55 2016), 站内 && ActiveMQ实现了JMS规范,而Kafka没有,消息模型不同,API设计不同。 && Kafka设计上支持集群并发操作,吞吐率远高于ActiveMQ(以及其他JMS实现)。 && 【 在 aaa888 的大作中提到: 】
: activemq消息队列和kafka有什么区别呢
&& -- && ※ 来源:·水木社区 ·[FROM: 210.22.91.74]
水木总版主发信人: aaa888 (aaa888), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 19:13:25 2016), 站内 &&&& 【 在 quhw 的大作中提到: 】
: ActiveMQ实现了JMS规范,而Kafka没有,消息模型不同,API设计不同。
: Kafka设计上支持集群并发操作,吞吐率远高于ActiveMQ(以及其他JMS实现)。
那你的意思是kafka完胜activeMQ&&那为啥我看到有的公司又用Kafaka&&又用activeMQ&& 这两者使用场景有啥区别呢
-- && ※ 来源:·水木社区 ·[FROM: 111.192.57.*]
天下无云发信人: quhw (天下无云), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 21:23:24 2016), 站内 && kafka可靠性上还有些问题,不支持事务,不是JMS规范的东西。 &&&& 【 在 aaa888 的大作中提到: 】
: 那你的意思是kafka完胜activeMQ&&那为啥我看到有的公司又用Kafaka&&又用activeMQ&& 这两者使用场景有啥区别呢
&& -- && ※ 来源:·水木社区 ·[FROM: 61.171.25.219]
Helsinki发信人: Helsinki (鬱鬱寡歡的龜龜在吃biangbiang麵), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 21:56:23 2016), 站内 && kafka主要可以分布式运行吧&& &&&& 【 在 aaa888 () 的大作中提到: 】
: activemq消息队列和kafka有什么区别呢
发自xsmth (iOS版)
bluex () 回复 ···
等我另外一个id放出来以后我再虐你 我记住你了 赫赫 黑心司机 && 【 在 Helsinki 的大作中提到: 】
: 蓝x没在本贴出现?&&
&&&& ※ 来源:·水木社区 ·[FROM: 223.104.19.*]
Helsinki发信人: Helsinki (鬱鬱寡歡的龜龜在吃biangbiang麵), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Sun Dec 11 21:57:06 2016), 站内 && 搜噶&& &&&& 【 在 quhw () 的大作中提到: 】
: kafka可靠性上还有些问题,不支持事务,不是JMS规范的东西。
: 【 在 aaa888 的大作中提到: 】
发自xsmth (iOS版)
bluex () 回复 ···
等我另外一个id放出来以后我再虐你 我记住你了 赫赫 黑心司机 && 【 在 Helsinki 的大作中提到: 】
: 蓝x没在本贴出现?&&
&&&& ※ 来源:·水木社区 ·[FROM: 223.104.19.*]
骑白马的发信人: emirbobo (骑白马的), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Mon Dec 12 09:35:22 2016), 站内 && 可靠性不如mq,kafka是客户端拉数据,mq是服务端主动推,kafka吞吐量比mq高一个数量级 &&&& 【 在 aaa888 () 的大作中提到: 】
: activemq消息队列和kafka有什么区别呢
发自xsmth (iOS版)
-- && ※ 来源:·水木社区 ·[FROM: 61.148.242.*]
fisherMartyn发信人: wFisher (fisherMartyn), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Mon Dec 12 15:26:18 2016), 站内 && activemq是一个支持amqp协议的消息队列实现。
kafka是一个分布式数据流处理平台,当然很多人用它的队列功能,不考虑amqp这种。 && 是否符合业务场景和轮子本身是什么,这是两个问题。
【 在 aaa888 的大作中提到: 】
: activemq消息队列和kafka有什么区别呢
&& - 来自「最水木 for iOS」
-- && ※ 来源:·最水木 客户端·[FROM: 117.136.0.*]
过眼云烟发信人: graceman (过眼云烟), 信区: Java
标&&题: Re: activemq消息队列和kafka有什么区别呢
发信站: 水木社区 (Tue Dec 13 08:25:43 2016), 站内 && kafaka最初被用来同意归档存储日志等非关键数据的...你感受下 && 【 在 aaa888 的大作中提到: 】
: 【 在 quhw 的大作中提到: 】&&
: : ActiveMQ实现了JMS规范,而Kafka没有,消息模型不同,API设计不同。&&
: : Kafka设计上支持集群并发操作,吞吐率远高于ActiveMQ(以及其他JMS实现)。&&
: :&& && #发自zSMTH@android 6.0
-- && ※ 来源:·水木社区 ·[FROM: 106.38.54.*]
文章数:20&分页:&&精spring整合apache activemq实现消息发送的三种方式代码配置实例我们项目中发送事件告警要用到消息队列,所以学习了下activemq,整理如下:activemq的介绍就不用说了,官网上大家可以详细的看到。1.下载并安装activemq:地址,我下面的例子用的是5.9.0的版本。下载后解压就完成安装了。进入解压目录的bin目录,选择windows位数(32/64),启动activemq.bat就可以开启activemq服务了。登陆就可以进入mq的界面了,官方默认登录名/密码是:admin/admin,也可以在conf/jetty-realm.properties中自行修改,activemq界面如下:简单介绍下导航栏:Queues:队列方式消息。Topics:主题方式消息。Subscribers:消息订阅监控查询。Connections:查看链接数,分别可以查看xmpp、ssl、stomp、openwire、ws和网络链接。Network:网络链接数监控。Scheduled:没有用到,不太清楚。Send:发送消息数据2.发送和接受消息的步骤:&a.发送消息(1)创建连接使用的工厂类JMS ConnectionFactory(2)使用管理对象JMS ConnectionFactory建立连接Connection,并启动(3)使用连接Connection 建立会话Session(4)使用会话Session和管理对象Destination创建消息生产者MessageSender(5)使用消息生产者MessageSender发送消息b.接收消息(1)创建连接使用的工厂类JMS ConnectionFactory(2)使用管理对象JMS ConnectionFactory建立连接Connection,并启动(3)使用连接Connection建立会话Session(4)使用会话Session和管理对象Destination创建消息接收者MessageReceiver(5)使用消息接收者MessageReceiver接受消息,需要用setMessageListener将MessageListener接口绑定到MessageReceiver,消息接收者必须实现了MessageListener接口,需要定义onMessage事件方法。3.spring的集成:spring集成所需要的jar包我已经在pom.xml中配置了,大家可以看看都需要哪些jar。当然下载的activemq的lib下也有这些jar。spring的整合比较简单,只需在spring的配置文件中配置消息模板JmsTemplete就可以了,具体如下:
&?xml version=&1.0& encoding=&UTF-8&?&
&beans xmlns=&http://www.springframework.org/schema/beans&
xmlns:context=&http://www.springframework.org/schema/context&
xmlns:xsi=&http://www.w3.org/2001/XMLSchema-instance&
xsi:schemaLocation=&http://www.springframework.org/schema/beans
http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/context
http://www.springframework.org/schema/context/spring-context-3.0.xsd&&
&!-- 连接池
&bean id=&pooledConnectionFactory& class=&org.apache.activemq.pool.PooledConnectionFactory& destroy-method=&stop&&
&property name=&connectionFactory&&
&bean class=&org.apache.activemq.ActiveMQConnectionFactory&&
&property name=&brokerURL& value=&tcp://localhost:61616& /&
&/property&
&!-- 连接工厂 --&
&bean id=&activeMQConnectionFactory& class=&org.apache.activemq.ActiveMQConnectionFactory&&
&property name=&brokerURL& value=&tcp://localhost:61616& /&
&!-- 配置消息目标 --&
&bean id=&destination& class=&org.apache.activemq.command.ActiveMQQueue&&
&constructor-arg index=&0& value=&com.zuidaima.spring& /&
&!-- 消息模板 --&
&bean id=&jmsTemplate& class=&org.springframework.jms.core.JmsTemplate&&
&property name=&connectionFactory& ref=&activeMQConnectionFactory& /&
&property name=&defaultDestination& ref=&destination& /&
&property name=&messageConverter&&
&bean class=&org.springframework.jms.support.converter.SimpleMessageConverter& /&
&/property&
&/beans&4.项目运行截图:JMS其他几种方式我就不一一截图了,请具体运行项目查看。5.开发环境:win7 32位+eclipse kepler + jdk7 + maven由编辑于 9:44:15猜你喜欢9个牛币请下载代码后再发表评论//zuidaima_activemq/zuidaima_activemq/.classpath/zuidaima_activemq/.project/zuidaima_activemq/.settings/zuidaima_activemq/.settings/org.eclipse.jdt.core.prefs/zuidaima_activemq/.settings/org.eclipse.m2e.core.prefs/zuidaima_activemq/pom.xml/zuidaima_activemq/src/zuidaima_activemq/src/applicationContext.xml/zuidaima_activemq/src/com/zuidaima_activemq/src/com/zuidamai/zuidaima_activemq/src/com/zuidamai/jms/zuidaima_activemq/src/com/zuidamai/pointToPoint精精精精原精精原精原精原精原精原原原原原相关分享最近下载暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级最近浏览暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级暂无贡献等级扫描二维码关注最代码为好友"/>扫描二维码关注最代码为好友activemq消息队列阻塞 怎么处理_百度知道
activemq消息队列阻塞 怎么处理
我有更好的答案
PTP模式下,连到同一个destination的两端,可以通过broker中转来传输大文件。发送端使用connection。这种方式仅仅适用于小文件的传输.createOutputStream(destination)。对于大文件;接收端则简单的使用connection,message序列化以后存放于blob字段。特别是如果broker端使用数据库作为存储按照JMS规范,为了保证可靠性,所有的消息都应该是发送到broker,像正常的message一样处理。对于比较小的文件,简单的处理方式是先读取所有的文件成byte[],然后使用ByteMessage,把文件数据发送到broker。InputStream in = connection,写入效率极低。直接传输文件为了解决传输大文件的问题.createInputStream拿到一个输入流,从中读取文件数据即可,往流里写文件。OutputStream out =connection,例如1GB以上的文件,这么搞直接把client或是broker给oom掉了.createOutputStream打开一个输出流,ActiveMQ在jms规范之外引入了jms streams的概念,文件传输频繁或是稍微有点大,然后交由broker来投递的。也即是说其实JMS是不建议或不支持传输文件的
为您推荐:
其他类似问题
换一换
回答问题,赢新手礼包
个人、企业类
违法有害信息,请在下方选择后提交
色情、暴力
我们会通过消息、邮箱等方式尽快将举报结果通知您。查看: 3831|回复: 1
java消息队列 ActiveMQ实例
TA的每日心情开心 20:47签到天数: 5 天[LV.2]偶尔看看I
1.下载ActiveMQ
去官方网站下载:
我下载的时候是 ActiveMQ 5.8.0 Release版
2.运行ActiveMQ
解压缩apache-activemq-5.8.0-bin.zip,然后双击apache-activemq-5.5.1\bin\activemq.bat运行ActiveMQ程序。
启动ActiveMQ以后,登陆:http://localhost:8161/admin/,创建一个Queue,命名为FirstQueue。
3.创建Eclipse项目并运行
创建java project:ActiveMQ-5.8,新建lib文件夹
打开apache-activemq-5.8.0\lib目录
activemq-broker-5.8.0.jar
activemq-client-5.8.0.jar
geronimo-j2ee-management_1.1_spec-1.0.1.jar
geronimo-jms_1.1_spec-1.1.1.jar
slf4j-api-1.6.6.jar
这5个jar文件到lib文件夹中,并Build Path-&Add to Build Path
08127d9f-fa80-37b0-a73a-473276bfe5a4.jpg (26.08 KB, 下载次数: 11)
22:52 上传
package com.lm.
* @Header: Sender.java
* 类描述:
* @author: lm
上午10:52:42
* @company 欢
* @addr 北京市朝阳区劲松
*/
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.DeliveryM
import javax.jms.D
import javax.jms.MessageP
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Sender {
& & & & private static final int SEND_NUMBER = 5;
& & & & public static void main(String[] args) {
& & & & & & & & // ConnectionFactory :连接工厂,JMS 用它创建连接
& & & & & & & & ConnectionFactory connectionF // Connection :JMS 客户端到JMS
& & & & & & & & // Provider 的连接
& & & & & & & & Connection connection = // Session: 一个发送或接收消息的线程
& & & & & & & & S // Destination :消息的目的地;消息发送给谁.
& & & & & & & & Des // MessageProducer:消息发送者
& & & & & & & & MessageP // TextM
& & & & & & & & // 构造ConnectionFactory实例对象,此处采用ActiveMq的实现jar
& & & & & & & & connectionFactory = new ActiveMQConnectionFactory(
& & & & & & & & & & & & & & & & ActiveMQConnection.DEFAULT_USER,
& & & & & & & & & & & & & & & & ActiveMQConnection.DEFAULT_PASSWORD, &tcp://localhost:61616&);
& & & & & & & & try { // 构造从工厂得到连接对象
& & & & & & & & & & & & connection = connectionFactory.createConnection();
& & & & & & & & & & & & // 启动
& & & & & & & & & & & & connection.start();
& & & & & & & & & & & & // 获取操作连接
& & & & & & & & & & & & session = connection.createSession(Boolean.TRUE,
& & & & & & & & & & & & & & & & & & & & Session.AUTO_ACKNOWLEDGE);
& & & & & & & & & & & & // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
& & & & & & & & & & & & destination = session.createQueue(&FirstQueue&);
& & & & & & & & & & & & // 得到消息生成者【发送者】
& & & & & & & & & & & & producer = session.createProducer(destination);
& & & & & & & & & & & & // 设置不持久化,此处学习,实际根据项目决定
& & & & & & & & & & & & producer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
& & & & & & & & & & & & // 构造消息,此处写死,项目就是参数,或者方法获取
& & & & & & & & & & & & sendMessage(session, producer);
& & & & & & & & & & & & session.commit();
& & & & & & & & } catch (Exception e) {
& & & & & & & & & & & & e.printStackTrace();
& & & & & & & & } finally {
& & & & & & & & & & & & try {
& & & & & & & & & & & & & & & & if (null != connection)
& & & & & & & & & & & & & & & & & & & & connection.close();
& & & & & & & & & & & & } catch (Throwable ignore) {
& & & & & & & & & & & & }
& & & & & & & & }
& & & & }
& & & & public static void sendMessage(Session session, MessageProducer producer)
& & & & & & & & & & & & throws Exception {
& & & & & & & & for (int i = 1; i &= SEND_NUMBER; i++) {
& & & & & & & & & & & & TextMessage message = session.createTextMessage(&ActiveMq 发送的消息&
& & & & & & & & & & & & & & & & & & & & + i);
& & & & & & & & & & & & // 发送消息到目的地方
& & & & & & & & & & & & System.out.println(&发送消息:& + &ActiveMq 发送的消息& + i);
& & & & & & & & & & & & producer.send(message);
& & & & & & & & }
& & & & }
}
复制代码
package com.lm.
* @Header: Receiver.java
* 类描述:
* @author: lm
上午10:52:58
* @company 欢
* @addr 北京市朝阳区劲松
*/
import javax.jms.C
import javax.jms.ConnectionF
import javax.jms.D
import javax.jms.MessageC
import javax.jms.S
import javax.jms.TextM
import org.apache.activemq.ActiveMQC
import org.apache.activemq.ActiveMQConnectionF
public class Receiver {
& & & & public static void main(String[] args) {
& & & & & & & & // ConnectionFactory :连接工厂,JMS 用它创建连接
& & & & & & & & ConnectionFactory connectionF
& & & & & & & & // Connection :JMS 客户端到JMS Provider 的连接
& & & & & & & & Connection connection =
& & & & & & & & // Session: 一个发送或接收消息的线程
& & & & & & & & S
& & & & & & & & // Destination :消息的目的地;消息发送给谁.
& & & & & & & & Des
& & & & & & & & // 消费者,消息接收者
& & & & & & & & MessageC
& & & & & & & & connectionFactory = new ActiveMQConnectionFactory(
& & & & & & & & & & & & & & & & ActiveMQConnection.DEFAULT_USER,
& & & & & & & & & & & & & & & & ActiveMQConnection.DEFAULT_PASSWORD, &tcp://localhost:61616&);
& & & & & & & & try {
& & & & & & & & & & & & // 构造从工厂得到连接对象
& & & & & & & & & & & & connection = connectionFactory.createConnection();
& & & & & & & & & & & & // 启动
& & & & & & & & & & & & connection.start();
& & & & & & & & & & & & // 获取操作连接
& & & & & & & & & & & & session = connection.createSession(Boolean.FALSE,
& & & & & & & & & & & & & & & & & & & & Session.AUTO_ACKNOWLEDGE);
& & & & & & & & & & & & // 获取session注意参数值xingbo.xu-queue是一个服务器的queue,须在在ActiveMq的console配置
& & & & & & & & & & & & destination = session.createQueue(&FirstQueue&);
& & & & & & & & & & & & consumer = session.createConsumer(destination);
& & & & & & & & & & & & while (true) {
& & & & & & & & & & & & & & & & // 设置接收者接收消息的时间,为了便于测试,这里谁定为100s
& & & & & & & & & & & & & & & & TextMessage message = (TextMessage) consumer.receive(100000);
& & & & & & & & & & & & & & & & if (null != message) {
& & & & & & & & & & & & & & & & & & & & System.out.println(&收到消息& + message.getText());
& & & & & & & & & & & & & & & & } else {
& & & & & & & & & & & & & & & & & & & &
& & & & & & & & & & & & & & & & }
& & & & & & & & & & & & }
& & & & & & & & } catch (Exception e) {
& & & & & & & & & & & & e.printStackTrace();
& & & & & & & & } finally {
& & & & & & & & & & & & try {
& & & & & & & & & & & & & & & & if (null != connection)
& & & & & & & & & & & & & & & & & & & & connection.close();
& & & & & & & & & & & & } catch (Throwable ignore) {
& & & & & & & & & & & & }
& & & & & & & & }
& & & & }
}
5.测试过程
先运行:Receiver.java
再运行:Sender.java
可以看到结果
Sender运行后:
发送消息:ActiveMq 发送的消息1
发送消息:ActiveMq 发送的消息2
发送消息:ActiveMq 发送的消息3
发送消息:ActiveMq 发送的消息4
发送消息:ActiveMq 发送的消息5
Receiver运行后:
收到消息ActiveMq 发送的消息1
收到消息ActiveMq 发送的消息2
收到消息ActiveMq 发送的消息3
收到消息ActiveMq 发送的消息4
收到消息ActiveMq 发送的消息5
TA的每日心情郁闷 13:55签到天数: 4 天[LV.2]偶尔看看I
这个好像有点难呀
Beijing Aptech Beida Jade Bird Information Technology Co.,Ltd
北大青鸟IT教育 北京阿博泰克北大青鸟信息技术有限公司 版权所有}

我要回帖

更多关于 activemq消息队列 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信