系统定制开发昨天手写了一个,系统定制开发并想着顺便写一个负载均衡器,集成一下。
系统定制开发手写的注册中心对标Spring Cloud Alibaba Nacos
,分为客户端
和服务端
。
系统定制开发手写的负载均衡器,系统定制开发分为两个版本:Web系统定制开发版本和版本
- Web:Spring Cloud Netflix
Ribbon
- WebFlux:Spring Cloud
LoadBalancer
系统定制开发想要源码的小伙伴,系统定制开发文末有链接。【系统定制开发仅供学习和参考
】
看到Ribbon系统定制开发的随机算法,系统定制开发让我产生了一些问题,系统定制开发不问不知道,系统定制开发一问吓一跳。问了很多人,上到各大厂相关领域的专家,下到一些所谓的高端人员,再到国内外网站,没有人注意到这个问题。
今天晚上得以有时间,我通宵把Ribbon这个随机算法讲的透彻一些,此处涉及到的知识点颇多,写了大概有几万字。前人栽树,后人乘凉。
我在上几篇文章:
我们这里的环境就用RestTemplate
+Ribbon
+Nacos
,所以说上面这几篇文章对下文的理解很有必要。
先来看看Ribbon的随机算法吧。
对于整体的设计思想,我们不难理解,根据服务名获取所有的服务列表,根据负载均衡器规则再选出一个服务。
我们先来看这个随机数的实现,在很久以前,我写过几篇Random随机数的源码深入解析,这里采用的ThreadLocalRandom无非是每个线程拥有不同的种子seed,这样就实现了线程安全的随机数,其底层无非也是CAS,具体可看笔者这几篇文章:
在此之前,我们一定要注意一下这类的官方介绍:
A loadbalacing strategy that randomly distributes traffic amongst existing servers.
翻译:一种随机分布在现有服务器之间的负载均衡策略。
记住这句话,这是深入RandomRule的重要思想。
接下里,我们看一下Random的核心实现方法RandomRule#choose,当前方法的官方介绍是一定要看的:
Randomly choose from all living servers
翻译:从所有活着的服务器中随机选择
看完类和方法的介绍,其实就能知道整体的设计了,为了提高我们的认知,我们来看看设计的细节。【这细节是真的细啊,贯穿了整个Ribbon生命周期和服务发现的生命周期】
public Server choose(ILoadBalancer lb, Object key) { // 如果没有负载器,直接返回空 if (lb == null) { return null; } // 初始化Server Server server = null; // 如果server为空就死循环,直到不为空 while (server == null) { // 如果线程中断,则直接返回null if (Thread.interrupted()) { return null; } // 1:获取可达的服务列表 List<Server> upList = lb.getReachableServers(); // 2:获取所有的服务列表 List<Server> allList = lb.getAllServers(); // 获取所有服务列表的个数 int serverCount = allList.size(); // 如果服务数为零,就直接返回空 if (serverCount == 0) { /* * No servers. End regardless of pass, because subsequent passes * only get more restrictive. */ // 上面注解的意思就是,连服务都没有,还玩什么呀! return null; } // 所有服务列表的个数随机选一个数 int index = chooseRandomInt(serverCount); // 用上面这个随机数作为可达服务列表的索引 server = upList.get(index); // 如果服务为空就把线程让出来 if (server == null) { /* * The only time this should happen is if the server list were * somehow trimmed. This is a transient condition. Retry after * yielding. */ Thread.yield(); continue; } // 如果服务活着,就返回该服务,否则线程继续挂起 // 这里服务默认是false,后续讲解在哪里设置的true if (server.isAlive()) { return (server); } // Shouldn't actually happen.. but must be transient or a bug. // 上面的意思就是:这种情况基本不能出现,就算出现了也是短暂的或是个bug server = null; Thread.yield(); } // 返回服务 return server; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
我在这里详细的把每一行代码都做了注释,其实昨天我就三个问题:
- 这个可达的服务列表和所有服务列表的关系是什么?如何获取的?
- 为什么要用所有服务列表数量作为随机数的取值范围?而且还用该随机数作为可达服务列表的索引?不会越界?
- 服务的存活状态在哪里设置的?
接下来就这三个问题展开论述
我先给出结论:
在BaseLoadBalancer#setServersList中会设置服务的存活状态,同时也会同步可达服务列表和所有服务列表,并且此任务是30s执行一次,同步Nacos的服务缓存数据。
那我们开始论证吧!!!
首先Spring容器初始化的时候,会加载自动装配类,我们看一下Ribbon的自动装配类
我们重点看一下这个Bean
我们在之前解析Ribbon的核心源码时就说过,这离初始化SpringClientFactory时,会加载RibbonClientConfiguration配置类
这个配置类会初始化ZoneAwareLoadBalancer
好了,这个start方法就是核心方法了,我们来看看它的实现
来,给你看看它的这几个时间:
- 延迟时间:1s
- 刷新时间间隔:30s
我们来一起看一下它的这个任务:
final Runnable wrapperRunnable = new Runnable() { @Override public void run() { // isActive默认是false,经过CAS会变成true,这里不会进去该if if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { // 核心流程 updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } }};
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
跟进去这个核心方法
关键点来了,这里分为两大点:
- 获取服务列表
- 更新Ribbon内部维护的服务列表
获取服务列表,我们放在后文说,这里先来讲解一下是如何更新服务列表的。
首先更新每个服务的存活状态【变为true】
接下来,我们就来到了最核心的方法
/** * Set the list of servers used as the server pool. This overrides existing * server list. * 设置用作服务器池的服务器列表,这将重写现有的服务器列表。 */public void setServersList(List lsrv) { // 获取锁 Lock writeLock = allServerLock.writeLock(); logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name); // 初始化新的服务列表 ArrayList<Server> newServers = new ArrayList<Server>(); // 加锁 writeLock.lock(); try { // 初始化所有服务列表 ArrayList<Server> allServers = new ArrayList<Server>(); // 循环所有服务列表 for (Object server : lsrv) { // 如果为null则跳出循环 if (server == null) { continue; } // 如果服务属于String,则初始化成Server if (server instanceof String) { server = new Server((String) server); } // 如果服务为Server,则增加进所有服务列表中 if (server instanceof Server) { logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); allServers.add((Server) server); } else { // 否则抛出异常 throw new IllegalArgumentException( "Type String or Server expected, instead found:" + server.getClass()); } } boolean listChanged = false; // 如果从注册中心查出来的服务列表不同 if (!allServerList.equals(allServers)) { listChanged = true; if (changeListeners != null && changeListeners.size() > 0) { List<Server> oldList = ImmutableList.copyOf(allServerList); List<Server> newList = ImmutableList.copyOf(allServers); for (ServerListChangeListener l: changeListeners) { try { l.serverListChanged(oldList, newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e); } } } } if (isEnablePrimingConnections()) { for (Server server : allServers) { if (!allServerList.contains(server)) { server.setReadyToServe(false); newServers.add((Server) server); } } if (primeConnections != null) { primeConnections.primeConnectionsAsync(newServers, this); } } // This will reset readyToServe flag to true on all servers // regardless whether // previous priming connections are success or not allServerList = allServers; if (canSkipPing()) { for (Server s : allServerList) { s.setAlive(true); } upServerList = allServerList; } else if (listChanged) { forceQuickPing(); } } finally { // 释放锁 writeLock.unlock(); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
所有服务列表
@Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL)protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>());
- 1
- 2
- 3
更新服务列表
@Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL)protected volatile List<Server> upServerList = Collections .synchronizedList(new ArrayList<Server>());
- 1
- 2
- 3
最后看一下概览
还记得我们刚才说到的那个问题么?就是这个,讲了更新服务列表,还有如何获取服务列表呢?
我们来到顶级接口
因为用的Nacos,所以这里Nacos实现了该方法。
整体的流程大概就是这个样子,其实并不复杂,更多的设计的哲学理念。
其实,如果不触发Ping机制,可达服务列表和所有服务列表基本是同样的数据。
触发Ping机制,就会抛出异常,返回null,交给上层去处理喽,这里还涉及很多定时器、Ping机制、锁优化、缓存优化等各种机制,设计得非常优雅。
一些小可爱没深入研究过源码就说是bug和缺陷,我真是透透了~~~
当然还有太多了,有时间在写吧。。。睡觉了~