diff --git a/tstime/rate/rate.go b/tstime/rate/rate.go index a3f516077..f0473862a 100644 --- a/tstime/rate/rate.go +++ b/tstime/rate/rate.go @@ -31,12 +31,14 @@ func Every(interval time.Duration) Limit { } // A Limiter controls how frequently events are allowed to happen. -// It implements a "token bucket" of size b, initially full and refilled -// at rate r tokens per second. -// Informally, in any large enough time interval, the Limiter limits the -// rate to r tokens per second, with a maximum burst size of b events. -// See https://en.wikipedia.org/wiki/Token_bucket for more about token buckets. +// It implements a [token bucket] of a particular size b, +// initially full and refilled at rate r tokens per second. +// Informally, in any large enough time interval, +// the Limiter limits the rate to r tokens per second, +// with a maximum burst size of b events. // Use NewLimiter to create non-zero Limiters. +// +// [token bucket]: https://en.wikipedia.org/wiki/Token_bucket type Limiter struct { limit Limit burst float64 @@ -54,7 +56,7 @@ func NewLimiter(r Limit, b int) *Limiter { return &Limiter{limit: r, burst: float64(b)} } -// AllowN reports whether an event may happen now. +// Allow reports whether an event may happen now. func (lim *Limiter) Allow() bool { return lim.allow(mono.Now()) } diff --git a/tstime/rate/value.go b/tstime/rate/value.go new file mode 100644 index 000000000..55206f267 --- /dev/null +++ b/tstime/rate/value.go @@ -0,0 +1,183 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package rate + +import ( + "fmt" + "math" + "sync" + "time" + + "tailscale.com/tstime/mono" +) + +// Value measures the rate at which events occur, +// exponentially weighted towards recent activity. +// It is guaranteed to occupy O(1) memory, operate in O(1) runtime, +// and is safe for concurrent use. +// The zero value is safe for immediate use. +// +// The algorithm is based on and semantically equivalent to +// [exponentially weighted moving averages (EWMAs)], +// but modified to avoid assuming that event samples are gathered +// at fixed and discrete time-step intervals. +// +// In EWMA literature, the average is typically tuned with a λ parameter +// that determines how much weight to give to recent event samples. +// A high λ value reacts quickly to new events favoring recent history, +// while a low λ value reacts more slowly to new events. +// The EWMA is computed as: +// +// zᵢ = λxᵢ + (1-λ)zᵢ₋₁ +// +// where: +// - λ is the weight parameter, where 0 ≤ λ ≤ 1 +// - xᵢ is the number of events that has since occurred +// - zᵢ is the newly computed moving average +// - zᵢ₋₁ is the previous moving average one time-step ago +// +// As mentioned, this implementation does not assume that the average +// is updated periodically on a fixed time-step interval, +// but allows the application to indicate that events occurred +// at any point in time by simply calling Value.Add. +// Thus, for every time Value.Add is called, it takes into consideration +// the amount of time elapsed since the last call to Value.Add as +// opposed to assuming that every call to Value.Add is evenly spaced +// some fixed time-step interval apart. +// +// Since time is critical to this measurement, we tune the metric not +// with the weight parameter λ (a unit-less constant between 0 and 1), +// but rather as a half-life period t½. The half-life period is +// mathematically equivalent but easier for humans to reason about. +// The parameters λ and t½ and directly related in the following way: +// +// t½ = -(ln(2) · ΔT) / ln(1 - λ) +// +// λ = 1 - 2^-(ΔT / t½) +// +// where: +// - t½ is the half-life commonly used with exponential decay +// - λ is the unit-less weight parameter commonly used with EWMAs +// - ΔT is the discrete time-step interval used with EWMAs +// +// The internal algorithm does not use the EWMA formula, +// but is rather based on [half-life decay]. +// The formula for half-life decay is mathematically related +// to the formula for computing the EWMA. +// The calculation of an EWMA is a geometric progression [[1]] and +// is essentially a discrete version of an exponential function [[2]], +// for which half-life decay is one particular expression. +// Given sufficiently small time-steps, the EWMA and half-life +// algorithms provide equivalent results. +// +// The Value type does not take ΔT as a parameter since it relies +// on a timer with nanosecond resolution. In a way, one could treat +// this algorithm as operating on a ΔT of 1ns. Practically speaking, +// the computation operates on non-discrete time intervals. +// +// [exponentially weighted moving averages (EWMAs)]: https://en.wikipedia.org/wiki/EWMA_chart +// [half-life decay]: https://en.wikipedia.org/wiki/Half-life +// [1]: https://en.wikipedia.org/wiki/Exponential_smoothing#%22Exponential%22_naming +// [2]: https://en.wikipedia.org/wiki/Exponential_decay +type Value struct { + // HalfLife specifies how quickly the rate reacts to rate changes. + // + // Specifically, if there is currently a steady-state rate of + // 0 events per second, and then immediately the rate jumped to + // N events per second, then it will take HalfLife seconds until + // the Value represents a rate of N/2 events per second and + // 2*HalfLife seconds until the Value represents a rate of 3*N/4 + // events per second, and so forth. The rate represented by Value + // will asymptotically approach N events per second over time. + // + // In order for Value to stably represent a steady-state rate, + // the HalfLife should be larger than the average period between + // calls to Value.Add. + // + // A zero or negative HalfLife is by default 1 second. + HalfLife time.Duration + + mu sync.Mutex + updated mono.Time + value float64 // adjusted count of events +} + +// halfLife returns the half-life period in seconds. +func (r *Value) halfLife() float64 { + if r.HalfLife <= 0 { + return time.Second.Seconds() + } + return time.Duration(r.HalfLife).Seconds() +} + +// Add records that n number of events just occurred, +// which must be a finite and non-negative number. +func (r *Value) Add(n float64) { + r.mu.Lock() + defer r.mu.Unlock() + r.addNow(mono.Now(), n) +} +func (r *Value) addNow(now mono.Time, n float64) { + if n < 0 || math.IsInf(n, 0) || math.IsNaN(n) { + panic(fmt.Sprintf("invalid count %f; must be a finite, non-negative number", n)) + } + r.value = r.valueNow(now) + n + r.updated = now +} + +// valueNow computes the number of events after some elapsed time. +// The total count of events decay exponentially so that +// the computed rate is biased towards recent history. +func (r *Value) valueNow(now mono.Time) float64 { + // This uses the half-life formula: + // N(t) = N₀ · 2^-(t / t½) + // where: + // N(t) is the amount remaining after time t, + // N₀ is the initial quantity, and + // t½ is the half-life of the decaying quantity. + // + // See https://en.wikipedia.org/wiki/Half-life + age := now.Sub(r.updated).Seconds() + return r.value * math.Exp2(-age/r.halfLife()) +} + +// Rate computes the rate as events per second. +func (r *Value) Rate() float64 { + r.mu.Lock() + defer r.mu.Unlock() + return r.rateNow(mono.Now()) +} +func (r *Value) rateNow(now mono.Time) float64 { + // The stored value carries the units "events" + // while we want to compute "events / second". + // + // In the trivial case where the events never decay, + // the average rate can be computed by dividing the total events + // by the total elapsed time since the start of the Value. + // This works because the weight distribution is uniform such that + // the weight of an event in the distant past is equal to + // the weight of a recent event. This is not the case with + // exponentially decaying weights, which complicates computation. + // + // Since our events are decaying, we can divide the number of events + // by the total possible accumulated value, which we determine + // by integrating the half-life formula from t=0 until t=∞, + // assuming that N₀ is 1: + // ∫ N(t) dt = t½ / ln(2) + // + // Recall that the integral of a curve is the area under a curve, + // which carries the units of the X-axis multiplied by the Y-axis. + // In our case this would be the units "events · seconds". + // By normalizing N₀ to 1, the Y-axis becomes a unit-less quantity, + // resulting in a integral unit of just "seconds". + // Dividing the events by the integral quantity correctly produces + // the units of "events / second". + return r.valueNow(now) / r.normalizedIntegral() +} + +// normalizedIntegral computes the quantity t½ / ln(2). +// It carries the units of "seconds". +func (r *Value) normalizedIntegral() float64 { + return r.halfLife() / math.Ln2 +} diff --git a/tstime/rate/value_test.go b/tstime/rate/value_test.go new file mode 100644 index 000000000..4a776b9d2 --- /dev/null +++ b/tstime/rate/value_test.go @@ -0,0 +1,236 @@ +// Copyright (c) Tailscale Inc & AUTHORS +// SPDX-License-Identifier: BSD-3-Clause + +package rate + +import ( + "flag" + "math" + "testing" + "time" + + qt "github.com/frankban/quicktest" + "github.com/google/go-cmp/cmp/cmpopts" + "tailscale.com/tstime/mono" +) + +const ( + min = mono.Time(time.Minute) + sec = mono.Time(time.Second) + msec = mono.Time(time.Millisecond) + usec = mono.Time(time.Microsecond) + nsec = mono.Time(time.Nanosecond) + + val = 1.0e6 +) + +var longNumericalStabilityTest = flag.Bool("long-numerical-stability-test", false, "") + +func TestValue(t *testing.T) { + // When performing many small calculations, the accuracy of the + // result can drift due to accumulated errors in the calculation. + // Verify that the result is correct even with many small updates. + // See https://en.wikipedia.org/wiki/Numerical_stability. + t.Run("NumericalStability", func(t *testing.T) { + step := usec + if *longNumericalStabilityTest { + step = nsec + } + numStep := int(sec / step) + + c := qt.New(t) + var v Value + var now mono.Time + for i := 0; i < numStep; i++ { + v.addNow(now, float64(step)) + now += step + } + c.Assert(v.rateNow(now), qt.CmpEquals(cmpopts.EquateApprox(1e-6, 0)), 1e9/2) + }) + + halfLives := []struct { + name string + period time.Duration + }{ + {"½s", time.Second / 2}, + {"1s", time.Second}, + {"2s", 2 * time.Second}, + } + for _, halfLife := range halfLives { + t.Run(halfLife.name+"/SpikeDecay", func(t *testing.T) { + testValueSpikeDecay(t, halfLife.period, false) + }) + t.Run(halfLife.name+"/SpikeDecayAddZero", func(t *testing.T) { + testValueSpikeDecay(t, halfLife.period, true) + }) + t.Run(halfLife.name+"/HighThenLow", func(t *testing.T) { + testValueHighThenLow(t, halfLife.period) + }) + t.Run(halfLife.name+"/LowFrequency", func(t *testing.T) { + testLowFrequency(t, halfLife.period) + }) + } +} + +// testValueSpikeDecay starts with a target rate and ensure that it +// exponentially decays according to the half-life formula. +func testValueSpikeDecay(t *testing.T, halfLife time.Duration, addZero bool) { + c := qt.New(t) + v := Value{HalfLife: halfLife} + v.addNow(0, val*v.normalizedIntegral()) + + var now mono.Time + var prevRate float64 + step := 100 * msec + wantHalfRate := float64(val) + for now < 10*sec { + // Adding zero for every time-step will repeatedly trigger the + // computation to decay the value, which may cause the result + // to become more numerically unstable. + if addZero { + v.addNow(now, 0) + } + currRate := v.rateNow(now) + t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate) + + // At every multiple of a half-life period, + // the current rate should be half the value of what + // it was at the last half-life period. + if time.Duration(now)%halfLife == 0 { + c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(1e-12, 0)), wantHalfRate) + wantHalfRate = currRate / 2 + } + + // Without any newly added events, + // the rate should be decaying over time. + if now > 0 && prevRate < currRate { + t.Errorf("%v: rate is not decaying: %0.1f < %0.1f", time.Duration(now), prevRate, currRate) + } + if currRate < 0 { + t.Errorf("%v: rate too low: %0.1f < %0.1f", time.Duration(now), currRate, 0.0) + } + + prevRate = currRate + now += step + } +} + +// testValueHighThenLow targets a steady-state rate that is high, +// then switches to a target steady-state rate that is low. +func testValueHighThenLow(t *testing.T, halfLife time.Duration) { + c := qt.New(t) + v := Value{HalfLife: halfLife} + + var now mono.Time + var prevRate float64 + var wantRate float64 + const step = 10 * msec + const stepsPerSecond = int(sec / step) + + // Target a higher steady-state rate. + wantRate = 2 * val + wantHalfRate := float64(0.0) + eventsPerStep := wantRate / float64(stepsPerSecond) + for now < 10*sec { + currRate := v.rateNow(now) + v.addNow(now, eventsPerStep) + t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate) + + // At every multiple of a half-life period, + // the current rate should be half-way more towards + // the target rate relative to before. + if time.Duration(now)%halfLife == 0 { + c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(0.1, 0)), wantHalfRate) + wantHalfRate += (wantRate - currRate) / 2 + } + + // Rate should approach wantRate from below, + // but never exceed it. + if now > 0 && prevRate > currRate { + t.Errorf("%v: rate is not growing: %0.1f > %0.1f", time.Duration(now), prevRate, currRate) + } + if currRate > 1.01*wantRate { + t.Errorf("%v: rate too high: %0.1f > %0.1f", time.Duration(now), currRate, wantRate) + } + + prevRate = currRate + now += step + } + c.Assert(prevRate, qt.CmpEquals(cmpopts.EquateApprox(0.05, 0)), wantRate) + + // Target a lower steady-state rate. + wantRate = val / 3 + wantHalfRate = prevRate + eventsPerStep = wantRate / float64(stepsPerSecond) + for now < 20*sec { + currRate := v.rateNow(now) + v.addNow(now, eventsPerStep) + t.Logf("%0.1fs:\t%0.3f", time.Duration(now).Seconds(), currRate) + + // At every multiple of a half-life period, + // the current rate should be half-way more towards + // the target rate relative to before. + if time.Duration(now)%halfLife == 0 { + c.Assert(currRate, qt.CmpEquals(cmpopts.EquateApprox(0.1, 0)), wantHalfRate) + wantHalfRate += (wantRate - currRate) / 2 + } + + // Rate should approach wantRate from above, + // but never exceed it. + if now > 10*sec && prevRate < currRate { + t.Errorf("%v: rate is not decaying: %0.1f < %0.1f", time.Duration(now), prevRate, currRate) + } + if currRate < 0.99*wantRate { + t.Errorf("%v: rate too low: %0.1f < %0.1f", time.Duration(now), currRate, wantRate) + } + + prevRate = currRate + now += step + } + c.Assert(prevRate, qt.CmpEquals(cmpopts.EquateApprox(0.15, 0)), wantRate) +} + +// testLowFrequency fires an event at a frequency much slower than +// the specified half-life period. While the average rate over time +// should be accurate, the standard deviation gets worse. +func testLowFrequency(t *testing.T, halfLife time.Duration) { + v := Value{HalfLife: halfLife} + + var now mono.Time + var rates []float64 + for now < 20*min { + if now%(10*sec) == 0 { + v.addNow(now, 1) // 1 event every 10 seconds + } + now += 50 * msec + rates = append(rates, v.rateNow(now)) + now += 50 * msec + } + + mean, stddev := stats(rates) + c := qt.New(t) + c.Assert(mean, qt.CmpEquals(cmpopts.EquateApprox(0.001, 0)), 0.1) + t.Logf("mean:%v stddev:%v", mean, stddev) +} + +func stats(fs []float64) (mean, stddev float64) { + for _, rate := range fs { + mean += rate + } + mean /= float64(len(fs)) + for _, rate := range fs { + stddev += (rate - mean) * (rate - mean) + } + stddev = math.Sqrt(stddev / float64(len(fs))) + return mean, stddev +} + +// BenchmarkValue benchmarks the cost of Value.Add, +// which is called often and makes extensive use of floating-point math. +func BenchmarkValue(b *testing.B) { + b.ReportAllocs() + v := Value{HalfLife: time.Second} + for i := 0; i < b.N; i++ { + v.Add(1) + } +}