背景
客户管理系统开发定制远程调用一般都会用,尽管使用feign,还是用的ribbon客户管理系统开发定制做的负载均衡,远程调用。但是ribbon会每隔30s刷新注册表信息,这样就会导致如果服务下线了,由于注册表没有及时更新,那远程调用就会报错。我们看下默认ribbon实现。
- com.netflix.loadbalancer.PollingServerListUpdater 这个类是ribbon更新注册表的核心类。
动态更新地址原理
Nacos客户端中有一个HostReactor类,它的功能是实现服务的动态更新,基本原理是:
- 客户端发起时间订阅后,在HostReactor中有一个UpdateTask线程,每10s发送一次Pull请求,获得服务端最新的地址列表
- 对于服务端,它和服务提供者的实例之间维持了心跳检测,一旦服务提供者出现异常,则会发送一个Push消息给Nacos客户端,也就是服务端消费者
- 服务消费者收到请求之后,使用HostReactor中提供的processServiceJSON解析消息,并更新本地服务地址列表
改造ribbon默认更新策略
基于上面的理论,是不是可以监听nacos事件更新事件,如果有更新,就重新刷新下ribbon的注册表。
- 覆盖
ServerListUpdater
,自己定义一个NacosDynamicServerListUpdater
代码如下;
public class NacosDynamicServerListUpdater implements ServerListUpdater { private static Logger log = LoggerFactory.getLogger(NacosDynamicServerListUpdater.class); private CopyOnWriteArrayList<LBUpdater> lbUpdaters = new CopyOnWriteArrayList(); private NamingService namingService; @Value("${ribbon.PollingServerListUpdater.initial-delay:1000}") private long initialDelay; @Value("${ribbon.PollingServerListUpdater.refresh-interval:30000}") private long refreshInterval; private NacosDiscoveryProperties properties; public NacosDynamicServerListUpdater(NamingService namingService, NacosDiscoveryProperties properties) { this.namingService = namingService; this.properties = properties; } @Override public void start(UpdateAction updateAction) { NacosDynamicServerListUpdater.LBUpdater lbUpdater = new NacosDynamicServerListUpdater.LBUpdater(updateAction); this.lbUpdaters.add(lbUpdater); lbUpdater.start(); log.info("nacos dslu-started: {}, lbUpdater: {}, PollingServerListUpdater: initialDelay={}, refreshInterval={}", new Object[]{this, lbUpdater.getIdentity(), this.initialDelay, this.refreshInterval}); } @Override public void stop() { log.info("nacos dslu-stopped: {}", this); Iterator iterator = this.lbUpdaters.iterator(); while (iterator.hasNext()) { NacosDynamicServerListUpdater.LBUpdater lbUpdater = (NacosDynamicServerListUpdater.LBUpdater) iterator.next(); try { lbUpdater.stop(); } catch (Exception var4) { log.error("nacos dslu-stop-lbUpdater: " + lbUpdater.getIdentity(), var4); } } } @Override public String getLastUpdate() { return null; } @Override public long getDurationSinceLastUpdateMs() { return 0; } @Override public int getNumberMissedCycles() { return 0; } @Override public int getCoreThreads() { return 0; } class LBUpdater { Logger log = LoggerFactory.getLogger(NacosDynamicServerListUpdater.LBUpdater.class); private String serviceId; private volatile UpdateAction updateAction; private volatile BaseLoadBalancer lb; private NacosDynamicServerListWatcher nacosWatcher; private PollingServerListUpdater pollingServerListUpdater; private String identity; public LBUpdater(UpdateAction updateAction) { this.updateAction = updateAction; this.lb = this.getLoadBalancer(updateAction); this.serviceId = this.lb.getClientConfig().getClientName(); this.pollingServerListUpdater = new PollingServerListUpdater(NacosDynamicServerListUpdater.this.initialDelay, NacosDynamicServerListUpdater.this.refreshInterval); this.nacosWatcher = new NacosDynamicServerListWatcher(NacosDynamicServerListUpdater.this.namingService, NacosDynamicServerListUpdater.this.properties, this); } public void start() { this.pollingServerListUpdater.start(this.updateAction); this.nacosWatcher.startWatch(); } private BaseLoadBalancer getLoadBalancer(UpdateAction updateAction) { try { Class<?> bc = updateAction.getClass(); Field field = bc.getDeclaredField("this$0"); field.setAccessible(true); return (BaseLoadBalancer) field.get(updateAction); } catch (Exception var4) { this.log.error("nacos dslu-getlb", var4); throw new IllegalStateException("Not supported LB used", var4); } } public String getServiceId() { return this.serviceId; } public UpdateAction getUpdateAction() { return this.updateAction; } public BaseLoadBalancer getLb() { return this.lb; } public PollingServerListUpdater getPollingServerListUpdater() { return this.pollingServerListUpdater; } public NacosDynamicServerListWatcher getNacosWatcher() { return nacosWatcher; } public String getIdentity() { if (this.identity == null) { this.identity = String.format("{serviceId: %s, lb: %s, updateAction: %s}", this.getServiceId(), this.getLb().hashCode(), this.getUpdateAction().hashCode()); } return this.identity; } public void stop() { try { this.nacosWatcher.stop(); } catch (Exception e) { this.log.error("nacos dslu-stop-watcher: " + this.getIdentity(), e); } this.pollingServerListUpdater.stop(); } public void doUpdate() { this.getUpdateAction().doUpdate(); this.log.info("nacos dslu-doUpdate: {}", this.getIdentity()); this.serviceLog(); } private void serviceLog() { List<Server> backwardList = this.getLb().getAllServers(); StringBuilder serviceLog = new StringBuilder(""); Iterator iterator = backwardList.iterator(); while (iterator.hasNext()) { Server service = (Server) iterator.next(); serviceLog.append(service.getHost()); serviceLog.append(":"); serviceLog.append(service.getPort()); serviceLog.append(","); } this.log.info("[nacos dslu-LbServerList] [{}].{}", serviceLog, this.getIdentity()); } }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
- 81
- 82
- 83
- 84
- 85
- 86
- 87
- 88
- 89
- 90
- 91
- 92
- 93
- 94
- 95
- 96
- 97
- 98
- 99
- 100
- 101
- 102
- 103
- 104
- 105
- 106
- 107
- 108
- 109
- 110
- 111
- 112
- 113
- 114
- 115
- 116
- 117
- 118
- 119
- 120
- 121
- 122
- 123
- 124
- 125
- 126
- 127
- 128
- 129
- 130
- 131
- 132
- 133
- 134
- 135
- 136
- 137
- 138
- 139
- 140
- 141
- 142
- 143
- 144
- 145
- 146
- 147
- 148
- 149
- 150
- 151
- 152
- 153
- 154
- 155
- 156
- 157
- 158
- 159
- 160
- 定义一个nacos的监听类
NacosDynamicServerListWatcher
,用于监听更新事件
public class NacosDynamicServerListWatcher { private static Logger log = LoggerFactory.getLogger(NacosDynamicServerListWatcher.class); private NamingService namingService; private final NacosDiscoveryProperties properties; private NacosDynamicServerListUpdater.LBUpdater lbUpdater; public NacosDynamicServerListWatcher(NamingService namingService, NacosDiscoveryProperties properties, NacosDynamicServerListUpdater.LBUpdater lbUpdater) { this.namingService = namingService; this.properties = properties; this.lbUpdater = lbUpdater; } public void startWatch() { log.info("nacos dslw-start: {}, serviceName: {}", this.lbUpdater.getIdentity(), lbUpdater.getServiceId()); try { namingService.subscribe(lbUpdater.getServiceId(), Arrays.asList(properties.getClusterName()), event -> { if (event instanceof NamingEvent) { NamingEvent namingEvent = (NamingEvent) event; System.out.println("服务名:" + namingEvent.getServiceName()); System.out.println("实例:" + namingEvent.getInstances()); lbUpdater.doUpdate(); } }); } catch (NacosException e) { e.printStackTrace(); } } public void stop() { }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 定义配置类,让springboot启动时自动加载
@Configuration@ConditionalOnRibbonNacospublic class NacosDynamicServerListConfiguration { @Bean public NacosDynamicServerListUpdater dynamicServerListUpdater(NacosDiscoveryProperties properties) throws NacosException { return new NacosDynamicServerListUpdater(NacosFactory.createNamingService(properties.getServerAddr()), properties); }}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 修改
spring.factories
测试
- 启动
service-feign
、service-hi
服务 - 关闭
service-hi
服务可以看到如下日志: