0
点赞
收藏
分享

微信扫一扫

Netty实战(06)-ChannelPipeline 接口

1 概述

把ChannelPipeline看成拦截流经Channel的入、出站事件的ChannelHandler 的实例链,就易看出这些 ChannelHandler 之间的交互如何组成一个应用程序数据和事件处理逻辑的核心。

每个新建的 Channel 都会被分配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定操作,无需开发人员的任何干预。

根据事件的起源,事件将会被 ChannelInboundHandler/ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。

1.1 ChannelHandlerContext

使ChannelHandler能和其ChannelPipeline及其他的ChannelHandler交互。

ChannelHandler能通知其所属的 ChannelPipeline 中的下一ChannelHandler,甚至动态修改它所属的ChannelPipeline(指修改 ChannelPipeline 中的 ChannelHandler 的编排)。

ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。6.3 节将提供有关ChannelHandlerContext 的更多内容。

图 6-3 同时具有入、出站 ChannelHandler 的 ChannelPipeline 的布局,印证ChannelPipeline由一系列ChannelHandler组成。ChannelPipeline 还提供通过 ChannelPipeline 本身传播事件的方法。若一个入站事件被触发,它将被从 ChannelPipeline 的head开始一直被传播到 Channel Pipeline 的tail。

Netty实战(06)-ChannelPipeline 接口_数据

上图中的一个出站 I/O 事件将从 ChannelPipeline 的最右边开始,然后向左传播。

1.2 ChannelPipeline 相对论

从事件途经 ChannelPipeline 的角度,ChannelPipeline 的头尾端取决于该事件是入站还是出站。而 Netty 总将 ChannelPipeline 的入站口(图 6-3 中的左侧) 作为头,而将出站口(该图的右侧)作为尾。

当你完成了通过调用 ChannelPipeline.add*()将入站处理器(ChannelInboundHandler) 和出站处理器( ChannelOutboundHandler ) 混合添加到 ChannelPipeline 后,每一个ChannelHandler 从头到尾的顺序位置正如同我们方才所定义它们的一样。因此,若你将图 6-3 中的处理器(ChannelHandler)从左到右编号,则第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的 ChannelHandler 将是 5。

在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向匹配。不匹配,ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)

pipeline是什么?

每个Netty SocketChannel包含一个ChannelPipeline。

可将ChannelInboundHandler、ChannelOutboundHandler实例都添加到Netty ChannelPipeline。

添加了ChannelInboundHandler、ChannelOutboundHandler实例的ChannelPipeline:

Netty实战(06)-ChannelPipeline 接口_数据_02

从SocketChannel接收到数据后,该数据将传递到ChannelPipeline中的第一个ChannelInboundHandler,处理完数据,将数据传递到ChannelPipeline中的下一ChannelInboundHandler。

ChannelInboundHandler可在将接收到的数据传递到pipeline中的下一处理器之前对其进行转换。如原始的字节可转换为HTTP对象。然后,管道中的下一个处理器将看到HTTP对象,而非原始数据。

当将数据写回SocketChannel时,它以相同方式发生。数据从ChannelOutboundHandler传递到ChannelPipeline中的ChannelOutboundHandler,直到到达SocketChannel。 ChannelOutboundHandler实例也可转换流程中的数据。

尽管该图将ChannelInboundHandler、ChannelOutboundHandler实例显示为单独的列表,但它们实际上位于同一list(管道)中。

因此,若ChannelInboundHandler决定将某些内容写回SocketChannel,则数据将通过比ChannelInboundHandler写入数据更早的ChannelPipeline中位于所有ChannelOutboundHandler实例。

Netty实战(06)-ChannelPipeline 接口_数据_03

Netty具有编解码器(编码器+解码器)。Netty编解码器将字节转换为消息对象(Java对象),或将消息对象转换为字节。如编解码器可将:

  • 传入的HTTP请求的原始字节转换为HTTP对象
  • 或将HTTP响应对象转换回原始字节

Netty编解码器对象实际上只是一或两个ChannelHandler实现。编解码器通常由将请求字节转换为对象的ChannelInboundHandler实现和将响应对象转换为字节的ChannelOutboundHandler组成。

Netty提供不同协议的编解码器,如HTTP,WebSocket,SSL / TLS等。为将这些协议与Netty一起使用,须将相应的协议编解码器ChannelInboundHandler和ChannelOutboundHandler添加到要使用的SocketChannel的ChannelPipeline中协议。

