saowu's Blog

一个简单的Netty服务

一个简单的Netty服务
2020-04-30 · 12 min read
Java

在网络编程领域,Netty是Java的卓越框架。它驾驭了Java高级API的能力,并将其隐藏在一个易于使用的API之后。Netty使你可以专注于自己真正感兴趣的——你的应用程序的独一无二的价值。---《Netty实战》

一、Java IO模型

1.同步阻塞 I/O模型(BIO)

  • BIO (Blocking I/O): 同步阻塞 I/O 模式,数据的读取写入必须阻塞在一个线程内等待其完成。在活动连接数不是特别高(小于单机 1000)的情况下,这种模型是比较不错的,可以让每一个连接专注于自己的 I/O 并且编程模型简单,也不用过多考虑系统的过载、限流等问题。线程池本身就是一个天然的漏斗,可以缓冲一些系统处理不了的连接或请求。但是,当面对十万甚至百万级连接的时候,传统的 BIO 模型是无能为力的。因此,我们需要一种更高效的 I/O 处理模型来应对更高的并发量。

2.同步非阻塞 I/O模型 (NIO)

  • NIO (Non-blocking/New I/O): NIO 是一种同步非阻塞的 I/O 模型,在 Java 1.4 中引入了 NIO 框架,对应 java.nio 包,提供了 Channel , Selector,Buffer 等抽象。NIO 中的 N 可以理解为 Non-blocking,不单纯是 New。它支持面向缓冲的,基于通道的 I/O 操作方法。 NIO 提供了与传统 BIO 模型中的 Socket 和 ServerSocket 相对应的 SocketChannel 和 ServerSocketChannel 两种不同的套接字通道实现,两种通道都支持阻塞和非阻塞两种模式。阻塞模式使用就像传统中的支持一样,比较简单,但是性能和可靠性都不好;非阻塞模式正好与之相反。对于低负载、低并发的应用程序,可以使用同步阻塞 I/O 来提升开发速率和更好的维护性;对于高负载、高并发的(网络)应用,应使用 NIO 的非阻塞模式来开发。

3.异步非阻塞 I/O模型 (AIO)

  • AIO (Asynchronous I/O): AIO 也就是 NIO 2。在 Java 7 中引入了 NIO 的改进版 NIO 2,它是异步非阻塞的 IO 模型。异步 IO 是基于事件和回调机制实现的,也就是应用操作之后会直接返回,不会堵塞在那里,当后台处理完成,操作系统会通知相应的线程进行后续的操作。AIO 是异步 IO 的缩写,虽然 NIO 在网络操作中,提供了非阻塞的方法,但是 NIO 的 IO 行为还是同步的。对于 NIO 来说,我们的业务线程是在 IO 操作准备好时,得到通知,接着就由这个线程自行进行 IO 操作,IO 操作本身是同步的。查阅网上相关资料,我发现就目前来说 AIO 的应用还不是很广泛,Netty 之前也尝试使用过 AIO,不过又放弃了。

二、 Netty简介

Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,擅长高负载下可靠和高效地处理和调度 I/O 操作。

分类 Netty 的特性
设计 统一的 API,支持多种传输类型,阻塞的和非阻塞的; 简单而强大的线程模型;真正的无连接数据报套接字支持; 链接逻辑组件以支持复用
易于使用 详实的Javadoc和大量的示例集;不需要超过JDK 1.6+的依赖(一些可选的特性可能需要Java 1.7+或额外的依赖)
性能 拥有比 Java 的核心 API 更高的吞吐量以及更低的延迟;得益于池化和复用,拥有更低的资源消耗; 最少的内存复制
健壮性 不会因为慢速、快速或者超载的连接而导致 OutOfMemoryError 消除在高速网络中 NIO 应用程序常见的不公平读/写比率
安全性 完整的 SSL/TLS 以及 StartTLS 支持; 可用于受限环境下,如 Applet 和 OSGI
社区驱动 发布快速而且频繁

