应用系统定制开发Ribbon负载均衡的原理

依赖

    <parent>        <groupId>org.springframework.boot</groupId>        <artifactId>spring-boot-starter-parent</artifactId>        <version>2.3.12.RELEASE</version>        <relativePath/> <!-- lookup parent from repository -->    </parent>    <properties>        <java.version>1.8</java.version>        <spring-cloud.version>Hoxton.SR12</spring-cloud.version>    </properties>    <!--eureka应用系统定制开发依赖中会自带ribbon依赖-->    <dependencies>        <dependency>            <groupId>org.springframework.cloud</groupId>            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>        </dependency>    </dependencies>    <dependencyManagement>        <dependencies>            <dependency>                <groupId>org.springframework.cloud</groupId>                <artifactId>spring-cloud-dependencies</artifactId>                <version>${spring-cloud.version}</version>                <type>pom</type>                <scope>import</scope>            </dependency>        </dependencies>    </dependencyManagement></project>
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32

SpringCloud2020.0.1版本之后,去除了ribbbon依赖,使用Load balancer应用系统定制开发进行负载均衡

应用系统定制开发负载均衡配置

spring:  application:    name: ribbon   #项目名字# eureka client配置eureka:  client:    registryFetchIntervalSeconds: 5    serviceUrl:      defaultZone: http://localhost:8098/eureka/  #eureka应用系统定制开发服务端提供的注册地址 应用系统定制开发参考服务端配置的这个路径  instance:    hostname: ribbon #此实例注册到eureka服务端的唯一的实例ID    prefer-ip-address: true #是否显示IP地址    leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒)    leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒    health-check-url-path: /actuator/healthribbon:  MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试  MaxAutoRetriesNextServer: 3 #切换实例的重试次数  OkToRetryOnAllOperations: false  #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false  ConnectTimeout: 5000  #请求连接的超时时间  ReadTimeout: 6000 #请求处理的超时时间  # 调用USER-MGT微服务时使用随机策略USER-MGT:  ribbon:    NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28

负载均衡原理

以调用下面的接口为例讲解ribbon怎么进行负载均衡

@RequestMapping(value = "/strategy", method = RequestMethod.GET, produces = "application/json")public String testRibbonStrategy() {    ResponseEntity<String> forEntity =            restTemplate.getForEntity("http://USER-MGT/sequence/number/port", String.class);    return forEntity.getBody();}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6

首先,.getForEntity方法

@Overridepublic <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables)      throws RestClientException {   RequestCallback requestCallback = acceptHeaderRequestCallback(responseType);   ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType);    // execute方法中进行HTTP调用   return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8

execute方法会调用到doExecute方法,他会创建一个客户端HTTP请求,然后进行调用

protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,      @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {   Assert.notNull(url, "URI is required");   Assert.notNull(method, "HttpMethod is required");   ClientHttpResponse response = null;   try {       // 创建客户端HTTP请求      ClientHttpRequest request = createRequest(url, method);      if (requestCallback != null) {         requestCallback.doWithRequest(request);      }       // 执行客户端HTTP调用      response = request.execute();      handleResponse(url, method, response);      return (responseExtractor != null ? responseExtractor.extractData(response) : null);   }   catch (IOException ex) {      String resource = url.toString();      String query = url.getRawQuery();      resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);      throw new ResourceAccessException("I/O error on " + method.name() +            " request for \"" + resource + "\": " + ex.getMessage(), ex);   }   finally {      if (response != null) {         response.close();      }   }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30

request.execute()方法会调用到抽象类AbstractClientHttpRequest#execute

public final ClientHttpResponse execute() throws IOException {   assertNotExecuted();    // 调用到AbstractBufferingClientHttpRequest   ClientHttpResponse result = executeInternal(this.headers);   this.executed = true;   return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7

executeInternal方法会调用到AbstractBufferingClientHttpRequest#executeInternal

protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {   byte[] bytes = this.bufferedOutput.toByteArray();   if (headers.getContentLength() < 0) {      headers.setContentLength(bytes.length);   }    // 调用InterceptingClientHttpRequest#executeInternal   ClientHttpResponse result = executeInternal(headers, bytes);   this.bufferedOutput = new ByteArrayOutputStream(0);   return result;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10

该方法继续调用InterceptingClientHttpRequest#executeInternal方法

protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {   InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();   return requestExecution.execute(this, bufferedOutput);}
  • 1
  • 2
  • 3
  • 4

requestExecution.execute(this, bufferedOutput)会执行以下方法

public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {   if (this.iterator.hasNext()) { // 先处理拦截器       // 获取LoadBalancerInterceptor负载均衡拦截器      ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();       // 进行负载均衡拦截      return nextInterceptor.intercept(request, body, this);   }else { // 拦截器处理之后,会再次进入到这里      HttpMethod method = request.getMethod();      Assert.state(method != null, "No standard HTTP method");       // 产生HTTP请求执行的代理对象      ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);       // 添加请求头      request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));      if (body.length > 0) { // 添加body         if (delegate instanceof StreamingHttpOutputMessage) {            StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;            streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));         } else {            StreamUtils.copy(body, delegate.getBody());         }      }       // 代理对象执行HTTP请求      return delegate.execute();   }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

LoadBalancerInterceptor是负载均衡的关键实现,他会进行HTTP请求前的拦截,根据负载均衡策略选择合适的服务器实例,将http://USER-MGT/sequence/number/port中的USER-MGT微服务实例名替换为对应的IP和端口号,然后返回,交给HTTP层进行实际的调用

public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,      final ClientHttpRequestExecution execution) throws IOException {   final URI originalUri = request.getURI();   String serviceName = originalUri.getHost();   Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri);    // 执行负载均衡拦截   return this.loadBalancer.execute(serviceName,                                     this.requestFactory.createRequest(request, body, execution));}public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)      throws IOException {    // 获取拦截器   ILoadBalancer loadBalancer = getLoadBalancer(serviceId);    // 获取server,调用BaseLoadBalancer#chooseServer   Server server = getServer(loadBalancer, hint);   if (server == null) {      throw new IllegalStateException("No instances available for " + serviceId);   }    // ribbon server   RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId),         serverIntrospector(serviceId).getMetadata(server));    // 执行   return execute(serviceId, ribbonServer, request);}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25

getServer(loadBalancer, hint)这里会调用BaseLoadBalancer#chooseServer,他会使用设置的负载均衡策略选取服务器实例

public Server chooseServer(Object key) {    if (counter == null) {        counter = createCounter();    }    counter.increment();    if (rule == null) {        return null;    } else {        try {            // 根据负载均衡策略选择服务实例            return rule.choose(key);        } catch (Exception e) {            logger.warn("LoadBalancer [{}]:  Error choosing server for key {}", name, key, e);            return null;        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

rule.choose(key)会调用负载均衡策略的选择实例方法,这里配置的是随机策略RandomRule,即调用RandomRule#choose

public Server choose(ILoadBalancer lb, Object key) {    if (lb == null) {        return null;    }    Server server = null;    while (server == null) {        if (Thread.interrupted()) {            return null;        }        // 可达服务实例列表        List<Server> upList = lb.getReachableServers();        // 所有服务实例列表        List<Server> allList = lb.getAllServers();        int serverCount = allList.size();        if (serverCount == 0) {            /*             * No servers. End regardless of pass, because subsequent passes             * only get more restrictive.             */            return null;        }        // 根据服务实例数产生随机值        int index = chooseRandomInt(serverCount);        // 选择服务实例        server = upList.get(index);        if (server == null) {            /*             * The only time this should happen is if the server list were             * somehow trimmed. This is a transient condition. Retry after             * yielding.             */            Thread.yield();            continue;        }        // 服务可用,则返回        if (server.isAlive()) {            return (server);        }        // Shouldn't actually happen.. but must be transient or a bug.        server = null;        Thread.yield();    }    return server;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47

LoadBalancerInterceptor拦截之后执行execute(serviceId, ribbonServer, request)会调用如下方法,

public <T> T execute(String serviceId, ServiceInstance serviceInstance,      LoadBalancerRequest<T> request) throws IOException {   Server server = null;    // 获取server服务器信息   if (serviceInstance instanceof RibbonServer) {      server = ((RibbonServer) serviceInstance).getServer();   }   if (server == null) {      throw new IllegalStateException("No instances available for " + serviceId);   }   RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId);   RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server);   try {       // 进行HTTP实际调用,这里会回到InterceptingClientHttpRequest#execute拦截器之后的处理逻辑,进行真正的HTTP调用      T returnVal = request.apply(serviceInstance);      statsRecorder.recordStats(returnVal);      return returnVal;   }   // catch IOException and rethrow so RestTemplate behaves correctly   catch (IOException ex) {      statsRecorder.recordStats(ex);      throw ex;   }   catch (Exception ex) {      statsRecorder.recordStats(ex);      ReflectionUtils.rethrowRuntimeException(ex);   }   return null;}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发