2 pipeline初始化

pipeline在创建Channel时被创建

protected AbstractChannel(Channel parent) {
        this.parent = parent;
        id = newId();
        unsafe = newUnsafe();
        pipeline = newChannelPipeline();
}

protected DefaultChannelPipeline newChannelPipeline() {
        return new DefaultChannelPipeline(this);
}

// 双向链表结构
protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
}

Pipeline节点数据结构: ChannelHandlerContext

Netty实战(06)-ChannelPipeline 接口_数据_04

/**
 * Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
 *
 * Implementations must be Thread-safe.
 */
public interface AttributeMap {
    /**
     * Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
     * an {@link Attribute} which does not have a value set yet.
     */
    <T> Attribute<T> attr(AttributeKey<T> key);

    /**
     * Returns {@code true} if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
     */
    <T> boolean hasAttr(AttributeKey<T> key);
}

看其一实现类:

Netty实战(06)-ChannelPipeline 接口_编解码器_05

基本数据结构组件:

abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
    volatile AbstractChannelHandlerContext next;
    volatile AbstractChannelHandlerContext prev;
  	...
}

Pipeline中的两大哨兵: head和tail

protected DefaultChannelPipeline(Channel channel) {
        this.channel = ObjectUtil.checkNotNull(channel, "channel");
        succeededFuture = new SucceededChannelFuture(channel, null);
        voidPromise =  new VoidChannelPromise(channel, true);

        tail = new TailContext(this);
        head = new HeadContext(this);

        head.next = tail;
        tail.prev = head;
    }

// A special catch-all handler that handles both bytes and messages.
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {

  TailContext(DefaultChannelPipeline pipeline) {
    super(pipeline, null, TAIL_NAME, TailContext.class);
    setAddComplete();
  }

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
	onUnhandledInboundMessage(ctx, msg);
}

protected void onUnhandledInboundMessage(Object msg) {
        try {
            logger.debug(
                    "Discarded inbound message {} that reached at the tail of the pipeline. " +
                            "Please check your pipeline configuration.", msg);
        } finally {
            ReferenceCountUtil.release(msg);
        }
    }

final class HeadContext extends AbstractChannelHandlerContext
            implements ChannelOutboundHandler, ChannelInboundHandler {

        private final Unsafe unsafe;

   			HeadContext(DefaultChannelPipeline pipeline) {
            super(pipeline, null, HEAD_NAME, true, true);
            unsafe = pipeline.channel().unsafe();
            setAddComplete();
        }
}

3 修改 ChannelPipeline

ChannelHandler 可以通过添加、删除或替换其他 ChannelHandler 来实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 ChannelHandler 最重要的能力之一,它如何做到的呢?

3.1 API

表 6-6 ChannelHandler 的用于修改 ChannelPipeline 的方法

名称

描 述

AddFirstaddBefore

addAfteraddLastChannelRegistered

将一个ChannelHandler 添加到ChannelPipeline 中

remove

Channel 已经被注册到了 EventLoop

replace

将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 Channel

代码清单 6-5 展示了这些方法的使用。

ChannelPipeline pipeline = ..;
// 创建一个 FirstHandler 的实例
FirstHandler firstHandler = new FirstHandler();
// 将该实例作为"handler1"添加到 ChannelPipeline
pipeline.addLast("handler1", firstHandler);
// 将一个SecondHandler实例作为"handler2"添加到ChannelPipeline的第一个槽。即放置在已有的"handler1"前
pipeline.addFirst("handler2", new SecondHandler());
// 将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
...
// 通过名称移除"handler3"
pipeline.remove("handler3");
// 通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
// 将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new ForthHandler());

3.2 ChannelHandler的执行和阻塞

通常 ChannelPipeline 中的每个 ChannelHandler 都是通过它的 EventLoop(I/O 线程)处理传递给它的事件的。所以重要的是别阻塞这个线程,因为对整体 I/O 处理产生负面影响。

与使用阻塞 API 的遗留代码交互

对此,ChannelPipeline 有一些接受一个 EventExecutorGroup 的add()方法。若一个事件被传递给一个自定义的 EventExecutorGroup,它将被包含在这个 EventExecutorGroup 中的某 EventExecutor 处理,从而被从该Channel 本身的 EventLoop 中移除。对这种case,Netty 提供 DefaultEventExecutorGroup 的默认实现。