三、Netty基础组件

  • Channel:Channel 是 Java NIO 的一个基本构造。它代表一个到实体(如一个硬件设备、一个文件、一个网络套接字或者一个能够执行一个或者多个不同的I/O操作的程序组件)的开放连接,如读操作和写操作。
  • 回调:一个回调其实就是一个方法,一个指向已经被提供给另外一个方法的方法的引用。这使得后者可以在适当的时候调用前者。
  • Future:Future 提供了另一种在操作完成时通知应用程序的方式。这个对象可以看作是一个异步操 作的结果的占位符;它将在未来的某个时刻完成,并提供对其结果的访问。
  • 事件和 ChannelHandler:Netty 使用不同的事件来通知我们状态的改变或者是操作的状态。这使得我们能够基于已经发生的事件来触发适当的动作。每个事件都可以被分发给 ChannelHandler 类中的某个用户实现的方法。

四、创建第一个Netty服务

1.客户端和服务器结构

  • 服务器的主要代码组件:
    • EchoServerHandler 实现了业务逻辑;
    • EchoServer引导过程:
      • 创建一个 ServerBootstrap 的实例以引导和绑定服务器;
      • 创建并分配一个 NioEventLoopGroup 实例以进行事件的处理,如接受新连接以及读/写数据;
      • 指定服务器绑定的本地的 InetSocketAddress;
      • 使用一个 EchoServerHandler 的实例初始化每一个新的 Channel;
      • 调用 ServerBootstrap.bind()方法以绑定服务器。
  • 客户端的主要代码组件:
    • EchoClientHandler 实现了业务逻辑;
    • EchoClient引导过程:
      • 为初始化客户端,创建了一个 Bootstrap 实例;
      • 为进行事件处理分配了一个 NioEventLoopGroup 实例,其中事件处理包括创建新的连接以及处理入站和出站数据;
      • 为服务器连接创建了一个 InetSocketAddress 实例;
      • 当连接被建立时,一个 EchoClientHandler 实例会被安装到(该Channel的)ChannelPipeline 中;
      • 在一切都设置完成后,调用 Bootstrap.connect()方法连接到远程节点;

2.在pom.xml添加如下代码

<dependency>
    <groupId>io.netty</groupId>
    <artifactId>netty-all</artifactId>
    <version>4.1.42.Final</version>
</dependency>

3.创建服务端ChannelHandler实现类

package org.example.netty.server;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;
import io.netty.util.CharsetUtil;

/**
 * 响应入站事件,实现了服务器对从客户端接收的数据的处理,即它的业务逻辑
 */

@ChannelHandler.Sharable //标示一个 ChannelHandler 可以被多 个 Channel 安全地 共享
public class EchoServerHandler extends ChannelInboundHandlerAdapter {
    /**
     * 对于每个传入的消息都要调用该方法
     *
     * @param ctx
     * @param msg
     */
    @Override
    public void channelRead(ChannelHandlerContext ctx, Object msg) {
        ByteBuf in = (ByteBuf) msg;
        System.out.println("Server received: " + in.toString(CharsetUtil.UTF_8));
        //将接收到的消息 写给发送者,而不冲刷出站消息
        ctx.write(in);
    }

    /**
     * 通知ChannelInboundHandler最后一次对channelRead()
     * 的调用是当前批量读取中的最后一条消息
     *
     * @param ctx
     */
    @Override
    public void channelReadComplete(ChannelHandlerContext ctx) {
        // 将未决消息冲刷到远程节点,并且关闭该Channel
        ctx.writeAndFlush(Unpooled.EMPTY_BUFFER).addListener(ChannelFutureListener.CLOSE);
    }

    /**
     * 在读取操作期间,有异常抛出时会调用
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        // 关闭该Channel
        ctx.close();
    }
}

4. 创建服务端引导

package org.example.netty.server;

import io.netty.bootstrap.ServerBootstrap;
import io.netty.channel.ChannelFuture;
import io.netty.channel.ChannelInitializer;
import io.netty.channel.EventLoopGroup;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioServerSocketChannel;

import java.net.InetSocketAddress;

/**
 * 配置服务器的启动代码,将服务器绑定到它要监听连接请求的端口上
 */
public class EchoServer {
    private final int port;

    public EchoServer(int port) {
        this.port = port;
    }

