企业管理系统定制开发在前面的文章中已经了企业管理系统定制开发解了的基本配置,企业管理系统定制开发各种组件的作用,企业管理系统定制开发负载均衡策略以及如何企业管理系统定制开发配合注册中心实现服务企业管理系统定制开发的注册发现,企业管理系统定制开发现在还遗留一个问题,企业管理系统定制开发服务的新增和更新ribbon企业管理系统定制开发到底是如何来支持的呢?
ILoadBalancer
企业管理系统定制开发定义软件器操作接口,企业管理系统定制开发动态更新一组服务列表及企业管理系统定制开发根据指定算法企业管理系统定制开发从现有服务器列表中选企业管理系统定制开发择一个服务。
public interface ILoadBalancer { /** * Initial list of servers. * This API also serves to add additional ones at a later time * The same logical server (host:port) could essentially be added multiple times * (helpful in cases where you want to give more "weightage" perhaps ..) * * @param newServers new servers to add */ public void addServers(List<Server> newServers); /** * Choose a server from load balancer. * * @param key An object that the load balancer may use to determine which server to return. null if * the load balancer does not use this parameter. * @return server chosen */ public Server chooseServer(Object key); /** * To be called by the clients of the load balancer to notify that a Server is down * else, the LB will think its still Alive until the next Ping cycle - potentially * (assuming that the LB Impl does a ping) * * @param server Server to mark as down */ public void markServerDown(Server server); /** * @deprecated 2016-01-20 This method is deprecated in favor of the * cleaner {@link #getReachableServers} (equivalent to availableOnly=true) * and {@link #getAllServers} API (equivalent to availableOnly=false). * * Get the current list of servers. * * @param availableOnly if true, only live and available servers should be returned */ @Deprecated public List<Server> getServerList(boolean availableOnly); /** * @return Only the servers that are up and reachable. */ public List<Server> getReachableServers(); /** * @return All known servers, both reachable and unreachable. */ public List<Server> getAllServers();}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- addServers:企业管理系统定制开发服务器的初始列表。此 API 企业管理系统定制开发还可以在以后添加其他服务器。企业管理系统定制开发基本上可以多次添加相同的逻辑服务器(主机:端口)
- chooseServer:从负载均衡器中选择一个服务器
- markServerDown:将一台服务器标记为下线,否则 LB 将认为它仍然活着,直到下一个 Ping 周期 - 可能(假设 LB Impl 执行 ping)
- getReachableServers:只有已启动且可访问的服务器
- getAllServers:所有已知的服务器,包括可达的和不可达的
该接口主要做了以下的一些事情:
维护了存储服务实例Server对象的二个列表:一个用于存储所有服务实例的清单,一个用于存储正常服务(up服务)的实例清单
初始化得到可用的服务列表,启动定时任务去实时的检测服务列表中的服务的可用性,并且间断性的去更新服务列表
选择可用的服务进行调用(交给IRule去实现,不同的轮询策略)
结下来我们应该关心这些方法何时被使用,在此之前首先查看接口ILoadBalancer继承关系
AbstractLoadBalancer
AbstractLoadBalancer中定义了一个关于服务实例的分组枚举类ServerGroup,它包含以下三种不同类型来为所有的服务器进行分组。
public enum ServerGroup{ //所有 ALL, //上线 STATUS_UP, //非上线 STATUS_NOT_UP }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
定义了一个chooseServer方法调用ILoadBalancer接口中的chooseServer方法,该方法需要子类去实现用来选择服务器
public Server chooseServer() { return chooseServer(null); }
- 1
- 2
- 3
定义了一个抽象的方法来获取所有的服务器,同样需要子类来实现
public abstract List<Server> getServerList(ServerGroup serverGroup);
- 1
最后,定义抽象方法获取 LoadBalancer 相关的统计信息
public abstract LoadBalancerStats getLoadBalancerStats();
- 1
这些信息可以用来观察和理解运行时行为的LoadBalancer,用来决定负载平衡策略。简单的说,它就是作为ServerStats实例列表的容器,统一维护(当然还有zone区域的概念)。
LoadBalancerStats的相关知识可以从这里了解到:
本文之介绍一下这个类的三大缓存
// 它实现了IClientConfigAware接口,所以很方便的得到IClientConfig配置public class LoadBalancerStats implements IClientConfigAware { volatile Map<String, ZoneStats> zoneStatsMap = new ConcurrentHashMap<>(); volatile Map<String, List<? extends Server>> upServerListZoneMap = new ConcurrentHashMap<>(); // 该变量最初使用的ConcurrentHashMap缓存 // Map<Server,ServerStats> serverStatsMap = new ConcurrentHashMap<Server,ServerStats>(); private static final DynamicIntProperty SERVERSTATS_EXPIRE_MINUTES = DynamicPropertyFactory.getInstance().getIntProperty("niws.loadbalancer.serverStats.expire.minutes", 30); private final LoadingCache<Server, ServerStats> serverStatsCache = CacheBuilder.newBuilder() // 在一定时间内没有读写,会移除该key // 在30s内没有读写该Server的时候会移除对应的没有被访问的key .expireAfterAccess(SERVERSTATS_EXPIRE_MINUTES.get(), TimeUnit.MINUTES) // 移除的时候把其pulish的功能也关了(不然定时任务一直在运行) .removalListener(notification -> notification.getValue().close()); // 首次get木有的话,就会调用此方法给你新建一个新的Server实例 .build(server -> createServerStats(server)); // 为何这里默认值为何是1000和1000???和ServerStats里的常量值并不一样哦 protected ServerStats createServerStats(Server server) { ServerStats ss = new ServerStats(this); //configure custom settings ss.setBufferSize(1000); ss.setPublishInterval(1000); // 请务必调用此方法完成初始化 ss.initialize(server); return ss; }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- zoneStatsMap:每个zone对应一个ZoneStats,代表着该可用区的健康状态
- upServerListZoneMap:存储了zone和server状态ZoneStats的对应关系(一个zone内可以有多台Server)
serverStatsCache:存储了Server和ServerStats的对应关系。老版本使用的Map缓存的,新版本使用了guaua的cache(增加了过期时间,对内存更友好) - Server默认的缓存时长是30s,请尽量保持此值>=熔断最大时长的值(它默认也是30s)
有兴趣的同学们可以去了解一下guaua和Caffeine(个人安利)
BaseLoadBalancer
BaseLoadBalancer类是Ribbon负载均衡器的基础实现类,在该类中定义了很多关于负载均衡器相关的基础内容。
private final static IRule DEFAULT_RULE = new RoundRobinRule(); private final static SerialPingStrategy DEFAULT_PING_STRATEGY = new SerialPingStrategy(); private static final String DEFAULT_NAME = "default"; private static final String PREFIX = "LoadBalancer_"; protected IRule rule = DEFAULT_RULE; //检查服务实例操作的执行策略对象IPingStrategy protected IPingStrategy pingStrategy = DEFAULT_PING_STRATEGY; //检查服务实例是否正常服务的IPing对象,在BaseLoadBalancer中默认为null,需要在构造时注入它的具体实现 protected IPing ping = null; //两个存储服务实例Server对象的列表。一个用于存储所有服务实例的清单, 一个用于存储正常服务的实例清单 @Monitor(name = PREFIX + "AllServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> allServerList = Collections .synchronizedList(new ArrayList<Server>()); @Monitor(name = PREFIX + "UpServerList", type = DataSourceType.INFORMATIONAL) protected volatile List<Server> upServerList = Collections .synchronizedList(new ArrayList<Server>()); protected ReadWriteLock allServerLock = new ReentrantReadWriteLock(); protected ReadWriteLock upServerLock = new ReentrantReadWriteLock(); protected String name = DEFAULT_NAME; protected Timer lbTimer = null; protected int pingIntervalSeconds = 10; protected int maxTotalPingTimeSeconds = 5; protected Comparator<Server> serverComparator = new ServerComparator(); protected AtomicBoolean pingInProgress = new AtomicBoolean(false); protected LoadBalancerStats lbStats; private volatile Counter counter = Monitors.newCounter("LoadBalancer_ChooseServer"); private PrimeConnections primeConnections; private volatile boolean enablePrimingConnections = false; private IClientConfig config; private List<ServerListChangeListener> changeListeners = new CopyOnWriteArrayList<ServerListChangeListener>(); private List<ServerStatusChangeListener> serverStatusListeners = new CopyOnWriteArrayList<ServerStatusChangeListener>();
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- rule:默认是RoundRobinRule,可以通过set方法指定
- pingStrategy:ping的策略,默认是SerialPingStrategy -> 串行全ping
若你的实力非常之多,自己实现一个并行的策略是个很好的方案 - List allServerList:所有实例
- upServerList:up实例(STATUS_NOT_UP实例列表通过两者差集取出)
- name:负载均衡器的名称,一般同ClientName,若没指定为default
- lbTimer:启动PingTask,定时使用IPing去检查Server的isAlive状态的定时器
- pingIntervalSeconds:默认30ping一次。可通过key:NFLoadBalancerPingInterval配置
- maxTotalPingTimeSeconds:每次ping的最长时间
默认值是2s,可通过NFLoadBalancerMaxTotalPingTime这个key配置 - serverComparator:按照Server的id排序,方便轮询时候输出,并无实际作用
- pingInProgress:标志是否正在ping中,如果是就避免重复进入去ping,做无用功
- LoadBalancerStats lbStats:默认是new LoadBalancerStats(name),你也可以set进来一个现存的,当然你可以通过配置
注意注意注意:它的实现类你可以通过key NFLoadBalancerStatsClassName来配置,只不过默认值是com.netflix.loadbalancer.LoadBalancerStats。在实际应用中,你很有可能去继承然后复写它,然后配置进来就使用你自己的啦 - primeConnections:启动连接器。用于初始检测Server的readyToServe是否能够提供服务(默认是关闭的,见下面的开关)
- enablePrimingConnections:默认值是false,可通过EnablePrimeConnections这个key来开启
- ServerListChangeListener changeListeners:当allServerList里面的内容(总数or每个Server的状态属性等)发生变化时,会触发此监听器
ServerStatusChangeListener serverStatusListeners:但凡只要Server的isAlive状态发生了变化,就会触发此监听器。有如下2种情况可能会触发:
IPing的时候
显式调用markServerDown(Server)/markServerDown(String id)的时候(该方法暂无任何显式调用处)
在BaseLoadBalancer中默认使用了该类中定义的静态内部类SerialPingStrategy实现,根据源码,我们可以看到该策略默认采用线性遍历ping服务实例的方式去实现检查。该策略在当IPing的实现速度不理想,或是Server列表过大时,可能会影响系统性能,这时候需要通过实现IPingStrategy接口去重写boolean[] pingServers(IPing var1, Server[] var2);函数去扩张ping的执行策略
private static class SerialPingStrategy implements IPingStrategy { @Override public boolean[] pingServers(IPing ping, Server[] servers) { int numCandidates = servers.length; boolean[] results = new boolean[numCandidates]; logger.debug("LoadBalancer: PingTask executing [{}] servers configured", numCandidates); for (int i = 0; i < numCandidates; i++) { results[i] = false; /* Default answer is DEAD. */ try { // NOTE: IFF we were doing a real ping // assuming we had a large set of servers (say 15) // the logic below will run them serially // hence taking 15 times the amount of time it takes // to ping each server // A better method would be to put this in an executor // pool // But, at the time of this writing, we dont REALLY // use a Real Ping (its mostly in memory eureka call) // hence we can afford to simplify this design and run // this // serially if (ping != null) { results[i] = ping.isAlive(servers[i]); } } catch (Exception e) { logger.error("Exception while pinging Server: '{}'", servers[i], e); } } return results; } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
负载均衡的处理规则IRule对象,从BaseLoadBalancer中的chooseServer的实现源码中,我们可以知道,负载均衡器实际将服务实例选择任务委托给了IRule实例中的choose函数来实现。而在这里,默认初始化了RoundRobinRule为IRule的实现对象。RoundRobinRule实现了最基本且常用的线性负载均衡策略。
接下来是构造函数
public BaseLoadBalancer(String name, IRule rule, LoadBalancerStats stats, IPing ping, IPingStrategy pingStrategy) { logger.debug("LoadBalancer [{}]: initialized", name); this.name = name; this.ping = ping; this.pingStrategy = pingStrategy; setRule(rule); setupPingTask(); lbStats = stats; init(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
看一下setupPingTask方法
void setupPingTask() { if (canSkipPing()) { return; } if (lbTimer != null) { lbTimer.cancel(); } lbTimer = new ShutdownEnabledTimer("NFLoadBalancer-PingTimer-" + name, true); lbTimer.schedule(new PingTask(), 0, pingIntervalSeconds * 1000); forceQuickPing(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
启动一个用于定时检查Server是否健康的任务。该任务默认的执行间隔为10秒
addServer方法
向负载均衡器中增加新的服务实例列表,该实现将原本已经维护着的所有服务实例清单allServerList和新传入的服务实例清单newServers都加入到newList中,然后通过调用setServersList函数对newList进行处理,在BaseLoadBalancer中实现的时候,会使用新的列表覆盖旧的列表。而之后介绍的几个扩展实现类对于服务实例清单更新优化都是通过对setServersList函数重写来实现的
public void addServer(Server newServer) { if (newServer != null) { try { ArrayList<Server> newList = new ArrayList<Server>(); newList.addAll(allServerList); newList.add(newServer); setServersList(newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error adding newServer {}", name, newServer.getHost(), e); } } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
需要注意的是:因为使用List存储不会去重,并且该方法内部也不验证唯一性。所以你可以通过这种方式(比如一个Server实例多几次add操作)来增加其被选中的概率,但并不建议你这么干。
//此处的List不带泛型,是因为它要接受List<Server>和List<String>这两种集合public void setServersList(List lsrv) { Lock writeLock = allServerLock.writeLock(); logger.debug("LoadBalancer [{}]: clearing server list (SET op)", name); ArrayList<Server> newServers = new ArrayList<Server>(); writeLock.lock(); try { ArrayList<Server> allServers = new ArrayList<Server>(); for (Object server : lsrv) { if (server == null) { continue; } if (server instanceof String) { server = new Server((String) server); } if (server instanceof Server) { logger.debug("LoadBalancer [{}]: addServer [{}]", name, ((Server) server).getId()); allServers.add((Server) server); } else { throw new IllegalArgumentException( "Type String or Server expected, instead found:" + server.getClass()); } } boolean listChanged = false; 编辑列表的内容是否有变更 只要内容不一样(包括数量、属性等)就算变更了 if (!allServerList.equals(allServers)) { listChanged = true; if (changeListeners != null && changeListeners.size() > 0) { List<Server> oldList = ImmutableList.copyOf(allServerList); List<Server> newList = ImmutableList.copyOf(allServers); 若注册了监听器,就触发 for (ServerListChangeListener l: changeListeners) { try { l.serverListChanged(oldList, newList); } catch (Exception e) { logger.error("LoadBalancer [{}]: Error invoking server list change listener", name, e); } } } } if (isEnablePrimingConnections()) { for (Server server : allServers) { if (!allServerList.contains(server)) { server.setReadyToServe(false); newServers.add((Server) server); } } /// 对比老的 筛选出哪些是新的后,进行连接检测 if (primeConnections != null) { primeConnections.primeConnectionsAsync(newServers, this); } } // This will reset readyToServe flag to true on all servers // regardless whether // previous priming connections are success or not allServerList = allServers; if (canSkipPing()) { for (Server s : allServerList) { s.setAlive(true); } upServerList = allServerList; } else if (listChanged) { forceQuickPing(); } } finally { writeLock.unlock(); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
针对此方法本身而言:它是set动作,所以具有覆盖性(完全替换掉原值)。
allServerList和upServerList的值何时改变?
针对此问题,此处小总结一下:
他俩均不能通过add/addAll/remove等方式来改变其值,仅能通过全覆盖的方式
能改变它两值的地方仅有两处:
- Pinger#runPinger:基于allServerList对没台Server完成ping操作,所以它只会改变upServerList的值(isAlive=true才属于up)
- setServersList():它会用新set进来的对allServerList全覆盖,并且完成对没台Server的初始化,包括识别出upServerList(这种识别其实依赖也是上面一样的ping操作)
综上可知,upServerList的值有且仅是把allServerList经过IPing处理后,若isAlive=true就属于这个行列了。因此若你没指定IPing策略或者是默认的DummyPing,那么它哥俩就永远相等(认为所有的Server均永远可用~)。
至此我们还没有看到ribbon去获取注册中心的服务清单并且保存在本地的逻辑,那么ribbon是怎么动态刷新服务器列表呢?
DynamicServerListLoadBalancer
DynamicServerListLoadBalancer类继承了BaseLoadBalancer,它是对基础负载均衡器的扩展。在该负载均衡器中,实现了服务实例清单在运行期的动态更新能力,同时,它还具备了对服务实例清单的过滤功能,也就是说,我们可以通过过滤器来选择性的获取一批服务实例清单。下面我们具体来看看在该类中增加了一些什么内容
在DynamicServerListLoadBalancer中,明显看到多了下面的成员变量
volatile ServerList<T> serverListImpl;volatile ServerListFilter<T> filter;
- 1
- 2
- 3
首先,ServerList什么样子?做了什么?
public interface ServerList<T extends Server> { public List<T> getInitialListOfServers(); /** * Return updated list of servers. This is called say every 30 secs * (configurable) by the Loadbalancer's Ping cycle * */ public List<T> getUpdatedListOfServers(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
可以看到,提供了两个方法,获取服务器的初始列表以及获取更新的服务器列表
那么怎么知道项目中使用了哪个子类呢?
以zk为例,我们找到Spring Cloud 整合Ribbon与zk(Eureka同理)的配置类ZookeeperRibbonClientConfiguration找到如下方法
@Bean @ConditionalOnMissingBean @ConditionalOnDependenciesNotPassed public ServerList<?> ribbonServerList(IClientConfig config, ServiceDiscovery<ZookeeperInstance> serviceDiscovery) { ZookeeperServerList serverList = new ZookeeperServerList(serviceDiscovery); serverList.initWithNiwsConfig(config); log.debug(String.format("Server list for Ribbon's non-dependency based load balancing is [%s]", serverList)); return serverList; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
可以看到,zk使用的子类是ZookeeperServerList,而它对父类的两个接口实现则都是通过getServers方法来实现
@Override public List<ZookeeperServer> getInitialListOfServers() { return getServers(); } @Override public List<ZookeeperServer> getUpdatedListOfServers() { return getServers(); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
关于getServers方法则根据serviceId来获取对应的zk服务实例列表,详细内容可以了解
protected List<ZookeeperServer> getServers() { try { if (this.serviceDiscovery == null) { return Collections.EMPTY_LIST; } Collection<ServiceInstance<ZookeeperInstance>> instances = this.serviceDiscovery .queryForInstances(this.serviceId); if (instances == null || instances.isEmpty()) { return Collections.EMPTY_LIST; } List<ZookeeperServer> servers = new ArrayList<>(); for (ServiceInstance<ZookeeperInstance> instance : instances) { String instanceStatus = null; if (instance.getPayload() != null && instance.getPayload().getMetadata() != null) { instanceStatus = instance.getPayload().getMetadata().get(INSTANCE_STATUS_KEY); } if (!StringUtils.hasText(instanceStatus) // backwards compatibility || instanceStatus.equalsIgnoreCase(STATUS_UP)) { servers.add(new ZookeeperServer(instance)); } } return servers; } catch (Exception e) { rethrowRuntimeException(e); } return Collections.EMPTY_LIST; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
serverListUpdater
通过上面的分析,我们已经知道了Ribbon与Eureka整合后,如何实现从zk(Eureka) Server中获取服务实例清单。那么它又是如何触发去获取服务实例清单以及如何在获取到服务实例清单后更新本地的服务实例清单的呢?继续来看DynamicServerListLoadBalancer中的实现内容,我们可以很容易地找到下面定义的关于ServerListUpdater中内容:
protected final ServerListUpdater.UpdateAction updateAction = new ServerListUpdater.UpdateAction() { @Override public void doUpdate() { updateListOfServers(); } };protected volatile ServerListUpdater serverListUpdater;
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
我们来看下这个接口的定义:
// Server列表更新器。public interface ServerListUpdater { // 内部接口:函数式接口 实际上执行服务器列表更新的接口 也就是实际的动作 // 一般使用匿名内部类的形式实现 public interface UpdateAction { void doUpdate(); } // 使用给定的更新操作启动serverList更新程序这个调用应该是幂等的 void start(UpdateAction updateAction); // 停止服务器列表更新程序。这个调用应该是幂等的 void stop(); // ============下面是一些获取执行过程中的信息方法============== // 最后更新的时间Date的String表示形式 String getLastUpdate(); // 自上次更新以来已经过的ms数 long getDurationSinceLastUpdateMs(); //错过更新周期的数量(如果有的话) int getNumberMissedCycles(); // 使用的线程数 int getCoreThreads();}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
在看它的实现类
可以看到,只有两个实现类
PollingServerListUpdater:动态服务列表更新的默认策略,也就是说DynamicServerListLoadBalancer负载均衡器中的默认实现就是它,它通过定时任务的方式进行服务列表的更新
ZkDynamicServerListUpdater:该更新也可服务于DynamicServerListLoadBalancer负载均衡器,但是它的触发机制跟PollingServerListUpdater不同,它需要利用zk的事件监听器来驱动服务列表的更新操作
先看PollingServerListUpdater的start方法
public synchronized void start(final UpdateAction updateAction) { //cas保证原子性。如果已经启动了就啥都不做 if (isActive.compareAndSet(false, true)) { final Runnable wrapperRunnable = new Runnable() { @Override public void run() { //再次校验 if (!isActive.get()) { if (scheduledFuture != null) { scheduledFuture.cancel(true); } return; } try { //执行update updateAction.doUpdate(); lastUpdated = System.currentTimeMillis(); } catch (Exception e) { logger.warn("Failed one update cycle", e); } } }; //启动定时任务 scheduledFuture = getRefreshExecutor().scheduleWithFixedDelay( wrapperRunnable, initialDelayMs, refreshIntervalMs, TimeUnit.MILLISECONDS ); } else { logger.info("Already active, no-op"); } }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
我们可以找到用于启动定时任务的两个重要参数initialDelayMs跟refreshIntervalMs的默认定义分别为1000跟30*1000,单位为毫秒。
private static long LISTOFSERVERS_CACHE_UPDATE_DELAY = 1000; // msecs; private static int LISTOFSERVERS_CACHE_REPEAT_INTERVAL = 30 * 1000; // msecs;
- 1
- 2
也就是说,更新服务实例在初始化之后延迟1秒后开始执行,并以30秒为周期重复执行。在看一下其他方法的实现
// 停止任务 @Override public synchronized void stop() { scheduledFuture.cancel(true); } @Override public String getLastUpdate() { return new Date(lastUpdated).toString(); } // 距离上一次update更新已过去这么长时间了 @Override public long getDurationSinceLastUpdateMs() { return System.currentTimeMillis() - lastUpdated; } // 因为coreSize是动态可以配的,所以提供方法供以访问 @Override public int getCoreThreads() { if (isActive.get()) { if (getRefreshExecutor() != null) { return getRefreshExecutor().getCorePoolSize(); } } return 0; }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
鉴于以上方法,得到一个PollingServerListUpdater实例后,调用其start方法,便可实现定时的更新服务列表了,非常的方便。所以若你想要有一个定时的去更新服务列表的能力,可以使用此组件方便的实现。
@Testpublic void fun10() throws InterruptedException { ServerListUpdater serverListUpdater = new PollingServerListUpdater(); serverListUpdater.start(() -> { int coreThreads = serverListUpdater.getCoreThreads(); String lastUpdate = serverListUpdater.getLastUpdate(); int numberMissedCycles = serverListUpdater.getNumberMissedCycles(); long durationSinceLastUpdateMs = serverListUpdater.getDurationSinceLastUpdateMs(); System.out.println("===========上次的执行时间是:" + lastUpdate); System.out.println("自上次更新以来已经过的ms数:" + durationSinceLastUpdateMs); System.out.println("线程核心数:" + coreThreads); System.out.println("错过更新周期的数量:" + numberMissedCycles); // .... 执行你对Server列表的更新动作,本处略 }); TimeUnit.SECONDS.sleep(500);}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
哪里使用?
在了解了更新服务实例的定时任务是如何启动的之后,我们回到updateAction.doUpdate()调用的具体实现位置,在DynamicServerListLoadBalancer中,它的实际实现委托给了updateListOfServers函数,具体实现如下:
public void updateListOfServers() { List<T> servers = new ArrayList<T>(); if (serverListImpl != null) { //获得所有的更新列表 servers = serverListImpl.getUpdatedListOfServers(); LOGGER.debug("List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); //过滤列表 if (filter != null) { servers = filter.getFilteredListOfServers(servers); LOGGER.debug("Filtered List of Servers for {} obtained from Discovery client: {}", getIdentifier(), servers); } } //更新 updateAllServerList(servers); }
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
这里终于用到了之前提到的ServerList的getUpdatedListOfServers()方法。在获得了服务实例列表之后,这里由将引入一个新的对象filter,关于filter的作用,我们下次再讲