本文共 12341 字,大约阅读时间需要 41 分钟。
检测空闲连接以及超时对于及时释放资源来说是至关重要的。由于这是一项常见的任务,Netty 特地为它提供了几个 ChannelHandler 实现。
示例:
当 使用通常的发送心跳消息到远程节点的方法时,如果在 60 秒之内没有接收或者发送任何的数据, 我们将如何得到通知;如果没有响应,则连接会被关闭。
public class IdleStateHandlerInitializer extends ChannelInitializer{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //IdleStateHandler 将 在被触发时发送一 个 IdleStateEvent 事件 pipeline.addLast(new IdleStateHandler(0, 0, 60, TimeUnit.SECONDS)); // 将一个 HeartbeatHandler 添加到ChannelPipeline 中 pipeline.addLast(new HearbeatHandler()); } //实现userEventTriggred()方法以发送心跳消息 public static final class HearbeatHandler extends ChannelInboundHandlerAdapter { private static final ByteBuf HEARTBEAT_SEQUENCE = Unpooled.unreleasableBuffer(Unpooled.copiedBuffer("HEARTBEAT", CharsetUtil.ISO_8859_1)); @Override public void userEventTriggered(ChannelHandlerContext ctx, Object evt) throws Exception { //发送心跳消息,并在发送失败时关闭该连接 if (evt instanceof IdleStateEvent) { ctx.writeAndFlush(HEARTBEAT_SEQUENCE.duplicate()) .addListener(ChannelFutureListener.CLOSE_ON_FAILURE); } else { //不是 IdleStateEvent 事件,所以将它传递 给下一个 ChannelInboundHandler super.userEventTriggered(ctx, evt); } } }}
这个示例演示了如何使用 IdleStateHandler 来测试远程节点是否仍然还活着,并且在它 失活时通过关闭连接来释放资源。
如果连接超过60秒没有接收或者发送任何的数据,那么IdleStateHandler 将会使用一个 IdleStateEvent 事件来调用 fireUserEventTriggered()方法
。HeartbeatHandler 实现 了 userEventTriggered()方法,如果这个方法检测到 IdleStateEvent 事件,它将会发送心 跳消息,并且添加一个将在发送操作失败时关闭该连接的 ChannelFutureListener 。
基于分隔符的协议:
基于分隔符的(delimited)消息协议使用定义的字符来标记的消息或者消息段(通常被称 为帧)的开头或者结尾。由RFC文档正式定义的许多协议(如SMTP、POP3、IMAP以及Telnet1) 都是这样的。此外,当然,私有组织通常也拥有他们自己的专有格式。无论你使用什么样的协 议,下表中列出的解码器都能帮助你定义可以提取由任意标记(token)序列分隔的帧的自 定义解码器。解码基于分隔符的协议和基于长度的协议:
//处理由行尾符分隔的帧public class LineBasedHandlerInitializer extends ChannelInitializer{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //该LineBasedFrameDecoder将提取的帧转发给下一个ChannelInboundHandler pipeline.addLast(new LineBasedFrameDecoder(64 * 1024)); //添加FrameHandler以接收帧 pipeline.addLast(new FrameHandle()); } public static final class FrameHandle extends SimpleChannelInboundHandler { //传入了单个帧的内容 @Override protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception { //Do something with the data exracted from the frame } }}
如果你正在使用除了行尾符之外的分隔符分隔的帧,那么你可以以类似的方式使用 DelimiterBasedFrameDecoder,只需要将特定的分隔符序列指定到其构造函数即可。
这些解码器是实现你自己的基于分隔符的协议的工具。作为示例,我们将使用下面的协议规范:
我们用于这个协议的自定义解码器将定义以下类:
public class CmdHandlerInitializer extends ChannelInitializer{ public static final byte SPACE = (byte)' '; @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加 CmdDecoder 以提取 Cmd 对象,并将它转发给下 一个 ChannelInboundHandler pipeline.addLast(new CmdDecoder(64 * 1024)); //添加CmdHandler以接收和处理Cmd对象 pipeline.addLast(new CmdHandler()); } /** * Cmd POJO */ public static final class Cmd { private final ByteBuf name; private final ByteBuf args; public Cmd(ByteBuf name, ByteBuf args) { this.name = name; this.args = args; } public ByteBuf name(){ return name; } public ByteBuf args(){ return args; } } /** * Cmd解码器 */ public static final class CmdDecoder extends LineBasedFrameDecoder { public CmdDecoder(int maxLength) { super(maxLength); } @Override protected Object decode(ChannelHandlerContext ctx, ByteBuf buffer) throws Exception { //从ByteBuf中提取由行尾符序列分隔的帧 ByteBuf frame = (ByteBuf) super.decode(ctx, buffer); //如果输入中没有帧,则返回null if (frame == null) { return null; } //查找第一个空格字符的索引。前面是命令名称,接着是参数 int index = frame.indexOf(frame.readerIndex(), frame.writerIndex(), SPACE); //使用包含有命令名称和参数的切片创建新的Cmd对象 return new Cmd(frame.slice(frame.readerIndex(), index), frame.slice(index + 1, frame.writerIndex())); } } /** * CMdHandler */ public static final class CmdHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Cmd msg) throws Exception { //处理传经ChannelPipeline的Cmd对象 //Do something with the command } }}
基于长度的协议:
基于长度的协议通过将它的长度编码到帧的头部来定义帧,而不是使用特殊的分隔符来标记 它的结束。下表列出了Netty提供的用于处理这种类型的协议的两种解码器。下图展示了 FixedLengthFrameDecoder 的功能,其在构造时已经指定了帧长度为 8 字节。
你将经常会遇到被编码到消息头部的帧大小不是固定值的协议。为了处理这种变长帧,你可以使用LengthFieldBasedFrameDecoder
,它将从头部字段确定帧长,然后从数据流中提取指定的字节数。 示例:长度字段在帧中的偏移量为 0,并且长度为 2 字节。
LengthFieldBasedFrameDecoder 提供了几个构造函数来支持各种各样的头部配置情 况。下面的代码展示了如何使用其 3 个构造参数分别为 maxFrameLength、lengthField- Offset 和 lengthFieldLength 的构造函数。在这个场景中,帧的长度被编码到了帧起始的前 8 个字节中。
//解码基于长度的协议public class LengthBasedInitializer extends ChannelInitializer{ @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); pipeline.addLast(new LengthFieldBasedFrameDecoder(64 * 1024, 0, 8)); //添加FrameHandler以处理每个帧 pipeline.addLast(new FrameHandler()); } public static final class FrameHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, ByteBuf msg) throws Exception { //处理帧数据 //Do something with the frame } }}
因为网络饱和的可能性,如何在异步框架中高效地写大块的数据是一个特殊的问题。由于写 操作是非阻塞的,所以即使没有写出所有的数据,写操作也会在完成时返回并通知 ChannelFuture。当这种情况发生时,如果仍然不停地写入,就有内存耗尽的风险。所以在写大型数据 时,需要准备好处理到远程节点的连接是慢速连接的情况,这种情况会导致内存释放的延迟。
NIO 的零拷贝特性
这种特性消除了将文件 的内容从文件系统移动到网络栈的复制过程。所有的这一切都发生在 Netty 的核心中,所以应用 程序所有需要做的就是使用一个 FileRegion 接口的实现,其在 Netty 的 API 文档中的定义是:
“通过支持零拷贝的文件传输的 Channel 来发送的文件区域。”
示例:使用FileRegion传输文件的内容
File file = new File("big.txt"); //创建一个FileInputStream FileInputStream in = new FileInputStream(file); //以该文件的完整长度创建一个DefaultFileRegion FileRegion region = new DefaultFileRegion(in.getChannel(), 0, file.length()); //发送该DefaultFileRegion,并注册一个ChannelFutureLIstener channel.writeAndFlush(region).addListener( new ChannelFutureListener() { @Override public void operationComplete(ChannelFuture future) throws Exception { if (!future.isSuccess()) { //处理失败 Throwable cause = future.cause(); cause.printStackTrace(); } } } );
这个示例只适用于文件内容的直接传输,不包括应用程序对数据的任何处理。在需要将数据 从文件系统复制到用户内存中时,可以使用 ChunkedWriteHandler,它支持异步写大型数据 流,而又不会导致大量的内存消耗。
关键是interface ChunkedInput<B>
,其中类型参数 B 是 readChunk()
方法返回的 类型。Netty 预置了该接口的 4 个实现,如下表中所列出的。每个都代表了一个将由 ChunkedWriteHandler 处理的不定长度的数据流。
示例:使用 ChunkedStream 传输文件内容
下面代码说明了 ChunkedStream 的用法,它是实践中最常用的实现。所示的类使用 了一个 File 以及一个 SslContext 进行实例化。当 initChannel()方法被调用时,它将使用所示的 ChannelHandler 链初始化该 Channel。当 Channel 的状态变为活动的时,WriteStreamHandler 将会逐块地把来自文件中的数 据作为 ChunkedStream 写入。数据在传输之前将会由 SslHandler 加密。
//使用ChunkedStream传输文件内容public class ChunkedWriteHandlerInitializer extends ChannelInitializer{ private final File file; private final SslContext sslContext; public ChunkedWriteHandlerInitializer(File file, SslContext sslContext) { this.file = file; this.sslContext = sslContext; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); //添加 ChunkedWriteHandler 以处理作为 ChunkedInput 传入的数据 pipeline.addLast(new SslHandler(sslContext.newEngine(ch.alloc()))); pipeline.addLast(new ChunkedWriteHandler()); // 一旦连接建立,WriteStreamHandler就开始写文件数据 pipeline.addLast(new WriteStreamHandler()); } public final class WriteStreamHandler extends ChannelInboundHandlerAdapter { // 当连接建立时,channelActive()方法将使用ChunkedInput写文件数据 @Override public void channelActive(ChannelHandlerContext ctx) throws Exception { super.channelActive(ctx); ctx.writeAndFlush(new ChunkedStream(new FileInputStream(file))); } }}
JDK 提供了 ObjectOutputStream 和 ObjectInputStream,用于通过网络对 POJO 的 基本数据类型和图进行序列化和反序列化。该 API 并不复杂,而且可以被应用于任何实现了 java.io.Serializable 接口的对象。但是它的性能也不是非常高效的。
JDK序列化:
如果你的应用程序必须要和使用了ObjectOutputStream和ObjectInputStream的远 程节点交互,并且兼容性也是你最关心的,那么JDK序列化将是正确的选择。下表中列出了 Netty提供的用于和JDK进行互操作的序列化类。使用JBoss Marshalling进行序列化:
如果你可以自由地使用外部依赖,那么JBoss Marshalling将是个理想的选择:它比JDK序列 化最多快 3 倍,而且也更加紧凑。在JBoss Marshalling官方网站主页 3上的概述中对它是这么定 义的:
JBoss Marshalling 是一种可选的序列化 API,它修复了在 JDK 序列化 API 中所发现 的许多问题,同时保留了与 java.io.Serializable 及其相关类的兼容性,并添加 了几个新的可调优参数以及额外的特性,所有的这些都是可以通过工厂配置(如外部序 列化器、类/实例查找表、类解析以及对象替换等)实现可插拔的。
Netty 通过下表所示的两组解码器/编码器对为 Boss Marshalling 提供了支持。
示例:使用 MarshallingDecoder 和 MarshallingEncoder。同 样,几乎只是适当地配置 ChannelPipeline 罢了。
public class MarshallingInitializer extends ChannelInitializer{ private final MarshallerProvider marshallerProvider; private final UnmarshallerProvider unmarshallerProvider; public MarshallingInitializer(MarshallerProvider marshallerProvider, UnmarshallerProvider unmarshallerProvider) { this.marshallerProvider = marshallerProvider; this.unmarshallerProvider = unmarshallerProvider; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加 MarshallingDecoder 以 将 ByteBuf 转换为 POJO pipeline.addLast(new MarshallingDecoder(unmarshallerProvider)); // 添加 MarshallingEncoder 以将 POJO 转换为 ByteBuf pipeline.addLast(new MarshallingEncoder(marshallerProvider)); // 添加 ObjectHandler, 以处理普通的实现了 Serializable 接口的 POJO pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler { @Override protected void channelRead0(ChannelHandlerContext ctx, Serializable msg) throws Exception { //Do something } }}
通过Protocol Buffers序列化:
Netty序列化的最后一个解决方案是利用Protocol Buffers 1的编解码器,它是一种由Google公 司开发的、现在已经开源的数据交换格式。可以在https://github.com/google/protobuf
找到源代码。 Protocol Buffers 以一种紧凑而高效的方式对结构化的数据进行编码以及解码。它具有许多的 编程语言绑定,使得它很适合跨语言的项目。下表展示了 Netty 为支持 protobuf 所提供的 ChannelHandler 实现。
示例:
com.google.protobuf protobuf-java 3.11.4
public class ProtoBufIntializer extends ChannelInitializer{ private final MessageLite lite; public ProtoBufIntializer(MessageLite lite) { this.lite = lite; } @Override protected void initChannel(Channel ch) throws Exception { ChannelPipeline pipeline = ch.pipeline(); // 添加 ProtobufVarint32FrameDecoder 以分隔帧 pipeline.addLast(new ProtobufVarint32FrameDecoder()); //添加 ProtobufEncoder 以处理消息的编码 pipeline.addLast(new ProtobufEncoder()); // 添加 ProtobufDecoder 以解码消息 pipeline.addLast(new ProtobufDecoder(lite)); //添加 Object- Handler 以处 理解码消息 pipeline.addLast(new ObjectHandler()); } public static final class ObjectHandler extends SimpleChannelInboundHandler
转载地址:http://rlpqb.baihongyu.com/