    public void start() throws Exception {
        final EchoServerHandler serverHandler = new EchoServerHandler();
        //创建 EventLoopGroup
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            // 创建 ServerBootstrap
            ServerBootstrap serverBootstrap = new ServerBootstrap();
            serverBootstrap.group(group)
                    .channel(NioServerSocketChannel.class)//指定所使用的 NIO 传输 Channel
                    .localAddress(new InetSocketAddress(port))//使用指定的 端口设置套 接字地址
                    .childHandler(new ChannelInitializer<SocketChannel>() {// 添加一个 EchoServerHandler 到子 Channel 的 ChannelPipeline
                        @Override
                        public void initChannel(SocketChannel ch) {
                            // EchoServerHandler 被标注为@Shareable,所以我们可以总是使用同样的实例
                            ch.pipeline().addLast(serverHandler);
                        }
                    });
            //异步地绑定服务器; 调用 sync()方法阻塞 等待直到绑定完成
            ChannelFuture future = serverBootstrap.bind().sync();
            //获取 Channel 的 CloseFuture,并且阻塞当前线程直到它完成
            future.channel().closeFuture().sync();
        } finally {
            // 关闭 EventLoopGroup, 释放所有的资源
            group.shutdownGracefully().sync();
        }
    }

}

5.创建客户端ChannelHandler实现类

package org.example.netty.client;

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
import io.netty.util.CharsetUtil;

/**
 * 用来处理数据的ClientHandler
 */
@ChannelHandler.Sharable
public class EchoClientHandler extends SimpleChannelInboundHandler<ByteBuf> {
    /**
     * 在到服务器的连接已经建立之后将被调用
     *
     * @param ctx
     */
    @Override
    public void channelActive(ChannelHandlerContext ctx) {
        //当被通知 Channel 是活跃的时候,发送一条消息
        ctx.writeAndFlush(Unpooled.copiedBuffer("Hello Netty!", CharsetUtil.UTF_8));
    }

    /**
     * 当从服务器接收到一条消息时被调用
     *
     * @param ctx
     * @param in
     */
    @Override
    public void channelRead0(ChannelHandlerContext ctx, ByteBuf in) {
        System.out.println("Client received: " + in.toString(CharsetUtil.UTF_8));
    }

    /**
     * 在处理过程中引发异常时被调用
     * 在发生异常时, 记录错误并关闭 Channel
     *
     * @param ctx
     * @param cause
     */
    @Override
    public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
        cause.printStackTrace();
        ctx.close();
    }
}

6.创建客户端引导

package org.example.netty.client;

import io.netty.bootstrap.Bootstrap;
import io.netty.channel.*;
import io.netty.channel.nio.NioEventLoopGroup;
import io.netty.channel.socket.SocketChannel;
import io.netty.channel.socket.nio.NioSocketChannel;

import java.net.InetSocketAddress;

public class EchoClient {
    private final String host;
    private final int port;

    public EchoClient(String host, int port) {
        this.host = host;
        this.port = port;
    }

    public void start() throws Exception {
        EventLoopGroup group = new NioEventLoopGroup();
        try {
            Bootstrap bootstrap = new Bootstrap();
            bootstrap.group(group)
                    .channel(NioSocketChannel.class)
                    .remoteAddress(new InetSocketAddress(host, port))
                    .handler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        protected void initChannel(SocketChannel channel) {
                            channel.pipeline()
                                    .addLast(new EchoClientHandler());
                        }
                    });
            ChannelFuture future = bootstrap.connect().sync();
            future.channel().closeFuture().sync();
        } finally {
            group.shutdownGracefully().sync();
        }
    }
}

7.创建C/S启动类

package org.example;

import org.example.netty.client.EchoClient;
import org.example.netty.server.EchoServer;

/**
 * ServerRun
 */
public class ServerRun {
    public static void main(String[] args) throws Exception {
        System.err.println("Usage: " + EchoServer.class.getSimpleName() + " 9000");
        new EchoServer(9000).start();
    }
}

package org.example;

import org.example.netty.client.EchoClient;
import org.example.netty.server.EchoServer;

/**
 * ClientRun
 */
public class ClientRun {
    public static void main(String[] args) throws Exception {
        System.err.println("Usage: " + EchoClient.class.getSimpleName() + " 127.0.0.1:9000");
        new EchoClient("127.0.0.1", 9000).start();
    }
}

五、执行Success!

Usage: EchoClient 127.0.0.1:9000
Client received: Hello Netty!

Usage: EchoServer 9000
Server received: Hello Netty!

Copyright © 2020 - 2024 saowu. All Right Reserved
Powered by Gridea