和Ribbon的重试机制
的重试机制
配置
- ribbon:
- ReadTimeout: 2000
- ConnectTimeout: 2000
- MaxAutoRetries: 1 #企业管理系统定制开发同一台实例最大重试次数,不包括首次调用
- MaxAutoRetriesNextServer: 3 #重试负载均衡其他的实例最大重试次数,不包括首次调用
- OkToRetryOnAllOperations: true #是否所有操作都重试
结果
可以看到2台机器分别被请求了四次,而且每个时间的间隔大约为2秒左右。一开始请求的是8083接口,请求的次数为8083,8083,8081,8081,8083,8083,8081,8083。
那它的计算公式为:(MaxAutoRetries+ 1)*(MaxAutoRetriesNextServer+1) =8次
源码分析
首先由于在配置文件中配置了超时时间这些信息,一开始注入bean的时候,会读取这些值,覆盖默认值,在HttpClientRibbonConfiguration中
- @Bean
- @ConditionalOnMissingBean(AbstractLoadBalancerAwareClient.class)
- @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
- public RibbonLoadBalancingHttpClient ribbonLoadBalancingHttpClient(
- IClientConfig config, ServerIntrospector serverIntrospector,
- ILoadBalancer loadBalancer, RetryHandler retryHandler,
- CloseableHttpClient httpClient) {
- RibbonLoadBalancingHttpClient client = new RibbonLoadBalancingHttpClient(
- httpClient, config, serverIntrospector);
- client.setLoadBalancer(loadBalancer);
- client.setRetryHandler(retryHandler);
- Monitors.registerObject("Client_" + this.name, client);
- return client;
- }
-
- public RibbonLoadBalancingHttpClient(CloseableHttpClient delegate,
- IClientConfig config, ServerIntrospector serverIntrospector) {
- super(delegate, config, serverIntrospector);
- }
-
- protected AbstractLoadBalancingClient(D delegate, IClientConfig config,
- ServerIntrospector serverIntrospector) {
- super(null);
- this.delegate = delegate;
- this.config = config;
- this.serverIntrospector = serverIntrospector;
- this.setRetryHandler(RetryHandler.DEFAULT);
- initWithNiwsConfig(config);
- }
-
- @Override
- public void initWithNiwsConfig(IClientConfig clientConfig) {
- super.initWithNiwsConfig(clientConfig);
- RibbonProperties ribbon = RibbonProperties.from(clientConfig);
- this.connectTimeout = ribbon.connectTimeout(DEFAULT_CONNECT_TIMEOUT);
- this.readTimeout = ribbon.readTimeout(DEFAULT_READ_TIMEOUT);
- this.secure = ribbon.isSecure();
- this.followRedirects = ribbon.isFollowRedirects();
- this.okToRetryOnAllOperations = ribbon.isOkToRetryOnAllOperations();
- this.gzipPayload = ribbon.isGZipPayload(DEFAULT_GZIP_PAYLOAD);
- }
当方法开始执行的时候,由于前面已经断点过此处就直接放行,一直来到LoadBalancerFeignClient
- @Override
- public Response execute(Request request, Request.Options options) throws IOException {
- try {
- URI asUri = URI.create(request.url());
- String clientName = asUri.getHost();
- URI uriWithoutHost = cleanUrl(request.url(), clientName);
- FeignLoadBalancer.RibbonRequest ribbonRequest = new FeignLoadBalancer.RibbonRequest(
- this.delegate, request, uriWithoutHost);
-
- IClientConfig requestConfig = getClientConfig(options, clientName);
- return lbClient(clientName)
- .executeWithLoadBalancer(ribbonRequest, requestConfig).toResponse();
- }
- catch (ClientException e) {
- IOException io = findIOException(e);
- if (io != null) {
- throw io;
- }
- throw new RuntimeException(e);
- }
- }
在进入到executeWithLoadBalancer的方法中。
- public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException {
- LoadBalancerCommand<T> command = buildLoadBalancerCommand(request, requestConfig);
-
- try {
- return command.submit(
- new ServerOperation<T>() {
- @Override
- public Observable<T> call(Server server) {
- URI finalUri = reconstructURIWithServer(server, request.getUri());
- S requestForServer = (S) request.replaceUri(finalUri);
- try {
- return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig));
- }
- catch (Exception e) {
- return Observable.error(e);
- }
- }
- })
- .toBlocking()
- .single();
- } catch (Exception e) {
- Throwable t = e.getCause();
- if (t instanceof ClientException) {
- throw (ClientException) t;
- } else {
- throw new ClientException(e);
- }
- }
-
- }
executeWithLoadBalancer
方法中会创建一个LoadBalancerCommand
,然后调用LoadBalancerCommand
的submit
方法提交请求Operation,submit
方法源码如下(有删减):
- public Observable<T> submit(final ServerOperation<T> operation) {
- // .......
- //  获取重试次数
- final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
- final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
- // Use the load balancer
- Observable<T> o = (server == null ? selectServer() : Observable.just(server))
- .concatMap(new Func1<Server, Observable<T>>() {
- @Override
- public Observable<T> call(Server server) {
- //.......
- // 相同节点的重试
- if (maxRetrysSame > 0)
- o = o.retry(retryPolicy(maxRetrysSame, true));
- return o;
- }
- });
- // 不同节点的重试
- if (maxRetrysNext > 0 && server == null)
- o = o.retry(retryPolicy(maxRetrysNext, false));
- return o.onErrorResumeNext(...);
方法中调用retryHandler的getMaxRetriesOnSameServer方法和getMaxRetriesOnNextServer方法分别获取配置maxRetrysSame、maxRetrysNext。maxRetrysSame表示调用相同节点的重试次数,默认为0;maxRetrysNext表示调用不同节点的重试次数,默认为1。
retryPolicy方法返回的是一个包装RetryHandler重试决策者的RxJava API的对象,最终由该RetryHandler决定是否需要重试,如抛出的异常是否允许重试。而是否达到最大重试次数则是在retryPolicy返回的Func2中完成,这是RxJava的API,retryPolicy方法的源码如下。
- private Func2<Integer, Throwable, Boolean> retryPolicy(final int maxRetrys, final boolean same) {
- return new Func2<Integer, Throwable, Boolean>() {
- @Override
- public Boolean call(Integer tryCount, Throwable e) {
- if (e instanceof AbortExecutionException) {
- return false;
- }
- // 大于最大重试次数
- if (tryCount > maxRetrys) {
- return false;
- }
- if (e.getCause() != null && e instanceof RuntimeException) {
- e = e.getCause();
- }
- // 调用RetryHandler判断是否重试
- return retryHandler.isRetriableException(e, same);
- }
- };
那么这个retryHandler是怎么来的呢?
FeignLoadBalancer的executeWithLoadBalancer方法中调用buildLoadBalancerCommand方法构造LoadBalancerCommand对象时创建的,buildLoadBalancerCommand方法源码如下。
- protected LoadBalancerCommand<T> buildLoadBalancerCommand(final S request, final IClientConfig config) {
- // 获取RetryHandler
- RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config);
- // 使用Builder构造者模式构造LoadBalancerCommand
- LoadBalancerCommand.Builder<T> builder = LoadBalancerCommand.<T>builder()
- .withLoadBalancerContext(this)
- // 传入RetryHandler
- .withRetryHandler(handler)
- .withLoadBalancerURI(request.getUri());
- return builder.build();
- }
从源码中可以看出,Ribbon
使用的RetryHandler
是RequestSpecificRetryHandler
。这里还用到了Builder
构造者模式。FeignLoadBalancer
的getRequestSpecificRetryHandler
方法源码如下:
- @Override
- public RequestSpecificRetryHandler getRequestSpecificRetryHandler(
- RibbonRequest request, IClientConfig requestConfig) {
- //.....
- if (!request.toRequest().httpMethod().name().equals("GET")) {
- // 调用this.getRetryHandler()方法获取一次RetryHandler
- return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(),
- requestConfig);
- }
- else {
- // 调用this.getRetryHandler()方法获取一次RetryHandler
- return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(),
- requestConfig);
- }
Feign的重试机制
因为ribbon的重试机制和Feign的重试机制有冲突,所以源码中默认关闭Feign的重试机制,源码如下
配置
- feign:
- client:
- config:
- ServiceA:
- connect-timeout: 3000
- read-timeout: 3000
-
- @Bean
- public Retryer feignRetryer() {
- return new Retryer.Default(100, SECONDS.toMillis(1), 7);
- }
结果
会发现2台机器是轮休的访问,时间超过3秒多一点,是因为期间有一个休眠,而且feign的超时和ribbon的超时是不一样的,而且如果同时都配置的话,生效的只是Feign,那下面就分析一下看看。
源码分析
SynchronousMethodHandler的反射方法
- @Override
- public Object invoke(Object[] argv) throws Throwable {
- RequestTemplate template = buildTemplateFromArgs.create(argv);
- Options options = findOptions(argv);
- Retryer retryer = this.retryer.clone();
- while (true) {
- try {
- return executeAndDecode(template, options);
- } catch (RetryableException e) {
- try {
- retryer.continueOrPropagate(e);
- } catch (RetryableException th) {
- Throwable cause = th.getCause();
- if (propagationPolicy == UNWRAP && cause != null) {
- throw cause;
- } else {
- throw th;
- }
- }
- if (logLevel != Logger.Level.NONE) {
- logger.logRetry(metadata.configKey(), logLevel);
- }
- continue;
- }
- }
- }
在它的方法里面下面就是核心的尝试机制,前面嵌套一层循环。此处的retryer已经拿到了配置的重试参数。
后面会进入到下面的这个方法中,这里就是核心,会创建一个handler,由于创建的类型不是默认的,会创建FeignOptionsClientConfig,导致之前配置的ribbon的会失效,这样它重试的次数和最大的次数都是-1,int值的时候会变成0,就会走feign的重试机制
如果超时的会就会来到一开始的方法,被catch中,核心代码如下:
- public void continueOrPropagate(RetryableException e) {
- //如果重试机制大于最大的尝试次数,则抛出异常
- if (attempt++ >= maxAttempts) {
- throw e;
- }
-
- long interval;
- if (e.retryAfter() != null) {
- //根据上次重试的时间减去当前时间来决定重试的间隔
- interval = e.retryAfter().getTime() - currentTimeMillis();
- if (interval > maxPeriod) {
- interval = maxPeriod;
- }
- if (interval < 0) {
- return;
- }
- } else {
- interval = nextMaxInterval();
- }
- try {
- Thread.sleep(interval);
- } catch (InterruptedException ignored) {
- Thread.currentThread().interrupt();
- throw e;
- }
- sleptForMillis += interval;
- }