企业管理系统定制开发Ribbon源码分析

是Netflix企业管理系统定制开发公司开源的一个客户端企业管理系统定制开发负载均衡的项目,一般配合Eureka使用。企业管理系统定制开发不过为了降低其他干扰因素,专注于Ribbon,企业管理系统定制开发这一次我们脱离Eureka讲Ribbon。

企业管理系统定制开发上一篇我们讲了,企业管理系统定制开发今天这一篇打算使用@LoadBalanced企业管理系统定制开发注解使得具有负载均衡的能力。

一、企业管理系统定制开发简单的例子

首先引入ribbon的依赖

  1. <dependency>
  2. <groupId>org.springframework.cloud</groupId>
  3. <artifactId>spring-cloud-starter-netflix-ribbon</artifactId>
  4. <version>2.1.4.RELEASE</version>
  5. </dependency>

企业管理系统定制开发接着直接为RestTemplate的Bean打上@LoadBalanced的注解

  1. @Configuration
  2. public class RestTemplateConfig {
  3. @Bean
  4. @LoadBalanced
  5. public RestTemplate getRestTemplate() {
  6. return new RestTemplate();
  7. }
  8. }

项目配置,demo企业管理系统定制开发是器访问的服务名称

企业管理系统定制开发我们接下来会在8081与8082端口启动两个服务demo1与demo2,不过他们的application.name都为demo

  1. server.port=8080
  2. demo.ribbon.listOfServers=http://localhost:8081,http://localhost:8082

在Controller中直接访问demo服务

  1. @RestController
  2. public class Controller {
  3. @Autowired
  4. RestTemplate restTemplate;
  5. @RequestMapping("/test")
  6. public String test() {
  7. String result = restTemplate.getForObject("http://demo/test", String.class);
  8. System.out.println(result);
  9. return result;
  10. }
  11. }

这个是demo1项目的controller,demo2返回8082

  1. @RestController
  2. public class Controller {
  3. @RequestMapping("/test")
  4. public String test() {
  5. return "8081";
  6. }
  7. }

demo1项目的配置,demo2的端口为8082,name依然为demo

  1. spring.application.name=demo
  2. server.port=8081

这个时候我们访问localhost:8080/test

依次返回8081与8082,证明RestTemplate具有了负载均衡的能力。

当我们关闭demo2,再进行多次调用时,发现会依次返回8081与500错误。

ps:请记住这里的现象,之后会用源码进行解释为什么会出现这样的现象。


二、@LoadBalanced注解内幕

LoadBalanced是个组合注解,点进去发现

  1. /**
  2. * Annotation to mark a RestTemplate bean to be configured to use a LoadBalancerClient.
  3. * @author Spencer Gibb
  4. */
  5. @Target({ ElementType.FIELD, ElementType.PARAMETER, ElementType.METHOD })
  6. @Retention(RetentionPolicy.RUNTIME)
  7. @Documented
  8. @Inherited
  9. @Qualifier
  10. public @interface LoadBalanced {
  11. }

通过最上面的注释,我们可以了解到:

LoadBalance标记的RestTemplate,这个RestTemplate之后将会使用LoadBalancerClient来配置自己。

在idea中双击Shift打开全局搜索,可以找到该接口。

public interface LoadBalancerClient extends ServiceInstanceChooser

其中ServiceInstanceChooser接口中只有一个抽象方法:

  1. public interface ServiceInstanceChooser {
  2. ServiceInstance choose(String serviceId);
  3. }

该方法由传入的服务id,会从负载均衡器中挑选出来一个服务实例,服务实例使用ServiceInstance封装。

ServiceInstance的方法如下:

 LoadBalancerClient有以下的抽象方法:

  1. public interface LoadBalancerClient extends ServiceInstanceChooser {
  2. //从负载均衡器中挑选出来一个服务实例,然后请求该实例
  3. <T> T execute(String serviceId, LoadBalancerRequest<T> request) throws IOException;
  4. <T> T execute(String serviceId, ServiceInstance serviceInstance,
  5. LoadBalancerRequest<T> request) throws IOException;
  6. //重构url,即将服务名称转化为ip:port的形式,instance参数决定使用哪个ip端口
  7. URI reconstructURI(ServiceInstance instance, URI original);
  8. }

