Dubbo Provider 函数执行过程

您所在的位置:网站首页 dubbo开发流程 Dubbo Provider 函数执行过程

Dubbo Provider 函数执行过程

2024-07-04 10:02| 来源: 网络整理| 查看: 265

引言

在 Dubbo 系列文章的最后,我们回过头来看一下整个 RPC 过程是如何运作起来的,本文着重介绍整个调用链路中 Provider 的函数执行过程,其他 Dubbo 相关文章均收录于 。

服务调用过程

在前面的文章中,我们分析了 Dubbo SPI、服务导出与引入、以及集群容错方面的代码。经过前文的铺垫,我们终于可以分析服务调用过程了。Dubbo 服务调用过程比较复杂,包含众多步骤,比如发送请求、编解码、服务降级、过滤器链处理、序列化、线程派发以及响应请求等步骤。接下来我们将会重点分析请求的发送与接收、编解码、线程派发以及响应的发送与接收等过程。dubbo-thread-model首先服务消费者通过代理对象 Proxy 发起远程调用,接着通过网络客户端 Client 将编码后的请求发送给服务提供方的网络层上,也就是 Server。Server 在收到请求后,首先要做的事情是对数据包进行解码。然后将解码后的请求发送至分发器 Dispatcher,再由分发器将请求派发到指定的线程池上,最后由线程池调用具体的服务。这就是一个远程调用请求的发送与接收过程。至于响应的发送与接收过程,这张图中没有表现出来。对于这两个过程,我们也会进行详细分析。

服务调用方式

Dubbo 支持同步和异步两种调用方式,其中异步调用还可细分为“有返回值”的异步调用和“无返回值”的异步调用。所谓“无返回值”异步调用是指服务消费方只管调用,但不关心调用结果,此时 Dubbo 会直接返回一个空的 RpcResult。若要使用异步特性,需要服务消费方手动进行配置。默认情况下,Dubbo 使用同步调用方式。

本节以及其他章节将会使用 Dubbo 官方提供的 Demo 分析整个调用过程,下面我们从 DemoService 接口的代理类开始进行分析。Dubbo 默认使用 Javassist 框架为服务接口生成动态代理类,因此我们需要先将代理类进行反编译才能看到源码。这里使用阿里开源 Java 应用诊断工具 Arthas 反编译代理类,结果如下:

