@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ // Number of invokers int length = invokers.size(); // Every invoker has the same weight? boolean sameWeight = true; // the weight of every invokers int[] weights = newint[length]; // the first invoker's weight int firstWeight = getWeight(invokers.get(0), invocation); weights[0] = firstWeight; // The sum of weights int totalWeight = firstWeight; for (int i = 1; i < length; i++) { int weight = getWeight(invokers.get(i), invocation); // save for later use weights[i] = weight; // Sum totalWeight += weight; if (sameWeight && weight != firstWeight) { sameWeight = false; } } if (totalWeight > 0 && !sameWeight) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on totalWeight. int offset = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < length; i++) { offset -= weights[i]; if (offset < 0) { return invokers.get(i); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(ThreadLocalRandom.current().nextInt(length)); } }
@Override protected <T> Invoker<T> doSelect(List<Invoker<T>> invokers, URL url, Invocation invocation){ // Number of invokers int length = invokers.size(); // The least active value of all invokers int leastActive = -1; // The number of invokers having the same least active value (leastActive) int leastCount = 0; // The index of invokers having the same least active value (leastActive) int[] leastIndexes = newint[length]; // the weight of every invokers int[] weights = newint[length]; // The sum of the warmup weights of all the least active invokes int totalWeight = 0; // The weight of the first least active invoke int firstWeight = 0; // Every least active invoker has the same weight value? boolean sameWeight = true;
// Filter out all the least active invokers for (int i = 0; i < length; i++) { Invoker<T> invoker = invokers.get(i); // Get the active number of the invoke int active = RpcStatus.getStatus(invoker.getUrl(), invocation.getMethodName()).getActive(); // Get the weight of the invoke configuration. The default value is 100. int afterWarmup = getWeight(invoker, invocation); // save for later use weights[i] = afterWarmup; // If it is the first invoker or the active number of the invoker is less than the current least active number if (leastActive == -1 || active < leastActive) { // Reset the active number of the current invoker to the least active number leastActive = active; // Reset the number of least active invokers leastCount = 1; // Put the first least active invoker first in leastIndexes leastIndexes[0] = i; // Reset totalWeight totalWeight = afterWarmup; // Record the weight the first least active invoker firstWeight = afterWarmup; // Each invoke has the same weight (only one invoker here) sameWeight = true; // If current invoker's active value equals with leaseActive, then accumulating. } elseif (active == leastActive) { // Record the index of the least active invoker in leastIndexes order leastIndexes[leastCount++] = i; // Accumulate the total weight of the least active invoker totalWeight += afterWarmup; // If every invoker has the same weight? if (sameWeight && i > 0 && afterWarmup != firstWeight) { sameWeight = false; } } } // Choose an invoker from all the least active invokers if (leastCount == 1) { // If we got exactly one invoker having the least active value, return this invoker directly. return invokers.get(leastIndexes[0]); } if (!sameWeight && totalWeight > 0) { // If (not every invoker has the same weight & at least one invoker's weight>0), select randomly based on // totalWeight. int offsetWeight = ThreadLocalRandom.current().nextInt(totalWeight); // Return a invoker based on the random value. for (int i = 0; i < leastCount; i++) { int leastIndex = leastIndexes[i]; offsetWeight -= weights[leastIndex]; if (offsetWeight < 0) { return invokers.get(leastIndex); } } } // If all invokers have the same weight value or totalWeight=0, return evenly. return invokers.get(leastIndexes[ThreadLocalRandom.current().nextInt(leastCount)]); } }
private String toKey(Object[] args){ StringBuilder buf = new StringBuilder(); for (int i : argumentIndex) { if (i >= 0 && i < args.length) { buf.append(args[i]); } } return buf.toString(); }
本节,我们来看一下 Dubbo 中加权轮询负载均衡的实现 RoundRobinLoadBalance。在详细分析源码前,我们先来了解一下什么是加权轮询。这里从最简单的轮询开始讲起,所谓轮询是指将请求轮流分配给每台服务器。举个例子,我们有三台服务器 A、B、C。我们将第一个请求分配给服务器 A,第二个请求分配给服务器 B,第三个请求分配给服务器 C,第四个请求再次分配给服务器 A。这个过程就叫做轮询。轮询是一种无状态负载均衡算法,实现简单,适用于每台服务器性能相近的场景下。但现实情况下,我们并不能保证每台服务器性能均相近。如果我们将等量的请求分配给性能较差的服务器,这显然是不合理的。因此,这个时候我们需要对轮询过程进行加权,以调控每台服务器的负载。经过加权后,每台服务器能够得到的请求数比例,接近或等于他们的权重比。比如服务器 A、B、C 权重比为 5:2:1。那么在8次请求中,服务器 A 将收到其中的5次请求,服务器 B 会收到其中的2次请求,服务器 C 则收到其中的1次请求。
Dubbo的RoundRobin参考了Nginx 的平滑加权轮询负载均衡。平滑加权轮询解决了什么问题呢?比如,服务器 A, B, C 对应权重 5, 1, 1。进行7次负载均衡后,选择出来的序列为A, A, A, A, A, B, C。前5个请求全部都落在了服务器 A上,这将会使服务器 A 短时间内接收大量的请求,压力陡增。而 B 和 C 此时无请求,处于空闲状态。而我们期望的结果是这样的 A, A, B, A, C, A, A,不同服务器可以穿插获取请求。
上面描述不是很好理解,下面还是举例进行说明。这里仍然使用服务器A, B, C对应权重5, 1, 1的例子说明,现在有7个请求依次进入负载均衡逻辑,选择过程如下: 如上,经过平滑性处理后,得到的服务器序列为A, A, B, A, C, A, A,相比之前的序列 A, A, A, A, A, B, C,分布性要好一些。初始情况下 currentWeight = 0, 0, 0,第7个请求处理完后,currentWeight 再次变为 0, 0, 0。