文章目录
异步调用
1.dubbo配置
provider端:
对整个provider小程序开发定制小程序开发定制端所有服务和方法进行异步 dubbo.provider.=true
小程序开发定制对某个具体服务所小程序开发定制有方法异步 @Service(async = true)
小程序开发定制对服务的某些具体方法异步 @Service(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)})
consumer端:
1.对整个consumer端所有服务和方法进行异步 dubbo.reference.async=true。小程序开发定制很少这样使用
2.小程序开发定制对某个具体引用服务所有方法异步 @Reference(async = true)。使用较少
3.小程序开发定制对引用服务的某些具体方法异步 @Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)})。dubbo常用异步配置
4.在consumer端调用内设置RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”); 优先级最高
常用使用规则:
consumer端覆盖provider端配置,方法级别优先。
通常不会在provider端设置异步,都是在consumer端进行异步设置。具体代码如下
@Reference(retries = 0, timeout = 2000, methods = {@Method(name="findProduct",async = true)})private ProductService productService;@Overridepublic Result<ProductVO> getProduct(ProductDTO dto) { Result<ProductVO> result = productService.findProduct(dto);//dubbo异步调用,此时输出result是null Future<Object> future = RpcContext.getContext().getFuture();//获取异步执行结果Future //do othder something try { result = (Result<ProductVO>) future.get();//获取具体的异步执行结果 } catch (InterruptedException | ExecutionException e) { e.printStackTrace(); } return result;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
2.dubbo异步调用分析
2.1.异步调用分析
dubbo异步是在DubboInvoker(AbstractInvoker)内实现,具体代码如下
//com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(Invocation)public Result invoke(Invocation inv) throws RpcException { // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed if (destroyed.get()) { logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, " + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer"); } RpcInvocation invocation = (RpcInvocation) inv; invocation.setInvoker(this); if (attachment != null && attachment.size() > 0) { invocation.addAttachmentsIfAbsent(attachment); } Map<String, String> contextAttachments = RpcContext.getContext().getAttachments(); if (contextAttachments != null && contextAttachments.size() != 0) { invocation.addAttachments(contextAttachments);//把RpcContext的隐式参数保存到RpcInvocation.attachments。如果在上下文设置了async=true就说明使用一部 } if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//url上有异步标识${方法名}.async=true 或者 default.async=true invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());//设置异步标识到RpcInvocation.attachments } RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation); try { return doInvoke(invocation);//DubboInvoker.doInvoke(Invocation) } catch (InvocationTargetException e) { // biz exception //其它忽略 }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
就是保存异步标识到RpcInvocation.attachments,异步标识的来源:1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”); 2. url上有${方法名}.async=true
//com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(Invocation)@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable { RpcInvocation inv = (RpcInvocation) invocation; final String methodName = RpcUtils.getMethodName(invocation); inv.setAttachment(Constants.PATH_KEY, getUrl().getPath()); inv.setAttachment(Constants.VERSION_KEY, version); ExchangeClient currentClient; if (clients.length == 1) { currentClient = clients[0]; } else { currentClient = clients[index.getAndIncrement() % clients.length]; } try { boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//url上有async=true,认为是异步 boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//url上有return=false or 隐式参数有return=false,认为是onway,不需要响应 int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); if (isOneway) { boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false); currentClient.send(inv, isSent); RpcContext.getContext().setFuture(null); return new RpcResult(); } else if (isAsync) {//dubbo异步调用 ResponseFuture future = currentClient.request(inv, timeout);//网络调用 RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));//隐式参数存放FutureAdapter,封装了ResponseFuture,因此consumer端需要RpcContext.getContext().getFuture()获取Future,然后Future.get()阻塞获取执行结果 return new RpcResult();//返回RpcResult,其value是null,这就是异步调用获取的执行结果是null的原因 } else {//dubbo同步调用 RpcContext.getContext().setFuture(null); return (Result) currentClient.request(inv, timeout).get();//阻塞获取执行结果 } } catch (TimeoutException e) { throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); } catch (RemotingException e) { throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
从上面代码发现,异步和同步区别就在于是否直接阻塞获取执行结果而已。异步调用后,业务上可以做一些其它事情,然后再获取异步执行结果。获取执行结果,异步和同步都一样,本质都是com.alibaba.dubbo.remoting.exchange.ResponseFuture.get()
。异步调用是把ResponseFuture方到了dubbo上下文,需要客户端手工显示的获取ResponseFuture,接着再get()获取执行结果。
以前时候(dubbo2.5),dubbo异步调用还有个坑,比如A->B—>C,其中A调用B是异步,B调用C是同步,但是实际上B调用C也会异步,因为异步标识被传递了。这个问题在dubbo2.6是不存在的,可以看ContextFilter内,把隐式参数async清除了。
2.2.异步调用判断和异步标识来源
接着看异步的判断:
异步的判断RpcUtils.isAsync(URL, Invocation)
//com.alibaba.dubbo.rpc.support.RpcUtils.isAsync(URL, Invocation)public static boolean isAsync(URL url, Invocation inv) { boolean isAsync; if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {//RpcInvocation.attachments有async=true isAsync = true; } else { isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);//url上有异步标识${方法名}.async=true 或者 default.async=true } return isAsync;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
异步标识的来源:
- RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”);
- url上有${方法名}.async=true
那么url上有异步标识是通过@Reference(async = true) 或 @Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)}) 配置
@Reference(async = true)
@Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)}) 配置 生成的url例子
@Method这样设置异步,具体是在com.alibaba.dubbo.config.ReferenceConfig.init()
3.dubbo异步调用图解
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bQ87inuU-1649166821546)(https://cdn.jsdelivr.net/gh/zhangyj131/mdpicture/docs/20210817211712.png)]
步骤1. consumer端业务线程调用dubbo引用方法,由IO线程发起网络通信,发送请求给provider
步骤2. netty nio网络通信,异步
步骤3. 设置ResponseFuture到RpcContext上。和步骤2基本是同时进行
步骤4. 从RpcContext获取ResponseFuture。然后consumer端进行其它业务逻辑处理
步骤5. ResponseFuture 阻塞获取provider端执行结果。其中步骤6、7可能执行结果早于步骤5,
步骤6. provider端处理完毕,响应执行结果
步骤7. 设置provider端执行结果到ResponseFuture,并唤醒ResponseFuture
4.总结
dubbo异步是基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较少。
dubbo异步调用就是请求发送出去,然后业务做一些其它处理,最后再获取异步执行结果。
dubbo异步调用使用场景:
1.用于耗时久的请求调用,且下步业务处理,不依赖这个异步调用结果
2.用于不重要的操作,比如记录用户行为等
dubbo异步在平时工作中使用较少,基本还是同步为主,真要到了dubbo同步无法提高吞吐量和解耦,通常需要改变架构设计了,通常会使用mq进行异步消峰解耦。