小程序开发定制dubbo系列十、dubbo异步调用

文章目录

异步调用

1.dubbo配置

provider端:

对整个provider小程序开发定制小程序开发定制端所有服务和方法进行异步 dubbo.provider.=true

小程序开发定制对某个具体服务所小程序开发定制有方法异步 @Service(async = true)

小程序开发定制对服务的某些具体方法异步 @Service(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)})

consumer端:

1.对整个consumer端所有服务和方法进行异步 dubbo.reference.async=true。小程序开发定制很少这样使用

2.小程序开发定制对某个具体引用服务所有方法异步 @Reference(async = true)。使用较少

3.小程序开发定制对引用服务的某些具体方法异步 @Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)})。dubbo常用异步配置

4.在consumer端调用内设置RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”); 优先级最高

常用使用规则:

consumer端覆盖provider端配置,方法级别优先。

通常不会在provider端设置异步,都是在consumer端进行异步设置。具体代码如下

@Reference(retries = 0, timeout = 2000, methods = {@Method(name="findProduct",async = true)})private ProductService productService;@Overridepublic Result<ProductVO> getProduct(ProductDTO dto) {    Result<ProductVO> result = productService.findProduct(dto);//dubbo异步调用,此时输出result是null    Future<Object> future = RpcContext.getContext().getFuture();//获取异步执行结果Future    //do othder something    try {        result = (Result<ProductVO>) future.get();//获取具体的异步执行结果    } catch (InterruptedException | ExecutionException e) {        e.printStackTrace();    }    return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16

2.dubbo异步调用分析

2.1.异步调用分析

dubbo异步是在DubboInvoker(AbstractInvoker)内实现,具体代码如下

//com.alibaba.dubbo.rpc.protocol.AbstractInvoker.invoke(Invocation)public Result invoke(Invocation inv) throws RpcException {    // if invoker is destroyed due to address refresh from registry, let's allow the current invoke to proceed    if (destroyed.get()) {        logger.warn("Invoker for service " + this + " on consumer " + NetUtils.getLocalHost() + " is destroyed, "                    + ", dubbo version is " + Version.getVersion() + ", this invoker should not be used any longer");    }    RpcInvocation invocation = (RpcInvocation) inv;    invocation.setInvoker(this);    if (attachment != null && attachment.size() > 0) {        invocation.addAttachmentsIfAbsent(attachment);    }    Map<String, String> contextAttachments = RpcContext.getContext().getAttachments();    if (contextAttachments != null && contextAttachments.size() != 0) {                invocation.addAttachments(contextAttachments);//把RpcContext的隐式参数保存到RpcInvocation.attachments。如果在上下文设置了async=true就说明使用一部    }    if (getUrl().getMethodParameter(invocation.getMethodName(), Constants.ASYNC_KEY, false)) {//url上有异步标识${方法名}.async=true  或者 default.async=true        invocation.setAttachment(Constants.ASYNC_KEY, Boolean.TRUE.toString());//设置异步标识到RpcInvocation.attachments    }    RpcUtils.attachInvocationIdIfAsync(getUrl(), invocation);    try {        return doInvoke(invocation);//DubboInvoker.doInvoke(Invocation)    } catch (InvocationTargetException e) { // biz exception        //其它忽略    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

就是保存异步标识到RpcInvocation.attachments,异步标识的来源:1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”); 2. url上有${方法名}.async=true

//com.alibaba.dubbo.rpc.protocol.dubbo.DubboInvoker.doInvoke(Invocation)@Overrideprotected Result doInvoke(final Invocation invocation) throws Throwable {    RpcInvocation inv = (RpcInvocation) invocation;    final String methodName = RpcUtils.getMethodName(invocation);    inv.setAttachment(Constants.PATH_KEY, getUrl().getPath());    inv.setAttachment(Constants.VERSION_KEY, version);    ExchangeClient currentClient;    if (clients.length == 1) {        currentClient = clients[0];    } else {        currentClient = clients[index.getAndIncrement() % clients.length];    }    try {        boolean isAsync = RpcUtils.isAsync(getUrl(), invocation);//url上有async=true,认为是异步        boolean isOneway = RpcUtils.isOneway(getUrl(), invocation);//url上有return=false or 隐式参数有return=false,认为是onway,不需要响应        int timeout = getUrl().getMethodParameter(methodName, Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT);        if (isOneway) {            boolean isSent = getUrl().getMethodParameter(methodName, Constants.SENT_KEY, false);            currentClient.send(inv, isSent);            RpcContext.getContext().setFuture(null);            return new RpcResult();        } else if (isAsync) {//dubbo异步调用            ResponseFuture future = currentClient.request(inv, timeout);//网络调用            RpcContext.getContext().setFuture(new FutureAdapter<Object>(future));//隐式参数存放FutureAdapter,封装了ResponseFuture,因此consumer端需要RpcContext.getContext().getFuture()获取Future,然后Future.get()阻塞获取执行结果            return new RpcResult();//返回RpcResult,其value是null,这就是异步调用获取的执行结果是null的原因        } else {//dubbo同步调用            RpcContext.getContext().setFuture(null);            return (Result) currentClient.request(inv, timeout).get();//阻塞获取执行结果        }    } catch (TimeoutException e) {        throw new RpcException(RpcException.TIMEOUT_EXCEPTION, "Invoke remote method timeout. method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);    } catch (RemotingException e) {        throw new RpcException(RpcException.NETWORK_EXCEPTION, "Failed to invoke remote method: " + invocation.getMethodName() + ", provider: " + getUrl() + ", cause: " + e.getMessage(), e);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38

