您现在的位置是:首页 >技术交流 >流控 随笔 0-算法网站首页技术交流

流控 随笔 0-算法

肯尼思布赖恩埃德蒙 2024-06-04 00:00:03
简介流控 随笔 0-算法

0. 想要自然而然更多一些

流控算法
guava单机流控+流控原理

集群流控的一些思路+spring cloud gateway+redis+lua实战
sentinel 集群流控 实战


sentinel 官方教程
sentinel wiki

javadoop大佬(版本比较旧)、与sentinel-dashboard交互

Spring Cloud Hystrix原理与注意事项


sentienl 流控模式(策略)

1. 流控算法

可以直接应用于 单机流控 的场景

1.1 令牌桶算法

1个简单的令牌(token)桶的设计需要考虑的因子:

  • 添加 token的速度
  • 消耗 token的速度

令牌桶可以认为是漏桶算法,应对突发流量 的升级版,并且实现起来没有漏桶那么的抽象

1.1.1 guava.RateLimiter

guava的设计中,为了使流控效果变得更加"有意思",额外设计了的因子:

  • 预留 多少个单位时间 的token (这些token将帮助我们应对突发的流量时)
  • 闲置 多个个token时, 可以认为系统处于 活跃、冷却的临界状态 (冷启动时的 预热)
package com.google.common.util.concurrent;

abstract class SmoothRateLimiter extends RateLimiter {

	/* --------------- 区别控制 活跃、冷却 状态下的请求流量 --------------- */
  /**
   * This implements the following function where coldInterval = coldFactor * stableInterval.
   *
   * <pre>
   *          ^ throttling
   *          |
   *    cold  +                  /
   * interval |                 /.
   *          |                / .
   *          |               /  .   ← "warmup period" is the area of the trapezoid between
   *          |              /   .     thresholdPermits and maxPermits
   *          |             /    .
   *          |            /     .
   *          |           /      .
   *   stable +----------/  WARM .
   * interval |          .   UP  .
   *          |          . PERIOD.
   *          |          .       .
   *        0 +----------+-------+--------------→ storedPermits
   *          0 thresholdPermits maxPermits
   * </pre>
   *
   */
  static final class SmoothWarmingUp extends SmoothRateLimiter {
    private final long warmupPeriodMicros;
    /**
     * The slope of the line from the stable interval (when permits == 0), to the cold interval
     * (when permits == maxPermits)
     */
    private double slope;

    private double thresholdPermits;
    private double coldFactor;

	// 通过 RateLimiter.create 静态创建的 实例,coldFactor 硬编码赋值:3
	// 传入的timeUnit 也被转换成 ms,作为 warmupPeriodMicros 预热时长(剩余许可storedPermits 从 maxPermits -> thresholdePermits)
    SmoothWarmingUp(
        SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
      super(stopwatch);
      this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
      this.coldFactor = coldFactor;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = maxPermits;
      double coldIntervalMicros = stableIntervalMicros * coldFactor;
      thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
      maxPermits =
          thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
      // 计算斜率
      slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = 0.0;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? maxPermits // initial state is cold
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
	  // availablePermitsAboveThreshold > 0,表示存在梯形
	  // 表示完整的预热时间:矩形+直角梯形(面积即时间)
      double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
      long micros = 0;
      // measuring the integral on the right part of the function (the climbing line)
      if (availablePermitsAboveThreshold > 0.0) {
		// permitsAboveThresholdToTake 即 梯形的直角边长
		// 表示 实际预热的时间(不完整的)
        double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
		// 上底边 + 下底边
		// permitsToTime(x) 相当于 y=kx 
        // TODO(cpovirk): Figure out a good name for this variable.
        double length =
            permitsToTime(availablePermitsAboveThreshold)
                + permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
		// 计算梯形面积 = (上底边 + 下底边)*高/2
        micros = (long) (permitsAboveThresholdToTake * length / 2.0);
        permitsToTake -= permitsAboveThresholdToTake;
      }
	  // 加上矩形的面积
      // measuring the integral on the left part of the function (the horizontal line)
      micros += (long) (stableIntervalMicros * permitsToTake);
      return micros;
    }

    private double permitsToTime(double permits) {
	  // slope 表示 斜率
      return stableIntervalMicros + permits * slope;
    }

	// warmupPeriodMicros:每微秒产出多少许可
	// maxPermits:最大许可数
	// coolDownIntervalMicros:产出1个许可,需要多少微秒(只不过是在预热的过程中的速率)
	// 简化成小学题 s=vt -> 1/t = v/s
    @Override
    double coolDownIntervalMicros() {
      return warmupPeriodMicros / maxPermits;
    }
  }

/* ---------------- 预留一定时间内产出的token -------------- */
  /**
   * This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
   * The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
   * terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
   * seconds, we can save up to 2 * 10 = 20 permits.
   */
  static final class SmoothBursty extends SmoothRateLimiter {
    /** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
    final double maxBurstSeconds;

    SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
      super(stopwatch);
      this.maxBurstSeconds = maxBurstSeconds;
    }

    @Override
    void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
      double oldMaxPermits = this.maxPermits;
      maxPermits = maxBurstSeconds * permitsPerSecond;
      if (oldMaxPermits == Double.POSITIVE_INFINITY) {
        // if we don't special-case this, we would get storedPermits == NaN, below
        storedPermits = maxPermits;
      } else {
        storedPermits =
            (oldMaxPermits == 0.0)
                ? 0.0 // initial state
                : storedPermits * maxPermits / oldMaxPermits;
      }
    }

    @Override
    long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
      return 0L;
    }

    @Override
    double coolDownIntervalMicros() {
      return stableIntervalMicros;
    }
  }

  /** The currently stored permits. */
  double storedPermits;

  /** The maximum number of stored permits. */
  double maxPermits;

  /**
   * The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
   * per second has a stable interval of 200ms.
   */
  double stableIntervalMicros;

  /**
   * The time when the next request (no matter its size) will be granted. After granting a request,
   * this is pushed further in the future. Large requests push this further than small requests.
   */
  private long nextFreeTicketMicros = 0L; // could be either in the past or future

  private SmoothRateLimiter(SleepingStopwatch stopwatch) {
    super(stopwatch);
  }

  @Override
  final void doSetRate(double permitsPerSecond, long nowMicros) {
    resync(nowMicros);
    double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
    this.stableIntervalMicros = stableIntervalMicros;
    doSetRate(permitsPerSecond, stableIntervalMicros);
  }

  abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);

  @Override
  final double doGetRate() {
    return SECONDS.toMicros(1L) / stableIntervalMicros;
  }

  @Override
  final long queryEarliestAvailable(long nowMicros) {
    return nextFreeTicketMicros;
  }

  @Override
  final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
	// step into ...
	// 在这里更新了 nextFreeTicketMicros,同时更新 storedPermits
	// 如果之前没有同步的话,正常来说,会在本方法最后同步这俩属性
    resync(nowMicros);
	// 所以该方法返回 nextFreeTicketMicros预留时间
    long returnValue = nextFreeTicketMicros;
	// 准备借出的许可部分
    double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
	// 当前许可超过请求的部分
    double freshPermits = requiredPermits - storedPermitsToSpend;
	// 等待时间,超出这部分许可需要消耗的时间
	// 对于 SmoothWarmingUp 而言,就预热还需要的时长
    long waitMicros =
        storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
            + (long) (freshPermits * stableIntervalMicros);

	// 大概就是 更新预留时间
    this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
	// 剩余许可减去借走的许可
    this.storedPermits -= storedPermitsToSpend;
    return returnValue;
  }

  /**
   * Translates a specified portion of our currently stored permits which we want to spend/acquire,
   * into a throttling time. Conceptually, this evaluates the integral of the underlying function we
   * use, for the range of [(storedPermits - permitsToTake), storedPermits].
   *
   * <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
   */
  abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);

  /**
   * Returns the number of microseconds during cool down that we have to wait to get a new permit.
   */
  abstract double coolDownIntervalMicros();

  /** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
  void resync(long nowMicros) {
	// 如果 系统时间 > nextFreeTicketMicros预留时间
	// storedPermits:目前剩余的许可
	// 则更新剩余许可 storedPermits
	// 并且 nextFreeTicketMicros 更新为系统时间
    // if nextFreeTicket is in the past, resync to now
    if (nowMicros > nextFreeTicketMicros) {
      double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
      storedPermits = min(maxPermits, storedPermits + newPermits);
      nextFreeTicketMicros = nowMicros;
    }
  }
}

-----------

  // com.google.common.math.LongMath#saturatedAdd
  @Beta
  public static long saturatedAdd(long a, long b) {
    long naiveSum = a + b;
    if ((a ^ b) < 0 | (a ^ naiveSum) >= 0) {
	  // 如果 符号相同,返回 两数和
      // If a and b have different signs or a has the same sign as the result then there was no
      // overflow, return.
      return naiveSum;
    }
    // we did over/under flow, if the sign is negative we should return MAX otherwise MIN
    return Long.MAX_VALUE + ((naiveSum >>> (Long.SIZE - 1)) ^ 1);
  }

1.1.2 sentinel.TrafficShapingController

也有类似guava的实现

1.1.2.1 应对突发流量

package com.alibaba.csp.sentinel.slots.block.flow.controller;

// 原理类似 guava.RateLimit.SmoothBursty
public class RateLimiterController implements TrafficShapingController {

    private final int maxQueueingTimeMs;
    private final double count;

    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public RateLimiterController(int timeOut, double count) {
        this.maxQueueingTimeMs = timeOut;
        this.count = count;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        // Pass when acquire count is less or equal than 0.
        if (acquireCount <= 0) {
            return true;
        }
        // Reject when count is less or equal than 0.
        // Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
        if (count <= 0) {
            return false;
        }

        long currentTime = TimeUtil.currentTimeMillis();
        // Calculate the interval between every two requests.
        long costTime = Math.round(1.0 * (acquireCount) / count * 1000);

		// 推进预设时间
        // Expected pass time of this request.
        long expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            // Contention may exist here, but it's okay.
            latestPassedTime.set(currentTime);
            return true;
        } else {
			// 当前时间 小于 预设时间,则阻塞,积累令牌(排队)
            // Calculate the time to wait.
            long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
            if (waitTime > maxQueueingTimeMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > maxQueueingTimeMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    // in race condition waitTime may <= 0
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }

}

1.1.2.2 预热

package com.alibaba.csp.sentinel.slots.block.flow.controller;

public class WarmUpController implements TrafficShapingController {

    protected double count;
    private int coldFactor;
    protected int warningToken = 0;
    private int maxToken;
    protected double slope;

	// 可以猜想到与 guava.RateLimit 的实现方式的一个不同:没有使用 synchronized关键字,而是使用原子类型
    protected AtomicLong storedTokens = new AtomicLong(0);
    protected AtomicLong lastFilledTime = new AtomicLong(0);

    public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
        construct(count, warmUpPeriodInSec, coldFactor);
    }

    public WarmUpController(double count, int warmUpPeriodInSec) {
        construct(count, warmUpPeriodInSec, 3);
    }

    private void construct(double count, int warmUpPeriodInSec, int coldFactor) {

        if (coldFactor <= 1) {
            throw new IllegalArgumentException("Cold factor should be larger than 1");
        }

        this.count = count;

        this.coldFactor = coldFactor;

		// 相当于 guava.RateLimite 中的 thresholdPermits,即预热的转折点
        // thresholdPermits = 0.5 * warmupPeriod / stableInterval.
        // warningToken = 100;
        warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
		
		// 相当于 guava.RateLimit 中的 maxPermits
        // / maxPermits = thresholdPermits + 2 * warmupPeriod /
        // (stableInterval + coldInterval)
        // maxToken = 200
        maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));

		// 计算初始的斜率 slope
        // slope
        // slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
		// (coldFactor - 1.0) / count 即 实时的 QPS,即 Y轴
		// QPS = count / coldFactor = 1/Y, 已知Y:产出1个令牌所消耗的时间,因此 coldFactor 倍份的时间,即可产出所有的count个令牌
        slope = (coldFactor - 1.0) / count / (maxToken - warningToken);

    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
		// 滑动窗口中的当前窗口
        long passQps = (long) node.passQps();

		// 之前的、最新的1个窗口
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        // 开始计算它的斜率
        // 如果进入了警戒线,开始调整他的qps
        long restToken = storedTokens.get();
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;
            // 消耗的速度要比warning快,但是要比慢
            // current interval = restToken*slope+1/count
            double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            if (passQps + acquireCount <= warningQps) {
                return true;
            }
        } else {
            if (passQps + acquireCount <= count) {
                return true;
            }
        }

        return false;
    }

    protected void syncToken(long passQps) {
        long currentTime = TimeUtil.currentTimeMillis();
        currentTime = currentTime - currentTime % 1000;
        long oldLastFillTime = lastFilledTime.get();
        if (currentTime <= oldLastFillTime) {
            return;
        }

        long oldValue = storedTokens.get();
		
		// 计算出新的剩余令牌数 storedTokens
		// 周期性的冷却,即新增剩余令牌数
        long newValue = coolDownTokens(currentTime, passQps);

		// 根据 QPS 消耗 剩余的令牌数
        if (storedTokens.compareAndSet(oldValue, newValue)) {
		
			// 减去上1QPS的令牌数
            long currentValue = storedTokens.addAndGet(0 - passQps);
            if (currentValue < 0) {
                storedTokens.set(0L);
            }
            lastFilledTime.set(currentTime);
        }

    }

    private long coolDownTokens(long currentTime, long passQps) {
        long oldValue = storedTokens.get();
        long newValue = oldValue;

        // 添加令牌的判断前提条件:
        // 当令牌的消耗程度远远低于警戒线的时候
        if (oldValue < warningToken) {
            newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
			
		// 此时存在梯形部分
        } else if (oldValue > warningToken) {
			// count > coldFactor * passQps,消耗速度Qps 小于 冷却速度,也需要添加令牌
			// count / coldFactor 即警戒线的临界速度,同比 1 / y
            if (passQps < (int)count / coldFactor) {
                newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
            }
        }
		
		// 毕竟不能超过极值
        return Math.min(newValue, maxToken);
    }

}

1.1.2.3 应对突发流量 + 预热

package com.alibaba.csp.sentinel.slots.block.flow.controller;

// 缝合 RateLimitController + WarmUpController
public class WarmUpRateLimiterController extends WarmUpController {

    private final int timeoutInMs;
    private final AtomicLong latestPassedTime = new AtomicLong(-1);

    public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
        super(count, warmUpPeriodSec, coldFactor);
        this.timeoutInMs = timeOutMs;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        long previousQps = (long) node.previousPassQps();
        syncToken(previousQps);

        long currentTime = TimeUtil.currentTimeMillis();

        long restToken = storedTokens.get();
        long costTime = 0;
        long expectedTime = 0;
		
		// costTime的计算逻辑中,加入了预热的东西
		// 剩余的令牌数 大于 预警令牌数(转折点的x轴)
        if (restToken >= warningToken) {
            long aboveToken = restToken - warningToken;

			// 梯形面积
            // current interval = restToken*slope+1/count
            double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
            costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
        } else {
			// 矩形面积
            costTime = Math.round(1.0 * (acquireCount) / count * 1000);
        }
        expectedTime = costTime + latestPassedTime.get();

        if (expectedTime <= currentTime) {
            latestPassedTime.set(currentTime);
            return true;
        } else {
            long waitTime = costTime + latestPassedTime.get() - currentTime;
            if (waitTime > timeoutInMs) {
                return false;
            } else {
                long oldTime = latestPassedTime.addAndGet(costTime);
                try {
                    waitTime = oldTime - TimeUtil.currentTimeMillis();
                    if (waitTime > timeoutInMs) {
                        latestPassedTime.addAndGet(-costTime);
                        return false;
                    }
                    if (waitTime > 0) {
                        Thread.sleep(waitTime);
                    }
                    return true;
                } catch (InterruptedException e) {
                }
            }
        }
        return false;
    }
}

1.1.2.4 直接拒绝溢出的流量

/**
 * Default throttling controller (immediately reject strategy).
 */
