`
rxin2009
  • 浏览: 16652 次
  • 性别: Icon_minigender_1
  • 来自: 杭州
社区版块
存档分类
最新评论

netty基于事件的过程流转

 
阅读更多

本为分析netty源码中的过程流转实现

 

netty中的处理器的流转体现在DefaultChannelPipeline类中,在添加处理器时一般调用addLast(String, ChannelHandler),下面来看看这个方法

public synchronized void addLast(String name, ChannelHandler handler) {
        if (name2ctx.isEmpty()) {
            init(name, handler);
        } else {
            checkDuplicateName(name);
            DefaultChannelHandlerContext oldTail = tail;
            DefaultChannelHandlerContext newTail = new DefaultChannelHandlerContext(oldTail, null, name, handler);

            callBeforeAdd(newTail);

            oldTail.next = newTail;
            tail = newTail;
            name2ctx.put(name, newTail);

            callAfterAdd(newTail);
        }
    }

 

在第一次调用的时候会执行init(name, handler),也就是添加第一个处理器,他会初始化DefaultChannelPipeline的全局变量head、tail,分别表示第一个和最后一个处理器,还有name2ctx,一个map,表示名称和上下文键值对,每一个处理器就有一个上下文类DefaultChannelHandlerContext。

以下是全局变量

static final InternalLogger logger = InternalLoggerFactory.getInstance(DefaultChannelPipeline.class);
    static final ChannelSink discardingSink = new DiscardingChannelSink();

    private volatile Channel channel;
    private volatile ChannelSink sink;
    private volatile DefaultChannelHandlerContext head;
    private volatile DefaultChannelHandlerContext tail;
    private final Map<String, DefaultChannelHandlerContext> name2ctx =
        new HashMap<String, DefaultChannelHandlerContext>(4);

 当再次添加处理器时,多个处理器会构成一个双向链表,这个在构造方法中实现

DefaultChannelHandlerContext(
                DefaultChannelHandlerContext prev, DefaultChannelHandlerContext next,
                String name, ChannelHandler handler) {

            if (name == null) {
                throw new NullPointerException("name");
            }
            if (handler == null) {
                throw new NullPointerException("handler");
            }
            canHandleUpstream = handler instanceof ChannelUpstreamHandler;
            canHandleDownstream = handler instanceof ChannelDownstreamHandler;


            if (!canHandleUpstream && !canHandleDownstream) {
                throw new IllegalArgumentException(
                        "handler must be either " +
                        ChannelUpstreamHandler.class.getName() + " or " +
                        ChannelDownstreamHandler.class.getName() + '.');
            }

            this.prev = prev;
            this.next = next;
            this.name = name;
            this.handler = handler;
        }

 

这个时候多个处理器的上下文就构成一个双向链表,同时上下文类还清晰的标明了处理器是属于上游处理器还是下游处理器。

 

一个上游事件流入管道,它是从head流入的,而下游处理器是从tail流入的,如代码

public void sendUpstream(ChannelEvent e) {
        DefaultChannelHandlerContext head = getActualUpstreamContext(this.head);
        if (head == null) {
            if (logger.isWarnEnabled()) {
                logger.warn(
                        "The pipeline contains no upstream handlers; discarding: " + e);
            }

            return;
        }

        sendUpstream(head, e);
    }

    void sendUpstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        try {
            ((ChannelUpstreamHandler) ctx.getHandler()).handleUpstream(ctx, e);
        } catch (Throwable t) {
            notifyHandlerException(e, t);
        }
    }

    public void sendDownstream(ChannelEvent e) {
        DefaultChannelHandlerContext tail = getActualDownstreamContext(this.tail);
        if (tail == null) {
            try {
                getSink().eventSunk(this, e);
                return;
            } catch (Throwable t) {
                notifyHandlerException(e, t);
                return;
            }
        }

        sendDownstream(tail, e);
    }

    void sendDownstream(DefaultChannelHandlerContext ctx, ChannelEvent e) {
        if (e instanceof UpstreamMessageEvent) {
            throw new IllegalArgumentException("cannot send an upstream event to downstream");
        }

        try {
            ((ChannelDownstreamHandler) ctx.getHandler()).handleDownstream(ctx, e);
        } catch (Throwable t) {
            // Unlike an upstream event, a downstream event usually has an
            // incomplete future which is supposed to be updated by ChannelSink.
            // However, if an exception is raised before the event reaches at
            // ChannelSink, the future is not going to be updated, so we update
            // here.
            e.getFuture().setFailure(t);
            notifyHandlerException(e, t);
        }
    }

 

 

一个上游事件在执行完第一个上游处理器的代码后,会调用上下文DefaultChannelHandlerContext的sendUpstream方法,如下代码

public void sendUpstream(ChannelEvent e) {
            DefaultChannelHandlerContext next = getActualUpstreamContext(this.next);
            if (next != null) {
                DefaultChannelPipeline.this.sendUpstream(next, e);
            }
        }

 这个方法的作用就是找到下一个上游处理器,对比下游事件的流转代码就是找到下一个下游处理器

public void sendDownstream(ChannelEvent e) {
            DefaultChannelHandlerContext prev = getActualDownstreamContext(this.prev);
            if (prev == null) {
                try {
                    getSink().eventSunk(DefaultChannelPipeline.this, e);
                } catch (Throwable t) {
                    notifyHandlerException(e, t);
                }
            } else {
                DefaultChannelPipeline.this.sendDownstream(prev, e);
            }
        }

 下游事件是从tail节点开始向head方向找处理器,如果没有处理器了,那么将执行事件下沉,就是实现底层的数据传输,这个一般不用用户插手。而上游事件达到处理器末端后是什么都不做,表示一个上游事件处理完毕。

 

事件的过程流转,总的来说就是,处理事件时DefaultChannelPipeline找到第一个匹配的处理器上下文,然后执行处理器代码,在通过上下文类DefaultChannelHandlerContext找到下一个匹配的处理器。这里有一个值得注意的地方就是事件是否继续流转到下一个处理器是由处理器自身决定的,如ctx.sendUpstream(e),这样就可以灵活控制事件的流转了。

分享到:
评论

相关推荐

Global site tag (gtag.js) - Google Analytics