本为分析netty源码中的过程流转实现
netty中的处理器的流转体现在DefaultChannelPipeline类中,在添加处理器时一般调用addLast(String, ChannelHandler),下面来看看这个方法
public synchronized void addLast(String name, ChannelHandler handler) { if (name2ctx.isEmpty()) { init(name, handler); } else { checkDuplicateName(name); DefaultChannelHandlerContext oldTail = tail; DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler); callBeforeAdd(newTail); oldTail.next = newTail; tail = newTail; name2ctx.put(name, newTail); callAfterAdd(newTail); } }
在第一次调用的时候会执行init(name, handler),也就是添加第一个处理器,他会初始化DefaultChannelPipeline的全局变量head、tail,分别表示第一个和最后一个处理器,还有name2ctx,一个map,表示名称和上下文键值对,每一个处理器就有一个上下文类DefaultChannelHandlerContext。
以下是全局变量
static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class); static final ChannelSink discardingSink = new DiscardingChannelSink(); private volatile Channel channel; private volatile ChannelSink sink; private volatile DefaultChannelHandlerContext head; private volatile DefaultChannelHandlerContext tail; private final Map<String, DefaultChannelHandlerContext> name2ctx = new HashMap<String, DefaultChannelHandlerContext>(4);
当再次添加处理器时,多个处理器会构成一个双向链表,这个在构造方法中实现
DefaultChannelHandlerContext( DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next, String name, ChannelHandler handler) { if (name == null) { throw new NullPointerException("name"); } if (handler == null) { throw new NullPointerException("handler"); } canHandleUpstream = handler instanceof ChannelUpstreamHandler; canHandleDownstream = handler instanceof ChannelDownstreamHandler; if (!canHandleUpstream && !canHandleDownstream) { throw new IllegalArgumentException( "handler must be either " + ChannelUpstreamHandler.class.getName() + " or " + ChannelDownstreamHandler.class.getName() + '.'); } this.prev = prev; this.next = next; this.name = name; this.handler = handler; }
这个时候多个处理器的上下文就构成一个双向链表,同时上下文类还清晰的标明了处理器是属于上游处理器还是下游处理器。
一个上游事件流入管道,它是从head流入的,而下游处理器是从tail流入的,如代码
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext head = getActualUpstreamContext(this.head); if (head == null) { if (logger.isWarnEnabled()) { logger.warn( "The pipeline contains no upstream handlers; discarding: " + e); } return; } sendUpstream(head, e); } void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { try { ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e); } catch (Throwable t) { notifyHandlerException(e, t); } } public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail); if (tail == null) { try { getSink().eventSunk(this, e); return; } catch (Throwable t) { notifyHandlerException(e, t); return; } } sendDownstream(tail, e); } void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) { if (e instanceof UpstreamMessageEvent) { throw new IllegalArgumentException("cannot send an upstream event to downstream"); } try { ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e); } catch (Throwable t) { // Unlike an upstream event, a downstream event usually has an // incomplete future which is supposed to be updated by ChannelSink. // However, if an exception is raised before the event reaches at // ChannelSink, the future is not going to be updated, so we update // here. e.getFuture().setFailure(t); notifyHandlerException(e, t); } }
一个上游事件在执行完第一个上游处理器的代码后,会调用上下文DefaultChannelHandlerContext的sendUpstream方法,如下代码
public void sendUpstream(ChannelEvent e) { DefaultChannelHandlerContext next = getActualUpstreamContext(this.next); if (next != null) { DefaultChannelPipeline.this.sendUpstream(next, e); } }
这个方法的作用就是找到下一个上游处理器,对比下游事件的流转代码就是找到下一个下游处理器
public void sendDownstream(ChannelEvent e) { DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev); if (prev == null) { try { getSink().eventSunk(DefaultChannelPipeline.this, e); } catch (Throwable t) { notifyHandlerException(e, t); } } else { DefaultChannelPipeline.this.sendDownstream(prev, e); } }
下游事件是从tail节点开始向head方向找处理器,如果没有处理器了,那么将执行事件下沉,就是实现底层的数据传输,这个一般不用用户插手。而上游事件达到处理器末端后是什么都不做,表示一个上游事件处理完毕。
事件的过程流转,总的来说就是,处理事件时DefaultChannelPipeline找到第一个匹配的处理器上下文,然后执行处理器代码,在通过上下文类DefaultChannelHandlerContext找到下一个匹配的处理器。这里有一个值得注意的地方就是事件是否继续流转到下一个处理器是由处理器自身决定的,如ctx.sendUpstream(e),这样就可以灵活控制事件的流转了。
相关推荐
netty案例,netty4.1中级拓展篇十三《Netty基于SSL实现信息传输过程中双向加密验证》源码 ...
netty案例,netty4.1中级拓展篇十一《Netty基于ChunkedStream数据流切块传输》源码 ...
netty框架基于http socket websocket及心跳包机制的demo
JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA版基于netty的物联网高并发智能网关 JAVA...
一种基于NETTY的远程过程调用方法.pdf
基于netty 的udp字节数据接 收服务,发送服务实例 基于netty 的udp字节数据接收服务,发送服务实例
JAVA采用Netty库实现基于以DTU传输的TCP服务器 ,可以支持多端口通讯 ,同时也支持 多协议解析
基于netty框架编写的socket服务器
写了一个简单的netty server和client,传输协议是google protobuf。上传文件主要包括源码以及转换proto文件的工具.
基于Netty实现的命令行斗地主游戏,为划水摸鱼而生~ 基于Netty实现的命令行斗地主游戏,为划水摸鱼而生~ 基于Netty实现的命令行斗地主游戏,为划水摸鱼而生~ 基于Netty实现的命令行斗地主游戏,为划水摸鱼而生...
Netty4事件处理传播机制,java高级开发工程师要求(csdn)————程序
经过本人测试通过,少走弯路
一个基于netty实现web框架,或者mvc框架,实现基于netty的web框架,你说netty强不强,文中有不对的地方,欢迎大牛指正
1、基于netty+websocket+springboot的实时聊天系统项目源码.zip 2、该资源包括项目的全部源码,下载可以直接使用! 3、本项目适合作为计算机、数学、电子信息等专业的课程设计、期末大作业和毕设项目,作为参考资料...
基于Netty框架开发的Modbus源代码。支持 * READ COILS | 0x01 * READ DISCRETE INPUTS | 0x02 * READ HOLDING REGISTERS | 0x03 * READ INPUT REGISTERS | 0x04 * WRITE SINGLE COIL | 0x05 * WRITE SINGLE REGISTER...
基于Netty实现了dubbo rpc
从最简单的Socked编程,到目前为止已经有了开源的框架,那就是Netty,它Jobss开发的一个网络异步应用框架,能高快捷的实现网络客户端和服务器端的优秀框架,而本实例就是基于这个框架实现的聊天程序,希望对你有用
基于Netty手写Dubbo,该资源包含 1、生产者向zk服务的注册,消费者发现服务。 2、RPC远程调用实现。 3、netty服务调用,对象序列化和反序列化。 4、负载均衡的简单实现 详情见博客:...
基于Netty的文件上传源码、基于socket通信的源码、基于udp协议通信的源码
Netty是一个提供异步事件驱动的网络应用框架,用以快速开发高性能、高可靠性的网络服务器和客户端程序。 换句话说,Netty是一个NIO框架,使用它可以简单快速地开发网络应用程序,比如客户端和服务端的协议。Netty...