从上面代码发现,异步和同步区别就在于是否直接阻塞获取执行结果而已。异步调用后,业务上可以做一些其它事情,然后再获取异步执行结果。获取执行结果,异步和同步都一样,本质都是com.alibaba.dubbo.remoting.exchange.ResponseFuture.get()。异步调用是把ResponseFuture方到了dubbo上下文,需要客户端手工显示的获取ResponseFuture,接着再get()获取执行结果。

以前时候(dubbo2.5),dubbo异步调用还有个坑,比如A->B—>C,其中A调用B是异步,B调用C是同步,但是实际上B调用C也会异步,因为异步标识被传递了。这个问题在dubbo2.6是不存在的,可以看ContextFilter内,把隐式参数async清除了。

2.2.异步调用判断和异步标识来源

接着看异步的判断:

异步的判断RpcUtils.isAsync(URL, Invocation)

//com.alibaba.dubbo.rpc.support.RpcUtils.isAsync(URL, Invocation)public static boolean isAsync(URL url, Invocation inv) {    boolean isAsync;    if (Boolean.TRUE.toString().equals(inv.getAttachment(Constants.ASYNC_KEY))) {//RpcInvocation.attachments有async=true        isAsync = true;    } else {        isAsync = url.getMethodParameter(getMethodName(inv), Constants.ASYNC_KEY, false);//url上有异步标识${方法名}.async=true  或者 default.async=true    }    return isAsync;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

异步标识的来源:

  1. RpcContext.getContext().setAttachment(Constants.ASYNC_KEY, “true”);
  2. url上有${方法名}.async=true

那么url上有异步标识是通过@Reference(async = true) 或 @Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)}) 配置

@Reference(async = true)

@Reference(methods={@Method(name=“方法名1”,async = true), @Method(name=“方法名2”,async = true)}) 配置 生成的url例子

@Method这样设置异步,具体是在com.alibaba.dubbo.config.ReferenceConfig.init()

3.dubbo异步调用图解

[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bQ87inuU-1649166821546)(https://cdn.jsdelivr.net/gh/zhangyj131/mdpicture/docs/20210817211712.png)]

步骤1. consumer端业务线程调用dubbo引用方法,由IO线程发起网络通信,发送请求给provider

步骤2. netty nio网络通信,异步

步骤3. 设置ResponseFuture到RpcContext上。和步骤2基本是同时进行

步骤4. 从RpcContext获取ResponseFuture。然后consumer端进行其它业务逻辑处理

步骤5. ResponseFuture 阻塞获取provider端执行结果。其中步骤6、7可能执行结果早于步骤5,

步骤6. provider端处理完毕,响应执行结果

步骤7. 设置provider端执行结果到ResponseFuture,并唤醒ResponseFuture

4.总结

dubbo异步是基于NIO的非阻塞实现并行调用,客户端不需要启动多线程即可完成并行调用多个远程服务,相对多线程开销较少。

dubbo异步调用就是请求发送出去,然后业务做一些其它处理,最后再获取异步执行结果。

dubbo异步调用使用场景:

1.用于耗时久的请求调用,且下步业务处理,不依赖这个异步调用结果

2.用于不重要的操作,比如记录用户行为等

dubbo异步在平时工作中使用较少,基本还是同步为主,真要到了dubbo同步无法提高吞吐量和解耦,通常需要改变架构设计了,通常会使用mq进行异步消峰解耦。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发