public class DefaultController implements TrafficShapingController {

    private static final int DEFAULT_AVG_USED_TOKENS = 0;

    private double count;
    private int grade;

    public DefaultController(double count, int grade) {
        this.count = count;
        this.grade = grade;
    }

    @Override
    public boolean canPass(Node node, int acquireCount) {
        return canPass(node, acquireCount, false);
    }

    @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) {
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
                if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs);

                    // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

    private int avgUsedTokens(Node node) {
        if (node == null) {
            return DEFAULT_AVG_USED_TOKENS;
        }
        return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
    }

    private void sleep(long timeMillis) {
        try {
            Thread.sleep(timeMillis);
        } catch (InterruptedException e) {
            // Ignore.
        }
    }
}

1.2 滑动窗

为了对 给定时间段 的流量做特殊的 审计、管控,显然我们需要1个时间窗。

但是固定的时间窗,在编码实现的过程中,切换、重置当前时间窗(同步加锁)所带来的性能消耗、流量堆积 势必会导致局部的性能曲线不那么光滑

为此,我们在1个大的时间周期内可以预置多个桶(比如1min对应60个1s的时间窗),并根据实时的系统时间滑动起来,即 滑动窗口。

1.2.1 sentienl.LeapArray<MetricBucket>

  • BucketLeapArray :重置时间窗时,直接赋值0L
  • OccupiableBucketLeapArray :重置时间窗时,之前预占了未来的令牌,也会被加入(DefaultController中被调用)
package com.alibaba.csp.sentinel.slots.statistic.base;

public abstract class LeapArray<T> {

    protected int windowLengthInMs;		// 单个窗口的时间长度(ms表示)
    protected int sampleCount;			// 整个周期可以平分多少个窗口
    protected int intervalInMs;			// 滑动窗口的整个周期长度(ms表示)
    private double intervalInSecond; 	// 滑动窗口的整个周期长度(秒钟表示)

