0
点赞
收藏
分享

微信扫一扫

十:线程与线程池(二)

猎书客er 2022-03-20 阅读 74

接下来我们继续分析 五种线程模型的逻辑实现

在这里插入图片描述

其中关于 bossGroup 与 workerGroup 的关系, 查看此链接了解

ALL:所有消息都派发到线程池

//org.apache.dubbo.remoting.transport.dispatcher.all.AllChannelHandler
public class AllChannelHandler extends WrappedChannelHandler {

    public AllChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }

    //连接
    @Override
    public void connected(Channel channel) throws RemotingException {
        //获取线程池
        ExecutorService executor = getExecutorService();
        try {
            //ChannelEventRunnable实现了Runnable接口,此处直接放在线程池中进行执行。以下的方法都是类似
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("connect event", channel, getClass() + " error when process connected event .", t);
        }
    }

    //断开连接
    @Override
    public void disconnected(Channel channel) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.DISCONNECTED));
        } catch (Throwable t) {
            throw new ExecutionException("disconnect event", channel, getClass() + " error when process disconnected event .", t);
        }
    }

    //接收
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getPreferredExecutorService(message);
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
        } catch (Throwable t) {
        	if(message instanceof Request && t instanceof RejectedExecutionException){
                sendFeedback(channel, (Request) message, t);
                return;
        	}
            throw new ExecutionException(message, channel, getClass() + " error when process received event .", t);
        }
    }

    //异常
    @Override
    public void caught(Channel channel, Throwable exception) throws RemotingException {
        ExecutorService executor = getExecutorService();
        try {
            executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.CAUGHT, exception));
        } catch (Throwable t) {
            throw new ExecutionException("caught event", channel, getClass() + " error when process caught event .", t);
        }
    }
}

其他分别对应

Connection:在IO线程上,将连接断开事件放入队列,有序逐个执行

//org.apache.dubbo.remoting.transport.dispatcher.connection.ConnectionOrderedChannelHandler#ConnectionOrderedChannelHandler

Direct:所有请求都不派发到线程池,全部在IO线程上直接执行

//org.apache.dubbo.remoting.transport.dispatcher.direct.DirectChannelHandler

Execution:只有请求消息派发到线程池,不含响应

//org.apache.dubbo.remoting.transport.dispatcher.execution.ExecutionChannelHandler

Message:只有请求响应消息派发到线程池

//org.apache.dubbo.remoting.transport.dispatcher.message.MessageOnlyChannelHandler
举报

相关推荐

0 条评论