企业管理系统定制开发Feign和Ribbon的重试机制

和Ribbon的重试机制

的重试机制

配置

  1. ribbon:
  2. ReadTimeout: 2000
  3. ConnectTimeout: 2000
  4. MaxAutoRetries: 1 #企业管理系统定制开发同一台实例最大重试次数,不包括首次调用
  5. MaxAutoRetriesNextServer: 3 #重试负载均衡其他的实例最大重试次数,不包括首次调用
  6. OkToRetryOnAllOperations: true #是否所有操作都重试

结果

可以看到2台机器分别被请求了四次,而且每个时间的间隔大约为2秒左右。一开始请求的是8083接口,请求的次数为8083,8083,8081,8081,8083,8083,8081,8083。

那它的计算公式为:(MaxAutoRetries+ 1)*(MaxAutoRetriesNextServer+1) =8次

源码分析

首先由于在配置文件中配置了超时时间这些信息,一开始注入bean的时候,会读取这些值,覆盖默认值,在HttpClientRibbonConfiguration中

  1. @Bean
  2. @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
  3. @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
  4. public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
  5. IClientConfig config, ServerIntrospector serverIntrospector,
  6. ILoadBalancer loadBalancer, RetryHandler retryHandler,
  7. CloseableHttpClient httpClient) {
  8. RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
  9. httpClient, config, serverIntrospector);
  10. client.setLoadBalancer(loadBalancer);
  11. client.setRetryHandler(retryHandler);
  12. Monitors.registerObject("Client_" + this.name, client);
  13. return client;
  14. }
  15. public RibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
  16. IClientConfig config, ServerIntrospector serverIntrospector) {
  17. super(delegate, config, serverIntrospector);
  18. }
  19. protected AbstractLoadBalancingClient(D delegate, IClientConfig config,
  20. ServerIntrospector serverIntrospector) {
  21. super(null);
  22. this.delegate = delegate;
  23. this.config = config;
  24. this.serverIntrospector = serverIntrospector;
  25. this.setRetryHandler(RetryHandler.DEFAULT);
  26. initWithNiwsConfig(config);
  27. }
  28. @Override
  29. public void initWithNiwsConfig(IClientConfig clientConfig) {
  30. super.initWithNiwsConfig(clientConfig);
  31. RibbonProperties ribbon = RibbonProperties.from(clientConfig);
  32. this.connectTimeout = ribbon.connectTimeout(DEFAULT_CONNECT_TIMEOUT);
  33. this.readTimeout = ribbon.readTimeout(DEFAULT_READ_TIMEOUT);
  34. this.secure = ribbon.isSecure();
  35. this.followRedirects = ribbon.isFollowRedirects();
  36. this.okToRetryOnAllOperations = ribbon.isOkToRetryOnAllOperations();
  37. this.gzipPayload = ribbon.isGZipPayload(DEFAULT_GZIP_PAYLOAD);
  38. }

当方法开始执行的时候,由于前面已经断点过此处就直接放行,一直来到LoadBalancerFeignClient

  1. @Override
  2. public Response execute(Request request, Request.Options options) throws IOException {
  3. try {
  4. URI asUri = URI.create(request.url());
  5. String clientName = asUri.getHost();
  6. URI uriWithoutHost = cleanUrl(request.url(), clientName);
  7. FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
  8. this.delegate, request, uriWithoutHost);
  9. IClientConfig requestConfig = getClientConfig(options, clientName);
  10. return lbClient(clientName)
  11. .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
  12. }
  13. catch (ClientException e) {
  14. IOException io = findIOException(e);
  15. if (io != null) {
  16. throw io;
  17. }
  18. throw new RuntimeException(e);
  19. }
  20. }

在进入到executeWithLoadBalancer的方法中。

  1. public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
  2. LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
  3. try {
  4. return command.submit(
  5. new ServerOperation<T>() {
  6. @Override
  7. public Observable<T> call(Server server) {
  8. URI finalUri = reconstructURIWithServer(server, request.getUri());
  9. S requestForServer = (S) request.replaceUri(finalUri);
  10. try {
  11. return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
  12. }
  13. catch (Exception e) {
  14. return Observable.error(e);
  15. }
  16. }
  17. })
  18. .toBlocking()
  19. .single();
  20. } catch (Exception e) {
  21. Throwable t = e.getCause();
  22. if (t instanceof ClientException) {
  23. throw (ClientException) t;
  24. } else {
  25. throw new ClientException(e);
  26. }
  27. }
  28. }

