1 概述
把ChannelPipeline看成拦截流经Channel的入、出站事件的ChannelHandler 的实例链,就易看出这些 ChannelHandler 之间的交互如何组成一个应用程序数据和事件处理逻辑的核心。
每个新建的 Channel 都会被分配一个新的 ChannelPipeline。这项关联是永久性的;Channel 既不能附加另外一个 ChannelPipeline,也不能分离其当前的。在 Netty 组件的生命周期中,这是一项固定操作,无需开发人员的任何干预。
根据事件的起源,事件将会被 ChannelInboundHandler/ChannelOutboundHandler处理。随后,通过调用 ChannelHandlerContext 实现,它将被转发给同一超类型的下一个ChannelHandler。
1.1 ChannelHandlerContext
使ChannelHandler能和其ChannelPipeline及其他的ChannelHandler交互。
ChannelHandler能通知其所属的 ChannelPipeline 中的下一ChannelHandler,甚至动态修改它所属的ChannelPipeline(指修改 ChannelPipeline 中的 ChannelHandler 的编排)。
ChannelHandlerContext 具有丰富的用于处理事件和执行 I/O 操作的 API。6.3 节将提供有关ChannelHandlerContext 的更多内容。
图 6-3 同时具有入、出站 ChannelHandler 的 ChannelPipeline 的布局,印证ChannelPipeline由一系列ChannelHandler组成。ChannelPipeline 还提供通过 ChannelPipeline 本身传播事件的方法。若一个入站事件被触发,它将被从 ChannelPipeline 的head开始一直被传播到 Channel Pipeline 的tail。
上图中的一个出站 I/O 事件将从 ChannelPipeline 的最右边开始,然后向左传播。
1.2 ChannelPipeline 相对论
从事件途经 ChannelPipeline 的角度,ChannelPipeline 的头尾端取决于该事件是入站还是出站。而 Netty 总将 ChannelPipeline 的入站口(图 6-3 中的左侧) 作为头,而将出站口(该图的右侧)作为尾。
当你完成了通过调用 ChannelPipeline.add*()将入站处理器(ChannelInboundHandler) 和出站处理器( ChannelOutboundHandler ) 混合添加到 ChannelPipeline 后,每一个ChannelHandler 从头到尾的顺序位置正如同我们方才所定义它们的一样。因此,若你将图 6-3 中的处理器(ChannelHandler)从左到右编号,则第一个被入站事件看到的 ChannelHandler 将是1,而第一个被出站事件看到的 ChannelHandler 将是 5。
在 ChannelPipeline 传播事件时,它会测试 ChannelPipeline 中的下一个 ChannelHandler 的类型是否和事件的运动方向匹配。不匹配,ChannelPipeline 将跳过该 ChannelHandler 并前进到下一个,直到它找到和该事件所期望的方向相匹配的为止。(当然,ChannelHandler 也可以同时实现 ChannelInboundHandler 接口和 ChannelOutboundHandler 接口。)
pipeline是什么?
每个Netty SocketChannel包含一个ChannelPipeline。
可将ChannelInboundHandler、ChannelOutboundHandler实例都添加到Netty ChannelPipeline。
添加了ChannelInboundHandler、ChannelOutboundHandler实例的ChannelPipeline:
从SocketChannel接收到数据后,该数据将传递到ChannelPipeline中的第一个ChannelInboundHandler,处理完数据,将数据传递到ChannelPipeline中的下一ChannelInboundHandler。
ChannelInboundHandler可在将接收到的数据传递到pipeline中的下一处理器之前对其进行转换。如原始的字节可转换为HTTP对象。然后,管道中的下一个处理器将看到HTTP对象,而非原始数据。
当将数据写回SocketChannel时,它以相同方式发生。数据从ChannelOutboundHandler传递到ChannelPipeline中的ChannelOutboundHandler,直到到达SocketChannel。 ChannelOutboundHandler实例也可转换流程中的数据。
尽管该图将ChannelInboundHandler、ChannelOutboundHandler实例显示为单独的列表,但它们实际上位于同一list(管道)中。
因此,若ChannelInboundHandler决定将某些内容写回SocketChannel,则数据将通过比ChannelInboundHandler写入数据更早的ChannelPipeline中位于所有ChannelOutboundHandler实例。
Netty具有编解码器(编码器+解码器)。Netty编解码器将字节转换为消息对象(Java对象),或将消息对象转换为字节。如编解码器可将:
- 传入的HTTP请求的原始字节转换为HTTP对象
- 或将HTTP响应对象转换回原始字节
Netty编解码器对象实际上只是一或两个ChannelHandler实现。编解码器通常由将请求字节转换为对象的ChannelInboundHandler实现和将响应对象转换为字节的ChannelOutboundHandler组成。
Netty提供不同协议的编解码器,如HTTP,WebSocket,SSL / TLS等。为将这些协议与Netty一起使用,须将相应的协议编解码器ChannelInboundHandler和ChannelOutboundHandler添加到要使用的SocketChannel的ChannelPipeline中协议。
2 pipeline初始化
pipeline在创建Channel时被创建
protected AbstractChannel(Channel parent) {
this.parent = parent;
id = newId();
unsafe = newUnsafe();
pipeline = newChannelPipeline();
}
protected DefaultChannelPipeline newChannelPipeline() {
return new DefaultChannelPipeline(this);
}
// 双向链表结构
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
Pipeline节点数据结构: ChannelHandlerContext
/**
* Holds {@link Attribute}s which can be accessed via {@link AttributeKey}.
*
* Implementations must be Thread-safe.
*/
public interface AttributeMap {
/**
* Get the {@link Attribute} for the given {@link AttributeKey}. This method will never return null, but may return
* an {@link Attribute} which does not have a value set yet.
*/
<T> Attribute<T> attr(AttributeKey<T> key);
/**
* Returns {@code true} if and only if the given {@link Attribute} exists in this {@link AttributeMap}.
*/
<T> boolean hasAttr(AttributeKey<T> key);
}
看其一实现类:
基本数据结构组件:
abstract class AbstractChannelHandlerContext implements ChannelHandlerContext, ResourceLeakHint {
volatile AbstractChannelHandlerContext next;
volatile AbstractChannelHandlerContext prev;
...
}
Pipeline中的两大哨兵: head和tail
protected DefaultChannelPipeline(Channel channel) {
this.channel = ObjectUtil.checkNotNull(channel, "channel");
succeededFuture = new SucceededChannelFuture(channel, null);
voidPromise = new VoidChannelPromise(channel, true);
tail = new TailContext(this);
head = new HeadContext(this);
head.next = tail;
tail.prev = head;
}
// A special catch-all handler that handles both bytes and messages.
final class TailContext extends AbstractChannelHandlerContext implements ChannelInboundHandler {
TailContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, TAIL_NAME, TailContext.class);
setAddComplete();
}
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) {
onUnhandledInboundMessage(ctx, msg);
}
protected void onUnhandledInboundMessage(Object msg) {
try {
logger.debug(
"Discarded inbound message {} that reached at the tail of the pipeline. " +
"Please check your pipeline configuration.", msg);
} finally {
ReferenceCountUtil.release(msg);
}
}
final class HeadContext extends AbstractChannelHandlerContext
implements ChannelOutboundHandler, ChannelInboundHandler {
private final Unsafe unsafe;
HeadContext(DefaultChannelPipeline pipeline) {
super(pipeline, null, HEAD_NAME, true, true);
unsafe = pipeline.channel().unsafe();
setAddComplete();
}
}
3 修改 ChannelPipeline
ChannelHandler 可以通过添加、删除或替换其他 ChannelHandler 来实时地修改 ChannelPipeline 的布局。(它也可以将它自己从 ChannelPipeline 中移除。)这是 ChannelHandler 最重要的能力之一,它如何做到的呢?
3.1 API
表 6-6 ChannelHandler 的用于修改 ChannelPipeline 的方法
名称 | 描 述 |
AddFirstaddBefore addAfteraddLastChannelRegistered | 将一个ChannelHandler 添加到ChannelPipeline 中 |
remove | Channel 已经被注册到了 EventLoop |
replace | 将 ChannelPipeline 中的一个 ChannelHandler 替换为另一个 Channel |
代码清单 6-5 展示了这些方法的使用。
ChannelPipeline pipeline = ..;
// 创建一个 FirstHandler 的实例
FirstHandler firstHandler = new FirstHandler();
// 将该实例作为"handler1"添加到 ChannelPipeline
pipeline.addLast("handler1", firstHandler);
// 将一个SecondHandler实例作为"handler2"添加到ChannelPipeline的第一个槽。即放置在已有的"handler1"前
pipeline.addFirst("handler2", new SecondHandler());
// 将一个 ThirdHandler 的实例作为"handler3"添加到 ChannelPipeline 的最后一个槽中
pipeline.addLast("handler3", new ThirdHandler());
...
// 通过名称移除"handler3"
pipeline.remove("handler3");
// 通过引用移除FirstHandler(它是唯一的,所以不需要它的名称)
pipeline.remove(firstHandler);
// 将 SecondHandler("handler2")替换为 FourthHandler:"handler4"
pipeline.replace("handler2", "handler4", new ForthHandler());
3.2 ChannelHandler的执行和阻塞
通常 ChannelPipeline 中的每个 ChannelHandler 都是通过它的 EventLoop(I/O 线程)处理传递给它的事件的。所以重要的是别阻塞这个线程,因为对整体 I/O 处理产生负面影响。
与使用阻塞 API 的遗留代码交互
对此,ChannelPipeline 有一些接受一个 EventExecutorGroup 的add()方法。若一个事件被传递给一个自定义的 EventExecutorGroup,它将被包含在这个 EventExecutorGroup 中的某 EventExecutor 处理,从而被从该Channel 本身的 EventLoop 中移除。对这种case,Netty 提供 DefaultEventExecutorGroup 的默认实现。
除了这些操作,还有别的通过类型或者名称来访问 ChannelHandler 的方法。这些方法都在表 6-7。
表 6-7 ChannelPipeline 的用于访问 ChannelHandler 的操作:
名称 | 描 述 |
get | 通过类型或者名称返回 ChannelHandler |
context | 返回和 ChannelHandler 绑定的 ChannelHandlerContext |
names | 返回 ChannelPipeline 中所有 ChannelHandler 的名称 |
用户代码
/**
* @author JavaEdge
*/
public final class Server {
public static void main(String[] args) throws Exception {
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childAttr(AttributeKey.newInstance("childAttr"), "childAttrValue")
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) {
ch.pipeline().addLast(new OutBoundHandlerA());
ch.pipeline().addLast(new OutBoundHandlerC());
ch.pipeline().addLast(new OutBoundHandlerB());
}
});
ChannelFuture f = b.bind(8888).sync();
f.channel().closeFuture().sync();
} finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}
}
}
判断是否重复添加
@Override
public final ChannelPipeline addLast(ChannelHandler... handlers) {
return addLast(null, handlers);
}
@Override
public final ChannelPipeline addLast(EventExecutorGroup executor, ChannelHandler... handlers) {
ObjectUtil.checkNotNull(handlers, "handlers");
for (ChannelHandler h: handlers) {
if (h == null) {
break;
}
addLast(executor, null, h);
}
return this;
}
public final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {
final AbstractChannelHandlerContext newCtx;
synchronized (this) {
// core
checkMultiplicity(handler);
...
}
private static void checkMultiplicity(ChannelHandler handler) {
if (handler instanceof ChannelHandlerAdapter) {
ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;
if (!h.isSharable() && h.added) {
throw new ChannelPipelineException(
h.getClass().getName() +
" is not a @Sharable handler, so can't be added or removed multiple times.");
}
h.added = true;
}
}
public boolean isSharable() {
/**
* Cache the result of {@link Sharable} annotation detection to workaround a condition. We use a
* {@link ThreadLocal} and {@link WeakHashMap} to eliminate the volatile write/reads. Using different
* {@link WeakHashMap} instances per {@link Thread} is good enough for us and the number of
* {@link Thread}s are quite limited anyway.
*
* See <a href="https://github.com/netty/netty/issues/2289">#2289</a>.
*/
Class<?> clazz = getClass();
Map<Class<?>, Boolean> cache = InternalThreadLocalMap.get().handlerSharableCache();
Boolean sharable = cache.get(clazz);
if (sharable == null) {
sharable = clazz.isAnnotationPresent(Sharable.class);
cache.put(clazz, sharable);
}
return sharable;
}
创建节点并添加至链表
private String filterName(String name, ChannelHandler handler) {
if (name == null) {
return generateName(handler);
}
checkDuplicateName(name);
return name;
}
private void checkDuplicateName(String name) {
if (context0(name) != null) {
throw new IllegalArgumentException("Duplicate handler name: " + name);
}
}
private AbstractChannelHandlerContext context0(String name) {
AbstractChannelHandlerContext context = head.next;
while (context != tail) {
if (context.name().equals(name)) {
return context;
}
context = context.next;
}
return null;
}
private AbstractChannelHandlerContext newContext(EventExecutorGroup group, String name, ChannelHandler handler) {
return new DefaultChannelHandlerContext(this, childExecutor(group), name, handler);
}
private void addLast0(AbstractChannelHandlerContext newCtx) {
AbstractChannelHandlerContext prev = tail.prev;
newCtx.prev = prev;
newCtx.next = tail;
prev.next = newCtx;
tail.prev = newCtx;
}
回调添加完成事件
private void callHandlerAddedInEventLoop(final AbstractChannelHandlerContext newCtx, EventExecutor executor) {
newCtx.setAddPending();
executor.execute(new Runnable() {
@Override
public void run() {
callHandlerAdded0(newCtx);
}
});
}
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {
try {
// core
ctx.callHandlerAdded();
} catch (Throwable t) {
boolean removed = false;
try {
atomicRemoveFromHandlerList(ctx);
ctx.callHandlerRemoved();
removed = true;
} catch (Throwable t2) {
if (logger.isWarnEnabled()) {
logger.warn("Failed to remove a handler: " + ctx.name(), t2);
}
}
if (removed) {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; removed.", t));
} else {
fireExceptionCaught(new ChannelPipelineException(
ctx.handler().getClass().getName() +
".handlerAdded() has thrown an exception; also failed to remove.", t));
}
}
}
final void callHandlerAdded() throws Exception {
// We must call setAddComplete before calling handlerAdded. Otherwise if the handlerAdded method generates
// any pipeline events ctx.handler() will miss them because the state will not allow it.
if (setAddComplete()) {
handler().handlerAdded(this);
}
}
final boolean setAddComplete() {
for (;;) {
int oldState = handlerState;
if (oldState == REMOVE_COMPLETE) {
return false;
}
// Ensure we never update when the handlerState is REMOVE_COMPLETE already.
// oldState is usually ADD_PENDING but can also be REMOVE_COMPLETE when an EventExecutor is used that is not
// exposing ordering guarantees.
if (HANDLER_STATE_UPDATER.compareAndSet(this, oldState, ADD_COMPLETE)) {
return true;
}
}
}
用户方自定义实现即可:
4 触发事件
ChannelPipeline 的 API 公开了用于调用入站和出站操作的附加方法。
4.1 入站
表 6-8 ChannelPipeline 的入站操作,用于通知 ChannelInboundHandler 在 ChannelPipeline 中所发生的事件:
在出站这边,处理事件将会导致底层的套接字上发生一系列的动作。
4.2 出站
表 6-9 ChannelPipeline 的出站操作:
4.3 小结
- ChannelPipeline 保存了与 Channel 相关联的 ChannelHandler
- ChannelPipeline 可根据需要,通过添加或删除 ChannelHandler 动态修改
- ChannelPipeline 有着丰富的 API 用以被调用,以响应入站和出站事件
6 outBound事件的传播
同理以后的过程
7 异常的传播
最佳实践
8 pipeline总结
netty如何判断ChannelHandler类型?
调用 pipeline 添加节点时,netty 会使用 instanceof 关键字判断当前节点是 inboound 还是 outbound 类型,分别用不同的 boolean 类型变量标识。
对于ChannelHandler的添加,应遵循何顺序?
- inbound 事件类型顺序正相关
- outbound 逆相关
用户手动触发事件传播,不同触发方式有何区别?
异常处理器要么从 head 或者 tail 节点开始传播inbound事件则从当前节点开始传递到最后节点 outbound事件则从当前节点开始传递 到第一个 outbound节点