Netty 学习(十):ChannelPipeline源码说明作者: Grey
原文地址:
博客园:Netty 学习(十):ChannelPipeline源码说明
【十 Netty 学习:ChannelPipeline源码说明】CSDN:Netty 学习(十):ChannelPipeline源码说明
ChannelPipeline可以看作一条流水线,原料(字节流)进来,经过加工,形成一个个Java对象,然后基于这些对象进行处理,最后输出二进制字节流 。ChannelPipeline 在创建 NioSocketChannel 的时候创建,其默认实现是 DefaultChannelPipeline
final AbstractChannelHandlerContext head;final AbstractChannelHandlerContext tail;protected DefaultChannelPipeline(Channel channel) {this.channel = ObjectUtil.checkNotNull(channel, "channel");succeededFuture = new SucceededChannelFuture(channel, null);voidPromise =new VoidChannelPromise(channel, true);tail = new TailContext(this);head = new HeadContext(this);head.next = tail;tail.prev = head;}
ChannelPipeline 中保存了 Channel 的引用,且其中每个节点都是一个 ChannelHandlerContext 对象 。每个 ChannelHandlerContext 节点都保存了执行器(即:ChannelHandler) 。ChannelPipeline里有两种不同的节点,一种是 ChannelInboundHandler,处理 inbound 事件(例如:读取数据流) , 还有一种是 ChannelOutboundHandler,处理 Outbound 事件,比如调用writeAndFlush()类方法时,就会调用该 handler 。
添加 handler 的逻辑如下
@Overridepublic final ChannelPipeline addLast(EventExecutorGroup group, String name, ChannelHandler handler) {final AbstractChannelHandlerContext newCtx;synchronized (this) {// 检查是否有重复的 handlercheckMultiplicity(handler);// 创建 节点newCtx = newContext(group, filterName(name, handler), handler);// 添加节点addLast0(newCtx);// If the registered is false it means that the channel was not registered on an eventLoop yet.// In this case we add the context to the pipeline and add a task that will call// ChannelHandler.handlerAdded(...) once the channel is registered.if (!registered) {newCtx.setAddPending();callHandlerCallbackLater(newCtx, true);return this;}EventExecutor executor = newCtx.executor();if (!executor.inEventLoop()) {callHandlerAddedInEventLoop(newCtx, executor);return this;}}// 回调用户方法callHandlerAdded0(newCtx);return this;}
如上代码,整个添加过程见注释说明,其实主要就是四步:第一步:检查是否有重复的 handler,核心逻辑见
private static void checkMultiplicity(ChannelHandler handler) {if (handler instanceof ChannelHandlerAdapter) {ChannelHandlerAdapter h = (ChannelHandlerAdapter) handler;if (!h.isSharable() && h.added) {// 非共享的且添加过,就抛出异常,反之,如果一个 handler 支持共享,就可以无限次被添加到 ChannelPipeline 中throw new ChannelPipelineException(h.getClass().getName() +" is not a @Sharable handler, so can't be added or removed multiple times.");}h.added = true;}}
第二步:创建节点,即把 handler 包裹成 ChannelHandlerContext,核心逻辑如下private static final FastThreadLocal<Map<Class<?>, String>> nameCaches =new FastThreadLocal<Map<Class<?>, String>>() {@Overrideprotected Map<Class<?>, String> initialValue() {return new WeakHashMap<Class<?>, String>();}};private String generateName(ChannelHandler handler) {Map<Class<?>, String> cache = nameCaches.get();Class<?> handlerType = handler.getClass();String name = cache.get(handlerType);if (name == null) {name = generateName0(handlerType);cache.put(handlerType, name);}// It's not very likely for a user to put more than one handler of the same type, but make sure to avoid// any name conflicts.Note that we don't cache the names generated here.if (context0(name) != null) {String baseName = name.substring(0, name.length() - 1); // Strip the trailing '0'.for (int i = 1;; i ++) {String newName = baseName + i;if (context0(newName) == null) {name = newName;break;}}}return name;}
注:Netty 使用 FastThreadLocal 变量来缓存 Handler 的类和名称的映射关系,在生成 name 的时候,首先看缓存中有没有生成过默认 name,如果没有生成,就调用generateName0()
生成默认名称,加入缓存 。第三步:把 ChannelHandlerContext 作为节点添加到 pipeline 中,核心代码如下
private void addLast0(AbstractChannelHandlerContext newCtx) {AbstractChannelHandlerContext prev = tail.prev;newCtx.prev = prev;newCtx.next = tail;prev.next = newCtx;tail.prev = newCtx;}
其本质就是一个双向链表的插入节点过程,而且 , ChannelPipeline 删除 ChannelHandler 的方法,本质就是把这个双向链表的某个节点删掉!第四步:回调用户方法,核心代码如下
private void callHandlerAdded0(final AbstractChannelHandlerContext ctx) {try {ctx.callHandlerAdded();} catch (Throwable t) {boolean removed = false;try {atomicRemoveFromHandlerList(ctx);ctx.callHandlerRemoved();removed = true;} catch (Throwable t2) {if (logger.isWarnEnabled()) {logger.warn("Failed to remove a handler: " + ctx.name(), t2);}}if (removed) {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; removed.", t));} else {fireExceptionCaught(new ChannelPipelineException(ctx.handler().getClass().getName() +".handlerAdded() has thrown an exception; also failed to remove.", t));}}}final void callHandlerRemoved() throws Exception {try {// Only call handlerRemoved(...) if we called handlerAdded(...) before.if (handlerState == ADD_COMPLETE) {handler().handlerRemoved(this);}} finally {// Mark the handler as removed in any case.setRemoved();}}
推荐阅读
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
-
- vivo互联网机器学习平台的建设与实践
- 玩魔方的秘诀(十大诡异魔方)
- 世界十大顶级面料:全球十大顶级男装面料排行榜 你知道哪些?
- 三十七 Java开发学习----SpringBoot多环境配置及配置文件分类
- 【博学谷学习记录】超强总结,用心分享|MySql连接查询超详细总结
- Jupyter,Matplotlib,Pandas 【机器学习】利用 Python 进行数据分析的环境配置 Windows
- 去越南必买的十件东西,值得买的越南特产排名,越买越心动
- 一个C#开发者学习SpringCloud搭建微服务的心路历程
- 基于Netty的TCP服务框架
- 狼人杀卡牌怎么玩(狼人杀必背十句口诀)