	// 一整个统计周期的时间窗数组
    protected final AtomicReferenceArray<WindowWrap<T>> array;

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();

    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }

    /**
     * Get the bucket at current timestamp.
     *
     * @return the bucket at current timestamp
     */
    public WindowWrap<T> currentWindow() {
		// step into ...
		// 使用当前系统时间作为参数
        return currentWindow(TimeUtil.currentTimeMillis());
    }
	
	/**
     * Get bucket item at provided timestamp.
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

		// 通过当前时间,计算出对应统计周期中的哪一个时间窗的索引
        int idx = calculateTimeIdx(timeMillis);
		// 当前时间对应的 时间窗 的起始时间
        // Calculate current bucket start time.
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            WindowWrap<T> old = array.get(idx);
			// 不存在时间窗实例,则初始化1个实例
            if (old == null) {
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } 
			// 存在且是最新的(时间窗的起始时间是一致的,表示还在统计1个时间段内),则直接返回
			else if (windowStart == old.windowStart()) {
                return old;
            } 
			// 时间窗过期(时间窗的起始时间是旧的),则重置时间窗
			else if (windowStart > old.windowStart()) {
				// 为避免时间窗实例,重复重置,加锁
                if (updateLock.tryLock()) {
                    try {
						// 重置逻辑由子类实现
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
					// 并发场景下,只能让1个重置生效
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } 
			// 一般不会走这里,代码容错而已
			else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
		// 当前时间 / 单个窗口的时间长度 % 时间窗数组的长度 = 时间窗数组中对应的索引
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        return (int)(timeId % array.length());
    }

    protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
		// 当前时间 - (当前时间 % 单个窗口的时间长度)= 当前时间对应的 时间窗 的起始时间
        return timeMillis - timeMillis % windowLengthInMs;
    }
}

----------

package com.alibaba.csp.sentinel.slots.statistic.data;

// 单个的时间窗实例,描述一段时间内的统计
/**
 * Represents metrics data in a period of time span.
 */
public class MetricBucket {

    private final LongAdder[] counters;

    private volatile long minRt;

    /**
     * Reset the adders.
     *
     * @return new metric bucket in initial state
     */
    public MetricBucket reset() {
        for (MetricEvent event : MetricEvent.values()) {
			// 重置 j.u.c.LongAdder
			// 逻辑也很简单: j.u.c.Cell.value = 0L
            counters[event.ordinal()].reset();
        }
        initMinRt();
        return this;
    }
	
	private void initMinRt() {
        this.minRt = SentinelConfig.statisticMaxRt();
    }
}

---------------

package com.alibaba.csp.sentinel.slots.statistic.metric;

/**
 * The fundamental data structure for metric statistics in a time span.
 */
public class BucketLeapArray extends LeapArray<MetricBucket> {

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);	// 直接赋值时间戳(WindowWrap时间窗包装类的)
        w.value().reset();		// 更新 MetricBucket.LongAdder(juc).Cell.value = 0L
        return w;
    }
}

-------------

package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;

public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {

    private final FutureBucketLeapArray borrowArray;

    public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
        // This class is the original "CombinedBucketArray".
        super(sampleCount, intervalInMs);
        this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        MetricBucket newBucket = new MetricBucket();
		// borrow : 借出
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            newBucket.reset(borrowBucket);
        }

        return newBucket;
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
        // Update the start time and reset value.
        w.resetTo(time);
        MetricBucket borrowBucket = borrowArray.getWindowValue(time);
        if (borrowBucket != null) {
            w.value().reset();
            w.value().addPass((int)borrowBucket.pass());
        } else {
            w.value().reset();
        }

        return w;
    }

    @Override
    public long currentWaiting() {
        borrowArray.currentWindow();
        long currentWaiting = 0;
        List<MetricBucket> list = borrowArray.values();

        for (MetricBucket window : list) {
            currentWaiting += window.pass();
        }
        return currentWaiting;
    }

    @Override
    public void addWaiting(long time, int acquireCount) {
        WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
        window.value().add(MetricEvent.PASS, acquireCount);
    }
}

----------

package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;

/**
 * A kind of {@code BucketLeapArray} that only reserves for future buckets.
 */
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {

    public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
        // This class is the original "BorrowBucketArray".
        super(sampleCount, intervalInMs);
    }

    @Override
    public MetricBucket newEmptyBucket(long time) {
        return new MetricBucket();
    }

    @Override
    protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
        // Update the start time and reset value.
        w.resetTo(startTime);
        w.value().reset();
        return w;
    }

    @Override
    public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
        // Tricky: will only calculate for future.
        return time >= windowWrap.windowStart();
    }
}
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。