除了这些操作,还有别的通过类型或者名称来访问 ChannelHandler 的方法。这些方法都在表 6-7。

表 6-7 ChannelPipeline 的用于访问 ChannelHandler 的操作:

名称

描 述

get

通过类型或者名称返回 ChannelHandler

context

返回和 ChannelHandler 绑定的 ChannelHandlerContext

names

返回 ChannelPipeline 中所有 ChannelHandler 的名称

用户代码

/**
 * @author JavaEdge
 */
public final class Server {

    public static void main(String[] args) throws Exception {
        EventLoopGroup bossGroup = new NioEventLoopGroup(1);
        EventLoopGroup workerGroup = new NioEventLoopGroup();

        try {
            ServerBootstrap b = new ServerBootstrap();
            b.group(bossGroup, workerGroup)
                    .channel(NioServerSocketChannel.class)
                    .childOption(ChannelOption.TCP_NODELAY, true)
                    .childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
                    .childHandler(new ChannelInitializer<SocketChannel>() {
                        @Override
                        public void initChannel(SocketChannel ch) {
                            ch.pipeline().addLast(new OutBoundHandlerA());
                            ch.pipeline().addLast(new OutBoundHandlerC());
                            ch.pipeline().addLast(new OutBoundHandlerB());
                        }
                    });

            ChannelFuture f = b.bind(8888).sync();

            f.channel().closeFuture().sync();
        } finally {
            bossGroup.shutdownGracefully();
            workerGroup.shutdownGracefully();
        }
    }
}

判断是否重复添加

@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
  return addLast(null, handlers);
}

@Override
    public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
        ObjectUtil.checkNotNull(handlers, "handlers");

        for (ChannelHandler h: handlers) {
            if (h == null) {
                break;
            }
            addLast(executor, null, h);
        }

        return this;
    }

public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
  final AbstractChannelHandlerContext newCtx;
  synchronized (this) {
    // core
    checkMultiplicity(handler);
    ...
  }

private static void checkMultiplicity(ChannelHandler handler) {
  if (handler instanceof ChannelHandlerAdapter) {
    ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
    if (!h.isSharable() && h.added) {
      throw new ChannelPipelineException(
        h.getClass().getName() +
        " is not a @Sharable handler, so can't be added or removed multiple times.");
    }
    h.added = true;
  }
}

public boolean isSharable() {
        /**
         * Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
         * {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
         * {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
         * {@link Thread}s are quite limited anyway.
         *
         * See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
         */
        Class<?> clazz = getClass();
        Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
        Boolean sharable = cache.get(clazz);
        if (sharable == null) {
            sharable = clazz.isAnnotationPresent(Sharable.class);
            cache.put(clazz, sharable);
        }
        return sharable;
    }

创建节点并添加至链表

Netty实战(06)-ChannelPipeline 接口_HTTP_06

private String filterName(String name, ChannelHandler handler) {
  if (name == null) {
    return generateName(handler);
  }
  checkDuplicateName(name);
  return name;
}

private void checkDuplicateName(String name) {
  if (context0(name) != null) {
    throw new IllegalArgumentException("Duplicate handler name: " + name);
  }
}

private AbstractChannelHandlerContext context0(String name) {
  AbstractChannelHandlerContext context = head.next;
  while (context != tail) {
    if (context.name().equals(name)) {
      return context;
    }
    context = context.next;
  }
  return null;
}

private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
  return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}

private void addLast0(AbstractChannelHandlerContext newCtx) {
  AbstractChannelHandlerContext prev = tail.prev;
  newCtx.prev = prev;
  newCtx.next = tail;
  prev.next = newCtx;
  tail.prev = newCtx;
}

回调添加完成事件

Netty实战(06)-ChannelPipeline 接口_数据_07

private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
  newCtx.setAddPending();
  executor.execute(new Runnable() {
    @Override
    public void run() {
      callHandlerAdded0(newCtx);
    }
  });
}

private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
        try {
          	// core
            ctx.callHandlerAdded();
        } catch (Throwable t) {
            boolean removed = false;
            try {
                atomicRemoveFromHandlerList(ctx);
                ctx.callHandlerRemoved();
                removed = true;
            } catch (Throwable t2) {
                if (logger.isWarnEnabled()) {
                    logger.warn("Failed to remove a handler: " + ctx.name(), t2);
                }
            }

            if (removed) {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; removed.", t));
            } else {
                fireExceptionCaught(new ChannelPipelineException(
                        ctx.handler().getClass().getName() +
                        ".handlerAdded() has thrown an exception; also failed to remove.", t));
            }
        }
    }

