control/controlclient: improve handling of concurrent lite map requests
This reverts commitpull/7518/head6eca47b16c
and fixes forward. Previously the first ever streaming MapRequest that a client sent would also set ReadOnly to true as it didn't have any endpoints and expected/relied on the map poll to restart as soon as it got endpoints. However with48f6c1eba4
, we would no longer restart MapRequests as frequently as we used to, so control would only ever get the first streaming MapRequest which had ReadOnly=true. Control would treat this as an uninteresting request and would not send it any further netmaps, while the client would happily stay in the map poll forever while litemap updates happened in parallel. This makes it so that we never set `ReadOnly=true` when we are doing a streaming MapRequest. This is no longer necessary either as most endpoint discovery happens over disco anyway. Co-authored-by: Andrew Dunham <andrew@du.nham.ca> Signed-off-by: Maisem Ali <maisem@tailscale.com>
parent
87b4bbb94f
commit
be027a9899
|
@ -414,7 +414,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de
|
||||||
encoding/xml from github.com/tailscale/goupnp+
|
encoding/xml from github.com/tailscale/goupnp+
|
||||||
errors from bufio+
|
errors from bufio+
|
||||||
expvar from tailscale.com/derp+
|
expvar from tailscale.com/derp+
|
||||||
flag from tailscale.com/control/controlclient+
|
flag from net/http/httptest+
|
||||||
fmt from compress/flate+
|
fmt from compress/flate+
|
||||||
hash from crypto+
|
hash from crypto+
|
||||||
hash/adler32 from tailscale.com/ipn/ipnlocal
|
hash/adler32 from tailscale.com/ipn/ipnlocal
|
||||||
|
|
|
@ -59,15 +59,17 @@ type Auto struct {
|
||||||
|
|
||||||
mu sync.Mutex // mutex guards the following fields
|
mu sync.Mutex // mutex guards the following fields
|
||||||
|
|
||||||
paused bool // whether we should stop making HTTP requests
|
paused bool // whether we should stop making HTTP requests
|
||||||
unpauseWaiters []chan struct{}
|
unpauseWaiters []chan struct{}
|
||||||
loggedIn bool // true if currently logged in
|
loggedIn bool // true if currently logged in
|
||||||
loginGoal *LoginGoal // non-nil if some login activity is desired
|
loginGoal *LoginGoal // non-nil if some login activity is desired
|
||||||
synced bool // true if our netmap is up-to-date
|
synced bool // true if our netmap is up-to-date
|
||||||
inPollNetMap bool // true if currently running a PollNetMap
|
inPollNetMap bool // true if currently running a PollNetMap
|
||||||
inLiteMapUpdate bool // true if a lite (non-streaming) map request is outstanding
|
inLiteMapUpdate bool // true if a lite (non-streaming) map request is outstanding
|
||||||
inSendStatus int // number of sendStatus calls currently in progress
|
liteMapUpdateCancel context.CancelFunc // cancels a lite map update, may be nil
|
||||||
state State
|
liteMapUpdateCancels int // how many times we've canceled a lite map update
|
||||||
|
inSendStatus int // number of sendStatus calls currently in progress
|
||||||
|
state State
|
||||||
|
|
||||||
authCtx context.Context // context used for auth requests
|
authCtx context.Context // context used for auth requests
|
||||||
mapCtx context.Context // context used for netmap requests
|
mapCtx context.Context // context used for netmap requests
|
||||||
|
@ -168,28 +170,56 @@ func (c *Auto) Start() {
|
||||||
func (c *Auto) sendNewMapRequest() {
|
func (c *Auto) sendNewMapRequest() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
|
|
||||||
// If we're not already streaming a netmap, or if we're already stuck
|
// If we're not already streaming a netmap, then tear down everything
|
||||||
// in a lite update, then tear down everything and start a new stream
|
// and start a new stream (which starts by sending a new map request)
|
||||||
// (which starts by sending a new map request)
|
if !c.inPollNetMap || !c.loggedIn {
|
||||||
if !c.inPollNetMap || c.inLiteMapUpdate || !c.loggedIn {
|
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
c.cancelMapSafely()
|
c.cancelMapSafely()
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// If we are already in process of doing a LiteMapUpdate, cancel it and
|
||||||
|
// try a new one. If this is the 10th time we have done this
|
||||||
|
// cancelation, tear down everything and start again.
|
||||||
|
const maxLiteMapUpdateAttempts = 10
|
||||||
|
if c.inLiteMapUpdate {
|
||||||
|
// Always cancel the in-flight lite map update, regardless of
|
||||||
|
// whether we cancel the streaming map request or not.
|
||||||
|
c.liteMapUpdateCancel()
|
||||||
|
c.inLiteMapUpdate = false
|
||||||
|
|
||||||
|
if c.liteMapUpdateCancels >= maxLiteMapUpdateAttempts {
|
||||||
|
// Not making progress
|
||||||
|
c.mu.Unlock()
|
||||||
|
c.cancelMapSafely()
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Increment our cancel counter and continue below to start a
|
||||||
|
// new lite update.
|
||||||
|
c.liteMapUpdateCancels++
|
||||||
|
}
|
||||||
|
|
||||||
// Otherwise, send a lite update that doesn't keep a
|
// Otherwise, send a lite update that doesn't keep a
|
||||||
// long-running stream response.
|
// long-running stream response.
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
c.inLiteMapUpdate = true
|
c.inLiteMapUpdate = true
|
||||||
ctx, cancel := context.WithTimeout(c.mapCtx, 10*time.Second)
|
ctx, cancel := context.WithTimeout(c.mapCtx, 10*time.Second)
|
||||||
|
c.liteMapUpdateCancel = cancel
|
||||||
go func() {
|
go func() {
|
||||||
defer cancel()
|
defer cancel()
|
||||||
t0 := time.Now()
|
t0 := time.Now()
|
||||||
err := c.direct.SendLiteMapUpdate(ctx)
|
err := c.direct.SendLiteMapUpdate(ctx)
|
||||||
d := time.Since(t0).Round(time.Millisecond)
|
d := time.Since(t0).Round(time.Millisecond)
|
||||||
|
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
c.inLiteMapUpdate = false
|
c.inLiteMapUpdate = false
|
||||||
|
c.liteMapUpdateCancel = nil
|
||||||
|
if err == nil {
|
||||||
|
c.liteMapUpdateCancels = 0
|
||||||
|
}
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
if err == nil {
|
if err == nil {
|
||||||
c.logf("[v1] successful lite map update in %v", d)
|
c.logf("[v1] successful lite map update in %v", d)
|
||||||
return
|
return
|
||||||
|
@ -197,10 +227,13 @@ func (c *Auto) sendNewMapRequest() {
|
||||||
if ctx.Err() == nil {
|
if ctx.Err() == nil {
|
||||||
c.logf("lite map update after %v: %v", d, err)
|
c.logf("lite map update after %v: %v", d, err)
|
||||||
}
|
}
|
||||||
// Fall back to restarting the long-polling map
|
if !errors.Is(ctx.Err(), context.Canceled) {
|
||||||
// request (the old heavy way) if the lite update
|
// Fall back to restarting the long-polling map
|
||||||
// failed for any reason.
|
// request (the old heavy way) if the lite update
|
||||||
c.cancelMapSafely()
|
// failed for reasons other than the context being
|
||||||
|
// canceled.
|
||||||
|
c.cancelMapSafely()
|
||||||
|
}
|
||||||
}()
|
}()
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -237,6 +270,12 @@ func (c *Auto) cancelMapSafely() {
|
||||||
c.mu.Lock()
|
c.mu.Lock()
|
||||||
defer c.mu.Unlock()
|
defer c.mu.Unlock()
|
||||||
|
|
||||||
|
// Always reset our lite map cancels counter if we're canceling
|
||||||
|
// everything, since we're about to restart with a new map update; this
|
||||||
|
// allows future calls to sendNewMapRequest to retry sending lite
|
||||||
|
// updates.
|
||||||
|
c.liteMapUpdateCancels = 0
|
||||||
|
|
||||||
c.logf("[v1] cancelMapSafely: synced=%v", c.synced)
|
c.logf("[v1] cancelMapSafely: synced=%v", c.synced)
|
||||||
|
|
||||||
if c.inPollNetMap {
|
if c.inPollNetMap {
|
||||||
|
|
|
@ -12,7 +12,6 @@ import (
|
||||||
"encoding/binary"
|
"encoding/binary"
|
||||||
"encoding/json"
|
"encoding/json"
|
||||||
"errors"
|
"errors"
|
||||||
"flag"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"io"
|
"io"
|
||||||
"log"
|
"log"
|
||||||
|
@ -89,16 +88,15 @@ type Direct struct {
|
||||||
sfGroup singleflight.Group[struct{}, *NoiseClient] // protects noiseClient creation.
|
sfGroup singleflight.Group[struct{}, *NoiseClient] // protects noiseClient creation.
|
||||||
noiseClient *NoiseClient
|
noiseClient *NoiseClient
|
||||||
|
|
||||||
persist persist.PersistView
|
persist persist.PersistView
|
||||||
authKey string
|
authKey string
|
||||||
tryingNewKey key.NodePrivate
|
tryingNewKey key.NodePrivate
|
||||||
expiry *time.Time
|
expiry *time.Time
|
||||||
hostinfo *tailcfg.Hostinfo // always non-nil
|
hostinfo *tailcfg.Hostinfo // always non-nil
|
||||||
netinfo *tailcfg.NetInfo
|
netinfo *tailcfg.NetInfo
|
||||||
endpoints []tailcfg.Endpoint
|
endpoints []tailcfg.Endpoint
|
||||||
tkaHead string
|
tkaHead string
|
||||||
everEndpoints bool // whether we've ever had non-empty endpoints
|
lastPingURL string // last PingRequest.URL received, for dup suppression
|
||||||
lastPingURL string // last PingRequest.URL received, for dup suppression
|
|
||||||
}
|
}
|
||||||
|
|
||||||
type Options struct {
|
type Options struct {
|
||||||
|
@ -753,9 +751,6 @@ func (c *Direct) newEndpoints(endpoints []tailcfg.Endpoint) (changed bool) {
|
||||||
}
|
}
|
||||||
c.logf("[v2] client.newEndpoints(%v)", epStrs)
|
c.logf("[v2] client.newEndpoints(%v)", epStrs)
|
||||||
c.endpoints = append(c.endpoints[:0], endpoints...)
|
c.endpoints = append(c.endpoints[:0], endpoints...)
|
||||||
if len(endpoints) > 0 {
|
|
||||||
c.everEndpoints = true
|
|
||||||
}
|
|
||||||
return true // changed
|
return true // changed
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -768,8 +763,6 @@ func (c *Direct) SetEndpoints(endpoints []tailcfg.Endpoint) (changed bool) {
|
||||||
return c.newEndpoints(endpoints)
|
return c.newEndpoints(endpoints)
|
||||||
}
|
}
|
||||||
|
|
||||||
func inTest() bool { return flag.Lookup("test.v") != nil }
|
|
||||||
|
|
||||||
// PollNetMap makes a /map request to download the network map, calling cb with
|
// PollNetMap makes a /map request to download the network map, calling cb with
|
||||||
// each new netmap.
|
// each new netmap.
|
||||||
func (c *Direct) PollNetMap(ctx context.Context, cb func(*netmap.NetworkMap)) error {
|
func (c *Direct) PollNetMap(ctx context.Context, cb func(*netmap.NetworkMap)) error {
|
||||||
|
@ -824,7 +817,6 @@ func (c *Direct) sendMapRequest(ctx context.Context, maxPolls int, readOnly bool
|
||||||
epStrs = append(epStrs, ep.Addr.String())
|
epStrs = append(epStrs, ep.Addr.String())
|
||||||
epTypes = append(epTypes, ep.Type)
|
epTypes = append(epTypes, ep.Type)
|
||||||
}
|
}
|
||||||
everEndpoints := c.everEndpoints
|
|
||||||
c.mu.Unlock()
|
c.mu.Unlock()
|
||||||
|
|
||||||
machinePrivKey, err := c.getMachinePrivKey()
|
machinePrivKey, err := c.getMachinePrivKey()
|
||||||
|
@ -865,15 +857,17 @@ func (c *Direct) sendMapRequest(ctx context.Context, maxPolls int, readOnly bool
|
||||||
OmitPeers: cb == nil,
|
OmitPeers: cb == nil,
|
||||||
TKAHead: c.tkaHead,
|
TKAHead: c.tkaHead,
|
||||||
|
|
||||||
// On initial startup before we know our endpoints, set the ReadOnly flag
|
// Previously we'd set ReadOnly to true if we didn't have any endpoints
|
||||||
// to tell the control server not to distribute out our (empty) endpoints to peers.
|
// yet as we expected to learn them in a half second and restart the full
|
||||||
// Presumably we'll learn our endpoints in a half second and do another post
|
// streaming map poll, however as we are trying to reduce the number of
|
||||||
// with useful results. The first POST just gets us the DERP map which we
|
// times we restart the full streaming map poll we now just set ReadOnly
|
||||||
// need to do the STUN queries to discover our endpoints.
|
// false when we're doing a full streaming map poll.
|
||||||
// TODO(bradfitz): we skip this optimization in tests, though,
|
//
|
||||||
// because the e2e tests are currently hyper-specific about the
|
// TODO(maisem/bradfitz): really ReadOnly should be set to true if for
|
||||||
// ordering of things. The e2e tests need love.
|
// all streams and we should only do writes via lite map updates.
|
||||||
ReadOnly: readOnly || (len(epStrs) == 0 && !everEndpoints && !inTest()),
|
// However that requires an audit and a bunch of testing to make sure we
|
||||||
|
// don't break anything.
|
||||||
|
ReadOnly: readOnly && !allowStream,
|
||||||
}
|
}
|
||||||
var extraDebugFlags []string
|
var extraDebugFlags []string
|
||||||
if hi != nil && c.linkMon != nil && !c.skipIPForwardingCheck &&
|
if hi != nil && c.linkMon != nil && !c.skipIPForwardingCheck &&
|
||||||
|
|
Loading…
Reference in New Issue