Netty源码分析 (八)----- write过程 源码分析

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeAndFlush。主要内容本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的pipeline中的标准链表结构java对象编码过程write:写队列flush:刷新写队列writeAndFlush:写队列并刷新pipeline中的标准链表结构一个标准的pipeline链式结构如下...

Netty源码分析 (八)----- write过程 源码分析

上一篇文章主要讲了netty的read过程,本文主要分析一下write和writeAndFlush。

主要内容

本文分以下几个部分阐述一个java对象最后是如何转变成字节流,写到socket缓冲区中去的

  1. pipeline中的标准链表结构
  2. java对象编码过程
  3. write:写队列
  4. flush:刷新写队列
  5. writeAndFlush: 写队列并刷新

pipeline中的标准链表结构

一个标准的pipeline链式结构如下

数据从head节点流入,先拆包,然后解码成业务对象,最后经过业务Handler处理,调用write,将结果对象写出去。而写的过程先通过tail节点,然后通过encoder节点将对象编码成ByteBuf,最后将该ByteBuf对象传递到head节点,调用底层的Unsafe写到jdk底层管道

java对象编码过程

为什么我们在pipeline中添加了encoder节点,java对象就转换成netty可以处理的ByteBuf,写到管道里?

我们先看下调用write的code

BusinessHandler

protected void channelRead0(ChannelHandlerContext ctx, Request request) throws Exception { Response response = doBusiness(request); if (response != null) {  ctx.channel().write(response); }}

业务处理器接受到请求之后,做一些业务处理,返回一个Response,然后,response在pipeline中传递,落到Encoder节点,我们来跟踪一下ctx.channel().write(response);

public ChannelFuture write(Object msg) { return this.pipeline.write(msg);}

调用了Channel中的pipeline中的write方法,我们接着看

public final ChannelFuture write(Object msg) { return this.tail.write(msg);}

pipeline中有属性tail,调用tail中的write,由此我们知道write消息的时候,从tail开始,接着往下看

private void write(Object msg, boolean flush, ChannelPromise promise) { AbstractChannelHandlerContext next = this.findContextOutbound(); Object m = this.pipeline.touch(msg, next); EventExecutor executor = next.executor(); if (executor.inEventLoop()) {  if (flush) {next.invokeWriteAndFlush(m, promise);  } else {   next.invokeWrite(m, promise);  } } else {  Object task;  if (flush) {task = AbstractChannelHandlerContext.WriteAndFlushTask.newInstance(next, m, promise);  } else {task = AbstractChannelHandlerContext.WriteTask.newInstance(next, m, promise);  }  safeExecute(executor, (Runnable)task, promise, m); }}

中间我省略了几个重载的方法,我们来看看第一行代码,next =this.findContextOutbound();

private AbstractChannelHandlerContext findContextOutbound() { AbstractChannelHandlerContext ctx = this; do {  ctx = ctx.prev; } while(!ctx.outbound); return ctx;}

通过ctx = ctx.prev; 我们知道从tail开始找到pipeline中的第一个outbound的handler,然后调用invokeWrite(m, promise),此时找到的第一个outbound的handler就是我们自定义的编码器Encoder

我们接着看next.invokeWrite(m, promise);

private void invokeWrite(Object msg, ChannelPromise promise) { if (this.invokeHandler()) {  this.invokeWrite0(msg, promise); } else {  this.write(msg, promise); }}private void invokeWrite0(Object msg, ChannelPromise promise) { try {  ((ChannelOutboundHandler)this.handler()).write(this, msg, promise); } catch (Throwable var4) {  notifyOutboundHandlerException(var4, promise); }}

一路代码跟下来,我们可以知道是调用了第一个outBound类型的handler中的write方法,也就是第一个调用的是我们自定义编码器Encoder的write方法

我们来看看自定义Encoder

public class Encoder extends MessageToByteEncoder<Response> { @Override protected void encode(ChannelHandlerContext ctx, Response response, ByteBuf out) throws Exception {  out.writeByte(response.getVersion());  out.writeInt(4   response.getData().length);  out.writeBytes(response.getData()); }}

自定义Encoder继承MessageToByteEncoder ,并且重写了encode方法,这就是编码器的核心,我们先来看MessageToByteEncoder

public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {

我们看到MessageToByteEncoder 继承了ChannelOutboundHandlerAdapter,说明了Encoder 是一个Outbound的handler

我们来看看Encoder 的父类 MessageToByteEncoder中的write方法

MessageToByteEncoder

@Overridepublic void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception { ByteBuf buf = null; try {  // 判断当前Handelr是否能处理写入的消息  if (acceptOutboundMessage(msg)) {@SuppressWarnings("unchecked")// 强制换换I cast = (I) msg;// 分配一段ButeBuf   buf = allocateBuffer(ctx, cast, preferDirect);try {// 调用encode,这里就调回到  `Encoder` 这个Handelr中  encode(ctx, cast, buf);} finally { // 既然自定义java对象转换成ByteBuf了,那么这个对象就已经无用了,释放掉 // (当传入的msg类型是ByteBuf的时候,就不需要自己手动释放了) ReferenceCountUtil.release(cast);}// 如果buf中写入了数据,就把buf传到下一个节点   if (buf.isReadable()) { ctx.write(buf, promise);} else {// 否则,释放buf,将空数据传到下一个节点  buf.release(); ctx.write(Unpooled.EMPTY_BUFFER, promise);}buf = null;  } else {// 如果当前节点不能处理传入的对象,直接扔给下一个节点处理ctx.write(msg, promise);  } } catch (EncoderException e) {  throw e; } catch (Throwable e) {  throw new EncoderException(e); } finally {  // 当buf在pipeline中处理完之后,释放 
源文地址:https://www.guoxiongfei.cn/cntech/27146.html