final void callHandlerAdded() throws Exception {
  // We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
  // any pipeline events ctx.handler() will miss them because the state will not allow it.
  if (setAddComplete()) {
    handler().handlerAdded(this);
  }
}

final boolean setAddComplete() {
  for (;;) {
    int oldState = handlerState;
    if (oldState == REMOVE_COMPLETE) {
      return false;
    }
    // Ensure we never update when the handlerState is REMOVE_COMPLETE already.
    // oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
    // exposing ordering guarantees.
    if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
      return true;
    }
  }
}

Netty实战(06)-ChannelPipeline 接口_HTTP_08

Netty实战(06)-ChannelPipeline 接口_编解码器_09

Netty实战(06)-ChannelPipeline 接口_HTTP_10

Netty实战(06)-ChannelPipeline 接口_编解码器_11

用户方自定义实现即可:

Netty实战(06)-ChannelPipeline 接口_HTTP_12

4 触发事件

ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。

4.1 入站

表 6-8 ChannelPipeline 的入站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件:

Netty实战(06)-ChannelPipeline 接口_HTTP_13

在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。

4.2 出站

表 6-9 ChannelPipeline 的出站操作:

Netty实战(06)-ChannelPipeline 接口_编解码器_14

4.3 小结

  • ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler
  • ChannelPipeline 可根据需要,通过添加或删除 ChannelHandler 动态修改
  • ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件

6 outBound事件的传播

Netty实战(06)-ChannelPipeline 接口_编解码器_15

Netty实战(06)-ChannelPipeline 接口_编解码器_16

Netty实战(06)-ChannelPipeline 接口_HTTP_17

Netty实战(06)-ChannelPipeline 接口_数据_18

Netty实战(06)-ChannelPipeline 接口_HTTP_19

Netty实战(06)-ChannelPipeline 接口_数据_20

Netty实战(06)-ChannelPipeline 接口_编解码器_21

Netty实战(06)-ChannelPipeline 接口_编解码器_22

Netty实战(06)-ChannelPipeline 接口_HTTP_23

Netty实战(06)-ChannelPipeline 接口_编解码器_24

Netty实战(06)-ChannelPipeline 接口_编解码器_25

Netty实战(06)-ChannelPipeline 接口_编解码器_26

Netty实战(06)-ChannelPipeline 接口_编解码器_27

Netty实战(06)-ChannelPipeline 接口_HTTP_28

Netty实战(06)-ChannelPipeline 接口_数据_29

Netty实战(06)-ChannelPipeline 接口_HTTP_30

同理以后的过程

Netty实战(06)-ChannelPipeline 接口_HTTP_31

7 异常的传播

Netty实战(06)-ChannelPipeline 接口_编解码器_32

Netty实战(06)-ChannelPipeline 接口_HTTP_33

Netty实战(06)-ChannelPipeline 接口_编解码器_34

Netty实战(06)-ChannelPipeline 接口_HTTP_35

Netty实战(06)-ChannelPipeline 接口_HTTP_36

Netty实战(06)-ChannelPipeline 接口_数据_37

Netty实战(06)-ChannelPipeline 接口_编解码器_38

Netty实战(06)-ChannelPipeline 接口_数据_39

Netty实战(06)-ChannelPipeline 接口_数据_40

最佳实践

Netty实战(06)-ChannelPipeline 接口_HTTP_41

Netty实战(06)-ChannelPipeline 接口_数据_42

8 pipeline总结

netty如何判断ChannelHandler类型?

调用 pipeline 添加节点时,netty 会使用 instanceof 关键字判断当前节点是 inboound 还是 outbound 类型,分别用不同的 boolean 类型变量标识。

对于ChannelHandler的添加,应遵循何顺序?

  • inbound 事件类型顺序正相关
  • outbound 逆相关

用户手动触发事件传播,不同触发方式有何区别?

异常处理器要么从 head 或者 tail 节点开始传播inbound事件则从当前节点开始传递到最后节点 outbound事件则从当前节点开始传递 到第一个 outbound节点

举报

相关推荐

0 条评论