在@LoadBalanced注解、LoadBalancerClient与ServiceInstanceChooser旁边,我们还发现了LoadBalancerAutoConfiguration,他是一个负载均衡器的自动配置类。


三、LoadBalancerAutoConfiguration自动配置类分析

LoadBalancerAutoConfiguration源码如下:

  1. @Configuration
  2. @ConditionalOnClass(RestTemplate.class)
  3. @ConditionalOnBean(LoadBalancerClient.class)
  4. @EnableConfigurationProperties(LoadBalancerRetryProperties.class)
  5. public class LoadBalancerAutoConfiguration {
  6. @LoadBalanced
  7. @Autowired(required = false)
  8. private List<RestTemplate> restTemplates = Collections.emptyList();
  9. @Autowired(required = false)
  10. private List<LoadBalancerRequestTransformer> transformers = Collections.emptyList();
  11. @Bean
  12. public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
  13. final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
  14. return () -> restTemplateCustomizers.ifAvailable(customizers -> {
  15. for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
  16. for (RestTemplateCustomizer customizer : customizers) {
  17. customizer.customize(restTemplate);
  18. }
  19. }
  20. });
  21. }
  22. @Bean
  23. @ConditionalOnMissingBean
  24. public LoadBalancerRequestFactory loadBalancerRequestFactory(
  25. LoadBalancerClient loadBalancerClient) {
  26. return new LoadBalancerRequestFactory(loadBalancerClient, this.transformers);
  27. }
  28. @Configuration
  29. @ConditionalOnMissingClass("org.springframework.retry.support.RetryTemplate")
  30. static class LoadBalancerInterceptorConfig {
  31. @Bean
  32. public LoadBalancerInterceptor ribbonInterceptor(
  33. LoadBalancerClient loadBalancerClient,
  34. LoadBalancerRequestFactory requestFactory) {
  35. return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
  36. }
  37. @Bean
  38. @ConditionalOnMissingBean
  39. public RestTemplateCustomizer restTemplateCustomizer(
  40. final LoadBalancerInterceptor loadBalancerInterceptor) {
  41. return restTemplate -> {
  42. List<ClientHttpRequestInterceptor> list = new ArrayList<>(
  43. restTemplate.getInterceptors());
  44. list.add(loadBalancerInterceptor);
  45. restTemplate.setInterceptors(list);
  46. };
  47. }
  48. }
  49. @Configuration
  50. @ConditionalOnClass(RetryTemplate.class)
  51. public static class RetryAutoConfiguration {
  52. ....
  53. }
  54. @Configuration
  55. @ConditionalOnClass(RetryTemplate.class)
  56. public static class RetryInterceptorAutoConfiguration {
  57. ...
  58. }
  59. }

@Configuration代表该类是一个配置类

@ConditionalOnClass(RestTemplate.class)

@ConditionalOnBean(LoadBalancerClient.class)

这两个注解则说明如果要实现Ribbon的自动配置,则需要能加载到RestTemplate类,以及存在LoadBalancerClient的接口实现类。

RestTemplate我们在一开始就注入到容器当中了,而我们引入的netflix.ribbon依赖包中有LoadBalancerClient的接口实现类,即RibbonLoadBalancerClient,这个类我们后面再讲。

  1. @LoadBalanced
  2. @Autowired(required = false)
  3. private List<RestTemplate> restTemplates = Collections.emptyList();

使用@Autowired加@LoadBalanced注解,将会使得容器向restTemplates 集合中注入被@LoadBalanced注解修饰的RestTemplate。

我们把RetryAutoConfiguration和RetryInterceptorAutoConfiguration自动配置代码省略掉了,因为@ConditionalOnClass(RetryTemplate.class)需要当前项目存在RetryTemplate类,但是我们并没有引入。

在静态类LoadBalancerInterceptorConfig 中,就做了两件事:

