netty心跳包,只要在netty 多线程服务端端设置吗

温馨提示!由于新浪微博认证机制调整,您的新浪微博帐号绑定已过期,请重新绑定!&&|&&
人生有两种生活方式:腐烂或燃烧。胆怯懒惰的人选择前者,勇敢而胸怀博大的人选择后者。
LOFTER精选
网易考拉推荐
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
&通过以上三种方式,可以轻松实现SOCKET长连接的心跳包机制。
另要注意的是这里所说的超时是指逻辑上的超时,而非TCP连接超时。
ReadTimeoutHandler,WriteTimeoutHandler, IdleStateHandler这三种方式都是通过在数据处理管道ChannelPipelineFactory的实现类里配置相关代码来实现。
一般使用根据需求使用一种方式即可。
&它主要使用Timer定时器用自定义的时间,去定时检测相应的状态,检测到后会根据选择的处理方式,是报错或回调指定的接口实现类处理。
&参考源码包
IdleStateHandler 空闲状态的处理代码
* Creates a new instance.
* @param timer
the Timer that is used to trigger the scheduled event.
The recommended Timer implementation is HashedWheelTimer.
* @param readerIdleTimeSeconds
an IdleStateEvent whose state is IdleState.READER_IDLE
will be triggered when no read was performed for the specified
period of time.
Specify 0 to disable.
* @param writerIdleTimeSeconds
an IdleStateEvent whose state is IdleState.WRITER_IDLE
will be triggered when no write was performed for the specified
period of time.
Specify {@code 0} to disable.
* @param allIdleTimeSeconds
an IdleStateEvent whose state is IdleState.ALL_IDLE
will be triggered when neither read nor write was performed
for the specified period of time.
Specify 0 to disable.
public IdleStateHandler(
Timer timer,
int readerIdleTimeSeconds,
int writerIdleTimeSeconds,
int allIdleTimeSeconds) {
this(timer, readerIdleTimeSeconds, writerIdleTimeSeconds, allIdleTimeSeconds, TimeUnit.SECONDS);
&ReadTimeoutHandler 读取数据超时的处理代码
* Creates a new instance.
* @param timer
the Timer that is used to trigger the scheduled event.
The recommended Timer implementation is
HashedWheelTimer.
* @param timeoutSeconds
read timeout in seconds
public ReadTimeoutHandler(Timer timer, int timeoutSeconds) {
this(timer, timeoutSeconds, TimeUnit.SECONDS);
&WriteTimeoutHandler 写数据超时的处理代码
* Creates a new instance.
* @param timer
the Timer that is used to trigger the scheduled event.
The recommended Timer implementation is HashedWheelTimer.
* @param timeoutSeconds
write timeout in seconds
public WriteTimeoutHandler(Timer timer, int timeoutSeconds) {
this(timer, timeoutSeconds, TimeUnit.SECONDS);
IdleStateEvent 空闲事件的接口类
ChannelEvent that is triggered when a Channel has been idle
* for a while.
* @apiviz.landmark
* @apiviz.has org.jboss.netty.handler.timeout.IdleState oneway -
interface IdleStateEvent extends ChannelEvent {
* Returns the detailed idle state.
IdleState getState();
* Returns the last time when I/O occurred in milliseconds.
long getLastActivityTimeMillis();
&DefaultIdleStateEvent空闲事件的实现类
IdleStateAwareChannelHandler 处理空闲事件的接口.
* An extended SimpleChannelHandler that adds the handler method for
* an IdleStateEvent.
* @apiviz.uses org.jboss.netty.handler.timeout.IdleStateEvent
class IdleStateAwareChannelHandler extends SimpleChannelHandler {
* Creates a new instance.
public IdleStateAwareChannelHandler() {
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof IdleStateEvent) {
channelIdle(ctx, (IdleStateEvent) e);
super.handleUpstream(ctx, e);
* Invoked when a Channel has been idle for a while.
void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
ctx.sendUpstream(e);
&IdleStateAwareChannelUpstreamHandler处理空闲事件的接口.
IdleState 定义了三种空闲状态的枚举类
* An Enum that represents the idle state of a Channel.
enum IdleState {
* No data was received for a while.
READER_IDLE,
* No data was sent for a while.
WRITER_IDLE,
* No data was either received or sent for a while.
&ReadTimeoutException 读取超时异常类
WriteTimeoutException 写数据超时异常类
TimeoutException 读取数据和写数据异常类的父类
以下是关于三种方式的关键代码实现讲解。
&IdleStateHandler
&//第一步,在服务启动类中设置数据处理管道.
bootstrap.setPipelineFactory(new TCPServerPipelineFactory());
&//第二步,在数据处理管道实现类里配置空闲状态处理代码.
TCPServerPipelineFactory
implements ChannelPipelineFactory {
&&&&@Override
&&&&public ChannelPipeline getPipeline() throws Exception {
&&&&&&&&// Create a default pipeline implementation.
&&&&&&&&ChannelPipeline pipeline = Channels.pipeline();
&&&&&&&&//设置空闲状态处理操作
pipeline.addLast("idlehandler", new
IdleStateHandler(new HashedWheelTimer(), 10, 5 , 0));
&pipeline.addLast("hearbeat", new Heartbeat());
&&&&&&&&pipeline.addLast("handler", new TCPServerHandler());
&&&&&&&&return
&//第三步,实现方式1:自定义IdleStateAwareChannelHandler的实现类,
//当Channel处理空闲状态时,会触发此方法.
class Heartbeat extends
IdleStateAwareChannelHandler {
&&&&@Override
channelIdle(ChannelHandlerContext ctx, IdleStateEvent e) throws Exception {
&&&&&&&&super.channelIdle(ctx, e);&&&&&&&&
&&&&&&&&if (e.getState() == IdleState.READER_IDLE){
&&&&byte[] test = " ac0bce6".getBytes();
ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);
&&&&&&&&&&&&channelBuffer.writeBytes(upData);
&&&&&&&&&&&&//发送超时数据到终端.
&&&&&&&&&&&&e.getChannel().write(channelBuffer);
&//第三步, 实现方式2:在扩展SimpleChannelHandler的handler类里,如下操作:
//记得注释掉第二步中的代码pipeline.addLast("hearbeat", new Heartbeat());
void handleUpstream(ChannelHandlerContext ctx, ChannelEvent e) throws Exception {
if (e instanceof IdleStateEvent) {
IdleStateEvent ise = (IdleStateEvent)e;
if (ise.getState() == IdleState.READER_IDLE){
byte[] test = "超时 ...".getBytes();
ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);
channelBuffer.writeBytes(test);
//发送超时数据到终端.
ctx.getChannel().write(channelBuffer);
super.handleUpstream(ctx, e);
&ReadTimeoutHandler
&//第一步,在服务启动类中设置数据处理管道.
bootstrap.setPipelineFactory(new TCPServerPipelineFactory());
&//第二步,在数据处理管道实现类里配置空闲状态处理代码.
TCPServerPipelineFactory
implements ChannelPipelineFactory {
&&&&@Override
&&&&public ChannelPipeline getPipeline() throws Exception {
&&&&&&&&// Create a default pipeline implementation.
&&&&&&&&ChannelPipeline pipeline = Channels.pipeline();
&&&&&&&&//设置读取数据超时处理
pipeline.addLast("readTimeOut",new ReadTimeoutHandler(new HashedWheelTimer(),10));
&&&&&&&&pipeline.addLast("handler", new TCPServerHandler());
&&&&&&&&return
&//第三步, 在扩展SimpleChannelHandler的handler类里,如下操作:
&@Override
exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) {
&&&&//对读取超时异常进行判断
&&&&if (e.getCause() instanceof ReadTimeoutException) {
byte[] test = "超时 ...".getBytes();
ChannelBuffer channelBuffer = ChannelBuffers.buffer(test.length);
channelBuffer.writeBytes(test);
ctx.getChannel().write(channelBuffer);
&&&&} else {
logger.log(Level.WARN, "Unexpected exception from downstream.",e.getCause());
WriteTimeoutHandler
略,原理同ReadTimeoutHandler。
阅读(19162)|
用微信&&“扫一扫”
将文章分享到朋友圈。
用易信&&“扫一扫”
将文章分享到朋友圈。
历史上的今天
在LOFTER的更多文章
loftPermalink:'',
id:'fks_',
blogTitle:'Netty的超时机制',
blogAbstract:'\n&Netty超时机制学习\n技术点描述\nReadTimeoutHandler读取数据超时处理\nWriteTimeoutHandler写数据超时处理\nIdleStateHandler状态空闲处理\n\n&通过以上三种方式,可以轻松实现SOCKET长连接的心跳包机制。\n另要注意的是这里所说的超时是指逻辑上的超时,而非TCP连接超时。\n实现方案\nReadTimeoutHandler,WriteTimeoutHandler,',
blogTag:'netty超时,idlestatehandler,writetimeouthandler,readtimeouthandler',
blogUrl:'blog/static/',
isPublished:1,
istop:false,
modifyTime:2,
publishTime:5,
permalink:'blog/static/',
commentCount:0,
mainCommentCount:0,
recommendCount:0,
bsrk:-100,
publisherId:0,
recomBlogHome:false,
currentRecomBlog:false,
attachmentsFileIds:[],
groupInfo:{},
friendstatus:'none',
followstatus:'unFollow',
pubSucc:'',
visitorProvince:'',
visitorCity:'',
visitorNewUser:false,
postAddInfo:{},
mset:'000',
remindgoodnightblog:false,
isBlackVisitor:false,
isShowYodaoAd:false,
hostIntro:'人生有两种生活方式:腐烂或燃烧。胆怯懒惰的人选择前者,勇敢而胸怀博大的人选择后者。',
hmcon:'1',
selfRecomBlogCount:'0',
lofter_single:''
{list a as x}
{if x.moveFrom=='wap'}
{elseif x.moveFrom=='iphone'}
{elseif x.moveFrom=='android'}
{elseif x.moveFrom=='mobile'}
${a.selfIntro|escape}{if great260}${suplement}{/if}
{list a as x}
推荐过这篇日志的人:
{list a as x}
{if !!b&&b.length>0}
他们还推荐了:
{list b as y}
转载记录:
{list d as x}
{list a as x}
{list a as x}
{list a as x}
{list a as x}
{if x_index>4}{break}{/if}
${fn2(x.publishTime,'yyyy-MM-dd HH:mm:ss')}
{list a as x}
{if !!(blogDetail.preBlogPermalink)}
{if !!(blogDetail.nextBlogPermalink)}
{list a as x}
{if defined('newslist')&&newslist.length>0}
{list newslist as x}
{if x_index>7}{break}{/if}
{list a as x}
{var first_option =}
{list x.voteDetailList as voteToOption}
{if voteToOption==1}
{if first_option==false},{/if}&&“${b[voteToOption_index]}”&&
{if (x.role!="-1") },“我是${c[x.role]}”&&{/if}
&&&&&&&&${fn1(x.voteTime)}
{if x.userName==''}{/if}
网易公司版权所有&&
{list x.l as y}
{if defined('wl')}
{list wl as x}{/list}用Netty实现聊天功能
用Netty实现聊天功能
围观2024次
编辑日期: 字体:
[Netty](http://netty.io/) 是一个 Java NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。更多关于 Netty 的知识,可以参阅《Netty 4.x 用户指南》()
Netty 是一个 Java NIO 客户端服务器框架,使用它可以快速简单地开发网络应用程序,比如服务器和客户端的协议。Netty 大大简化了网络程序的开发过程比如 TCP 和 UDP 的 socket 服务的开发。更多关于 Netty 的知识,可以参阅《Netty 4.x 用户指南》(/waylau/netty-4-user-guide)
下面,就基于 Netty 快速实现一个聊天小程序。
Maven 3.2.x
Eclipse 4.x
让我们从 handler (处理器)的实现开始,handler 是由 Netty 生成用来处理 I/O 事件的。
SimpleChatServerHandler.java
public class SimpleChatServerHandler extends SimpleChannelInboundHandler&String& { // (1)
public static ChannelGroup channels = new DefaultChannelGroup(GlobalEventExecutor.INSTANCE);
public void handlerAdded(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(&[SERVER] - & + incoming.remoteAddress() + & 加入\n&);
channels.add(ctx.channel());
public void handlerRemoved(ChannelHandlerContext ctx) throws Exception {
Channel incoming = ctx.channel();
for (Channel channel : channels) {
channel.writeAndFlush(&[SERVER] - & + incoming.remoteAddress() + & 离开\n&);
channels.remove(ctx.channel());
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception { // (4)
Channel incoming = ctx.channel();
for (Channel channel : channels) {
if (channel != incoming){
channel.writeAndFlush(&[& + incoming.remoteAddress() + &]& + s + &\n&);
channel.writeAndFlush(&[you]& + s + &\n&);
public void channelActive(ChannelHandlerContext ctx) throws Exception { // (5)
Channel incoming = ctx.channel();
System.out.println(&SimpleChatClient:&+incoming.remoteAddress()+&在线&);
public void channelInactive(ChannelHandlerContext ctx) throws Exception { // (6)
Channel incoming = ctx.channel();
System.out.println(&SimpleChatClient:&+incoming.remoteAddress()+&掉线&);
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) { // (7)
Channel incoming = ctx.channel();
System.out.println(&SimpleChatClient:&+incoming.remoteAddress()+&异常&);
// 当出现异常就关闭连接
cause.printStackTrace();
ctx.close();
1.SimpleChatServerHandler 继承自 ,这个类实现了
接口,ChannelInboundHandler 提供了许多事件处理的接口方法,然后你可以覆盖这些方法。现在仅仅只需要继承 SimpleChannelInboundHandler 类而不是你自己去实现接口方法。
2.覆盖了 handlerAdded() 事件处理方法。每当从服务端收到新的客户端连接时,客户端的 Channel 存入
列表中,并通知列表中的其他客户端 Channel
3.覆盖了 handlerRemoved() 事件处理方法。每当从服务端收到客户端断开时,客户端的 Channel 移除 ChannelGroup 列表中,并通知列表中的其他客户端 Channel
4.覆盖了 channelRead0() 事件处理方法。每当从服务端读到客户端写入信息时,将信息转发给其他客户端的 Channel。其中如果你使用的是 Netty 5.x 版本时,需要把 channelRead0() 重命名为messageReceived()
5.覆盖了 channelActive() 事件处理方法。服务端监听到客户端活动
6.覆盖了 channelInactive() 事件处理方法。服务端监听到客户端不活动
7.exceptionCaught() 事件处理方法是当出现 Throwable 对象才会被调用,即当 Netty 由于 IO 错误或者处理器在处理事件时抛出的异常时。在大部分情况下,捕获的异常应该被记录下来并且把关联的 channel 给关闭掉。然而这个方法的处理方式会在遇到不同异常的情况下有不同的实现,比如你可能想在关闭连接之前发送一个错误码的响应消息。
SimpleChatServerInitializer.java
SimpleChatServerInitializer 用来增加多个的处理类到 ChannelPipeline 上,包括编码、解码、SimpleChatServerHandler 等。
public class SimpleChatServerInitializer extends
ChannelInitializer&SocketChannel& {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(&framer&, new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(&decoder&, new StringDecoder());
pipeline.addLast(&encoder&, new StringEncoder());
pipeline.addLast(&handler&, new SimpleChatServerHandler());
System.out.println(&SimpleChatClient:&+ch.remoteAddress() +&连接上&);
SimpleChatServer.java
编写一个 main() 方法来启动服务端。
public class SimpleChatServer {
public SimpleChatServer(int port) {
this.port =
public void run() throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(); // (1)
EventLoopGroup workerGroup = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap(); // (2)
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class) // (3)
.childHandler(new SimpleChatServerInitializer())
.option(ChannelOption.SO_BACKLOG, 128)
.childOption(ChannelOption.SO_KEEPALIVE, true); // (6)
System.out.println(&SimpleChatServer 启动了&);
// 绑定端口,开始接收进来的连接
ChannelFuture f = b.bind(port).sync(); // (7)
// 等待服务器
socket 关闭 。
// 在这个例子中,这不会发生,但你可以优雅地关闭你的服务器。
f.channel().closeFuture().sync();
} finally {
workerGroup.shutdownGracefully();
bossGroup.shutdownGracefully();
System.out.println(&SimpleChatServer 关闭了&);
public static void main(String[] args) throws Exception {
if (args.length & 0) {
port = Integer.parseInt(args[0]);
port = 8080;
new SimpleChatServer(port).run();
1. 是用来处理I/O操作的多线程事件循环器,Netty 提供了许多不同的
的实现用来处理不同的传输。在这个例子中我们实现了一个服务端的应用,因此会有2个 NioEventLoopGroup 会被使用。第一个经常被叫做‘boss’,用来接收进来的连接。第二个经常被叫做‘worker’,用来处理已经被接收的连接,一旦‘boss’接收到连接,就会把连接信息注册到‘worker’上。如何知道多少个线程已经被使用,如何映射到已经创建的 上都需要依赖于 EventLoopGroup 的实现,并且可以通过构造函数来配置他们的关系。
2. 是一个启动 NIO 服务的辅助启动类。你可以在这个服务中直接使用 Channel,但是这会是一个复杂的处理过程,在很多情况下你并不需要这样做。
3.这里我们指定使用
类来举例说明一个新的 Channel 如何接收进来的连接。
4.这里的事件处理类经常会被用来处理一个最近的已经接收的 Channel。SimpleChatServerInitializer 继承自 是一个特殊的处理类,他的目的是帮助使用者配置一个新的 Channel。也许你想通过增加一些处理类比如 SimpleChatServerHandler 来配置一个新的 Channel 或者其对应的 来实现你的网络程序。当你的程序变的复杂时,可能你会增加更多的处理类到 pipline 上,然后提取这些匿名类到最顶层的类上。
5.你可以设置这里指定的 Channel 实现的配置参数。我们正在写一个TCP/IP 的服务端,因此我们被允许设置 socket 的参数选项比如tcpNoDelay 和 keepAlive。请参考
实现的接口文档以此可以对ChannelOption 的有一个大概的认识。
6.option() 是提供给 用来接收进来的连接。childOption() 是提供给由父管道
接收到的连接,在这个例子中也是 NioServerSocketChannel。
7.我们继续,剩下的就是绑定端口然后启动服务。这里我们在机器上绑定了机器所有网卡上的 8080 端口。当然现在你可以多次调用 bind() 方法(基于不同绑定地址)。
恭喜!你已经完成了基于 Netty 聊天服务端程序。
SimpleChatClientHandler.java
客户端的处理类比较简单,只需要将读到的信息打印出来即可
public class SimpleChatClientHandler extends
SimpleChannelInboundHandler&String& {
protected void channelRead0(ChannelHandlerContext ctx, String s) throws Exception {
System.out.println(s);
SimpleChatClientInitializer.java
与服务端类似
public class SimpleChatClientInitializer extends ChannelInitializer&SocketChannel& {
public void initChannel(SocketChannel ch) throws Exception {
ChannelPipeline pipeline = ch.pipeline();
pipeline.addLast(&framer&, new DelimiterBasedFrameDecoder(8192, Delimiters.lineDelimiter()));
pipeline.addLast(&decoder&, new StringDecoder());
pipeline.addLast(&encoder&, new StringEncoder());
pipeline.addLast(&handler&, new SimpleChatClientHandler());
SimpleChatClient.java
编写一个 main() 方法来启动客户端。
public class SimpleChatClient {
public static void main(String[] args) throws Exception{
new SimpleChatClient("localhost", 8080).run();
private final S
public SimpleChatClient(String host, int port){
this.host =
this.port =
public void run() throws Exception{
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap bootstrap
= new Bootstrap()
.group(group)
.channel(NioSocketChannel.class)
.handler(new SimpleChatClientInitializer());
Channel channel = bootstrap.connect(host, port).sync().channel();
BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
while(true){
channel.writeAndFlush(in.readLine() + "\r\n");
} catch (Exception e) {
e.printStackTrace();
} finally {
group.shutdownGracefully();
先运行 SimpleChatServer,再可以运行多个 SimpleChatClient,控制台输入文本继续测试
中 simplechat
Netty 4.x 用户指南
本文固定链接:
转载请注明:
作者:leehom
本博客主要是把自己的经验记录于此,方便自己以后查阅及其他遇到类似问题的朋友参考。如果你有觉得不错的文章,可以注册会员发布文章或者邮箱发给我文章地址,谢谢!
如果觉得文章还不错,请麻烦点下广告,算是赞助下本站服务器费用,谢谢!
您可能还会对这些文章感兴趣!netty的个人使用心得 - peirenlei - ITeye技术网站
博客分类:
Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 如果需要客户端和服务器端沟通 分别都需要编写一个 实现了SimpleChannelHandler接口的类,其中类中需要重写的主要方法为 channelConnected() and channelOpen()
这两个方法为
当客户端链接到服务器端得时候和 客户端 channel被创建出来的时候所调用的 channelDisconnected and
channelClosed() 对应上面的两个方法 exceptionCaught 可以获得 对应handler端(服务器或客户端)的异常信息 messageReceived 每个 客户端 发送的信息后
将调用此方法 当编写完某端得程序后(客户端或服务器端) 将编写好的handler需要配置在 实现了ChannelPipelineFactory的类里,ChannelPipelineFactory中有一个需要实现的方法getPipeline将写好的handler配置到其中,在这个 工厂里 可能要添加很多东西 比如说 编解码器,心跳等。。。。 如需要自定义编解码器需要继承:LengthFieldBasedFrameDecoder(解码),OneToOneEncoder(编码) 编解码器(encode,decode) encode为
调用messageReceived 方法之后调用的方法,则decode方法为 messageReceived 之前调用的方法 ,用于处理自定义包协议的解析于编辑 心跳: 当客户端socket在非正常情况家掉线,如: 断网,断电等特殊问题的时候, 客户端的channel对象不会自动关闭,需要一直接收到客户端的消息,从而判断是否可以和对象构成通信。。 如果 发现客户端空闲时间过长则视为掉线 服务端handler代码如下 package com.djyou. import java.util.logging.L import org.jboss.netty.buffer.ChannelB import org.jboss.netty.channel.ChannelHandlerC import org.jboss.netty.channel.ChannelStateE import org.jboss.netty.channel.ChildChannelStateE import org.jboss.netty.channel.ExceptionE import org.jboss.netty.channel.MessageE import org.jboss.netty.channel.SimpleChannelH import org.jboss.netty.channel.group.ChannelG import org.jboss.netty.channel.group.DefaultChannelG public class ChatServerHandler extends SimpleChannelHandler{ public static final ChannelGroup channelGroup = new DefaultChannelGroup();
@Override public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e)
throws Exception {
System.out.println("进来一个"); } @Override public void channelDisconnected(ChannelHandlerContext ctx,
ChannelStateEvent e) throws Exception {
super.channelDisconnected(ctx, e); } @Override public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e)
throws Exception {
Logger.getAnonymousLogger().info(e.getCause().getMessage());
ctx.getChannel().close();
// TODO Auto-generated method stub
//super.exceptionCaught(ctx, e); } @Override public void childChannelClosed(ChannelHandlerContext ctx,
ChildChannelStateEvent e) throws Exception {
super.childChannelClosed(ctx, e); } @Override public void messageReceived(ChannelHandlerContext ctx, MessageEvent e)
throws Exception {
System.out.println(this.id++);
//google protocol解码后返回为 ChannelBuffer类型
if(!(e.getMessage() instanceof ChannelBuffer))
//获得 消息对象
ChannelBuffer channelBuffer = (ChannelBuffer)e.getMessage();
//MessageInfo info = Message.MessageInfo.newBuilder().mergeFrom(channelBuffer.copy().array()).build();
//写回给客户端
e.getChannel().write(channelBuffer);
} } pieplelineFactory里的代码为 package com.djyou. import static org.jboss.netty.channel.Channels.*; import org.jboss.netty.channel.ChannelP import org.jboss.netty.channel.ChannelPipelineF import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32FrameD import org.jboss.netty.handler.codec.protobuf.ProtobufVarint32LengthFieldP import org.jboss.netty.handler.timeout.IdleStateH import org.jboss.netty.util.T public class ChatPipelineServerFactory implements ChannelPipelineFactory{ private T public ChatPipelineServerFactory(Timer timer){
this.timer = } @Override public ChannelPipeline getPipeline() throws Exception {
ChannelPipeline pipeline = pipeline();
//添加netty默认支持的 编解码器(可自动添加包头,并处理粘包问题) pipeline.addLast("frameDecoder", new ProtobufVarint32FrameDecoder());//对应 pipeline.addLast("frameEncoder", new ProtobufVarint32LengthFieldPrepender());//此对象为 netty默认支持protocolbuf的编解码器
pipeline.addLast("timeout", new IdleStateHandler(timer, 10, 10, 0));//此两项为添加心跳机制 10秒查看一次在线的客户端channel是否空闲,IdleStateHandler为netty jar包中提供的类
pipeline.addLast("hearbeat", new Heartbeat());//此类 实现了IdleStateAwareChannelHandler接口
//netty会定时扫描 空闲的channel
//pipeline.addLast("frameDecoder", new ProtobufDecoder(Message.MessageInfo.getDefaultInstance()));
//pipeline.addLast("frameEncoder", new ProtobufEncoder());//
pipeline.addLast("handler", new ChatServerHandler());//将编写好的服务器端的handler添加到这里
} } 心跳包的代码如下 import org.jboss.netty.channel.ChannelHandlerC import org.jboss.netty.handler.timeout.IdleS import org.jboss.netty.handler.timeout.IdleStateAwareChannelH import org.jboss.netty.handler.timeout.IdleStateE public class Heartbeat extends IdleStateAwareChannelHandler{ int i = 0; @Override public void channelIdle(ChannelHandlerContext ctx, IdleStateEvent e)
throws Exception {
// TODO Auto-generated method stub
super.channelIdle(ctx, e);
if(e.getState() == IdleState.WRITER_IDLE)
e.getChannel().close();
System.out.println("掉了。");
} } } 自定义解码器代码 package com.djyou. import org.jboss.netty.buffer.ChannelB import org.jboss.netty.channel.C import org.jboss.netty.channel.ChannelHandlerC import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameD public class Decode extends LengthFieldBasedFrameDecoder{ public Decode(int maxFrameLength, int lengthFieldOffset,
int lengthFieldLength) {
super(maxFrameLength, lengthFieldOffset, lengthFieldLength);
// TODO Auto-generated constructor stub } @Override protected Object decode(ChannelHandlerContext ctx, Channel channel,
ChannelBuffer buffer) throws Exception {
ChannelBuffer buffs = (ChannelBuffer)super.decode(ctx, channel, buffer);
} } 自定义编码器代码 package com.djyou. import org.jboss.netty.channel.C import org.jboss.netty.channel.ChannelHandlerC import org.jboss.netty.handler.codec.oneone.OneToOneE public class Encode extends OneToOneEncoder{ @Override protected Object encode(ChannelHandlerContext ctx, Channel channel,
Object msg) throws Exception {
// TODO Auto-generated method stub
} } 服务端启动代码 package com.djyou. import java.net.InetSocketA import java.util.concurrent.E import org.jboss.netty.bootstrap.ServerB import org.jboss.netty.channel.ChannelF import org.jboss.netty.channel.socket.nio.NioServerSocketChannelF import org.jboss.netty.util.HashedWheelT import org.jboss.netty.util.T public class ChatServer { public static void main(String[] args) {
ChannelFactory factory = new NioServerSocketChannelFactory(Executors
.newCachedThreadPool(), Executors.newCachedThreadPool(),
Runtime.getRuntime().availableProcessors() + 1);
Timer timer = new HashedWheelTimer();
ServerBootstrap bootstrap = new ServerBootstrap(factory);
bootstrap.setPipelineFactory(new ChatPipelineServerFactory(timer));
bootstrap.setOption("child.tcpNoDelay", true);
bootstrap.setOption("child.keepAlive", true);
bootstrap.setOption("reuseAddress", true);
bootstrap.bind(new InetSocketAddress(6666)); } } 客户端启动代码如下(除客户端启动代码意外 其余的东西都与服务器端一样 都需要编写对应的 编解码器,定时发送消息线程(10秒发个信息给服务端 确保channel不为空闲, 来对应心跳程序), 客户端的handler) package com.djyou. import java.io.BufferedR import java.io.InputStreamR import java.net.InetSocketA import java.util.concurrent.E import org.jboss.netty.bootstrap.ClientB import org.jboss.netty.buffer.ChannelB import org.jboss.netty.buffer.ChannelB import org.jboss.netty.channel.C import org.jboss.netty.channel.ChannelF import org.jboss.netty.channel.ChannelF import org.jboss.netty.channel.socket.nio.NioClientSocketChannelF import com.djyou.protoBufModel.M import com.djyou.protoBufModel.Message.MessageI public class ChatClient { public static void main(String[] args) throws Exception{
String host = "localhost";
int port = 6666;
// Configure the client.
ChannelFactory factory =
new NioClientSocketChannelFactory(
Executors.newCachedThreadPool(),
Executors.newCachedThreadPool());
ClientBootstrap bootstrap = new ClientBootstrap(factory);
ChatPipelineClientFactory
cpcf = new ChatPipelineClientFactory();
bootstrap.setPipelineFactory(cpcf);
// Start the connection attempt.
ChannelFuture future = bootstrap.connect(new InetSocketAddress(host, port));
// Wait until the connection attempt succeeds or fails.
Channel channel = future.awaitUninterruptibly().getChannel();
if (!future.isSuccess()) {
future.getCause().printStackTrace();
System.exit(0);
// Read commands from the stdin.
ChannelFuture lastWriteFuture =
BufferedReader in = new BufferedReader(new InputStreamReader(System.in)); //
for (;;) { //
String line = in.readLine(); //
if (line == null) { //
//创建Builder
MessageInfo.Builder builder = MessageInfo.newBuilder();
builder.addBody(Message.Body.newBuilder().setKey("message").setValue("你在干什么?" + "/n").build());
//创建 赋值结束的 Build
并生成 MessageInfo对象
MessageInfo messageInfo = builder.build();
//将messageInfo转换为字节
byte[] messageByte = messageInfo.toByteArray();
//获得此对象的长度
ChannelBuffer channelBuffer = ChannelBuffers.buffer(messageByte.length);
//将 获得到的数组写入 channelBuffer中
channelBuffer.writeBytes(messageByte);
//发送到服务器端
for(int i = 0; i & 10;i++){
lastWriteFuture = channel.write(channelBuffer);
if (lastWriteFuture != null) {
lastWriteFuture.awaitUninterruptibly();
Thread.sleep(50000);
// Wait until all messages are flushed before closing the channel.
Thread.sleep(50000);
// Close the connection.
Make sure the close operation ends because
// all I/O operations are asynchronous in Netty.
channel.close().awaitUninterruptibly();
// We should shut down all thread pools here to exit normally.
// However, it is just fine to call System.exit(0) because we are
// finished with the business.
System.exit(0); } }
浏览 15077
浏览: 680137 次
来自: 武汉
Shrio在线教程: ...
不错,赞一个
写的不错,受用
可以发下源码吗
[b][/b][flash=200,200][url][img ...}

我要回帖

更多关于 netty 服务端关闭连接 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信