From 7fd0d515bcf5adb0b18b81735fb3525e93eaf7e1 Mon Sep 17 00:00:00 2001 From: Maisem Ali Date: Wed, 7 Dec 2022 13:41:48 +0500 Subject: [PATCH] syncs: add WakeGroup as a way to wake up multiple goroutines We have this functionality in a few places, this consolidates it in the syncs package for reuse. Signed-off-by: Maisem Ali --- syncs/syncs.go | 36 ++++++++++++++++++++++++++++++++++++ syncs/syncs_test.go | 38 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 74 insertions(+) diff --git a/syncs/syncs.go b/syncs/syncs.go index fd565fbbe..d5e7f7462 100644 --- a/syncs/syncs.go +++ b/syncs/syncs.go @@ -13,6 +13,36 @@ import ( "tailscale.com/util/mak" ) +// WakeGroup provides a way to wake up multiple goroutines waiting on a +// single event. It is meant be reused, so you can call Wake and then +// Get again to wait for the next event. +type WakeGroup struct { + v AtomicValue[chan struct{}] +} + +// Get returns a channel that will be closed when Wake is called. +func (g *WakeGroup) Get() chan struct{} { + c, ok := g.v.LoadOk() + if ok { + return c + } + // The WakeGroup is empty, so we need to create a new channel. + // This is only reached once per WakeGroup. + n := make(chan struct{}) + if g.v.Init(n) { + return n + } + return g.v.Load() +} + +// Wake wakes up all goroutines waiting on the WakeGroup. +func (g *WakeGroup) Wake() { + c := g.v.Swap(make(chan struct{})) + if c != nil { + close(c) + } +} + // ClosedChan returns a channel that's already closed. func ClosedChan() <-chan struct{} { return closedChan } @@ -62,6 +92,12 @@ func (v *AtomicValue[T]) Swap(x T) (old T) { return old } +// Init provides a way to initialize the value. It returns true if the +// value was initialized, false if it was already initialized. +func (v *AtomicValue[T]) Init(newV T) (swapped bool) { + return v.v.CompareAndSwap(nil, newV) +} + // CompareAndSwap executes the compare-and-swap operation for the Value. func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) { return v.v.CompareAndSwap(oldV, newV) diff --git a/syncs/syncs_test.go b/syncs/syncs_test.go index 632cae64f..b0496fed9 100644 --- a/syncs/syncs_test.go +++ b/syncs/syncs_test.go @@ -12,6 +12,44 @@ import ( "github.com/google/go-cmp/cmp" ) +func BenchmarkWakeGroup(b *testing.B) { + var kg WakeGroup + b.ResetTimer() + var wg, wg2 sync.WaitGroup + const workers = 100 + wg.Add(workers) + wg2.Add(workers) + done := make(chan struct{}) + for i := 0; i < workers; i++ { + go func() { + defer wg.Done() + wg2.Done() + for i := 0; i < b.N; i++ { + select { + case <-kg.Get(): + case <-done: + } + } + }() + } + wg2.Wait() + for i := 0; i < b.N; i++ { + kg.Wake() + } + close(done) + wg.Wait() +} + +func TestWakeGroup(t *testing.T) { + var x WakeGroup + ch := x.Get() + if ch == nil { + t.Fatal("nil chan") + } + x.Wake() + <-ch +} + func TestWaitGroupChan(t *testing.T) { wg := NewWaitGroupChan()