(1)向容器中注入LoadBalancerInterceptor拦截器

  1. @Bean
  2. public LoadBalancerInterceptor ribbonInterceptor(
  3. LoadBalancerClient loadBalancerClient,
  4. LoadBalancerRequestFactory requestFactory) {
  5. return new LoadBalancerInterceptor(loadBalancerClient, requestFactory);
  6. }

(2)返回一个RestTemplateCustomizer的实现类,只不过使用lambda简化了代码。之后获取入参restTemplate的所有拦截器集合,并将当前loadBalancerInterceptor拦截器加入到集合中,最后使用setInterceptors(list)保存当前拦截器集合

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public RestTemplateCustomizer restTemplateCustomizer(
  4. final LoadBalancerInterceptor loadBalancerInterceptor) {
  5. return restTemplate -> {
  6. List<ClientHttpRequestInterceptor> list = new ArrayList<>(
  7. restTemplate.getInterceptors());
  8. list.add(loadBalancerInterceptor);
  9. restTemplate.setInterceptors(list);
  10. };
  11. }

loadBalancedRestTemplateInitializerDeprecated方法,入参是所有RestTemplateCustomizer的实现类(上文刚使用@Bean将返回的RestTemplateCustomizer的实现类注入到了容器中)。

遍历所有被@LoadBalanced修饰的RestTemplate,依次调用customize方法,即为RestTemplate添加LoadBalancerInterceptor拦截器。

  1. public SmartInitializingSingleton loadBalancedRestTemplateInitializerDeprecated(
  2. final ObjectProvider<List<RestTemplateCustomizer>> restTemplateCustomizers) {
  3. return () -> restTemplateCustomizers.ifAvailable(customizers -> {
  4. for (RestTemplate restTemplate : LoadBalancerAutoConfiguration.this.restTemplates) {
  5. for (RestTemplateCustomizer customizer : customizers) {
  6. customizer.customize(restTemplate);
  7. }
  8. }
  9. });
  10. }

LoadBalancerInterceptor拦截器内部到底做了什么呢?


四、LoadBalancerInterceptor拦截器

  1. public class LoadBalancerInterceptor implements ClientHttpRequestInterceptor {
  2. private LoadBalancerClient loadBalancer;
  3. private LoadBalancerRequestFactory requestFactory;
  4. public LoadBalancerInterceptor(LoadBalancerClient loadBalancer,
  5. LoadBalancerRequestFactory requestFactory) {
  6. this.loadBalancer = loadBalancer;
  7. this.requestFactory = requestFactory;
  8. }
  9. public LoadBalancerInterceptor(LoadBalancerClient loadBalancer) {
  10. // for backwards compatibility
  11. this(loadBalancer, new LoadBalancerRequestFactory(loadBalancer));
  12. }
  13. @Override
  14. public ClientHttpResponse intercept(final HttpRequest request, final byte[] body,
  15. final ClientHttpRequestExecution execution) throws IOException {
  16. //获取要访问的地址,即例子中的http://demo/test
  17. final URI originalUri = request.getURI();
  18. //获取服务名称,即demo
  19. String serviceName = originalUri.getHost();
  20. Assert.state(serviceName != null,
  21. "Request URI does not contain a valid hostname: " + originalUri);
  22. return this.loadBalancer.execute(serviceName,
  23. this.requestFactory.createRequest(request, body, execution));
  24. }
  25. }

跟进到loadBalancer.execute方法中,loadBalancer即LoadBalancerClient的唯一实现类RibbonLoadBalancerClient

  1. public <T> T execute(String serviceId, LoadBalancerRequest<T> request)
  2. throws IOException {
  3. return execute(serviceId, request, null);
  4. }

继续进入RibbonLoadBalancerClient的execute方法

  1. public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
  2. throws IOException {
  3. ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
  4. Server server = getServer(loadBalancer, hint);
  5. if (server == null) {
  6. throw new IllegalStateException("No instances available for " + serviceId);
  7. }
  8. RibbonServer ribbonServer = new RibbonServer(serviceId, server,
  9. isSecure(server, serviceId),
  10. serverIntrospector(serviceId).getMetadata(server));
  11. return execute(serviceId, ribbonServer, request);
  12. }

