依赖
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.3.12.RELEASE</version> <relativePath/> <!-- lookup parent from repository --> </parent> <properties> <java.version>1.8</java.version> <spring-cloud.version>Hoxton.SR12</spring-cloud.version> </properties> <!--eureka应用系统定制开发依赖中会自带ribbon依赖--> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId> </dependency> </dependencies> <dependencyManagement> <dependencies> <dependency> <groupId>org.springframework.cloud</groupId> <artifactId>spring-cloud-dependencies</artifactId> <version>${spring-cloud.version}</version> <type>pom</type> <scope>import</scope> </dependency> </dependencies> </dependencyManagement></project>
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
SpringCloud2020.0.1版本之后,去除了ribbbon依赖,使用Load balancer应用系统定制开发进行负载均衡
应用系统定制开发负载均衡配置
spring: application: name: ribbon #项目名字# eureka client配置eureka: client: registryFetchIntervalSeconds: 5 serviceUrl: defaultZone: http://localhost:8098/eureka/ #eureka应用系统定制开发服务端提供的注册地址 应用系统定制开发参考服务端配置的这个路径 instance: hostname: ribbon #此实例注册到eureka服务端的唯一的实例ID prefer-ip-address: true #是否显示IP地址 leaseRenewalIntervalInSeconds: 10 #eureka客户需要多长时间发送心跳给eureka服务器,表明它仍然活着,默认为30 秒 (与下面配置的单位都是秒) leaseExpirationDurationInSeconds: 30 #Eureka服务器在接收到实例的最后一次发出的心跳后,需要等待多久才可以将此实例删除,默认为90秒 health-check-url-path: /actuator/healthribbon: MaxAutoRetries: 2 #最大重试次数,当Eureka中可以找到服务,但是服务连不上时将会重试 MaxAutoRetriesNextServer: 3 #切换实例的重试次数 OkToRetryOnAllOperations: false #对所有操作请求都进行重试,如果是get则可以,如果是post,put等操作没有实现幂等的情况下是很危险的,所以设置为false ConnectTimeout: 5000 #请求连接的超时时间 ReadTimeout: 6000 #请求处理的超时时间 # 调用USER-MGT微服务时使用随机策略USER-MGT: ribbon: NFLoadBalancerRuleClassName: com.netflix.loadbalancer.RandomRule
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
负载均衡原理
以调用下面的接口为例讲解ribbon怎么进行负载均衡
@RequestMapping(value = "/strategy", method = RequestMethod.GET, produces = "application/json")public String testRibbonStrategy() { ResponseEntity<String> forEntity = restTemplate.getForEntity("http://USER-MGT/sequence/number/port", String.class); return forEntity.getBody();}
- 1
- 2
- 3
- 4
- 5
- 6
首先,.getForEntity方法
@Overridepublic <T> ResponseEntity<T> getForEntity(String url, Class<T> responseType, Object... uriVariables) throws RestClientException { RequestCallback requestCallback = acceptHeaderRequestCallback(responseType); ResponseExtractor<ResponseEntity<T>> responseExtractor = responseEntityExtractor(responseType); // execute方法中进行HTTP调用 return nonNull(execute(url, HttpMethod.GET, requestCallback, responseExtractor, uriVariables));}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
execute方法会调用到doExecute方法,他会创建一个客户端HTTP请求,然后进行调用
protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback, @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException { Assert.notNull(url, "URI is required"); Assert.notNull(method, "HttpMethod is required"); ClientHttpResponse response = null; try { // 创建客户端HTTP请求 ClientHttpRequest request = createRequest(url, method); if (requestCallback != null) { requestCallback.doWithRequest(request); } // 执行客户端HTTP调用 response = request.execute(); handleResponse(url, method, response); return (responseExtractor != null ? responseExtractor.extractData(response) : null); } catch (IOException ex) { String resource = url.toString(); String query = url.getRawQuery(); resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource); throw new ResourceAccessException("I/O error on " + method.name() + " request for \"" + resource + "\": " + ex.getMessage(), ex); } finally { if (response != null) { response.close(); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
request.execute()方法会调用到抽象类AbstractClientHttpRequest#execute
public final ClientHttpResponse execute() throws IOException { assertNotExecuted(); // 调用到AbstractBufferingClientHttpRequest ClientHttpResponse result = executeInternal(this.headers); this.executed = true; return result;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
executeInternal方法会调用到AbstractBufferingClientHttpRequest#executeInternal
protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException { byte[] bytes = this.bufferedOutput.toByteArray(); if (headers.getContentLength() < 0) { headers.setContentLength(bytes.length); } // 调用InterceptingClientHttpRequest#executeInternal ClientHttpResponse result = executeInternal(headers, bytes); this.bufferedOutput = new ByteArrayOutputStream(0); return result;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
该方法继续调用InterceptingClientHttpRequest#executeInternal方法
protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException { InterceptingRequestExecution requestExecution = new InterceptingRequestExecution(); return requestExecution.execute(this, bufferedOutput);}
- 1
- 2
- 3
- 4
requestExecution.execute(this, bufferedOutput)会执行以下方法
public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException { if (this.iterator.hasNext()) { // 先处理拦截器 // 获取LoadBalancerInterceptor负载均衡拦截器 ClientHttpRequestInterceptor nextInterceptor = this.iterator.next(); // 进行负载均衡拦截 return nextInterceptor.intercept(request, body, this); }else { // 拦截器处理之后,会再次进入到这里 HttpMethod method = request.getMethod(); Assert.state(method != null, "No standard HTTP method"); // 产生HTTP请求执行的代理对象 ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method); // 添加请求头 request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value)); if (body.length > 0) { // 添加body if (delegate instanceof StreamingHttpOutputMessage) { StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate; streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream)); } else { StreamUtils.copy(body, delegate.getBody()); } } // 代理对象执行HTTP请求 return delegate.execute(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
LoadBalancerInterceptor是负载均衡的关键实现,他会进行HTTP请求前的拦截,根据负载均衡策略选择合适的服务器实例,将http://USER-MGT/sequence/number/port中的USER-MGT微服务实例名替换为对应的IP和端口号,然后返回,交给HTTP层进行实际的调用
public ClientHttpResponse intercept(final HttpRequest request, final byte[] body, final ClientHttpRequestExecution execution) throws IOException { final URI originalUri = request.getURI(); String serviceName = originalUri.getHost(); Assert.state(serviceName != null, "Request URI does not contain a valid hostname: " + originalUri); // 执行负载均衡拦截 return this.loadBalancer.execute(serviceName, this.requestFactory.createRequest(request, body, execution));}public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint) throws IOException { // 获取拦截器 ILoadBalancer loadBalancer = getLoadBalancer(serviceId); // 获取server,调用BaseLoadBalancer#chooseServer Server server = getServer(loadBalancer, hint); if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } // ribbon server RibbonServer ribbonServer = new RibbonServer(serviceId, server, isSecure(server, serviceId), serverIntrospector(serviceId).getMetadata(server)); // 执行 return execute(serviceId, ribbonServer, request);}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
getServer(loadBalancer, hint)这里会调用BaseLoadBalancer#chooseServer,他会使用设置的负载均衡策略选取服务器实例
public Server chooseServer(Object key) { if (counter == null) { counter = createCounter(); } counter.increment(); if (rule == null) { return null; } else { try { // 根据负载均衡策略选择服务实例 return rule.choose(key); } catch (Exception e) { logger.warn("LoadBalancer [{}]: Error choosing server for key {}", name, key, e); return null; } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
rule.choose(key)会调用负载均衡策略的选择实例方法,这里配置的是随机策略RandomRule,即调用RandomRule#choose
public Server choose(ILoadBalancer lb, Object key) { if (lb == null) { return null; } Server server = null; while (server == null) { if (Thread.interrupted()) { return null; } // 可达服务实例列表 List<Server> upList = lb.getReachableServers(); // 所有服务实例列表 List<Server> allList = lb.getAllServers(); int serverCount = allList.size(); if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ return null; } // 根据服务实例数产生随机值 int index = chooseRandomInt(serverCount); // 选择服务实例 server = upList.get(index); if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } // 服务可用,则返回 if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. server = null; Thread.yield(); } return server;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
LoadBalancerInterceptor拦截之后执行execute(serviceId, ribbonServer, request)会调用如下方法,
public <T> T execute(String serviceId, ServiceInstance serviceInstance, LoadBalancerRequest<T> request) throws IOException { Server server = null; // 获取server服务器信息 if (serviceInstance instanceof RibbonServer) { server = ((RibbonServer) serviceInstance).getServer(); } if (server == null) { throw new IllegalStateException("No instances available for " + serviceId); } RibbonLoadBalancerContext context = this.clientFactory.getLoadBalancerContext(serviceId); RibbonStatsRecorder statsRecorder = new RibbonStatsRecorder(context, server); try { // 进行HTTP实际调用,这里会回到InterceptingClientHttpRequest#execute拦截器之后的处理逻辑,进行真正的HTTP调用 T returnVal = request.apply(serviceInstance); statsRecorder.recordStats(returnVal); return returnVal; } // catch IOException and rethrow so RestTemplate behaves correctly catch (IOException ex) { statsRecorder.recordStats(ex); throw ex; } catch (Exception ex) { statsRecorder.recordStats(ex); ReflectionUtils.rethrowRuntimeException(ex); } return null;}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31