在网络编程领域,Netty是Java的卓越框架。它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。Netty使你可以专注于自己真正感兴趣的——你的应用程序的独一无二的价值。---《Netty实战》
BIO (Blocking I/O)
: 同步阻塞 I/O 模式,数据的读取写入必须阻塞在一个线程内等待其完成。在活动连接数不是特别高(小于单机 1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。NIO (Non-blocking/New I/O)
: NIO 是一种同步非阻塞的 I/O 模型,在 Java 1.4 中引入了 NIO 框架,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞 I/O 来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。AIO (Asynchronous I/O)
: AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的 IO 模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO 是异步 IO 的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO 操作本身是同步的。查阅网上相关资料,我发现就目前来说 AIO 的应用还不是很广泛,Netty 之前也尝试使用过 AIO,不过又放弃了。Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,擅长高负载下可靠和高效地处理和调度 I/O 操作。
分类 | Netty 的特性 |
---|---|
设计 | 统一的 API,支持多种传输类型,阻塞的和非阻塞的; 简单而强大的线程模型;真正的无连接数据报套接字支持; 链接逻辑组件以支持复用 |
易于使用 | 详实的Javadoc和大量的示例集;不需要超过JDK 1.6+的依赖(一些可选的特性可能需要Java 1.7+或额外的依赖) |
性能 | 拥有比 Java 的核心 API 更高的吞吐量以及更低的延迟;得益于池化和复用,拥有更低的资源消耗; 最少的内存复制 |
健壮性 | 不会因为慢速、快速或者超载的连接而导致 OutOfMemoryError 消除在高速网络中 NIO 应用程序常见的不公平读/写比率 |
安全性 | 完整的 SSL/TLS 以及 StartTLS 支持; 可用于受限环境下,如 Applet 和 OSGI |
社区驱动 | 发布快速而且频繁 |
Channel
:Channel 是 Java NIO 的一个基本构造。它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。回调
:一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后者可以在适当的时候调用前者。Future
:Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。事件和 ChannelHandler
:Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-all</artifactId>
<version>4.1.42.Final</version>
</dependency>
package org.example.netty.server;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;
/**
* 响应入站事件,实现了服务器对从客户端接收的数据的处理,即它的业务逻辑
*/
@ChannelHandler.Sharable //标示一个 ChannelHandler 可以被多 个 Channel 安全地 共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
/**
* 对于每个传入的消息都要调用该方法
*
* @param ctx
* @param msg
*/
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
ByteBuf in = (ByteBuf) msg;
System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
//将接收到的消息 写给发送者,而不冲刷出站消息
ctx.write(in);
}
/**
* 通知ChannelInboundHandler最后一次对channelRead()
* 的调用是当前批量读取中的最后一条消息
*
* @param ctx
*/
@Override
public void channelReadComplete(ChannelHandlerContext ctx) {
// 将未决消息冲刷到远程节点,并且关闭该Channel
ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
}
/**
* 在读取操作期间,有异常抛出时会调用
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
// 关闭该Channel
ctx.close();
}
}
package org.example.netty.server;
import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;
import java.net.InetSocketAddress;
/**
* 配置服务器的启动代码,将服务器绑定到它要监听连接请求的端口上
*/
public class EchoServer {
private final int port;
public EchoServer(int port) {
this.port = port;
}
public void start() throws Exception {
final EchoServerHandler serverHandler = new EchoServerHandler();
//创建 EventLoopGroup
EventLoopGroup group = new NioEventLoopGroup();
try {
// 创建 ServerBootstrap
ServerBootstrap serverBootstrap = new ServerBootstrap();
serverBootstrap.group(group)
.channel(NioServerSocketChannel.class)//指定所使用的 NIO 传输 Channel
.localAddress(new InetSocketAddress(port))//使用指定的 端口设置套 接字地址
.childHandler(new ChannelInitializer<SocketChannel>() {// 添加一个 EchoServerHandler 到子 Channel 的 ChannelPipeline
@Override
public void initChannel(SocketChannel ch) {
// EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
ch.pipeline().addLast(serverHandler);
}
});
//异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成
ChannelFuture future = serverBootstrap.bind().sync();
//获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
future.channel().closeFuture().sync();
} finally {
// 关闭 EventLoopGroup, 释放所有的资源
group.shutdownGracefully().sync();
}
}
}
package org.example.netty.client;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;
/**
* 用来处理数据的ClientHandler
*/
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
/**
* 在到服务器的连接已经建立之后将被调用
*
* @param ctx
*/
@Override
public void channelActive(ChannelHandlerContext ctx) {
//当被通知 Channel 是活跃的时候,发送一条消息
ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty!", CharsetUtil.UTF_8));
}
/**
* 当从服务器接收到一条消息时被调用
*
* @param ctx
* @param in
*/
@Override
public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
}
/**
* 在处理过程中引发异常时被调用
* 在发生异常时, 记录错误并关闭 Channel
*
* @param ctx
* @param cause
*/
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
cause.printStackTrace();
ctx.close();
}
}
package org.example.netty.client;
import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;
import java.net.InetSocketAddress;
public class EchoClient {
private final String host;
private final int port;
public EchoClient(String host, int port) {
this.host = host;
this.port = port;
}
public void start() throws Exception {
EventLoopGroup group = new NioEventLoopGroup();
try {
Bootstrap bootstrap = new Bootstrap();
bootstrap.group(group)
.channel(NioSocketChannel.class)
.remoteAddress(new InetSocketAddress(host, port))
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel channel) {
channel.pipeline()
.addLast(new EchoClientHandler());
}
});
ChannelFuture future = bootstrap.connect().sync();
future.channel().closeFuture().sync();
} finally {
group.shutdownGracefully().sync();
}
}
}
package org.example;
import org.example.netty.client.EchoClient;
import org.example.netty.server.EchoServer;
/**
* ServerRun
*/
public class ServerRun {
public static void main(String[] args) throws Exception {
System.err.println("Usage: " + EchoServer.class.getSimpleName() + " 9000");
new EchoServer(9000).start();
}
}
package org.example;
import org.example.netty.client.EchoClient;
import org.example.netty.server.EchoServer;
/**
* ClientRun
*/
public class ClientRun {
public static void main(String[] args) throws Exception {
System.err.println("Usage: " + EchoClient.class.getSimpleName() + " 127.0.0.1:9000");
new EchoClient("127.0.0.1", 9000).start();
}
}
Usage: EchoClient 127.0.0.1:9000
Client received: Hello Netty!
Usage: EchoServer 9000
Server received: Hello Netty!