ILoadBalancer loadBalancer = getLoadBalancer(serviceId);

从字面意思来看,应该是获取负载均衡器...(不会有人看不出来吧),也就是接口ILoadBalancer的实现类。

深入getLoadBalancer内部,其实是从容器中寻找ILoadBalancer 的实现类。

以下是ILoadBalancer及依赖类的类图:

 

 那到底是哪个实现类呢?

其实ILoadBalancer 的实现类在自动配置类RibbonClientConfiguration中完成了注入:

  1. @Bean
  2. @ConditionalOnMissingBean
  3. public ILoadBalancer ribbonLoadBalancer(IClientConfig config,
  4. ServerList<Server> serverList, ServerListFilter<Server> serverListFilter,
  5. IRule rule, IPing ping, ServerListUpdater serverListUpdater) {
  6. if (this.propertiesFactory.isSet(ILoadBalancer.class, name)) {
  7. return this.propertiesFactory.get(ILoadBalancer.class, config, name);
  8. }
  9. return new ZoneAwareLoadBalancer<>(config, rule, ping, serverList,
  10. serverListFilter, serverListUpdater);
  11. }

是ZoneAwareLoadBalancer类!


五、ZoneAwareLoadBalancer是怎么构造的

构造该类需要以下参数(这些参数在RibbonClientConfiguration中全部提前注入到容器中了)

(1)IClientConfig config,是对客户端负载均衡器的配置,包括默认的读取超时时间(5s)、连接超时(2s)与最大连接数(200)等等,默认的实现类是DefaultClientConfigImpl。

(2)ServerList<Server> serverList,获取服务列表。目前我们是在yaml中写死了服务地址集合,因此serverList的类型实际上是ConfigurationBasedServerList,即从配置文件中获取服务地址的集合。

(3)ServerListFilter<Server> serverListFilter,服务列表过滤器,默认类型是ZonePreferenceServerListFilter,将不和客户端在同一个zone的服务给过滤掉。我们也没配置过什么zone,因此该过滤器在这次的例子中,其实是没什么作用的。

(4)IRule rule,是负载均衡策略接口,常见的策略有RoundRobinRule(轮询),RandomRule(随机),默认采用ZoneAvoidanceRule,即按照zone筛选,再进行轮询。(本例中,表现出来的效果就是一直轮询,所以接口依次返回8081、8082)

IRule及其实现类的类图关系如下:

 (5)IPing ping,判断服务实例是否存活的接口,常见的实现类有DummyPing(直接返回true,永远认为服务正常)、PingUrl(真实去ping某个url,得到存活与否)等等。默认探活策略是DummyPing,所以就造成了我们即使关闭demo2服务,ribbon也依然会选择到demo2服务。

IPing及其实现类的类图如下:

 (6)ServerListUpdater serverListUpdater,用于执行对服务列表的更新操作,默认的实现是PollingServerListUpdater,会启动一个ScheduledThreadPoolExecutor,周期性的执行IPing策略。(对线程池不熟悉的同学,可以参考我的这篇文章)


六、execute方法

我把RibbonLoadBalancerClient的execute方法再贴一遍

  1. public <T> T execute(String serviceId, LoadBalancerRequest<T> request, Object hint)
  2. throws IOException {
  3. ILoadBalancer loadBalancer = getLoadBalancer(serviceId);
  4. Server server = getServer(loadBalancer, hint);
  5. if (server == null) {
  6. throw new IllegalStateException("No instances available for " + serviceId);
  7. }
  8. RibbonServer ribbonServer = new RibbonServer(serviceId, server,
  9. isSecure(server, serviceId),
  10. serverIntrospector(serviceId).getMetadata(server));
  11. return execute(serviceId, ribbonServer, request);
  12. }

从上文,可以了解到:

