From 84c99fe0d9888c279e2d779040a29a60a2e903f4 Mon Sep 17 00:00:00 2001 From: Joe Tsai Date: Thu, 11 May 2023 12:52:35 -0700 Subject: [PATCH] logtail: be less aggressive about re-uploads (#8117) The retry logic was pathological in the following ways: * If we restarted the logging service, any pending uploads would be placed in a retry-loop where it depended on backoff.Backoff, which was too aggresive. It would retry failures within milliseconds, taking at least 10 retries to hit a delay of 1 second. * In the event where a logstream was rate limited, the aggressive retry logic would severely exacerbate the problem since each retry would also log an error message. It is by chance that the rate of log error spam does not happen to exceed the rate limit itself. We modify the retry logic in the following ways: * We now respect the "Retry-After" header sent by the logging service. * Lacking a "Retry-After" header, we retry after a hard-coded period of 30 to 60 seconds. This avoids the thundering-herd effect when all nodes try reconnecting to the logging service at the same time after a restart. * We do not treat a status 400 as having been uploaded. This is simply not the behavior of the logging service. Updates #tailscale/corp#11213 Signed-off-by: Joe Tsai --- cmd/tailscaled/depaware.txt | 2 +- logtail/logtail.go | 61 +++++++++++++++++++++---------------- 2 files changed, 36 insertions(+), 27 deletions(-) diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 7e51976fc..24e089651 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -283,7 +283,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de tailscale.com/tka from tailscale.com/ipn/ipnlocal+ W tailscale.com/tsconst from tailscale.com/net/interfaces tailscale.com/tsd from tailscale.com/cmd/tailscaled+ - tailscale.com/tstime from tailscale.com/wgengine/magicsock + tailscale.com/tstime from tailscale.com/wgengine/magicsock+ 💣 tailscale.com/tstime/mono from tailscale.com/net/tstun+ tailscale.com/tstime/rate from tailscale.com/wgengine/filter+ tailscale.com/tsweb/varz from tailscale.com/cmd/tailscaled diff --git a/logtail/logtail.go b/logtail/logtail.go index 3c32df3a0..26abe0918 100644 --- a/logtail/logtail.go +++ b/logtail/logtail.go @@ -13,19 +13,19 @@ import ( "fmt" "io" "log" + mrand "math/rand" "net/http" "os" "strconv" - "strings" "sync" "sync/atomic" "time" "tailscale.com/envknob" - "tailscale.com/logtail/backoff" "tailscale.com/net/interfaces" "tailscale.com/net/netmon" "tailscale.com/net/sockstats" + "tailscale.com/tstime" tslogger "tailscale.com/types/logger" "tailscale.com/types/logid" "tailscale.com/util/set" @@ -128,9 +128,6 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { cfg.FlushDelayFn = func() time.Duration { return 0 } } - stdLogf := func(f string, a ...any) { - fmt.Fprintf(cfg.Stderr, strings.TrimSuffix(f, "\n")+"\n", a...) - } var urlSuffix string if !cfg.CopyPrivateID.IsZero() { urlSuffix = "?copyId=" + cfg.CopyPrivateID.String() @@ -148,7 +145,6 @@ func NewLogger(cfg Config, logf tslogger.Logf) *Logger { sentinel: make(chan int32, 16), flushDelayFn: cfg.FlushDelayFn, timeNow: cfg.TimeNow, - bo: backoff.NewBackoff("logtail", stdLogf, 30*time.Second), metricsDelta: cfg.MetricsDelta, sockstatsLabel: sockstats.LabelLogtailLogger, @@ -186,7 +182,6 @@ type Logger struct { flushPending atomic.Bool sentinel chan int32 timeNow func() time.Time - bo *backoff.Backoff zstdEncoder Encoder uploadCancel func() explainedRaw bool @@ -373,23 +368,38 @@ func (l *Logger) uploading(ctx context.Context) { } } - for len(body) > 0 { - select { - case <-ctx.Done(): - return - default: - } - uploaded, err := l.upload(ctx, body, origlen) + var lastError string + var numFailures int + var firstFailure time.Time + for len(body) > 0 && ctx.Err() == nil { + retryAfter, err := l.upload(ctx, body, origlen) if err != nil { + numFailures++ + firstFailure = time.Now() + if !l.internetUp() { fmt.Fprintf(l.stderr, "logtail: internet down; waiting\n") l.awaitInternetUp(ctx) continue } - fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err) - } - l.bo.BackOff(ctx, err) - if uploaded { + + // Only print the same message once. + if currError := err.Error(); lastError != currError { + fmt.Fprintf(l.stderr, "logtail: upload: %v\n", err) + lastError = currError + } + + // Sleep for the specified retryAfter period, + // otherwise default to some random value. + if retryAfter <= 0 { + retryAfter = time.Duration(30+mrand.Intn(30)) * time.Second + } + tstime.Sleep(ctx, retryAfter) + } else { + // Only print a success message after recovery. + if numFailures > 0 { + fmt.Fprintf(l.stderr, "logtail: upload succeeded after %d failures and %s\n", numFailures, time.Since(firstFailure).Round(time.Second)) + } break } } @@ -433,7 +443,7 @@ func (l *Logger) awaitInternetUp(ctx context.Context) { // upload uploads body to the log server. // origlen indicates the pre-compression body length. // origlen of -1 indicates that the body is not compressed. -func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded bool, err error) { +func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (retryAfter time.Duration, err error) { const maxUploadTime = 45 * time.Second ctx = sockstats.WithSockStats(ctx, l.sockstatsLabel, l.Logf) ctx, cancel := context.WithTimeout(ctx, maxUploadTime) @@ -460,17 +470,16 @@ func (l *Logger) upload(ctx context.Context, body []byte, origlen int) (uploaded l.httpDoCalls.Add(1) resp, err := l.httpc.Do(req) if err != nil { - return false, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err) + return 0, fmt.Errorf("log upload of %d bytes %s failed: %v", len(body), compressedNote, err) } defer resp.Body.Close() - if resp.StatusCode != 200 { - uploaded = resp.StatusCode == 400 // the server saved the logs anyway - b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<20)) - return uploaded, fmt.Errorf("log upload of %d bytes %s failed %d: %q", len(body), compressedNote, resp.StatusCode, b) + if resp.StatusCode != http.StatusOK { + n, _ := strconv.Atoi(resp.Header.Get("Retry-After")) + b, _ := io.ReadAll(io.LimitReader(resp.Body, 1<<10)) + return time.Duration(n) * time.Second, fmt.Errorf("log upload of %d bytes %s failed %d: %s", len(body), compressedNote, resp.StatusCode, bytes.TrimSpace(b)) } - - return true, nil + return 0, nil } // Flush uploads all logs to the server. It blocks until complete or there is an