企业网站定制开发从源码角度搞懂 Ribbon 的负载策略

前言

RibbonNetflix企业网站定制开发公司的一个,企业网站定制开发现已被收录到 SpringCloud ,企业网站定制开发是一个基于 HTTPTCP 企业网站定制开发的客户端器,当我们将 RibbonEureka企业网站定制开发一起使用时,Ribbon会从Eureka企业网站定制开发注册中心去获取服务端列表,企业网站定制开发通过轮询方式达到负载企业网站定制开发均衡的作用,企业网站定制开发客户端负载均衡中用心企业网站定制开发跳机制去维护服务端清企业网站定制开发单的有效性,企业网站定制开发这个过程需要配合服务注册中心一起完成。


什么是负载均衡?

负载均衡是我们处理、缓解网络压力和进行服务端扩容的重要手段之一,但是一般情况下我们所说的负载均衡通常都是指服务端负载均衡,负载均衡又分为两种,还有一种是客户端负载均衡。


RibbonNginx 的区别?

Ribbon 是客户端负载均衡器,而 Nginx 是服务端负载均衡器。

客户端负载指的是 client 有要调用的服务实例清单,比如 /nacos 存储各服务实例信息,而对于其中集成的 Ribbon 来说,从已知的服务列表通过某种策略选取一个实例负载,这就是客户端负载均衡,即在客户端进行负载均衡算法分配。

服务端负载指的是 client 不知道调用哪个 server 实例,发送请求后,通过服务端的负载均衡算法,在多个服务端之间选择一个进行访问,即在服务端进行负载均衡算法分配。

客户端与服务端负载均衡的区别实际上是服务清单所存储的位置,在客户端负载均衡中,所有client有一份要访问的服务端清单地址。


负载策略

七种负载均衡策略类图如下:

负载均衡接口 com.netflix.loadbalancer.IRule,由抽象类 AbstractLoadBalancerRule实现 IRule 接口,各个策略都是抽象类的具体实现。


轮询策略 -RoundRobinRule

下面看看 RoundRobinRule 类的是如何实现的。