(1)getLoadBalancer(serviceId):可以理解为,再第一次请求到来时,创建好IClientConfig(客户端配置)、ServerList<Server>(从配置文件中加载的服务列表)、IRule(负载均衡策略)与IPing (探活策略)等Bean,是一种懒加载的模式。

(2)getServer(loadBalancer, hint):则是通过以上的负载均衡策略与探活策略,从服务列表中选择合适的服务实例(详细代码在ZoneAwareLoadBalancer的chooseServer方法中)。Server对象包含ip、端口与协议等信息。

进入到execute(serviceId, ribbonServer, request)方法中:

其核心代码是apply方法:

T returnVal = request.apply(serviceInstance)

其实接下来的代码,已经和Ribbon没有太大的关系了。

LoadBalancerInterceptor的intercept方法已经全部走完了,接下会在InterceptingClientHttpRequest中的execute方法内遍历其他拦截器,走下一个拦截器的intercept方法。

如果此时没有其他拦截器,最终会走RestTemplate的执行流程。此时RestTemplate已经拿到了负载均衡后的地址,利用封装的HttpURLConnection直接进行请求。


七、RestTemplate是怎么利用到该拦截器的?

上一篇我们讲了,举的例子是没有用到任何拦截器的。

那么用到拦截器呢,具体代码我们从RestTemplate的主流程方法doExecute说起

  1. protected <T> T doExecute(URI url, @Nullable HttpMethod method, @Nullable RequestCallback requestCallback,
  2. @Nullable ResponseExtractor<T> responseExtractor) throws RestClientException {
  3. Assert.notNull(url, "URI is required");
  4. Assert.notNull(method, "HttpMethod is required");
  5. ClientHttpResponse response = null;
  6. try {
  7. //创建文章开头所说的ClientHttpRequest
  8. ClientHttpRequest request = createRequest(url, method);
  9. if (requestCallback != null) {
  10. //执行请求回调
  11. requestCallback.doWithRequest(request);
  12. }
  13. //执行请求,U获取响应结果
  14. response = request.execute();
  15. //处理响应结果
  16. handleResponse(url, method, response);
  17. //利用响应抽取器抽取data返回预先定义的java对象,例如例子中的String
  18. return (responseExtractor != null ? responseExtractor.extractData(response) : null);
  19. }
  20. catch (IOException ex) {
  21. String resource = url.toString();
  22. String query = url.getRawQuery();
  23. resource = (query != null ? resource.substring(0, resource.indexOf('?')) : resource);
  24. throw new ResourceAccessException("I/O error on " + method.name() +
  25. " request for \"" + resource + "\": " + ex.getMessage(), ex);
  26. }
  27. finally {
  28. if (response != null) {
  29. response.close();
  30. }
  31. }
  32. }

进入到ClientHttpRequest request = createRequest(url, method)方法中

此处调用的是HttpAccessor中的createRequest方法

  1. protected ClientHttpRequest createRequest(URI url, HttpMethod method) throws IOException {
  2. ClientHttpRequest request = this.getRequestFactory().createRequest(url, method);
  3. this.initialize(request);
  4. if (this.logger.isDebugEnabled()) {
  5. this.logger.debug("HTTP " + method.name() + " " + url);
  6. }
  7. return request;
  8. }

其中getRequestFactory方法被InterceptingHttpAccessor重写了

  1. public ClientHttpRequestFactory getRequestFactory() {
  2. List<ClientHttpRequestInterceptor> interceptors = getInterceptors();
  3. if (!CollectionUtils.isEmpty(interceptors)) {
  4. ClientHttpRequestFactory factory = this.interceptingRequestFactory;
  5. if (factory == null) {
  6. factory = new InterceptingClientHttpRequestFactory(super.getRequestFactory(), interceptors);
  7. this.interceptingRequestFactory = factory;
  8. }
  9. return factory;
  10. }
  11. else {
  12. return super.getRequestFactory();
  13. }
  14. }

因为此时的getInterceptors返回的拦截器不为空,则此时获取到请求工厂为InterceptingClientHttpRequestFactory,而不是无拦截器时的SimpleClientHttpRequestFactory。

