本篇文章为大家展示了dubbo中ExecutionDispatcher的作用是什么,内容简明扼要并且容易理解,绝对能使你眼前一亮,通过这篇文章的详细介绍希望你能有所收获。

10年积累的网站设计、成都网站设计经验,可以快速应对客户对网站的新想法和需求。提供各种问题对应的解决方案。让选择我们的客户得到更好、更有力的网络服务。我虽然不认识你,你也不认识我。但先网站制作后付款的网站建设流程,更有西盟免费网站建设让你可以放心的选择与我们合作。
ExecutionDispatcher
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionDispatcher.java
public class ExecutionDispatcher implements Dispatcher {
    public static final String NAME = "execution";
    @Override
    public ChannelHandler dispatch(ChannelHandler handler, URL url) {
        return new ExecutionChannelHandler(handler, url);
    }
}- ExecutionDispatcher实现了Dispatcher接口,其dispatch方法返回的是ExecutionChannelHandler 
ExecutionChannelHandler
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/main/java/org/apache/dubbo/remoting/transport/dispatcher/execution/ExecutionChannelHandler.java
public class ExecutionChannelHandler extends WrappedChannelHandler {
    public ExecutionChannelHandler(ChannelHandler handler, URL url) {
        super(handler, url);
    }
    @Override
    public void received(Channel channel, Object message) throws RemotingException {
        ExecutorService executor = getExecutorService();
        if (message instanceof Request) {
            try {
                executor.execute(new ChannelEventRunnable(channel, handler, ChannelState.RECEIVED, message));
            } catch (Throwable t) {
                // FIXME: when the thread pool is full, SERVER_THREADPOOL_EXHAUSTED_ERROR cannot return properly,
                // therefore the consumer side has to wait until gets timeout. This is a temporary solution to prevent
                // this scenario from happening, but a better solution should be considered later.
                if (t instanceof RejectedExecutionException) {
                    Request request = (Request) message;
                    if (request.isTwoWay()) {
                        String msg = "Server side(" + url.getIp() + "," + url.getPort()
                                + ") thread pool is exhausted, detail msg:" + t.getMessage();
                        Response response = new Response(request.getId(), request.getVersion());
                        response.setStatus(Response.SERVER_THREADPOOL_EXHAUSTED_ERROR);
                        response.setErrorMessage(msg);
                        channel.send(response);
                        return;
                    }
                }
                throw new ExecutionException(message, channel, getClass() + " error when process received event.", t);
            }
        } else {
            handler.received(channel, message);
        }
    }
}- ExecutionChannelHandler继承了WrappedChannelHandler,其received方法判断message是否是Request类型,如果是则创建ChannelEventRunnable放到线程池里头执行,如果不是则直接执行handler.received 
PerformanceServerTest
dubbo-2.7.3/dubbo-remoting/dubbo-remoting-api/src/test/java/org/apache/dubbo/remoting/PerformanceServerTest.java
public class PerformanceServerTest  {
    private static final Logger logger = LoggerFactory.getLogger(PerformanceServerTest.class);
    private static ExchangeServer server = null;
    private static void restartServer(int times, int alive, int sleep) throws Exception {
        if (server != null && !server.isClosed()) {
            server.close();
            Thread.sleep(100);
        }
        for (int i = 0; i < times; i++) {
            logger.info("restart times:" + i);
            server = statServer();
            if (alive > 0) Thread.sleep(alive);
            server.close();
            if (sleep > 0) Thread.sleep(sleep);
        }
        server = statServer();
    }
    private static ExchangeServer statServer() throws Exception {
        final int port = PerformanceUtils.getIntProperty("port", 9911);
        final String transporter = PerformanceUtils.getProperty(Constants.TRANSPORTER_KEY, Constants.DEFAULT_TRANSPORTER);
        final String serialization = PerformanceUtils.getProperty(Constants.SERIALIZATION_KEY, Constants.DEFAULT_REMOTING_SERIALIZATION);
        final String threadpool = PerformanceUtils.getProperty(THREADPOOL_KEY, DEFAULT_THREADPOOL);
        final int threads = PerformanceUtils.getIntProperty(THREADS_KEY, DEFAULT_THREADS);
        final int iothreads = PerformanceUtils.getIntProperty(IO_THREADS_KEY, Constants.DEFAULT_IO_THREADS);
        final int buffer = PerformanceUtils.getIntProperty(BUFFER_KEY, DEFAULT_BUFFER_SIZE);
        final String channelHandler = PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME);
        // Start server
        ExchangeServer server = Exchangers.bind("exchange://0.0.0.0:" + port + "?transporter="
                + transporter + "&serialization="
                + serialization + "&threadpool=" + threadpool
                + "&threads=" + threads + "&iothreads=" + iothreads + "&buffer=" + buffer + "&channel.handler=" + channelHandler, new ExchangeHandlerAdapter() {
            public String telnet(Channel channel, String message) throws RemotingException {
                return "echo: " + message + "\r\ntelnet> ";
            }
            public CompletableFuture- PerformanceServerTest的statServer方法使用PerformanceUtils.getProperty(Constants.DISPATCHER_KEY, ExecutionDispatcher.NAME)获取channelHandler,找不到则使用ExecutionDispatcher.NAME 
上述内容就是dubbo中ExecutionDispatcher的作用是什么,你们学到知识或技能了吗?如果还想学到更多技能或者丰富自己的知识储备,欢迎关注创新互联行业资讯频道。
当前名称:dubbo中ExecutionDispatcher的作用是什么
文章链接:http://www.scyingshan.cn/article/jdiihj.html

 建站
建站
 咨询
咨询 售后
售后
 建站咨询
建站咨询 
 