syncs: add WakeGroup as a way to wake up multiple goroutines

We have this functionality in a few places, this consolidates it in the syncs package
for reuse.

Signed-off-by: Maisem Ali <maisem@tailscale.com>
maisem/wakegroup
Maisem Ali 2022-12-07 13:41:48 +05:00
parent a887ca7efe
commit 7fd0d515bc
2 changed files with 74 additions and 0 deletions

View File

@ -13,6 +13,36 @@ import (
"tailscale.com/util/mak"
)
// WakeGroup provides a way to wake up multiple goroutines waiting on a
// single event. It is meant be reused, so you can call Wake and then
// Get again to wait for the next event.
type WakeGroup struct {
v AtomicValue[chan struct{}]
}
// Get returns a channel that will be closed when Wake is called.
func (g *WakeGroup) Get() chan struct{} {
c, ok := g.v.LoadOk()
if ok {
return c
}
// The WakeGroup is empty, so we need to create a new channel.
// This is only reached once per WakeGroup.
n := make(chan struct{})
if g.v.Init(n) {
return n
}
return g.v.Load()
}
// Wake wakes up all goroutines waiting on the WakeGroup.
func (g *WakeGroup) Wake() {
c := g.v.Swap(make(chan struct{}))
if c != nil {
close(c)
}
}
// ClosedChan returns a channel that's already closed.
func ClosedChan() <-chan struct{} { return closedChan }
@ -62,6 +92,12 @@ func (v *AtomicValue[T]) Swap(x T) (old T) {
return old
}
// Init provides a way to initialize the value. It returns true if the
// value was initialized, false if it was already initialized.
func (v *AtomicValue[T]) Init(newV T) (swapped bool) {
return v.v.CompareAndSwap(nil, newV)
}
// CompareAndSwap executes the compare-and-swap operation for the Value.
func (v *AtomicValue[T]) CompareAndSwap(oldV, newV T) (swapped bool) {
return v.v.CompareAndSwap(oldV, newV)

View File

@ -12,6 +12,44 @@ import (
"github.com/google/go-cmp/cmp"
)
func BenchmarkWakeGroup(b *testing.B) {
var kg WakeGroup
b.ResetTimer()
var wg, wg2 sync.WaitGroup
const workers = 100
wg.Add(workers)
wg2.Add(workers)
done := make(chan struct{})
for i := 0; i < workers; i++ {
go func() {
defer wg.Done()
wg2.Done()
for i := 0; i < b.N; i++ {
select {
case <-kg.Get():
case <-done:
}
}
}()
}
wg2.Wait()
for i := 0; i < b.N; i++ {
kg.Wake()
}
close(done)
wg.Wait()
}
func TestWakeGroup(t *testing.T) {
var x WakeGroup
ch := x.Get()
if ch == nil {
t.Fatal("nil chan")
}
x.Wake()
<-ch
}
func TestWaitGroupChan(t *testing.T) {
wg := NewWaitGroupChan()