/** * Arthas 反编译步骤: * 1. 启动 Arthas * java -jar arthas-boot.jar * * 2. 输入编号选择进程 * Arthas 启动后,会打印 Java 应用进程列表,如下: * [1]: 11232 org.jetbrains.jps.cmdline.Launcher * [2]: 22370 org.jetbrains.jps.cmdline.Launcher * [3]: 22371 com.alibaba.dubbo.demo.consumer.Consumer * [4]: 22362 com.alibaba.dubbo.demo.provider.Provider * [5]: 2074 org.apache.zookeeper.server.quorum.QuorumPeerMain * 这里输入编号 3,让 Arthas 关联到启动类为 com.....Consumer 的 Java 进程上 * * 3. 由于 Demo 项目中只有一个服务接口,因此此接口的代理类类名为 proxy0,此时使用 sc 命令搜索这个类名。 * $ sc *.proxy0 * com.alibaba.dubbo.common.bytecode.proxy0 * * 4. 使用 jad 命令反编译 com.alibaba.dubbo.common.bytecode.proxy0 * $ jad com.alibaba.dubbo.common.bytecode.proxy0 * * 更多使用方法请参考 Arthas 官方文档: * https://alibaba.github.io/arthas/quick-start.html */ public class proxy0 implements ClassGenerator.DC, EchoService, DemoService { // 方法数组 public static Method[] methods; private InvocationHandler handler; public proxy0(InvocationHandler invocationHandler) { this.handler = invocationHandler; } public proxy0() { } public String sayHello(String string) { // 将参数存储到 Object 数组中 Object[] arrobject = new Object[]{string}; // 调用 InvocationHandler 实现类的 invoke 方法得到调用结果 Object object = this.handler.invoke(this, methods[0], arrobject); // 返回调用结果 return (String)object; } /** 回声测试方法 */ public Object $echo(Object object) { Object[] arrobject = new Object[]{object}; Object object2 = this.handler.invoke(this, methods[1], arrobject); return object2; } }

如上,代理类的逻辑比较简单。首先将运行时参数存储到数组中,然后调用 InvocationHandler 接口实现类的 invoke 方法,得到调用结果,最后将结果转型并返回给调用方。关于代理类的逻辑就说这么多,继续向下分析。

public class InvokerInvocationHandler implements InvocationHandler { private final Invoker invoker; public InvokerInvocationHandler(Invoker handler) { this.invoker = handler; } @Override public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { String methodName = method.getName(); Class[] parameterTypes = method.getParameterTypes(); // 拦截定义在 Object 类中的方法(未被子类重写),比如 wait/notify if (method.getDeclaringClass() == Object.class) { return method.invoke(invoker, args); } // 如果 toString、hashCode 和 equals 等方法被子类重写了,这里也直接调用 if ("toString".equals(methodName) && parameterTypes.length == 0) { return invoker.toString(); } if ("hashCode".equals(methodName) && parameterTypes.length == 0) { return invoker.hashCode(); } if ("equals".equals(methodName) && parameterTypes.length == 1) { return invoker.equals(args[0]); } // 将 method 和 args 封装到 RpcInvocation 中,并执行后续的调用 return invoker.invoke(new RpcInvocation(method, args)).recreate(); } }

InvokerInvocationHandler 中的 invoker 成员变量类型为 MockClusterInvoker,MockClusterInvoker 内部封装了服务降级逻辑。下面简单看一下:

public class MockClusterInvoker implements Invoker { private final Invoker invoker; public Result invoke(Invocation invocation) throws RpcException { Result result = null; // 获取 mock 配置值 String value = directory.getUrl().getMethodParameter(invocation.getMethodName(), Constants.MOCK_KEY, Boolean.FALSE.toString()).trim(); if (value.length() == 0 || value.equalsIgnoreCase("false")) { // 无 mock 逻辑,直接调用其他 Invoker 对象的 invoke 方法, // 比如 FailoverClusterInvoker result = this.invoker.invoke(invocation); } else if (value.startsWith("force")) { // force:xxx 直接执行 mock 逻辑,不发起远程调用 result = doMockInvoke(invocation, null); } else { // fail:xxx 表示消费方对调用服务失败后,再执行 mock 逻辑,不抛出异常 try { // 调用其他 Invoker 对象的 invoke 方法 result = this.invoker.invoke(invocation); } catch (RpcException e) { if (e.isBiz()) { throw e; } else { // 调用失败,执行 mock 逻辑 result = doMockInvoke(invocation, e); } } } return result; } // 省略其他方法 }

服务降级不是本文重点,因此这里就不分析 doMockInvoke 方法了。考虑到前文已经详细分析过 FailoverClusterInvoker,因此本节略过 FailoverClusterInvoker,直接分析 DubboInvoker。

public abstract class AbstractInvoker implements Invoker { public Result invoke(Invocation inv) throws RpcException { if (destroyed.get()) { throw new RpcException("Rpc invoker for service ..."); } RpcInvocation invocation = (RpcInvocation) inv; // 设置 Invoker invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { // 设置 attachment invocation.addAttachmentsIfAbsent(attachment); } Map contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { // 添加 contextAttachments 到 RpcInvocation#attachment 变量中 invocation.addAttachments(contextAttachments); } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) { // 设置异步信息到 RpcInvocation#attachment 中 invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString()); } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { // 抽象方法,由子类实现 return doInvoke(invocation); } catch (InvocationTargetException e) { // ... } catch (RpcException e) { // ... } catch (Throwable e) { return new RpcResult(e); } } protected abstract Result doInvoke(Invocation invocation) throws Throwable; // 省略其他方法 }

上面的代码来自 AbstractInvoker 类,其中大部分代码用于添加信息到 RpcInvocation#attachment 变量中,添加完毕后,调用 doInvoke 执行后续的调用。doInvoke 是一个抽象方法,需要由子类实现,下面到 DubboInvoker 中看一下。

public class DubboInvoker extends AbstractInvoker { private final ExchangeClient[] clients; protected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); // 设置 path 和 version 到 attachment 中 inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { // 从 clients 数组中获取 ExchangeClient currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { // 获取异步配置 boolean isAsync = RpcUtils.isAsync(getUrl(), invocation); // isOneway 为 true,表示“单向”通信 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation); int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 异步无返回值 if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); // 发送请求 currentClient.send(inv, isSent); // 设置上下文中的 future 字段为 null RpcContext.getContext().setFuture(null); // 返回一个空的 RpcResult return new RpcResult(); } // 异步有返回值 else if (isAsync) { // 发送请求,并得到一个 ResponseFuture 实例 ResponseFuture future = currentClient.request(inv, timeout); // 设置 future 到上下文中 RpcContext.getContext().setFuture(new FutureAdapter(future)); // 暂时返回一个空结果 return new RpcResult(); } // 同步调用 else { RpcContext.getContext().setFuture(null); // 发送请求,得到一个 ResponseFuture 实例,并调用该实例的 get 方法进行等待 return (Result) currentClient.request(inv, timeout).get(); } } catch (TimeoutException e) { throw new RpcException(..., "Invoke remote method timeout...."); } catch (RemotingException e) { throw new RpcException(..., "Failed to invoke remote method: ..."); } } // 省略其他方法 }

上面的代码包含了 Dubbo 对同步和异步调用的处理逻辑,搞懂了上面的代码,会对 Dubbo 的同步和异步调用方式有更深入的了解。Dubbo 实现同步和异步调用比较关键的一点就在于由谁调用 ResponseFuture 的 get 方法。同步调用模式下,由框架自身调用 ResponseFuture 的 get 方法。异步调用模式下,则由用户调用该方法。ResponseFuture 是一个接口,下面我们来看一下它的默认实现类 DefaultFuture 的源码。

public class DefaultFuture implements ResponseFuture { private static final Map CHANNELS = new ConcurrentHashMap(); private static final Map FUTURES = new ConcurrentHashMap(); private final long id; private final Channel channel; private final Request request; private final int timeout; private final Lock lock = new ReentrantLock(); private final Condition done = lock.newCondition(); private volatile Response response; public DefaultFuture(Channel channel, Request request, int timeout) { this.channel = channel; this.request = request; // 获取请求 id,这个 id 很重要,后面还会见到 this.id = request.getId(); this.timeout = timeout > 0 ? timeout : channel.getUrl().getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); // 存储 映射关系到 FUTURES 中 FUTURES.put(id, this); CHANNELS.put(id, channel); } @Override public Object get() throws RemotingException { return get(timeout); } @Override public Object get(int timeout) throws RemotingException { if (timeout timeout) { break; } } } catch (InterruptedException e) { throw new RuntimeException(e); } finally { lock.unlock(); } // 如果调用结果仍未返回,则抛出超时异常 if (!isDone()) { throw new TimeoutException(sent > 0, channel, getTimeoutMessage(false)); } } // 返回调用结果 return returnFromResponse(); } @Override public boolean isDone() { // 通过检测 response 字段为空与否,判断是否收到了调用结果 return response != null; } private Object returnFromResponse() throws RemotingException { Response res = response; if (res == null) { throw new IllegalStateException("response cannot be null"); } // 如果调用结果的状态为 Response.OK,则表示调用过程正常,服务提供方成功返回了调用结果 if (res.getStatus() == Response.OK) { return res.getResult(); } // 抛出异常 if (res.getStatus() == Response.CLIENT_TIMEOUT || res.getStatus() == Response.SERVER_TIMEOUT) { throw new TimeoutException(res.getStatus() == Response.SERVER_TIMEOUT, channel, res.getErrorMessage()); } throw new RemotingException(channel, res.getErrorMessage()); } // 省略其他方法 }

如上,当服务消费者还未接收到调用结果时,用户线程调用 get 方法会被阻塞住。同步调用模式下,框架获得 DefaultFuture 对象后,会立即调用 get 方法进行等待。而异步模式下则是将该对象封装到 FutureAdapter 实例中,并将 FutureAdapter 实例设置到 RpcContext 中,供用户使用。FutureAdapter 是一个适配器,用于将 Dubbo 中的 ResponseFuture 与 JDK 中的 Future 进行适配。这样当用户线程调用 Future 的 get 方法时,经过 FutureAdapter 适配,最终会调用 ResponseFuture 实现类对象的 get 方法,也就是 DefaultFuture 的 get 方法。

到这里关于 Dubbo 几种调用方式的代码逻辑就分析完了,下面来分析请求数据的发送与接收,以及响应数据的发送与接收过程。

文章说明

更多有价值的文章均收录于贝贝猫的文章目录

stun

版权声明: 本博客所有文章除特别声明外,均采用 BY-NC-SA 许可协议。转载请注明出处!

创作声明: 本文基于下列所有参考内容进行创作,其中可能涉及复制、修改或者转换,图片均来自网络,如有侵权请联系我,我会第一时间进行删除。

参考内容

[1]《深入理解Apache Dubbo与实战》[2] dubbo 官方文档



【本文地址】


今日新闻


推荐新闻


CopyRight 2018-2019 办公设备维修网 版权所有 豫ICP备15022753号-3