public class RoundRobinRule extends AbstractLoadBalancerRule {    private AtomicInteger nextServerCyclicCounter;    public Server choose(ILoadBalancer lb, Object key) {        if (lb == null) {            log.warn("no load balancer");            return null;        }        Server server = null;        // 用于计算负载均衡器尝试获取可用服务器的次数        int count = 0;        // 共尝试10次,超过则负载失败        while (server == null && count++ < 10) {            // 获取所有可达服务器            List<Server> reachableServers = lb.getReachableServers();            // 获取所有服务器            List<Server> allServers = lb.getAllServers();            int upCount = reachableServers.size();            int serverCount = allServers.size();            if ((upCount == 0) || (serverCount == 0)) {                log.warn("No up servers available from load balancer: " + lb);                return null;            }            // 自旋锁计算出下一个负载的服务器            int nextServerIndex = incrementAndGetModulo(serverCount);            // 取出下一个负载的服务器            server = allServers.get(nextServerIndex);            // 如没有此服务器,当前线程让出CPU,置为就绪状态            if (server == null) {                /* Transient. */                Thread.yield();                continue;            }            if (server.isAlive() && (server.isReadyToServe())) {                return (server);            }            // Next.            server = null;        }        // 获取负载服务器会尝试10次,超过10次警告        if (count >= 10) {            log.warn("No available alive servers after 10 tries from load balancer: "                    + lb);        }        return server;    }        // 取模运算并使用CAS机制更新下一个负载的服务器    private int incrementAndGetModulo(int modulo) {        for (;;) {            // 获取原子属性值            int current = nextServerCyclicCounter.get();            // 取模运算            int next = (current + 1) % modulo;            // CAS机制更新标识服务器循环计数器            if (nextServerCyclicCounter.compareAndSet(current, next))                return next;        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65

轮询通过模运算计算出负载机器的索引,根据索引从存放所有服务器的 list中取出作为负载服务器。其中,使用原子类 AtomicInteger + CAS 机制来记录下一个负载的服务器标识,保证了线程安全。


随机策略 - RandomRule

随机策略是指随机选择服务器实例进行负载,使用 ThreadLocalRandom 方式获取随机数,保证线程安全。

public class RandomRule extends AbstractLoadBalancerRule {    public Server choose(ILoadBalancer lb, Object key) {        if (lb == null) {            return null;        }        Server server = null;        while (server == null) {            if (Thread.interrupted()) {                return null;            }            // 获取可达服务器和所有服务器            List<Server> upList = lb.getReachableServers();            List<Server> allList = lb.getAllServers();            int serverCount = allList.size();            if (serverCount == 0) {                return null;            }            // 取随机数            int index = chooseRandomInt(serverCount);            server = upList.get(index);            if (server == null) {                Thread.yield();                continue;            }            if (server.isAlive()) {                return (server);            }            server = null;            Thread.yield();        }        return server;    }    // 取随机数    protected int chooseRandomInt(int serverCount) {        return ThreadLocalRandom.current().nextInt(serverCount);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45

重试策略 - RetryRule

先按照轮询负载策略获取服务实例,如果获取失败则在指定时间内(默认500ms)进行重试,循环调用轮询策略获取实例。

使用 InterruptTask 开启了一个 Timer 守护线程,用来延迟执行指定的任务,在重试时间范围内循环调用轮询策略获取服务器,如超过指定重试时间后仍未获取到服务器信息,则返回 null

public class RetryRule extends AbstractLoadBalancerRule {    public Server choose(ILoadBalancer lb, Object key) {       long requestTime = System.currentTimeMillis();       long deadline = requestTime + maxRetryMillis;       Server answer = null;       // 调用轮询策略       answer = subRule.choose(key);       // 如果轮询策略没获取到服务器 || 服务器未激活 && 在指定的最大重试时间内       if (((answer == null) || (!answer.isAlive()))             && (System.currentTimeMillis() < deadline)) {          // 开启守护线程,监视剩余指定的重试时间          InterruptTask task = new InterruptTask(deadline                - System.currentTimeMillis());          // 在指定的重试时间范围内,当前线程如没中断,循环调用轮询策略          while (!Thread.interrupted()) {             answer = subRule.choose(key);             if (((answer == null) || (!answer.isAlive()))                   && (System.currentTimeMillis() < deadline)) {                /* pause and retry hoping it's transient */                Thread.yield();             } else {                break;             }          }          task.cancel();       }       if ((answer == null) || (!answer.isAlive())) {          return null;       } else {          return answer;       }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42

加权响应时间 - WeightedResponseTimeRule

WeightedResponseTimeRule 类继承了轮询策略类 RandomRule

初始化时,启动一个定时器,每隔 30s 根据服务的响应时间分配一次权重,响应时间越长,权重越低,被选择到的概率也越低。响应时间越短,权重越高,实例被选中概率越高。得到权重后,生成随机权重,命中权重比随机权重大的第一个服务实例。

public class WeightedResponseTimeRule extends RoundRobinRule {        // 每隔 30s 统计各服务权重    public static final int DEFAULT_TIMER_INTERVAL = 30 * 1000;    // 记录累计权重    private volatile List<Double> accumulatedWeights = new ArrayList<Double>();        // 初始化    void initialize(ILoadBalancer lb) {                if (serverWeightTimer != null) {            serverWeightTimer.cancel();        }        serverWeightTimer = new Timer("NFLoadBalancer-serverWeightTimer-"                + name, true);        // 统计各服务权重        serverWeightTimer.schedule(new DynamicServerWeightTask(), 0,                serverWeightTaskTimerInterval);        // do a initial run        ServerWeight sw = new ServerWeight();        sw.maintainWeights();        Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {            public void run() {                logger                        .info("Stopping NFLoadBalancer-serverWeightTimer-"                                + name);                serverWeightTimer.cancel();            }        }));    }    @Override    public Server choose(ILoadBalancer lb, Object key) {        if (lb == null) {            return null;        }        Server server = null;        while (server == null) {            List<Double> currentWeights = accumulatedWeights;            // 判断线程是否中断            if (Thread.interrupted()) {                return null;            }            // 获取服务器列表            List<Server> allList = lb.getAllServers();            int serverCount = allList.size();            if (serverCount == 0) {                return null;            }            int serverIndex = 0;            // currentWeights.size() - 1 是所有权重的总和            double maxTotalWeight = currentWeights.size() == 0 ? 0 : currentWeights.get(currentWeights.size() - 1);             // 未命中任何服务器就调用轮询策略获取            if (maxTotalWeight < 0.001d || serverCount != currentWeights.size()) {                server =  super.choose(getLoadBalancer(), key);                if(server == null) {                    return server;                }            } else {                // 从 0 到 所有权重总和之间获取随机数作为随机权重                double randomWeight = random.nextDouble() * maxTotalWeight;                int n = 0;                // 命中权重比随机权重大的第一个服务实例                for (Double d : currentWeights) {                    if (d >= randomWeight) {                        serverIndex = n;                        break;                    } else {                        n++;                    }                }                server = allList.get(serverIndex);            }            if (server == null) {                /* Transient. */                Thread.yield();                continue;            }            if (server.isAlive()) {                return (server);            }            // Next.            server = null;        }        return server;    }}// 内部类class ServerWeight {    public void maintainWeights() {        ILoadBalancer lb = getLoadBalancer();        if (lb == null) {            return;        }                if (!serverWeightAssignmentInProgress.compareAndSet(false,  true))  {            return;         }                try {            logger.info("Weight adjusting job started");            AbstractLoadBalancer nlb = (AbstractLoadBalancer) lb;            LoadBalancerStats stats = nlb.getLoadBalancerStats();            if (stats == null) {                // no statistics, nothing to do                return;            }            double totalResponseTime = 0;            // 计算出所有服务实例累计的平均响应时间            for (Server server : nlb.getAllServers()) {                ServerStats ss = stats.getSingleServerStat(server);                totalResponseTime += ss.getResponseTimeAvg();            }            // 记录累计权重            Double weightSoFar = 0.0;                        // 存放所有服务的权重            List<Double> finalWeights = new ArrayList<Double>();            for (Server server : nlb.getAllServers()) {                ServerStats ss = stats.getSingleServerStat(server);                // 每个服务权重 = 所有服务的平均响应时间总和 - 当前服务的平均响应时间                // 所以服务的响应时间越大,权重越小,被选中的可能性越小                double weight = totalResponseTime - ss.getResponseTimeAvg();                weightSoFar += weight;                finalWeights.add(weightSoFar);               }            setWeights(finalWeights);        } catch (Exception e) {            logger.error("Error calculating server weights", e);        } finally {            serverWeightAssignmentInProgress.set(false);        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37
  • 38
  • 39
  • 40
  • 41
  • 42
  • 43
  • 44
  • 45
  • 46
  • 47
  • 48
  • 49
  • 50
  • 51
  • 52
  • 53
  • 54
  • 55
  • 56
  • 57
  • 58
  • 59
  • 60
  • 61
  • 62
  • 63
  • 64
  • 65
  • 66
  • 67
  • 68
  • 69
  • 70
  • 71
  • 72
  • 73
  • 74
  • 75
  • 76
  • 77
  • 78
  • 79
  • 80
  • 81
  • 82
  • 83
  • 84
  • 85
  • 86
  • 87
  • 88
  • 89
  • 90
  • 91
  • 92
  • 93
  • 94
  • 95
  • 96
  • 97
  • 98
  • 99
  • 100
  • 101
  • 102
  • 103
  • 104
  • 105
  • 106
  • 107
  • 108
  • 109
  • 110
  • 111
  • 112
  • 113
  • 114
  • 115
  • 116
  • 117
  • 118
  • 119
  • 120
  • 121
  • 122
  • 123
  • 124
  • 125
  • 126
  • 127
  • 128
  • 129
  • 130
  • 131
  • 132
  • 133
  • 134
  • 135
  • 136
  • 137
  • 138
  • 139
  • 140
  • 141
  • 142
  • 143
  • 144
  • 145

例如:现在有三个服务实例,平均响应时间分别为:

