您现在的位置是:首页 >技术交流 >流控 随笔 0-算法网站首页技术交流
流控 随笔 0-算法
简介流控 随笔 0-算法
0. 想要自然而然更多一些
集群流控的一些思路+spring cloud gateway+redis+lua实战
sentinel 集群流控 实战
javadoop大佬(版本比较旧)、与sentinel-dashboard交互
1. 流控算法
可以直接应用于 单机流控 的场景
1.1 令牌桶算法
1个简单的令牌(token)桶的设计需要考虑的因子:
- 添加 token的速度
- 消耗 token的速度
令牌桶可以认为是漏桶算法,应对突发流量 的升级版,并且实现起来没有漏桶那么的抽象
1.1.1 guava.RateLimiter
guava的设计中,为了使流控效果变得更加"有意思",额外设计了的因子:
- 预留 多少个单位时间 的token (这些token将帮助我们应对突发的流量时)
- 闲置 多个个token时, 可以认为系统处于 活跃、冷却的临界状态 (冷启动时的 预热)
package com.google.common.util.concurrent;
abstract class SmoothRateLimiter extends RateLimiter {
/* --------------- 区别控制 活跃、冷却 状态下的请求流量 --------------- */
/**
* This implements the following function where coldInterval = coldFactor * stableInterval.
*
* <pre>
* ^ throttling
* |
* cold + /
* interval | /.
* | / .
* | / . ← "warmup period" is the area of the trapezoid between
* | / . thresholdPermits and maxPermits
* | / .
* | / .
* | / .
* stable +----------/ WARM .
* interval | . UP .
* | . PERIOD.
* | . .
* 0 +----------+-------+--------------→ storedPermits
* 0 thresholdPermits maxPermits
* </pre>
*
*/
static final class SmoothWarmingUp extends SmoothRateLimiter {
private final long warmupPeriodMicros;
/**
* The slope of the line from the stable interval (when permits == 0), to the cold interval
* (when permits == maxPermits)
*/
private double slope;
private double thresholdPermits;
private double coldFactor;
// 通过 RateLimiter.create 静态创建的 实例,coldFactor 硬编码赋值:3
// 传入的timeUnit 也被转换成 ms,作为 warmupPeriodMicros 预热时长(剩余许可storedPermits 从 maxPermits -> thresholdePermits)
SmoothWarmingUp(
SleepingStopwatch stopwatch, long warmupPeriod, TimeUnit timeUnit, double coldFactor) {
super(stopwatch);
this.warmupPeriodMicros = timeUnit.toMicros(warmupPeriod);
this.coldFactor = coldFactor;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = maxPermits;
double coldIntervalMicros = stableIntervalMicros * coldFactor;
thresholdPermits = 0.5 * warmupPeriodMicros / stableIntervalMicros;
maxPermits =
thresholdPermits + 2.0 * warmupPeriodMicros / (stableIntervalMicros + coldIntervalMicros);
// 计算斜率
slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = 0.0;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? maxPermits // initial state is cold
: storedPermits * maxPermits / oldMaxPermits;
}
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
// availablePermitsAboveThreshold > 0,表示存在梯形
// 表示完整的预热时间:矩形+直角梯形(面积即时间)
double availablePermitsAboveThreshold = storedPermits - thresholdPermits;
long micros = 0;
// measuring the integral on the right part of the function (the climbing line)
if (availablePermitsAboveThreshold > 0.0) {
// permitsAboveThresholdToTake 即 梯形的直角边长
// 表示 实际预热的时间(不完整的)
double permitsAboveThresholdToTake = min(availablePermitsAboveThreshold, permitsToTake);
// 上底边 + 下底边
// permitsToTime(x) 相当于 y=kx
// TODO(cpovirk): Figure out a good name for this variable.
double length =
permitsToTime(availablePermitsAboveThreshold)
+ permitsToTime(availablePermitsAboveThreshold - permitsAboveThresholdToTake);
// 计算梯形面积 = (上底边 + 下底边)*高/2
micros = (long) (permitsAboveThresholdToTake * length / 2.0);
permitsToTake -= permitsAboveThresholdToTake;
}
// 加上矩形的面积
// measuring the integral on the left part of the function (the horizontal line)
micros += (long) (stableIntervalMicros * permitsToTake);
return micros;
}
private double permitsToTime(double permits) {
// slope 表示 斜率
return stableIntervalMicros + permits * slope;
}
// warmupPeriodMicros:每微秒产出多少许可
// maxPermits:最大许可数
// coolDownIntervalMicros:产出1个许可,需要多少微秒(只不过是在预热的过程中的速率)
// 简化成小学题 s=vt -> 1/t = v/s
@Override
double coolDownIntervalMicros() {
return warmupPeriodMicros / maxPermits;
}
}
/* ---------------- 预留一定时间内产出的token -------------- */
/**
* This implements a "bursty" RateLimiter, where storedPermits are translated to zero throttling.
* The maximum number of permits that can be saved (when the RateLimiter is unused) is defined in
* terms of time, in this sense: if a RateLimiter is 2qps, and this time is specified as 10
* seconds, we can save up to 2 * 10 = 20 permits.
*/
static final class SmoothBursty extends SmoothRateLimiter {
/** The work (permits) of how many seconds can be saved up if this RateLimiter is unused? */
final double maxBurstSeconds;
SmoothBursty(SleepingStopwatch stopwatch, double maxBurstSeconds) {
super(stopwatch);
this.maxBurstSeconds = maxBurstSeconds;
}
@Override
void doSetRate(double permitsPerSecond, double stableIntervalMicros) {
double oldMaxPermits = this.maxPermits;
maxPermits = maxBurstSeconds * permitsPerSecond;
if (oldMaxPermits == Double.POSITIVE_INFINITY) {
// if we don't special-case this, we would get storedPermits == NaN, below
storedPermits = maxPermits;
} else {
storedPermits =
(oldMaxPermits == 0.0)
? 0.0 // initial state
: storedPermits * maxPermits / oldMaxPermits;
}
}
@Override
long storedPermitsToWaitTime(double storedPermits, double permitsToTake) {
return 0L;
}
@Override
double coolDownIntervalMicros() {
return stableIntervalMicros;
}
}
/** The currently stored permits. */
double storedPermits;
/** The maximum number of stored permits. */
double maxPermits;
/**
* The interval between two unit requests, at our stable rate. E.g., a stable rate of 5 permits
* per second has a stable interval of 200ms.
*/
double stableIntervalMicros;
/**
* The time when the next request (no matter its size) will be granted. After granting a request,
* this is pushed further in the future. Large requests push this further than small requests.
*/
private long nextFreeTicketMicros = 0L; // could be either in the past or future
private SmoothRateLimiter(SleepingStopwatch stopwatch) {
super(stopwatch);
}
@Override
final void doSetRate(double permitsPerSecond, long nowMicros) {
resync(nowMicros);
double stableIntervalMicros = SECONDS.toMicros(1L) / permitsPerSecond;
this.stableIntervalMicros = stableIntervalMicros;
doSetRate(permitsPerSecond, stableIntervalMicros);
}
abstract void doSetRate(double permitsPerSecond, double stableIntervalMicros);
@Override
final double doGetRate() {
return SECONDS.toMicros(1L) / stableIntervalMicros;
}
@Override
final long queryEarliestAvailable(long nowMicros) {
return nextFreeTicketMicros;
}
@Override
final long reserveEarliestAvailable(int requiredPermits, long nowMicros) {
// step into ...
// 在这里更新了 nextFreeTicketMicros,同时更新 storedPermits
// 如果之前没有同步的话,正常来说,会在本方法最后同步这俩属性
resync(nowMicros);
// 所以该方法返回 nextFreeTicketMicros预留时间
long returnValue = nextFreeTicketMicros;
// 准备借出的许可部分
double storedPermitsToSpend = min(requiredPermits, this.storedPermits);
// 当前许可超过请求的部分
double freshPermits = requiredPermits - storedPermitsToSpend;
// 等待时间,超出这部分许可需要消耗的时间
// 对于 SmoothWarmingUp 而言,就预热还需要的时长
long waitMicros =
storedPermitsToWaitTime(this.storedPermits, storedPermitsToSpend)
+ (long) (freshPermits * stableIntervalMicros);
// 大概就是 更新预留时间
this.nextFreeTicketMicros = LongMath.saturatedAdd(nextFreeTicketMicros, waitMicros);
// 剩余许可减去借走的许可
this.storedPermits -= storedPermitsToSpend;
return returnValue;
}
/**
* Translates a specified portion of our currently stored permits which we want to spend/acquire,
* into a throttling time. Conceptually, this evaluates the integral of the underlying function we
* use, for the range of [(storedPermits - permitsToTake), storedPermits].
*
* <p>This always holds: {@code 0 <= permitsToTake <= storedPermits}
*/
abstract long storedPermitsToWaitTime(double storedPermits, double permitsToTake);
/**
* Returns the number of microseconds during cool down that we have to wait to get a new permit.
*/
abstract double coolDownIntervalMicros();
/** Updates {@code storedPermits} and {@code nextFreeTicketMicros} based on the current time. */
void resync(long nowMicros) {
// 如果 系统时间 > nextFreeTicketMicros预留时间
// storedPermits:目前剩余的许可
// 则更新剩余许可 storedPermits
// 并且 nextFreeTicketMicros 更新为系统时间
// if nextFreeTicket is in the past, resync to now
if (nowMicros > nextFreeTicketMicros) {
double newPermits = (nowMicros - nextFreeTicketMicros) / coolDownIntervalMicros();
storedPermits = min(maxPermits, storedPermits + newPermits);
nextFreeTicketMicros = nowMicros;
}
}
}
-----------
// com.google.common.math.LongMath#saturatedAdd
@Beta
public static long saturatedAdd(long a, long b) {
long naiveSum = a + b;
if ((a ^ b) < 0 | (a ^ naiveSum) >= 0) {
// 如果 符号相同,返回 两数和
// If a and b have different signs or a has the same sign as the result then there was no
// overflow, return.
return naiveSum;
}
// we did over/under flow, if the sign is negative we should return MAX otherwise MIN
return Long.MAX_VALUE + ((naiveSum >>> (Long.SIZE - 1)) ^ 1);
}
1.1.2 sentinel.TrafficShapingController
也有类似guava的实现
1.1.2.1 应对突发流量
package com.alibaba.csp.sentinel.slots.block.flow.controller;
// 原理类似 guava.RateLimit.SmoothBursty
public class RateLimiterController implements TrafficShapingController {
private final int maxQueueingTimeMs;
private final double count;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public RateLimiterController(int timeOut, double count) {
this.maxQueueingTimeMs = timeOut;
this.count = count;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// Pass when acquire count is less or equal than 0.
if (acquireCount <= 0) {
return true;
}
// Reject when count is less or equal than 0.
// Otherwise,the costTime will be max of long and waitTime will overflow in some cases.
if (count <= 0) {
return false;
}
long currentTime = TimeUtil.currentTimeMillis();
// Calculate the interval between every two requests.
long costTime = Math.round(1.0 * (acquireCount) / count * 1000);
// 推进预设时间
// Expected pass time of this request.
long expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
// Contention may exist here, but it's okay.
latestPassedTime.set(currentTime);
return true;
} else {
// 当前时间 小于 预设时间,则阻塞,积累令牌(排队)
// Calculate the time to wait.
long waitTime = costTime + latestPassedTime.get() - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > maxQueueingTimeMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
// in race condition waitTime may <= 0
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
1.1.2.2 预热
package com.alibaba.csp.sentinel.slots.block.flow.controller;
public class WarmUpController implements TrafficShapingController {
protected double count;
private int coldFactor;
protected int warningToken = 0;
private int maxToken;
protected double slope;
// 可以猜想到与 guava.RateLimit 的实现方式的一个不同:没有使用 synchronized关键字,而是使用原子类型
protected AtomicLong storedTokens = new AtomicLong(0);
protected AtomicLong lastFilledTime = new AtomicLong(0);
public WarmUpController(double count, int warmUpPeriodInSec, int coldFactor) {
construct(count, warmUpPeriodInSec, coldFactor);
}
public WarmUpController(double count, int warmUpPeriodInSec) {
construct(count, warmUpPeriodInSec, 3);
}
private void construct(double count, int warmUpPeriodInSec, int coldFactor) {
if (coldFactor <= 1) {
throw new IllegalArgumentException("Cold factor should be larger than 1");
}
this.count = count;
this.coldFactor = coldFactor;
// 相当于 guava.RateLimite 中的 thresholdPermits,即预热的转折点
// thresholdPermits = 0.5 * warmupPeriod / stableInterval.
// warningToken = 100;
warningToken = (int)(warmUpPeriodInSec * count) / (coldFactor - 1);
// 相当于 guava.RateLimit 中的 maxPermits
// / maxPermits = thresholdPermits + 2 * warmupPeriod /
// (stableInterval + coldInterval)
// maxToken = 200
maxToken = warningToken + (int)(2 * warmUpPeriodInSec * count / (1.0 + coldFactor));
// 计算初始的斜率 slope
// slope
// slope = (coldIntervalMicros - stableIntervalMicros) / (maxPermits - thresholdPermits);
// (coldFactor - 1.0) / count 即 实时的 QPS,即 Y轴
// QPS = count / coldFactor = 1/Y, 已知Y:产出1个令牌所消耗的时间,因此 coldFactor 倍份的时间,即可产出所有的count个令牌
slope = (coldFactor - 1.0) / count / (maxToken - warningToken);
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
// 滑动窗口中的当前窗口
long passQps = (long) node.passQps();
// 之前的、最新的1个窗口
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
// 开始计算它的斜率
// 如果进入了警戒线,开始调整他的qps
long restToken = storedTokens.get();
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 消耗的速度要比warning快,但是要比慢
// current interval = restToken*slope+1/count
double warningQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
if (passQps + acquireCount <= warningQps) {
return true;
}
} else {
if (passQps + acquireCount <= count) {
return true;
}
}
return false;
}
protected void syncToken(long passQps) {
long currentTime = TimeUtil.currentTimeMillis();
currentTime = currentTime - currentTime % 1000;
long oldLastFillTime = lastFilledTime.get();
if (currentTime <= oldLastFillTime) {
return;
}
long oldValue = storedTokens.get();
// 计算出新的剩余令牌数 storedTokens
// 周期性的冷却,即新增剩余令牌数
long newValue = coolDownTokens(currentTime, passQps);
// 根据 QPS 消耗 剩余的令牌数
if (storedTokens.compareAndSet(oldValue, newValue)) {
// 减去上1QPS的令牌数
long currentValue = storedTokens.addAndGet(0 - passQps);
if (currentValue < 0) {
storedTokens.set(0L);
}
lastFilledTime.set(currentTime);
}
}
private long coolDownTokens(long currentTime, long passQps) {
long oldValue = storedTokens.get();
long newValue = oldValue;
// 添加令牌的判断前提条件:
// 当令牌的消耗程度远远低于警戒线的时候
if (oldValue < warningToken) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
// 此时存在梯形部分
} else if (oldValue > warningToken) {
// count > coldFactor * passQps,消耗速度Qps 小于 冷却速度,也需要添加令牌
// count / coldFactor 即警戒线的临界速度,同比 1 / y
if (passQps < (int)count / coldFactor) {
newValue = (long)(oldValue + (currentTime - lastFilledTime.get()) * count / 1000);
}
}
// 毕竟不能超过极值
return Math.min(newValue, maxToken);
}
}
1.1.2.3 应对突发流量 + 预热
package com.alibaba.csp.sentinel.slots.block.flow.controller;
// 缝合 RateLimitController + WarmUpController
public class WarmUpRateLimiterController extends WarmUpController {
private final int timeoutInMs;
private final AtomicLong latestPassedTime = new AtomicLong(-1);
public WarmUpRateLimiterController(double count, int warmUpPeriodSec, int timeOutMs, int coldFactor) {
super(count, warmUpPeriodSec, coldFactor);
this.timeoutInMs = timeOutMs;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
long previousQps = (long) node.previousPassQps();
syncToken(previousQps);
long currentTime = TimeUtil.currentTimeMillis();
long restToken = storedTokens.get();
long costTime = 0;
long expectedTime = 0;
// costTime的计算逻辑中,加入了预热的东西
// 剩余的令牌数 大于 预警令牌数(转折点的x轴)
if (restToken >= warningToken) {
long aboveToken = restToken - warningToken;
// 梯形面积
// current interval = restToken*slope+1/count
double warmingQps = Math.nextUp(1.0 / (aboveToken * slope + 1.0 / count));
costTime = Math.round(1.0 * (acquireCount) / warmingQps * 1000);
} else {
// 矩形面积
costTime = Math.round(1.0 * (acquireCount) / count * 1000);
}
expectedTime = costTime + latestPassedTime.get();
if (expectedTime <= currentTime) {
latestPassedTime.set(currentTime);
return true;
} else {
long waitTime = costTime + latestPassedTime.get() - currentTime;
if (waitTime > timeoutInMs) {
return false;
} else {
long oldTime = latestPassedTime.addAndGet(costTime);
try {
waitTime = oldTime - TimeUtil.currentTimeMillis();
if (waitTime > timeoutInMs) {
latestPassedTime.addAndGet(-costTime);
return false;
}
if (waitTime > 0) {
Thread.sleep(waitTime);
}
return true;
} catch (InterruptedException e) {
}
}
}
return false;
}
}
1.1.2.4 直接拒绝溢出的流量
/**
* Default throttling controller (immediately reject strategy).
*/
public class DefaultController implements TrafficShapingController {
private static final int DEFAULT_AVG_USED_TOKENS = 0;
private double count;
private int grade;
public DefaultController(double count, int grade) {
this.count = count;
this.grade = grade;
}
@Override
public boolean canPass(Node node, int acquireCount) {
return canPass(node, acquireCount, false);
}
@Override
public boolean canPass(Node node, int acquireCount, boolean prioritized) {
int curCount = avgUsedTokens(node);
if (curCount + acquireCount > count) {
if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
long currentTime;
long waitInMs;
currentTime = TimeUtil.currentTimeMillis();
waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
node.addWaitingRequest(currentTime + waitInMs, acquireCount);
node.addOccupiedPass(acquireCount);
sleep(waitInMs);
// PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
throw new PriorityWaitException(waitInMs);
}
}
return false;
}
return true;
}
private int avgUsedTokens(Node node) {
if (node == null) {
return DEFAULT_AVG_USED_TOKENS;
}
return grade == RuleConstant.FLOW_GRADE_THREAD ? node.curThreadNum() : (int)(node.passQps());
}
private void sleep(long timeMillis) {
try {
Thread.sleep(timeMillis);
} catch (InterruptedException e) {
// Ignore.
}
}
}
1.2 滑动窗
为了对 给定时间段 的流量做特殊的 审计、管控,显然我们需要1个时间窗。
但是固定的时间窗,在编码实现的过程中,切换、重置当前时间窗(同步加锁)所带来的性能消耗、流量堆积 势必会导致局部的性能曲线不那么光滑
为此,我们在1个大的时间周期内可以预置多个桶(比如1min对应60个1s的时间窗),并根据实时的系统时间滑动起来,即 滑动窗口。
1.2.1 sentienl.LeapArray<MetricBucket>
- BucketLeapArray :重置时间窗时,直接赋值0L
- OccupiableBucketLeapArray :重置时间窗时,之前预占了未来的令牌,也会被加入(DefaultController中被调用)
package com.alibaba.csp.sentinel.slots.statistic.base;
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 单个窗口的时间长度(ms表示)
protected int sampleCount; // 整个周期可以平分多少个窗口
protected int intervalInMs; // 滑动窗口的整个周期长度(ms表示)
private double intervalInSecond; // 滑动窗口的整个周期长度(秒钟表示)
// 一整个统计周期的时间窗数组
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* Get the bucket at current timestamp.
*
* @return the bucket at current timestamp
*/
public WindowWrap<T> currentWindow() {
// step into ...
// 使用当前系统时间作为参数
return currentWindow(TimeUtil.currentTimeMillis());
}
/**
* Get bucket item at provided timestamp.
*
* @param timeMillis a valid timestamp in milliseconds
* @return current bucket item at provided timestamp if the time is valid; null if time is invalid
*/
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 通过当前时间,计算出对应统计周期中的哪一个时间窗的索引
int idx = calculateTimeIdx(timeMillis);
// 当前时间对应的 时间窗 的起始时间
// Calculate current bucket start time.
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
// 不存在时间窗实例,则初始化1个实例
if (old == null) {
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
}
// 存在且是最新的(时间窗的起始时间是一致的,表示还在统计1个时间段内),则直接返回
else if (windowStart == old.windowStart()) {
return old;
}
// 时间窗过期(时间窗的起始时间是旧的),则重置时间窗
else if (windowStart > old.windowStart()) {
// 为避免时间窗实例,重复重置,加锁
if (updateLock.tryLock()) {
try {
// 重置逻辑由子类实现
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// 并发场景下,只能让1个重置生效
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
}
// 一般不会走这里,代码容错而已
else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
// 当前时间 / 单个窗口的时间长度 % 时间窗数组的长度 = 时间窗数组中对应的索引
long timeId = timeMillis / windowLengthInMs;
// Calculate current index so we can map the timestamp to the leap array.
return (int)(timeId % array.length());
}
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
// 当前时间 - (当前时间 % 单个窗口的时间长度)= 当前时间对应的 时间窗 的起始时间
return timeMillis - timeMillis % windowLengthInMs;
}
}
----------
package com.alibaba.csp.sentinel.slots.statistic.data;
// 单个的时间窗实例,描述一段时间内的统计
/**
* Represents metrics data in a period of time span.
*/
public class MetricBucket {
private final LongAdder[] counters;
private volatile long minRt;
/**
* Reset the adders.
*
* @return new metric bucket in initial state
*/
public MetricBucket reset() {
for (MetricEvent event : MetricEvent.values()) {
// 重置 j.u.c.LongAdder
// 逻辑也很简单: j.u.c.Cell.value = 0L
counters[event.ordinal()].reset();
}
initMinRt();
return this;
}
private void initMinRt() {
this.minRt = SentinelConfig.statisticMaxRt();
}
}
---------------
package com.alibaba.csp.sentinel.slots.statistic.metric;
/**
* The fundamental data structure for metric statistics in a time span.
*/
public class BucketLeapArray extends LeapArray<MetricBucket> {
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime); // 直接赋值时间戳(WindowWrap时间窗包装类的)
w.value().reset(); // 更新 MetricBucket.LongAdder(juc).Cell.value = 0L
return w;
}
}
-------------
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;
public class OccupiableBucketLeapArray extends LeapArray<MetricBucket> {
private final FutureBucketLeapArray borrowArray;
public OccupiableBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "CombinedBucketArray".
super(sampleCount, intervalInMs);
this.borrowArray = new FutureBucketLeapArray(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {
MetricBucket newBucket = new MetricBucket();
// borrow : 借出
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
newBucket.reset(borrowBucket);
}
return newBucket;
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) {
// Update the start time and reset value.
w.resetTo(time);
MetricBucket borrowBucket = borrowArray.getWindowValue(time);
if (borrowBucket != null) {
w.value().reset();
w.value().addPass((int)borrowBucket.pass());
} else {
w.value().reset();
}
return w;
}
@Override
public long currentWaiting() {
borrowArray.currentWindow();
long currentWaiting = 0;
List<MetricBucket> list = borrowArray.values();
for (MetricBucket window : list) {
currentWaiting += window.pass();
}
return currentWaiting;
}
@Override
public void addWaiting(long time, int acquireCount) {
WindowWrap<MetricBucket> window = borrowArray.currentWindow(time);
window.value().add(MetricEvent.PASS, acquireCount);
}
}
----------
package com.alibaba.csp.sentinel.slots.statistic.metric.occupy;
/**
* A kind of {@code BucketLeapArray} that only reserves for future buckets.
*/
public class FutureBucketLeapArray extends LeapArray<MetricBucket> {
public FutureBucketLeapArray(int sampleCount, int intervalInMs) {
// This class is the original "BorrowBucketArray".
super(sampleCount, intervalInMs);
}
@Override
public MetricBucket newEmptyBucket(long time) {
return new MetricBucket();
}
@Override
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long startTime) {
// Update the start time and reset value.
w.resetTo(startTime);
w.value().reset();
return w;
}
@Override
public boolean isWindowDeprecated(long time, WindowWrap<MetricBucket> windowWrap) {
// Tricky: will only calculate for future.
return time >= windowWrap.windowStart();
}
}
风语者!平时喜欢研究各种技术,目前在从事后端开发工作,热爱生活、热爱工作。