executeWithLoadBalancer方法中会创建一个LoadBalancerCommand,然后调用LoadBalancerCommandsubmit方法提交请求Operation,submit方法源码如下(有删减):

  1. public Observable<T> submit(final ServerOperation<T> operation) {
  2. // .......
  3. // &emsp;获取重试次数
  4. final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
  5. final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
  6. // Use the load balancer
  7. Observable<T> o = (server == null ? selectServer() : Observable.just(server))
  8. .concatMap(new Func1<Server, Observable<T>>() {
  9. @Override
  10. public Observable<T> call(Server server) {
  11. //.......
  12. // 相同节点的重试
  13. if (maxRetrysSame > 0)
  14. o = o.retry(retryPolicy(maxRetrysSame, true));
  15. return o;
  16. }
  17. });
  18. // 不同节点的重试
  19. if (maxRetrysNext > 0 && server == null)
  20. o = o.retry(retryPolicy(maxRetrysNext, false));
  21. return o.onErrorResumeNext(...);

方法中调用retryHandler的getMaxRetriesOnSameServer方法和getMaxRetriesOnNextServer方法分别获取配置maxRetrysSame、maxRetrysNext。maxRetrysSame表示调用相同节点的重试次数,默认为0;maxRetrysNext表示调用不同节点的重试次数,默认为1。

retryPolicy方法返回的是一个包装RetryHandler重试决策者的RxJava API的对象,最终由该RetryHandler决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在retryPolicy返回的Func2中完成,这是RxJava的API,retryPolicy方法的源码如下。

  1. private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
  2. return new Func2<Integer, Throwable, Boolean>() {
  3. @Override
  4. public Boolean call(Integer tryCount, Throwable e) {
  5. if (e instanceof AbortExecutionException) {
  6. return false;
  7. }
  8. // 大于最大重试次数
  9. if (tryCount > maxRetrys) {
  10. return false;
  11. }
  12. if (e.getCause() != null && e instanceof RuntimeException) {
  13. e = e.getCause();
  14. }
  15. // 调用RetryHandler判断是否重试
  16. return retryHandler.isRetriableException(e, same);
  17. }
  18. };

那么这个retryHandler是怎么来的呢?

FeignLoadBalancer的executeWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象时创建的,buildLoadBalancerCommand方法源码如下。

  1. protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
  2. // 获取RetryHandler
  3. RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
  4. // 使用Builder构造者模式构造LoadBalancerCommand
  5. LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
  6. .withLoadBalancerContext(this)
  7. // 传入RetryHandler
  8. .withRetryHandler(handler)
  9. .withLoadBalancerURI(request.getUri());
  10. return builder.build();
  11. }

从源码中可以看出,Ribbon使用的RetryHandlerRequestSpecificRetryHandler。这里还用到了Builder构造者模式。FeignLoadBalancergetRequestSpecificRetryHandler方法源码如下:

  1. @Override
  2. public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
  3. RibbonRequest request, IClientConfig requestConfig) {
  4. //.....
  5. if (!request.toRequest().httpMethod().name().equals("GET")) {
  6. // 调用this.getRetryHandler()方法获取一次RetryHandler
  7. return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
  8. requestConfig);
  9. }
  10. else {
  11. // 调用this.getRetryHandler()方法获取一次RetryHandler
  12. return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
  13. requestConfig);
  14. }

Feign的重试机制

因为ribbon的重试机制和Feign的重试机制有冲突,所以源码中默认关闭Feign的重试机制,源码如下

配置

  1. feign:
  2. client:
  3. config:
  4. ServiceA:
  5. connect-timeout: 3000
  6. read-timeout: 3000
  7. @Bean
  8. public Retryer feignRetryer() {
  9. return new Retryer.Default(100, SECONDS.toMillis(1), 7);
  10. }

结果

会发现2台机器是轮休的访问,时间超过3秒多一点,是因为期间有一个休眠,而且feign的超时和ribbon的超时是不一样的,而且如果同时都配置的话,生效的只是Feign,那下面就分析一下看看。

源码分析

SynchronousMethodHandler的反射方法

  1. @Override
  2. public Object invoke(Object[] argv) throws Throwable {
  3. RequestTemplate template = buildTemplateFromArgs.create(argv);
  4. Options options = findOptions(argv);
  5. Retryer retryer = this.retryer.clone();
  6. while (true) {
  7. try {
  8. return executeAndDecode(template, options);
  9. } catch (RetryableException e) {
  10. try {
  11. retryer.continueOrPropagate(e);
  12. } catch (RetryableException th) {
  13. Throwable cause = th.getCause();
  14. if (propagationPolicy == UNWRAP && cause != null) {
  15. throw cause;
  16. } else {
  17. throw th;
  18. }
  19. }
  20. if (logLevel != Logger.Level.NONE) {
  21. logger.logRetry(metadata.configKey(), logLevel);
  22. }
  23. continue;
  24. }
  25. }
  26. }

在它的方法里面下面就是核心的尝试机制,前面嵌套一层循环。此处的retryer已经拿到了配置的重试参数。

后面会进入到下面的这个方法中,这里就是核心,会创建一个handler,由于创建的类型不是默认的,会创建FeignOptionsClientConfig,导致之前配置的ribbon的会失效,这样它重试的次数和最大的次数都是-1,int值的时候会变成0,就会走feign的重试机制

如果超时的会就会来到一开始的方法,被catch中,核心代码如下:

  1. public void continueOrPropagate(RetryableException e) {
  2. //如果重试机制大于最大的尝试次数,则抛出异常
  3. if (attempt++ >= maxAttempts) {
  4. throw e;
  5. }
  6. long interval;
  7. if (e.retryAfter() != null) {
  8. //根据上次重试的时间减去当前时间来决定重试的间隔
  9. interval = e.retryAfter().getTime() - currentTimeMillis();
  10. if (interval > maxPeriod) {
  11. interval = maxPeriod;
  12. }
  13. if (interval < 0) {
  14. return;
  15. }
  16. } else {
  17. interval = nextMaxInterval();
  18. }
  19. try {
  20. Thread.sleep(interval);
  21. } catch (InterruptedException ignored) {
  22. Thread.currentThread().interrupt();
  23. throw e;
  24. }
  25. sleptForMillis += interval;
  26. }

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发