ftpnio socket clientt 用的是nio还是io

11:21 提问
FTPClient上传storeFile报错,求救
今天要写一个很简单的ftp上传的代码,结果总是报错,请教各位大神来帮忙解决
这是我的代码
public class UpLoadJSToFtp {
* @param url ftp服务器地址
* @param port ftp服务器端口号
* @param userName 登录账号
* @param password 登录密码
* @param path 服务器保存路径
* @param fileName 文件名
* @param input 输入流
* @return 上传成功返回true ,失败返回false
public static boolean uploadFile(String url,
String userName,
String password,
String path,
String fileName,
InputStream input){
boolean isSuccess =
FTPClient ftp = new FTPClient();
ftp.setControlEncoding("UTF-8");
ftp.connect(url);// 连接FTP服务器
// 如果采用默认端口,可以使用ftp.connect(url)的方式直接连接FTP服务器
boolean loginResualt = ftp.login(userName, password);
reply = ftp.getReplyCode();
if(!loginResualt && !FTPReply.isPositiveCompletion(reply)){
ftp.disconnect();
return isS
ftp.setFileType(FTPClient.BINARY_FILE_TYPE);
ftp.makeDirectory(path);
ftp.changeWorkingDirectory(path);
ftp.enterLocalPassiveMode();
ftp.storeFile(fileName, input);
input.close();
ftp.logout();
isSuccess =
} catch (SocketException e) {
// TODO Auto-generated catch block
e.printStackTrace();
} catch (IOException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}finally {
if(ftp.isConnected()) {
ftp.disconnect();
} catch(IOException ioe) {
return isS
* 将本地文件上传到FTP服务器上 *
public static void upLoadFromProduction(String url,// FTP服务器hostname
int port,// FTP服务器端口
String username, // FTP登录账号
String password, // FTP登录密码
String path, // FTP服务器保存目录
String filename, // 上传到FTP服务器上的文件名
String pathName // 输入流文件路径
FileInputStream in = new FileInputStream(new File(pathName));
boolean flag = uploadFile(url, port, username, password, path,filename, in);
System.out.println(flag);
} catch (Exception e) {
e.printStackTrace();
public static void main(String[] args) {
String url = "172.25.5.229";
int port = 22;
String username = "etdftp";
String password = "123";
String path = "/caoyang";
String filename = "caoyangFTPTEST.js";
String pathName = "D:/FndData_1021.js";
upLoadFromProduction(url, port, username, password, path, filename, pathName);
这是报的错误,求解
java.net.SocketException: Connection reset
at java.net.SocketInputStream.read(SocketInputStream.java:189)
at java.net.SocketInputStream.read(SocketInputStream.java:121)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:283)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:325)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:177)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:154)
at java.io.BufferedReader.read(BufferedReader.java:175)
at mons.net.io.CRLFLineReader.readLine(CRLFLineReader.java:58)
at mons.net.ftp.FTP.__getReply(FTP.java:310)
at mons.net.ftp.FTP.__getReply(FTP.java:290)
at mons.net.ftp.FTP.sendCommand(FTP.java:479)
at mons.net.ftp.FTP.sendCommand(FTP.java:552)
at mons.net.ftp.FTP.sendCommand(FTP.java:601)
at mons.net.ftp.FTP.pasv(FTP.java:952)
at mons.net.ftp.FTPClient._openDataConnection_(FTPClient.java:755)
at mons.net.ftp.FTPClient._storeFile(FTPClient.java:565)
at mons.net.ftp.FTPClient.__storeFile(FTPClient.java:557)
at mons.net.ftp.FTPClient.storeFile(FTPClient.java:1795)
at stag.utils.UpLoadJSToFtp.uploadFile(UpLoadJSToFtp.java:54)
at stag.utils.UpLoadJSToFtp.upLoadFromProduction(UpLoadJSToFtp.java:89)
at stag.utils.UpLoadJSToFtp.main(UpLoadJSToFtp.java:104)
按赞数排序
没人帮忙吗!求助啊!为什么FTPClient在jdk1.6以上就不能正常运行了。求解
相关参考资料【总结】两种 NIO 实现:Selector 与 Epoll - 推酷
【总结】两种 NIO 实现:Selector 与 Epoll
我想用这个话题小结下最近这一阶段的各种测试和开发。其实文章的内容主要还是想总结一下
NIO Socket
,以及两种不同操作系统实现
问题应该从服务器端开始说起。我们都写过net包下的socket,用socket的accept方法来等待客户端的请求,请求来了则处理,没有则一直等待,然后反复循环。这样的方式,类似于重用进程,要说线程也可以,始终就在这一条路上堵着。这样没有并发可言,我们想到了可以用多线程,用线程池的方式来解决这个问题。这样一般的小问题能解决了,100个初始化的线程,解决几千个连接应该没什么问题。可是,如果我做的是传输的项目呢,这100个线程还是阻塞的,第101个线程连接的时候,如果前面的100个都还在传输,那这第101个人还是在空等,而且他连个回音都不能收到。而且这样的方式实现起来并不怎么容易,虽然线程池有Exectors这样的类帮你生成,可是遇到共享变量和协同等问题还是很头疼。一个更好的做法是,模仿FTP一样,将指令与传输分开进行,一个端口负责简短的指令,尽可能的端连接,而其他端口处理业务。这样至少服务器能返回消息了。
第三种解决方案,多数情况下,也是最快的一种被提出来了,选择器selector。用一个线程来查询一组socket,找出已经准备好读写的,然后按顺序处理socket,当然这种实现方式的前提是IO必须使用通道和缓冲区而不是流。
用java来开发NIO socket的程序,最先要理解的还是各种概念。
通道channel,在NIO socket中使用到的通道有三个,SocketChannel、ServerSocketChannel、DatagramChannel。前两种基于TCP,最后一种一种用来实现UDP的通信。ServerSocketChannel不做任何数据上的处理,只是提供通道,负责连接。SocketChannel职责和net包下的socket类似,只不过这里是以通道的形势来对接。
缓冲区Buffer,这里最常用的还是ByteBuffer,在mina中使用的IoBuffer也是基于ByteBuffer实现的。它的好处是可以自己拼装去想到的数据。当然利用通道后数据切换的速度也会更快了。
要实现非阻塞IO最重要的还是选择器:
The Selector class manages information about a set of registered channels and their readiness states. Channels are registered with selectors, and a selector can be asked to update the readiness states of the channels currently registered with it. When doing so, the invoking thread can optionally indicate that it would prefer to be suspended until one of the registered channels is ready.
SelectableChannel&
This abstract class provides the common methods needed to implement channel selectability. It's the superclass of all channel classes that support readiness selection. FileChannel objects are not selectable because they don't extend from SelectableChannel. All the socket channel classes are selectable, as well as the channels obtained from a Pipe object. SelectableChannel objects can be registered with Selector objects, along with an indication of which operations on that channel are of interest for that selector. A channel can be registered with multiple selectors, but only once per selector.
SelectionKey
A SelectionKey encapsulates the registration relationship between a specific channel and a specific selector. A SelectionKey object is returned from SelectableChannel.register( ) and serves as a token representing the registration. SelectionKey objects contain two bit sets (encoded as integers) indicating which channel operations the registrant has an interest in and which operations the channel is ready to perform.
管理被注册的通道的集合的信息和其就绪状态,同时也更新通道的就绪状态。并且一个通道可以被注册到多个选择器上,而对于同一个选择器则只能被注册一次。
import java.io.IOE
import java.net.InetSocketA
import java.net.ServerS
import java.nio.ByteB
import java.nio.channels.SelectableC
import java.nio.channels.SelectionK
import java.nio.channels.S
import java.nio.channels.ServerSocketC
import java.nio.channels.SocketC
import java.util.I
public class SelectSockets {
private static final int PORT = 8082;
private ByteBuffer buffer = ByteBuffer.allocate(1024);
public static void main(String[] args) throws IOException {
SelectSockets ss = new SelectSockets();
public void go() throws IOException {
System.out.println(&listening on port:& + PORT);
ServerSocketChannel ssc = ServerSocketChannel.open();
ServerSocket ss = ssc.socket();
Selector selector = Selector.open();
ss.bind(new InetSocketAddress(PORT));
ssc.configureBlocking(false);
ssc.register(selector, SelectionKey.OP_ACCEPT);
while (true) {
int n = selector.select();
if (n == 0) {
Iterator&SelectionKey& iter = selector.selectedKeys().iterator();
while (iter.hasNext()) {
SelectionKey key = iter.next();
if (key.isAcceptable()) {
ServerSocketChannel server = (ServerSocketChannel) key
.channel();
SocketChannel client = server.accept();
register(selector, client, SelectionKey.OP_READ);
System.out.println(&Accept client:& + client);
acceptClient(client);
if (key.isReadable()) {
readData(key);
iter.remove();
protected void register(Selector selector, SelectableChannel channel,
int ops) throws IOException {
if (channel == null) {
channel.configureBlocking(false);
channel.register(selector, ops);
protected void readData(SelectionKey key) throws IOException {
SocketChannel socketChannel = (SocketChannel) key.channel();
while ((count = socketChannel.read(buffer)) & 0) {
buffer.flip();
while (buffer.hasRemaining()) {
socketChannel.write(buffer);
buffer.clear();
if (count & 0) {
socketChannel.close();
private void acceptClient(SocketChannel channel) throws IOException {
buffer.clear();
buffer.put(&you have already connected server!&.getBytes());
buffer.flip();
channel.write(buffer);
上面的代码就是一般的
服务器端实现的过程。
当然对于一个企业级的应用这样的代码肯定是太单薄了,仅仅就
我们是不是能共通过用多线程的方式来增强他的处理能力?是不是只有一个线程在跑
,让这一个线程处理那么多的连接有点儿过意不去。答案并不是和你想的一样
For the first scenario, in which you want to bring more threads into play to service channels,
resist the urge to use multiple selectors
. Performing readiness selection on large numbers of chann
most of the work is done by the underlying operating system
. Maintaining multiple selectors and randomly assigning channels to one of them is not a satisfactory solution to this problem. It simply makes smaller versions of the same scenario.
如果您想要将更多的线程来为通道提供服务,请抵抗住使用多个选择器的欲望。在大量通道上执行就绪选择并不会有很大的开销,大多数工作是由底层操作系统完成的。管理多个选择器并随机地将通道分派给它们当中的一个并不是这个问题的合理的解决方案。这只会形成这个场景的一个更小的版本。
一个更好的方案就是,让一个线程处理selector,让其他线程去处理就绪通道的业务。
模式就和上图上描述的一样,但是我的疑问来了,这样和线程池下的服务端连接好像看起来并没有多少大的优势。同样还是要启那么多的线程去处理这些业务。这也是我最近一直想从
源码中找到的答案,可是还是没有发现我想要的,它也是通过
用原型模型的方式来实现并发的。不过我估计
应该是有异步的实现,这样也会对性能上有影响。具体还有待研究。
最后要说的就是客户端了,最近其实也的客户端比较多,版本一个接一个,写了不下七八个,各种方式,各种测试,其实有几点心得可以分享:
l& 如果是做大文件的传输,切分的性价比其实比连续传的性价比高不了多少,虽然像迅雷这样可以分好多块传输,但那毕竟是
的结构,文件本来就松散的。考虑到切分再校验再重组,这样还不如切大块,然后顺序传。
l& 尽量将指令和传输分开,指令可以加密,然后更具协议,返回端口和地址让服务器端做到分布式的处理。
l& 还有就是客户端是否要用非阻塞模式,客户端如果不是做出
模式的,而且能用多线程解决问题的,就没必要用非阻塞的模式,因为非阻塞模式的发送和接收的时机很难控制,特别是用原生的
l& 在做两端通信的时候,特别是不同语言写的程序和不同操作系统下,要注意字节序(高有效和低有效)和进制的问题。
这样的框架很好,如果再配上
这样的多平台序列化工具,可以很好的实现自定义协议的通信。自己订协议的好处就是安全,而且能做应答机制。
Epoll是Linux下多路复用IO接口select/poll的增强版本,它能显著提高程序在大量并发连接中只有少量活跃的情况下的系统CPU利用率,因为它会复用文件描述符集合来传递结果而不用迫使开发者每次等待事件之前都必须重新准备要被侦听的文件描述符集合,另一点原因就是获取事件的时候,它无须遍历整个被侦听的描述符集,只要遍历那些被内核IO事件异步唤醒而加入Ready队列的描述符集合就行了。epoll除了提供select/poll那种IO事件的电平触发(Level Triggered)外,还提供了边沿触发(Edge Triggered),这就使得用户空间程序有可能缓存IO状态,减少epoll_wait/epoll_pwait的调用,提高应用程序效率。
在linux下,NIO可以采用epoll来实现非阻塞,注意,是在linux下:
-Djava.nio.channels.spi.SelectorProvider=sun.nio.ch.EPollSelectorProvider
--------------------------------------------------------------------
下周就要开始弄kafka了,其实网络方面的测试还没有找到最好的解决方案,虽然测了很多东西,但是真的是没有时间去做更多更复杂的东西。接下来赶紧把mina的源码看完,再看看netty就要把重心放在kafka上了。其实网络编程还是很有必要的,好多分布式的通信方案都可以建立在一套比较完整的消息机制上。
已发表评论数()
已收藏到推刊!
请填写推刊名
描述不能大于100个字符!
权限设置: 公开
仅自己可见
正文不准确
排版有问题
没有分页内容
视频无法显示
图片无法显示posts - 341,&
comments - 60,&
trackbacks - 0
public static void main(String[] args) throws Exception {
ConnectingIOReactor ioReactor = new DefaultConnectingIOReactor();
PoolingNHttpClientConnectionManager cm = new PoolingNHttpClientConnectionManager(ioReactor);
cm.setMaxTotal(100);
CloseableHttpAsyncClient httpAsyncClient = HttpAsyncClients.custom().setConnectionManager(cm).build();
httpAsyncClient.start();
String[] urisToGet = {
final CountDownLatch latch = new CountDownLatch(urisToGet.length);
for (final String uri: urisToGet) {
final HttpGet httpget = new HttpGet(uri);
httpAsyncClient.execute(httpget, new FutureCallback&HttpResponse&() {
public void completed(final HttpResponse response) {
latch.countDown();
System.out.println(httpget.getRequestLine() + "-&" + response.getStatusLine());
public void failed(final Exception ex) {
latch.countDown();
System.out.println(httpget.getRequestLine() + "-&" + ex);
public void cancelled() {
latch.countDown();
System.out.println(httpget.getRequestLine() + " cancelled");
latch.await();
简要流程总结如下:
HttpAsyncClient有一个AbstractMultiworkerIOReactor和AbstractIOReactor, 前者和后者类似于netty的bossGroup和workerGroup,&AbstractMultiworkerIOReactor负责channel的连接,&AbstractIOReactor负责channel的读写
先要明白连接池的结构
AbstractNIOConnPool 下各个变量的含义
// 一个可复用的ioreactor, 负责生成SessionRequest并唤醒selector去做连接到目标网站的操作
private final ConnectingIOR
// 用来构造连接池的entry的工厂
private final NIOConnFactory&T, C& connF
// 验证并生成目标连接socketAddress的类
private final SocketAddressResolver&T& addressR
// 一个可复用的callBack类, 里面提供了一个调用SessionRequest的complete的方法
private final SessionRequestCallback sessionRequestC
// 用域名区分的连接池
private final Map&T, RouteSpecificPool&T, C, E&& routeToP
// 没有成功拿到连接的请求列表
private final LinkedList&LeaseRequest&T, C, E&& leasingR
// 已经拿到连接权利, 但是还没连接成功的连接集合
private final Set&SessionRequest&
// 已经连接成功, 并被租借出去的连接集合
private final Set&E&
// 当前连接池可用的连接集合
private final LinkedList&E&
// 已经连接完成, 但是不可用的连接集合, 例如因为异常连接失败等待, 他们会在队列中等待被调用回调方法做后续处理
private final ConcurrentLinkedQueue&LeaseRequest&T, C, E&& completedR
// 每个route的最大连接数
private final Map&T, Integer& maxPerR
private final L
// 是否关闭
private final AtomicBoolean isShutD
// 每个route最大连接数默认值
private volatile int defaultMaxPerR
// 整个连接池最大连接数
private volatile int maxT
1. 发起请求
&a. 根据请求route查看连接池, 如果连接池不为空, 直接返回跟池中connection绑定的future, 并把该conn放入leased列表
&b. 如果因为某些原因导致当前请求无法取得连接, 但是没有发生致命错误的, 请求将被放入一个 leasing 列表, 这个列表会在后续动作中被取出来做连接重试
&c. 如果实在连接过程中出现了移除等不可恢复的错误, 则将request标记为completed, 退出方法后调用fireCallBack, 进行回调清理, 这次请求就算是失败结束了
&d. 如果是因为连接池没有可用连接, 但是可以新建连接的情况, 则会将request 加入pending列表, 并调用 selector的wakeup()方法, selector在wakeup以后会使用AbstractMultiworkerIOReactor(bossGroup)来进行连接操作, 并注册到selector中, 后续的connectable事件监听和channel连接成功注册也是由他完成的
2.&AbstractIOReactor监听读写事件
3. 通过decoder检测response已经完成, 最后将连接release到连接池中, 此时将连接从leased列表除去, 并加入到available中
Future&HttpResponse& execute(
HttpUriRequest request,
FutureCallback&org.apache.http.HttpResponse& callback)
请求开始, 里面会调用 execute(request, new BasicHttpContext(), callback)
Future&HttpResponse& execute(
final HttpUriRequest request,
final HttpContext context,
final FutureCallback&HttpResponse& callback)
context 代表了一次请求的上下文, 里面实际上就是一个用来存储 attribute 的结构, 默认的实现 BasicHttpContext 实际上就是一个 ConcurrentHashMap
context 是可以嵌套的, 代码如下
@ThreadSafe
public class BasicHttpContext implements HttpContext {
private final HttpContext parentC
private final Map&String, Object&
public BasicHttpContext() {
this(null);
public BasicHttpContext(final HttpContext parentContext) {
this.map = new ConcurrentHashMap&String, Object&();
this.parentContext = parentC
public Object getAttribute(final String id) {
Args.notNull(id, "Id");
Object obj = this.map.get(id);
if (obj == null && this.parentContext != null) {
obj = this.parentContext.getAttribute(id);
public void setAttribute(final String id, final Object obj) {
Args.notNull(id, "Id");
if (obj != null) {
this.map.put(id, obj);
this.map.remove(id);
public Object removeAttribute(final String id) {
Args.notNull(id, "Id");
return this.map.remove(id);
* @since 4.2
public void clear() {
this.map.clear();
public String toString() {
return this.map.toString();
接着看执行流程
public Future&HttpResponse& execute(
final HttpUriRequest request,
final HttpContext context,
final FutureCallback&HttpResponse& callback) {
final HttpH
target = determineTarget(request); // 这一步是取出目标host
} catch (final ClientProtocolException ex) {
final BasicFuture&HttpResponse& future = new BasicFuture&HttpResponse&(callback);
future.failed(ex);
return execute(target, request, context, callback);
public Future&HttpResponse& execute(
final HttpHost target, final HttpRequest request, final HttpContext context,
final FutureCallback&HttpResponse& callback) {
return execute(
HttpAsyncMethods.create(target, request),
HttpAsyncMethods.createConsumer(),
context, callback);
位于HttpAsyncClient接口下的
* Initiates asynchronous HTTP request execution using the given context.
* The request producer passed to this method will be used to generate
* a request message and stream out its content without buffering it
* in memory. The response consumer passed to this method will be used
* to process a response message without buffering its content in memory.
* Please note it may be unsafe to interact with the context instance
* while the request is still being executed.
* @param &T& the result type of request execution.
* @param requestProducer request producer callback.
* @param responseConsumer response consumer callaback.
* @param context HTTP context
* @param callback future callback.
* @return future representing pending completion of the operation.
&T& Future&T& execute(
HttpAsyncRequestProducer requestProducer,
HttpAsyncResponseConsumer&T& responseConsumer,
HttpContext context,
FutureCallback&T& callback);
这里会通过原来的请求信息生成一个requestProducer跟responseConsumer, 默认会调用HttpAsyncClient的InternalHttpAsyncClient的实现, 如下
public &T& Future&T& execute(
final HttpAsyncRequestProducer requestProducer,
final HttpAsyncResponseConsumer&T& responseConsumer,
final HttpContext context,
final FutureCallback&T& callback) {
final Status status = getStatus();
Asserts.check(status == Status.ACTIVE, "Reques " +
"I/O reactor status: %s", status);
final BasicFuture&T& future = new BasicFuture&T&(callback);
final HttpClientContext localcontext = HttpClientContext.adapt(
context != null ? context : new BasicHttpContext());
setupContext(localcontext);
@SuppressWarnings("resource")
final DefaultClientExchangeHandlerImpl&T& handler = new DefaultClientExchangeHandlerImpl&T&(
requestProducer,
responseConsumer,
localcontext,
this.connmgr,
this.exec);
handler.start(); // 请求开始
} catch (final Exception ex) {
handler.failed(ex);
&这里通过生成一个ExchangeHandler来实现请求开始, 查看 handler.start()
public void start() throws HttpException, IOException {
final HttpHost target = this.requestProducer.getTarget();
final HttpRequest original = this.requestProducer.generateRequest();
if (original instanceof HttpExecutionAware) {
((HttpExecutionAware) original).setCancellable(this);
this.exec.prepare(this.state, target, original); // 准备动作, 往state里设置各种状态
requestConnection(); // 实际发送请求的地方
private void requestConnection() {
if (this.log.isDebugEnabled()) {
this.log.debug("[exchange: " + this.state.getId() + "] Request connection for " +
this.state.getRoute());
discardConnection();
this.state.setValidDuration(0);
this.state.setNonReusable();
this.state.setRouteEstablished(false);
this.state.setRouteTracker(null);
final HttpRoute route = this.state.getRoute();
final Object userToken = this.localContext.getUserToken();
final RequestConfig config = this.localContext.getRequestConfig();
this.connmgr.requestConnection( // 此处调用ConenctionManager的requestConnection方法
userToken,
config.getConnectTimeout(),
config.getConnectionRequestTimeout(),
TimeUnit.MILLISECONDS,
new FutureCallback&NHttpClientConnection&() {
public void completed(final NHttpClientConnection managedConn) {
connectionAllocated(managedConn);
public void failed(final Exception ex) {
connectionRequestFailed(ex);
public void cancelled() {
connectionRequestCancelled();
再看NHttpClientConnectionManager下的
* Returns a {@link Future} for a {@link NHttpClientConnection}.
* Please note that the consumer of that connection is responsible
* for fully establishing the route the to the connection target
* by calling {@link #startRoute(org.apache.http.nio.NHttpClientConnection,
org.apache.http.conn.routing.HttpRoute,
org.apache.http.protocol.HttpContext) startRoute} in order to start
* the process of connection initialization, optionally calling
* {@link #upgrade(org.apache.http.nio.NHttpClientConnection,
org.apache.http.conn.routing.HttpRoute,
org.apache.http.protocol.HttpContext) upgrade} method to upgrade
* the connection after having executed &code&CONNECT&/code& method to
* all intermediate proxy hops and and finally calling
* {@link #routeComplete(org.apache.http.nio.NHttpClientConnection,
org.apache.http.conn.routing.HttpRoute,
org.apache.http.protocol.HttpContext) routeComplete} to mark the route
* as fully completed.
* @param route HTTP route of the requested connection.
* @param state expected state of the connection or &code&null&/code&
if the connection is not expected to carry any state.
* @param connectTimeout connect timeout.
* @param connectionRequestTimeout
connection request timeout.
* @param timeUnit time unit of the previous two timeout values.
* @param callback future callback.
Future&NHttpClientConnection& requestConnection(
HttpRoute route,
Object state,
long connectTimeout,
long connectionRequestTimeout,
TimeUnit timeUnit,
FutureCallback&NHttpClientConnection& callback);
它调用了PoolingNHttpClientConnectionManager的实现
public Future&NHttpClientConnection& requestConnection(
final HttpRoute route,
final Object state,
final long connectTimeout,
final long leaseTimeout,
final TimeUnit tunit,
final FutureCallback&NHttpClientConnection& callback) {
Args.notNull(route, "HTTP route");
if (this.log.isDebugEnabled()) {
this.log.debug("Connection request: " + format(route, state) + formatStats(route));
final BasicFuture&NHttpClientConnection& future = new BasicFuture&NHttpClientConnection&(callback);
final HttpH
if (route.getProxyHost() != null) {
host = route.getProxyHost();
host = route.getTargetHost();
final SchemeIOSessionStrategy sf = this.iosessionFactoryRegistry.lookup(
host.getSchemeName());
if (sf == null) {
future.failed(new UnsupportedSchemeException(host.getSchemeName() +
" protocol is not supported"));
this.pool.lease(route, state,
connectTimeout, leaseTimeout, tunit != null ? tunit : TimeUnit.MILLISECONDS,
new InternalPoolEntryCallback(future)); // 这里就是实际运用连接池的地方
看 AbstractNIOConnPool 的 lease 方法
public Future&E& lease(
final T route, final Object state,
final long connectTimeout, final long leaseTimeout, final TimeUnit tunit,
final FutureCallback&E& callback) {
Args.notNull(route, "Route");
Args.notNull(tunit, "Time unit");
Asserts.check(!this.isShutDown.get(), "Connection pool shut down");
final BasicFuture&E& future = new BasicFuture&E&(callback);
this.lock.lock(); // 同步
final long timeout = connectTimeout & 0 ? tunit.toMillis(connectTimeout) : 0;
final LeaseRequest&T, C, E& request = new LeaseRequest&T, C, E&(route, state, timeout, leaseTimeout, future);
final boolean completed = processPendingRequest(request); // 1) 获取连接的方法
if (!request.isDone() && !completed) {
// 2) 因为连接池满而不能马上获得连接的的, 加入到一个leasing的LinkedList中, 他会在后续的某些操作中被取出来重新尝试连接发送请求
this.leasingRequests.add(request);
if (request.isDone()) {
// 3) 已经完成连接动作(注意是连接动作完成, 不是请求完成获得响应, 这里的连接完成包括从连接池获取到连接, 或者是因为异常request被设置为fail)的请求, 加入到一个ConcurrentLinkedQueue中, 这个队列的唯一作用就是标记连接完成以后, 调用fireCallBack方法会从里面把这些连接完成的request做一遍回调处理
this.completedRequests.add(request);
} finally {
this.lock.unlock();
fireCallbacks();
这里主要涉及到连接池&AbstractNIOConnPool 以及连接池下得实际存储连接的&RouteSpecificPool,&
然后开始分析连接流程
private boolean processPendingRequest(final LeaseRequest&T, C, E& request) {
final T route = request.getRoute();
final Object state = request.getState();
final long deadline = request.getDeadline();
final long now = System.currentTimeMillis();
if (now & deadline) {
request.failed(new TimeoutException());
return false;
final RouteSpecificPool&T, C, E& pool = getPool(route);
for (;;) { // 租借连接池连接
entry = pool.getFree(state); // getFree即是从available中获取一个state匹配的连接
if (entry == null) { // 没有可用连接退出循环
// 清除不可用连接
if (entry.isClosed() || entry.isExpired(System.currentTimeMillis())) {
entry.close();
this.available.remove(entry);
pool.free(entry, false);
if (entry != null) { // 找到连接退出
this.available.remove(entry);
this.leased.add(entry);
pleted(entry);
onLease(entry);
return true;
// 需要新连接的情况
// New connection is needed
final int maxPerRoute = getMax(route);
// 已经分配的连接超出可分配限制
// Shrink the pool prior to allocating a new connection
final int excess = Math.max(0, pool.getAllocatedCount() + 1 - maxPerRoute);
// 对连接池进行缩减, 将上次使用的连接关闭并删除, 直到超出的连接全被清除
if (excess & 0) {
for (int i = 0; i & i++) {
final E lastUsed = pool.getLastUsed(); // 这个方法是取到 available 里的最后一个连接, 也就是说会出现所有连接都被租借出去了的情况, 这样的话就相当于连接池满, 到下一步的 if (pool.getAllocatedCount() & maxPerRoute) 即会 false, 最后导致request进入 leasingRequest 列表
if (lastUsed == null) {
lastUsed.close();
this.available.remove(lastUsed);
pool.remove(lastUsed);
// 已分配连接数 & 最大连接数限制, 开始新建
if (pool.getAllocatedCount() & maxPerRoute) {
// 总共被使用的数量等于 正在等待连接数 + 已经租借出去的连接数
final int totalUsed = this.pending.size() + this.leased.size();
final int freeCapacity = Math.max(this.maxTotal - totalUsed, 0);
if (freeCapacity == 0) {
return false;
// 需要注意的是pool里available不为空, 也有可能拿不到可用连接, 因为state不匹配
final int totalAvailable = this.available.size();
// 总的available & 连接空位时, 会随机选择最后一次使用的连接, 并把它关掉.. 没搞明白这一步是干嘛用的
if (totalAvailable & freeCapacity - 1) {
if (!this.available.isEmpty()) {
final E lastUsed = this.available.removeLast();
lastUsed.close();
final RouteSpecificPool&T, C, E& otherpool = getPool(lastUsed.getRoute());
otherpool.remove(lastUsed);
// 创建连接监视器阶段, 创建了一个监时此次请求的监视对象 SessionRequest, 并调用selector的wakeup(), 出发实际的连接操作
final SocketAddress localA
final SocketAddress remoteA
remoteAddress = this.addressResolver.resolveRemoteAddress(route);
localAddress = this.addressResolver.resolveLocalAddress(route);
} catch (final IOException ex) {
request.failed(ex);
return false;
       // 重点关注一下这个connect方法
final SessionRequest sessionRequest = this.ioreactor.connect(
remoteAddress, localAddress, route, this.sessionRequestCallback);
final int timout = request.getConnectTimeout() & Integer.MAX_VALUE ?
(int) request.getConnectTimeout() : Integer.MAX_VALUE;
sessionRequest.setConnectTimeout(timout);
// 加入到总pending集合
this.pending.add(sessionRequest);
// 加入到route连接池pending集合
pool.addPending(sessionRequest, request.getFuture());
return true;
return false;
// 检查最后一个完成的request的结果, 并设置future的状态
private void fireCallbacks() {
LeaseRequest&T, C, E&
while ((request = this.completedRequests.poll()) != null) {
final BasicFuture&E& future = request.getFuture();
final Exception ex = request.getException();
final E result = request.getResult();
if (ex != null) {
future.failed(ex);
} else if (result != null) {
pleted(result);
future.cancel();
看看DefaultConnectingIOReactor的connect方法
public SessionRequest connect(
final SocketAddress remoteAddress,
final SocketAddress localAddress,
final Object attachment,
final SessionRequestCallback callback) {
Asserts.check(this.pareTo(IOReactorStatus.ACTIVE) &= 0,
"I/O reactor has been shut down");
final SessionRequestImpl sessionRequest = new SessionRequestImpl(
remoteAddress, localAddress, attachment, callback);
sessionRequest.setConnectTimeout(this.config.getConnectTimeout());
this.requestQueue.add(sessionRequest);
this.selector.wakeup(); // 去看看wakeup()以后会发生什么事情
return sessionR
在AbstractMultiworkerIOReactor中有一个execute()方法
* Activates the main I/O reactor as well as all worker I/O reactors.
* The I/O main reactor will start reacting to I/O events and triggering
* notification methods. The worker I/O reactor in their turn will start
* reacting to I/O events and dispatch I/O event notifications to the given
* {@link IOEventDispatch} interface.
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor and used
* to manage creation of new I/O channels. Once a new I/O channel has been
* created the processing of I/O events on that channel will be delegated
* to one of the worker I/O reactors.
* The method will remain blocked unto the I/O reactor is shut down or the
* execution thread is interrupted.
* @see #processEvents(int)
* @see #cancelRequests()
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
public void execute(
final IOEventDispatch eventDispatch) throws InterruptedIOException, IOReactorException {
Args.notNull(eventDispatch, "Event dispatcher");
synchronized (this.statusLock) {
if (this.pareTo(IOReactorStatus.SHUTDOWN_REQUEST) &= 0) {
this.status = IOReactorStatus.SHUT_DOWN;
this.statusLock.notifyAll();
Asserts.check(this.pareTo(IOReactorStatus.INACTIVE) == 0,
"Illegal state %s", this.status);
this.status = IOReactorStatus.ACTIVE;
// Start I/O dispatchers
for (int i = 0; i & this.dispatchers. i++) {
final BaseIOReactor dispatcher = new BaseIOReactor(this.selectTimeout, this.interestOpsQueueing);
dispatcher.setExceptionHandler(exceptionHandler);
this.dispatchers[i] =
for (int i = 0; i & this.workerC i++) {
final BaseIOReactor dispatcher = this.dispatchers[i];
this.workers[i] = new Worker(dispatcher, eventDispatch);
this.threads[i] = this.threadFactory.newThread(this.workers[i]);
// 启动所有worker线程, 连接的事情是交给 AbstractMultiworkerIOReactor 来做的, 但是连接成功后的事情则是交给 AbstractIOReactor Worker 线程来处理, 前者类似于 bossGroup, 后者类似于 workerGroup
for (int i = 0; i & this.workerC i++) {
if (this.status != IOReactorStatus.ACTIVE) {
this.threads[i].start();
// 使用无限循环监听事件
for (;;) {
final int readyC
// 阻塞, 直到超时或者调用 wakeup()
readyCount = this.selector.select(this.selectTimeout);
} catch (final InterruptedIOException ex) {
} catch (final IOException ex) {
throw new IOReactorException("Unexpected selector failure", ex);
// 如果有需要处理的事件, 则进入processEvents流程, 实际的连接过程就在这里
if (this.pareTo(IOReactorStatus.ACTIVE) == 0) {
processEvents(readyCount);
// Verify I/O dispatchers
for (int i = 0; i & this.workerC i++) {
final Worker worker = this.workers[i];
final Exception ex = worker.getException();
if (ex != null) {
throw new IOReactorException(
"I/O dispatch worker terminated abnormally", ex);
if (this.pareTo(IOReactorStatus.ACTIVE) & 0) {
} catch (final ClosedSelectorException ex) {
addExceptionEvent(ex);
} catch (final IOReactorException ex) {
if (ex.getCause() != null) {
addExceptionEvent(ex.getCause());
} finally {
doShutdown();
synchronized (this.statusLock) {
this.status = IOReactorStatus.SHUT_DOWN;
this.statusLock.notifyAll();
首次连接的时候, 触发的是 DefaultConnectingIOReactor 的 processEvents 方法
protected void processEvents(final int readyCount) throws IOReactorException {
processSessionRequests(); // 这里就是实际连接的地方
if (readyCount & 0) {
final Set&SelectionKey& selectedKeys = this.selector.selectedKeys();
for (final SelectionKey key : selectedKeys) {
processEvent(key);
selectedKeys.clear();
final long currentTime = System.currentTimeMillis();
if ((currentTime - this.lastTimeoutCheck) &= this.selectTimeout) {
this.lastTimeoutCheck = currentT
final Set&SelectionKey& keys = this.selector.keys();
processTimeouts(keys);
private void processSessionRequests() throws IOReactorException {
SessionRequestI
// wakeup 一次将队列的所有request处理(发起连接)掉
while ((request = this.requestQueue.poll()) != null) {
if (request.isCompleted()) {
final SocketChannel socketC
socketChannel = SocketChannel.open();
} catch (final IOException ex) {
throw new IOReactorException("Failure opening socket", ex);
socketChannel.configureBlocking(false);
validateAddress(request.getLocalAddress());
validateAddress(request.getRemoteAddress());
if (request.getLocalAddress() != null) {
final Socket sock = socketChannel.socket();
sock.setReuseAddress(this.config.isSoReuseAddress());
sock.bind(request.getLocalAddress());
prepareSocket(socketChannel.socket());
final boolean connected = socketChannel.connect(request.getRemoteAddress());
if (connected) { // 马上连接成功, 处理下一个
final ChannelEntry entry = new ChannelEntry(socketChannel, request);
addChannel(entry);
} catch (final IOException ex) {
closeChannel(socketChannel);
request.failed(ex);
// 还未连接成功, 则注册到selector, 等待connect事件的触发, 再用processEvent来处理
final SessionRequestHandle requestHandle = new SessionRequestHandle(request);
final SelectionKey key = socketChannel.register(this.selector, SelectionKey.OP_CONNECT,
requestHandle);
request.setKey(key);
} catch (final IOException ex) {
closeChannel(socketChannel);
throw new IOReactorException("Failure registering channel " +
"with the selector", ex);
// 这个方法是连接成功以后注册channel的方法
private void processEvent(final SelectionKey key) {
if (key.isConnectable()) {
final SocketChannel channel = (SocketChannel) key.channel();
// Get request handle
final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
// Finish connection process
channel.finishConnect();
} catch (final IOException ex) {
sessionRequest.failed(ex);
key.cancel();
key.attach(null);
if (!sessionRequest.isCompleted()) {
// 注册新channel, 这些channel后来会被worker线程处理, 他们来进行io读写
addChannel(new ChannelEntry(channel, sessionRequest));
channel.close();
} catch (IOException ignore) {
} catch (final CancelledKeyException ex) {
final SessionRequestHandle requestHandle = (SessionRequestHandle) key.attachment();
key.attach(null);
if (requestHandle != null) {
final SessionRequestImpl sessionRequest = requestHandle.getSessionRequest();
if (sessionRequest != null) {
sessionRequest.cancel();
接下来看连接成功后的IOReactor如何处理, 如下&BaseIOReactor 的 execute 方法
* Activates the I/O reactor. The I/O reactor will start reacting to
* I/O events and triggering notification methods.
* This method will enter the infinite I/O select loop on
* the {@link Selector} instance associated with this I/O reactor.
* The method will remain blocked unto the I/O reactor is shut down or the
* execution thread is interrupted.
* @see #acceptable(SelectionKey)
* @see #connectable(SelectionKey)
* @see #readable(SelectionKey)
* @see #writable(SelectionKey)
* @see #timeoutCheck(SelectionKey, long)
* @see #validate(Set)
* @see #sessionCreated(SelectionKey, IOSession)
* @see #sessionClosed(IOSession)
* @throws InterruptedIOException if the dispatch thread is interrupted.
* @throws IOReactorException in case if a non-recoverable I/O error.
protected void execute() throws InterruptedIOException, IOReactorException {
this.status = IOReactorStatus.ACTIVE;
for (;;) {
final int readyC
readyCount = this.selector.select(this.selectTimeout);
} catch (final InterruptedIOException ex) {
} catch (final IOException ex) {
throw new IOReactorException("Unexpected selector failure", ex);
if (this.status == IOReactorStatus.SHUT_DOWN) {
// Hard shut down. Exit select loop immediately
if (this.status == IOReactorStatus.SHUTTING_DOWN) {
// Graceful shutdown in process
// Try to close things out nicely
closeSessions();
closeNewChannels();
// Process selected I/O events
if (readyCount & 0) {
processEvents(this.selector.selectedKeys());
// Validate active channels
validate(this.selector.keys());
// Process closed sessions
processClosedSessions();
// If active process new channels
if (this.status == IOReactorStatus.ACTIVE) {
processNewChannels();
// Exit select loop if graceful shutdown has been completed
if (this.pareTo(IOReactorStatus.ACTIVE) & 0
&& this.sessions.isEmpty()) {
if (this.interestOpsQueueing) {
// process all pending interestOps() operations
processPendingInterestOps();
} catch (final ClosedSelectorException ignore) {
} finally {
hardShutdown();
synchronized (this.statusMutex) {
this.statusMutex.notifyAll();
private void processEvents(final Set&SelectionKey& selectedKeys) {
for (final SelectionKey key : selectedKeys) {
processEvent(key);
selectedKeys.clear();
* Processes new event on the given selection key.
* @param key the selection key that triggered an event.
protected void processEvent(final SelectionKey key) {
final IOSessionImpl session = (IOSessionImpl) key.attachment();
if (key.isAcceptable()) {
acceptable(key);
if (key.isConnectable()) {
connectable(key);
if (key.isReadable()) {
session.resetLastRead();
readable(key);
if (key.isWritable()) {
session.resetLastWrite();
writable(key);
} catch (final CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
这个就跟AbstractMultiworkerIOReactor类似, 只不过两个人的兴趣集事件不太一样, 看看 AbstractIOReactor 的实现类 BaseIOReactor就知道了
* This I/O reactor implementation does not react to the
* {@link SelectionKey#OP_ACCEPT} event.
* Super-classes can override this method to react to the event.
protected void acceptable(final SelectionKey key) {
* This I/O reactor implementation does not react to the
* {@link SelectionKey#OP_CONNECT} event.
* Super-classes can override this method to react to the event.
protected void connectable(final SelectionKey key) {
* Processes {@link SelectionKey#OP_READ} event on the given selection key.
* This method dispatches the event notification to the
* {@link IOEventDispatch#inputReady(IOSession)} method.
protected void readable(final SelectionKey key) {
final IOSession session = getSession(key);
this.eventDispatch.inputReady(session);
if (session.hasBufferedInput()) {
this.bufferingSessions.add(session);
} catch (final CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
} catch (final RuntimeException ex) {
handleRuntimeException(ex);
* Processes {@link SelectionKey#OP_WRITE} event on the given selection key.
* This method dispatches the event notification to the
* {@link IOEventDispatch#outputReady(IOSession)} method.
protected void writable(final SelectionKey key) {
final IOSession session = getSession(key);
this.eventDispatch.outputReady(session);
} catch (final CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
} catch (final RuntimeException ex) {
handleRuntimeException(ex);
它实际上只会处理 read 跟 write 事件
这里特别注意一下 AbstractNIOConnPool requestComplete 并不是整个请求结束, 而是连接成功的意思, 看看他的调用的地方和触发的东西
最开始他是从 AbstractIOReactor 的processNewChannels中来的, 这个方法在execute里被触发
rivate void processNewChannels() throws IOReactorException {
while ((entry = this.newChannels.poll()) != null) { // 记得上面连接成功后调用addChannel加到的这个队列, 现在取出来
final SocketC
final SelectionK
channel = entry.getChannel();
channel.configureBlocking(false);
key = channel.register(this.selector, SelectionKey.OP_READ);
} catch (final ClosedChannelException ex) {
final SessionRequestImpl sessionRequest = entry.getSessionRequest();
if (sessionRequest != null) {
sessionRequest.failed(ex);
} catch (final IOException ex) {
throw new IOReactorException("Failure registering channel " +
"with the selector", ex);
final SessionClosedCallback sessionClosedCallback = new SessionClosedCallback() {
public void sessionClosed(final IOSession session) {
queueClosedSession(session);
InterestOpsCallback interestOpsCallback = null;
if (this.interestOpsQueueing) {
interestOpsCallback = new InterestOpsCallback() {
public void addInterestOps(final InterestOpEntry entry) {
queueInterestOps(entry);
session = new IOSessionImpl(key, interestOpsCallback, sessionClosedCallback);
int timeout = 0;
timeout = channel.socket().getSoTimeout();
} catch (final IOException ex) {
// Very unlikely to happen and is not fatal
// as the protocol layer is expected to overwrite
// this value anyways
session.setAttribute(IOSession.ATTACHMENT_KEY, entry.getAttachment());
session.setSocketTimeout(timeout);
} catch (final CancelledKeyException ex) {
this.sessions.add(session);
final SessionRequestImpl sessionRequest = entry.getSessionRequest();
if (sessionRequest != null) {
// 就是在这里调用了completed, 最后进入连接池的completed方法
pleted(session);
key.attach(session);
sessionCreated(key, session);
} catch (final CancelledKeyException ex) {
queueClosedSession(session);
key.attach(null);
看看连接池 AbstractNIOConnPool 的completed 方法做了什么
protected void requestCompleted(final SessionRequest request) {
if (this.isShutDown.get()) {
@SuppressWarnings("unchecked")
T route = (T) request.getAttachment();
this.lock.lock();
this.pending.remove(request); // 从peding列表中去掉这个连接
final RouteSpecificPool&T, C, E& pool = getPool(route);
final IOSession session = request.getSession();
final C conn = this.connFactory.create(route, session);
final E entry = pool.createEntry(request, conn);
this.leased.add(entry); // 连接加入到被租借集合
pleted(request, entry); // 调用 perRoute 连接池的complte
onLease(entry); // 这个 onLease 就是设置了一下超时时间
} catch (final IOException ex) {
pool.failed(request, ex);
} finally {
this.lock.unlock();
fireCallbacks();
主要做的事情就是设置了一下连接的sotimeout, 还有就是将连接从 pending列表移到了 lease 集合
最后, 来看看 连接是如何归还的
通过对AbstractNIOConnPool的release方法的跟踪, 最后找了还是在 BaseIOReactor 监听到 readable 时间的时候, 调用了HttpAsyncRequestExecutor的inputReady
public void inputReady(
final NHttpClientConnection conn,
final ContentDecoder decoder) throws IOException, HttpException {
final State state = ensureNotNull(getState(conn));
final HttpAsyncClientExchangeHandler handler = ensureNotNull(getHandler(conn));
handler.consumeContent(decoder, conn);
state.setResponseState(MessageState.BODY_STREAM);
if (decoder.isCompleted()) { // 检测到内容已结束, 进入complete流程
processResponse(conn, state, handler);
请求完成阶段
当检测到response已经完了, 就会进入complete流程, 最后回到releaseConnection流程, 最后到达连接池的release
public void release(final E entry, final boolean reusable) {
if (entry == null) {
if (this.isShutDown.get()) {
this.lock.lock();
if (this.leased.remove(entry)) { // 从租借集合中删除
final RouteSpecificPool&T, C, E& pool = getPool(entry.getRoute());
pool.free(entry, reusable); // 重新加入到pool的available中
if (reusable) {
this.available.addFirst(entry); // 加入到available
onRelease(entry); // 重新这是soTimeout
entry.close();
processNextPendingRequest(); // 处理下一个在leasing队列中等待的请求
} finally {
this.lock.unlock();
fireCallbacks();
阅读(...) 评论()}

我要回帖

更多关于 nio io 区别 的文章

更多推荐

版权声明:文章内容来源于网络,版权归原作者所有,如有侵权请点击这里与我们联系,我们将及时删除。

点击添加站长微信