InterceptingClientHttpRequestFactory的createRequest方法会创建出InterceptingClientHttpRequest,而不是默认的SimpleBufferingClientHttpRequest

两者的类图关系如下:

 接下来走RestTemplate主流程中的response = request.execute()方法

这一块和SimpleBufferingClientHttpRequest一样,进入到父类AbstractClientHttpRequest中

  1. public final ClientHttpResponse execute() throws IOException {
  2. assertNotExecuted();
  3. ClientHttpResponse result = executeInternal(this.headers);
  4. this.executed = true;
  5. return result;
  6. }

还是和SimpleBufferingClientHttpRequest一样,进入executeInternal方法中,位于AbstractBufferingClientHttpRequest中

  1. protected ClientHttpResponse executeInternal(HttpHeaders headers) throws IOException {
  2. byte[] bytes = this.bufferedOutput.toByteArray();
  3. if (headers.getContentLength() < 0) {
  4. headers.setContentLength(bytes.length);
  5. }
  6. ClientHttpResponse result = executeInternal(headers, bytes);
  7. this.bufferedOutput = new ByteArrayOutputStream(0);
  8. return result;
  9. }

其中核心的是executeInternal(headers, bytes)方法,位于InterceptingClientHttpRequest中

  1. protected final ClientHttpResponse executeInternal(HttpHeaders headers, byte[] bufferedOutput) throws IOException {
  2. InterceptingRequestExecution requestExecution = new InterceptingRequestExecution();
  3. return requestExecution.execute(this, bufferedOutput);
  4. }

进入InterceptingRequestExecution(其实就是InterceptingClientHttpRequest的私有内部类)的execute方法中

  1. public ClientHttpResponse execute(HttpRequest request, byte[] body) throws IOException {
  2. if (this.iterator.hasNext()) {
  3. ClientHttpRequestInterceptor nextInterceptor = this.iterator.next();
  4. return nextInterceptor.intercept(request, body, this);
  5. }
  6. else {
  7. HttpMethod method = request.getMethod();
  8. Assert.state(method != null, "No standard HTTP method");
  9. ClientHttpRequest delegate = requestFactory.createRequest(request.getURI(), method);
  10. request.getHeaders().forEach((key, value) -> delegate.getHeaders().addAll(key, value));
  11. if (body.length > 0) {
  12. if (delegate instanceof StreamingHttpOutputMessage) {
  13. StreamingHttpOutputMessage streamingOutputMessage = (StreamingHttpOutputMessage) delegate;
  14. streamingOutputMessage.setBody(outputStream -> StreamUtils.copy(body, outputStream));
  15. }
  16. else {
  17. StreamUtils.copy(body, delegate.getBody());
  18. }
  19. }
  20. return delegate.execute();
  21. }
  22. }

this.iterator是当前拦截器的迭代器,如果当前有拦截器的话,直接先执行拦截器intercept方法,而此时的拦截器类型就是LoadBalancerInterceptor,这样的话,接下来的内容又回到了第四章节中。

LoadBalancerInterceptor拦截器执行完之后,又会回到该execute方法中。接下来走else逻辑,此时delegate类型是SimpleBufferingClientHttpRequest,真是熟悉的类啊,此时场景就变成了没有拦截器的场景了。

如果要接着跟进的话,可以从RestTemplate源码分析这个环节开始:

到这里,RestTemplate如何利用到Ribbon提供的负载均衡能力的过程已经结束了。


八、Ribbon的大致流程总结

(1)Ribbon的自动配置类拿到所有被@LoadBalanced注解修饰的RestTemplate实例

(2)将LoadBalancerInterceptor拦截器添加到每一个RestTemplate的拦截器列表中

(3)RestTemplate在执行请求前,会先执行每一个拦截器的intercept方法

(4)LoadBalancerInterceptor的intercept方法中,首先会从配置文件中读取服务实例集合,接着创建负载均衡策略、探活策略与服务列表更新策略等

(5)接着intercept方法会根据以上的策略选取一个服务实例

(6)RestTemplate拿到该服务实例后,内部利用封装的HttpURLConnection进行请求

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发