  • A:100ms
  • B:200ms
  • C:300ms

则权重分别是:

  • A:600-100 = 500
  • B:500+600-200 = 900
  • C:900+600-300 = 1200

生成的随机数若在 0-500 之间,则命中服务 A,如在 500 - 900 之间,则命中服务 B,如在 900 - 1200,则命中服务 C,如果没有命中任何服务实例,则取轮询策略的结果。


最佳可用策略 - BestAvailableRule

如未指定负载均衡器,采用轮询策略选取一个服务实例;

如指定了负载均衡器,逐个考察服务实例,过滤掉断路器跳闸状态的实例,从未过滤掉的实例中选择一个并发量最小的实例。如果未命中,则轮询策略选取一个服务实例。

public class BestAvailableRule extends ClientConfigEnabledRoundRobinRule {    @Override    public Server choose(Object key) {        // 未指定负载均衡器,调用轮询策略        if (loadBalancerStats == null) {            return super.choose(key);        }        // 获取所有服务器列表        List<Server> serverList = getLoadBalancer().getAllServers();        // 最小并发连接数        int minimalConcurrentConnections = Integer.MAX_VALUE;        long currentTime = System.currentTimeMillis();        Server chosen = null;        // 遍历服务器列表        for (Server server: serverList) {            // 获取服务器统计信息            ServerStats serverStats = loadBalancerStats.getSingleServerStat(server);            // 如果服务器断路器没有发生断路器跳闸,过滤掉断路器跳闸的实例            if (!serverStats.isCircuitBreakerTripped(currentTime)) {                int concurrentConnections = serverStats.getActiveRequestsCount(currentTime);                // 选择并发量最小的实例                if (concurrentConnections < minimalConcurrentConnections) {                    minimalConcurrentConnections = concurrentConnections;                    chosen = server;                }            }        }        // 如未命中,轮询选取一个实例        if (chosen == null) {            return super.choose(key);        } else {            return chosen;        }    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36

可用性过滤策略 - AvailabilityFilteringRule

该策略继承自抽象策略 PredicateBasedRule 类。

通过轮询的方式选取一个服务,如果不匹配过滤条件,则继续轮询10次,如果10次还未命中,就轮询选取一个实例。

过滤条件:断路器故障或者并发请求超过了设置的并发阈值

public class AvailabilityFilteringRule extends PredicateBasedRule {      @Override    public Server choose(Object key) {        int count = 0;        // 轮询策略选一个实例        Server server = roundRobinRule.choose(key);        while (count++ <= 10) {            // 判断是否符合断言条件            if (predicate.apply(new PredicateKey(server))) {                return server;            }            // 不满足断言条件再轮询选择一个实例            server = roundRobinRule.choose(key);        }        // 超过10次还不满足,使用 父类 `PredicateBasedRule`策略        return super.choose(key);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20

看看父类 PredicateBasedRule 的负载策略

public abstract class PredicateBasedRule extends ClientConfigEnabledRoundRobinRule {    @Override    public Server choose(Object key) {        ILoadBalancer lb = getLoadBalancer();        // 根据条件过滤后,采用轮询策略选取实例        Optional<Server> server = getPredicate().chooseRoundRobinAfterFiltering(lb.getAllServers(), key);        if (server.isPresent()) {            return server.get();        } else {            return null;        }           }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

来看看上述中的断言条件是什么,进入到AvailabilityPredicate类查看断言条件

public class AvailabilityPredicate extends  AbstractServerPredicate {    @Override    public boolean apply(@Nullable PredicateKey input) {        LoadBalancerStats stats = getLBStats();        if (stats == null) {            return true;        }        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));    }    private boolean shouldSkipServer(ServerStats stats) {        // 以下两个条件满足其一就过滤实例        // 1、断路器开启并且故障        // 2、实例的并发请求>=阈值        if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())                 || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {            return true;        }        return false;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

区域回避策略 - ZoneAvoidanceRule

继承自 PredicateBasedRule

public class ZoneAvoidanceRule extends PredicateBasedRule {    private static final Random random = new Random();    private CompositePredicate compositePredicate;        public ZoneAvoidanceRule() {        super();        // 两个过滤条件        ZoneAvoidancePredicate zonePredicate = new ZoneAvoidancePredicate(this);        AvailabilityPredicate availabilityPredicate = new AvailabilityPredicate(this);        compositePredicate = createCompositePredicate(zonePredicate, availabilityPredicate);    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14

两个断言条件

public class ZoneAvoidancePredicate extends  AbstractServerPredicate {    @Override    public boolean apply(@Nullable PredicateKey input) {        if (!ENABLED.get()) {            return true;        }        String serverZone = input.getServer().getZone();        if (serverZone == null) {            // there is no zone information from the server, we do not want to filter            // out this server            return true;        }        LoadBalancerStats lbStats = getLBStats();        if (lbStats == null) {            // no stats available, do not filter            return true;        }        if (lbStats.getAvailableZones().size() <= 1) {            // only one zone is available, do not filter            return true;        }        Map<String, ZoneSnapshot> zoneSnapshot = ZoneAvoidanceRule.createSnapshot(lbStats);        if (!zoneSnapshot.keySet().contains(serverZone)) {            // The server zone is unknown to the load balancer, do not filter it out             return true;        }        logger.debug("Zone snapshots: {}", zoneSnapshot);        // 获取可用区域        Set<String> availableZones = ZoneAvoidanceRule.getAvailableZones(zoneSnapshot, triggeringLoad.get(), triggeringBlackoutPercentage.get());        logger.debug("Available zones: {}", availableZones);        if (availableZones != null) {            return availableZones.contains(input.getServer().getZone());        } else {            return false;        }    }   }
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22
  • 23
  • 24
  • 25
  • 26
  • 27
  • 28
  • 29
  • 30
  • 31
  • 32
  • 33
  • 34
  • 35
  • 36
  • 37

此过滤条件就是 AvailabilityFilteringRule策略的过滤条件。

public class AvailabilityPredicate extends  AbstractServerPredicate {    @Override    public boolean apply(@Nullable PredicateKey input) {        LoadBalancerStats stats = getLBStats();        if (stats == null) {            return true;        }        return !shouldSkipServer(stats.getSingleServerStat(input.getServer()));    }    // 以下两个条件满足其一就过滤实例     // 1、断路器开启并且故障     // 2、实例的并发请求>=阈值    private boolean shouldSkipServer(ServerStats stats) {                if ((CIRCUIT_BREAKER_FILTERING.get() && stats.isCircuitBreakerTripped())                 || stats.getActiveRequestsCount() >= activeConnectionsLimit.get()) {            return true;        }        return false;    }}
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17
  • 18
  • 19
  • 20
  • 21
  • 22

小结

本文主要从源码角度分析了ribbon的七个负载均衡策略,如对 MySQLSpring 等感兴趣请继续关注。

网站建设定制开发 软件系统开发定制 定制软件开发 软件开发定制 定制app开发 app开发定制 app开发定制公司 电商商城定制开发 定制小程序开发 定制开发小程序 客户管理系统开发定制 定制网站 定制开发 crm开发定制 开发公司 小程序开发定制 定制软件 收款定制开发 企业网站定制开发 定制化开发 android系统定制开发 定制小程序开发费用 定制设计 专注app软件定制开发 软件开发定制定制 知名网站建设定制 软件定制开发供应商 应用系统定制开发 软件系统定制开发 企业管理系统定制开发 系统定制开发