From 369ccfaca2ec7aa4908f85d0f75d472dd79bf03e Mon Sep 17 00:00:00 2001 From: Val Date: Tue, 2 May 2023 12:04:43 -0700 Subject: [PATCH] derp,derper: add flow tracking Track all data flows between clients of a derp server, both local and remote, and display them in /debug/flows/. Fixes #3560 Signed-off-by: Val --- cmd/derper/depaware.txt | 1 + cmd/derper/derper.go | 1 + cmd/tailscale/depaware.txt | 1 + cmd/tailscaled/depaware.txt | 2 +- derp/derp_server.go | 250 ++++++++++++++++++++++++++++++++++++ derp/derp_test.go | 6 + 6 files changed, 260 insertions(+), 1 deletion(-) diff --git a/cmd/derper/depaware.txt b/cmd/derper/depaware.txt index d6562fb75..70f9cf9cb 100644 --- a/cmd/derper/depaware.txt +++ b/cmd/derper/depaware.txt @@ -154,6 +154,7 @@ tailscale.com/cmd/derper dependencies: (generated by github.com/tailscale/depawa golang.org/x/crypto/nacl/secretbox from golang.org/x/crypto/nacl/box golang.org/x/crypto/salsa20/salsa from golang.org/x/crypto/nacl/box+ golang.org/x/exp/constraints from golang.org/x/exp/slices + golang.org/x/exp/maps from tailscale.com/derp golang.org/x/exp/slices from tailscale.com/net/tsaddr+ L golang.org/x/net/bpf from github.com/mdlayher/netlink+ golang.org/x/net/dns/dnsmessage from net+ diff --git a/cmd/derper/derper.go b/cmd/derper/derper.go index 02736b6be..326f2fac8 100644 --- a/cmd/derper/derper.go +++ b/cmd/derper/derper.go @@ -217,6 +217,7 @@ func main() { } })) debug.Handle("traffic", "Traffic check", http.HandlerFunc(s.ServeDebugTraffic)) + debug.Handle("flows", "Flows", http.HandlerFunc(s.ServeFlows)) if *runSTUN { go serveSTUN(listenHost, *stunPort) diff --git a/cmd/tailscale/depaware.txt b/cmd/tailscale/depaware.txt index 72444a2a2..0d6a9b0f2 100644 --- a/cmd/tailscale/depaware.txt +++ b/cmd/tailscale/depaware.txt @@ -145,6 +145,7 @@ tailscale.com/cmd/tailscale dependencies: (generated by github.com/tailscale/dep golang.org/x/crypto/pbkdf2 from software.sslmate.com/src/go-pkcs12 golang.org/x/crypto/salsa20/salsa from golang.org/x/crypto/nacl/box+ golang.org/x/exp/constraints from golang.org/x/exp/slices + golang.org/x/exp/maps from tailscale.com/derp golang.org/x/exp/slices from tailscale.com/net/tsaddr+ golang.org/x/net/bpf from github.com/mdlayher/netlink+ golang.org/x/net/dns/dnsmessage from net+ diff --git a/cmd/tailscaled/depaware.txt b/cmd/tailscaled/depaware.txt index 24e089651..a3e4893db 100644 --- a/cmd/tailscaled/depaware.txt +++ b/cmd/tailscaled/depaware.txt @@ -364,7 +364,7 @@ tailscale.com/cmd/tailscaled dependencies: (generated by github.com/tailscale/de golang.org/x/crypto/salsa20/salsa from golang.org/x/crypto/nacl/box+ LD golang.org/x/crypto/ssh from tailscale.com/ssh/tailssh+ golang.org/x/exp/constraints from golang.org/x/exp/slices+ - golang.org/x/exp/maps from tailscale.com/wgengine + golang.org/x/exp/maps from tailscale.com/wgengine+ golang.org/x/exp/slices from tailscale.com/ipn/ipnlocal+ golang.org/x/net/bpf from github.com/mdlayher/genetlink+ golang.org/x/net/dns/dnsmessage from net+ diff --git a/derp/derp_server.go b/derp/derp_server.go index 1ad5d25f3..6616d749d 100644 --- a/derp/derp_server.go +++ b/derp/derp_server.go @@ -26,6 +26,7 @@ import ( "net/netip" "os/exec" "runtime" + "sort" "strconv" "strings" "sync" @@ -33,6 +34,7 @@ import ( "time" "go4.org/mem" + "golang.org/x/exp/maps" "golang.org/x/sync/errgroup" "tailscale.com/client/tailscale" "tailscale.com/disco" @@ -71,6 +73,10 @@ func init() { const ( perClientSendQueueDepth = 32 // packets buffered for sending writeTimeout = 2 * time.Second + // How often to check for and free inactive flows + flowsCleanInterval = 10 * time.Minute + // How long a flow can be inactive before it is freed + flowsInactiveTime = 5 * time.Minute ) // dupPolicy is a temporary (2021-08-30) mechanism to change the policy @@ -88,6 +94,15 @@ const ( disableFighters ) +// flowStats tracks statistics for a client to client data flow through +// this server. The counters are written solely by run(), read by the +// debug server, and do not need to be thread-safe. +type flowStats struct { + bytes int64 + packets int64 + lastActive time.Time +} + type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment // Server is a DERP server. @@ -162,6 +177,13 @@ type Server struct { // src. sentTo map[key.NodePublic]map[key.NodePublic]int64 // src => dst => dst's latest sclient.connNum + // flows is a map of all active flowStats on the server, + // protected by s.mu. After allocation by the server, + // flowStats are updated and deleted by the sclient read loop. + flows map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats // sclient => dst => mesh peer if any => flow stats + + // flowsTimer signals when it is time to free inactive flows + flowsTimer *time.Timer // maps from netip.AddrPort to a client's public key keyOfAddr map[netip.AddrPort]key.NodePublic } @@ -281,6 +303,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool { type PacketForwarder interface { ForwardPacket(src, dst key.NodePublic, payload []byte) error String() string + ServerPublicKey() key.NodePublic } // Conn is the subset of the underlying net.Conn the DERP Server needs. @@ -317,6 +340,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { sentTo: map[key.NodePublic]map[key.NodePublic]int64{}, avgQueueDuration: new(uint64), tcpRtt: metrics.LabelMap{Label: "le"}, + flows: map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats{}, keyOfAddr: map[netip.AddrPort]key.NodePublic{}, } s.initMetacert() @@ -333,6 +357,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server { } s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco") s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other") + s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc) return s } @@ -376,6 +401,7 @@ func (s *Server) Close() error { var closedChs []chan struct{} s.mu.Lock() + s.flowsTimer.Stop() for nc, closed := range s.netConns { nc.Close() closedChs = append(closedChs, closed) @@ -528,6 +554,7 @@ func (s *Server) registerClient(c *sclient) { if _, ok := s.clientsMesh[c.key]; !ok { s.clientsMesh[c.key] = nil // just for varz of total users in cluster } + s.flows[c] = make(map[key.NodePublic]map[key.NodePublic]*flowStats) s.keyOfAddr[c.remoteIPPort] = c.key s.curClients.Add(1) s.broadcastPeerStateChangeLocked(c.key, true) @@ -585,6 +612,8 @@ func (s *Server) unregisterClient(c *sclient) { } } + delete(s.flows, c) + if c.canMesh { delete(s.watchers, c) } @@ -704,6 +733,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem peerGone: make(chan peerGoneMsg), canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey, peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3), + flows: make(map[key.NodePublic]map[key.NodePublic]*flowStats), } if c.canMesh { @@ -795,6 +825,54 @@ func (c *sclient) run(ctx context.Context) error { if err != nil { return err } + c.cleanFlows() + } +} + +// flowsTimerFunc tells all the sclients to scan for inactive flows +// and free them. +func (s *Server) flowsTimerFunc() { + s.mu.Lock() + defer s.mu.Unlock() + if s.closed { + return + } + for c := range s.flows { + c.flowsCleanNow = true + } + s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc) +} + +// cleanFlows scans all the flows for this sclient and frees any that +// have been inactive for more than flowsCleanInterval. +func (c *sclient) cleanFlows() { + if !c.flowsCleanNow { + return + } + c.flowsCleanNow = false + c.debugLogf("cleaning flows") + now := time.Now() + s := c.s + for dstKey, meshMap := range c.flows { + for srcOrFwd, flow := range meshMap { + if now.Sub(flow.lastActive) > flowsInactiveTime { + c.debugLogf("deleting flow %s", dstKey.ShortString()) + // Common case is only one flow for + // dstKey, in which case we should + // delete the whole submap. + if len(c.flows[dstKey]) == 1 { + delete(c.flows, dstKey) + s.mu.Lock() + delete(s.flows[c], dstKey) + s.mu.Unlock() + } else { + delete(c.flows[dstKey], srcOrFwd) + s.mu.Lock() + delete(s.flows[c][dstKey], srcOrFwd) + s.mu.Unlock() + } + } + } } } @@ -912,6 +990,8 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error { } s.mu.Unlock() + c.updateFlowStats(dstKey, srcKey, contents) + if dst == nil { reason := dropReasonUnknownDestOnFwd if dstLen > 1 { @@ -944,6 +1024,48 @@ func (s *Server) notePeerSendLocked(src key.NodePublic, dst *sclient) { m[dst.key] = dst.connNum } +// allocFlowStats allocates a flowStats to track bytes/packets sent in +// the flow identified by this dstKey and srcOrFwd. It should only be +// called when the flowStat for this connection does not yet exist. +func (c *sclient) allocFlowStats(dstKey, srcOrFwd key.NodePublic) *flowStats { + c.debugLogf("allocating flow") + s := c.s + + s.mu.Lock() + if _, ok := s.flows[c][dstKey]; !ok { + s.flows[c][dstKey] = make(map[key.NodePublic]*flowStats) + } + f := &flowStats{} + s.flows[c][dstKey][srcOrFwd] = f + s.mu.Unlock() + + if _, ok := c.flows[dstKey]; !ok { + c.flows[dstKey] = make(map[key.NodePublic]*flowStats) + } + c.flows[dstKey][srcOrFwd] = f + return f +} + +// updateFlowStats updates the flowStat tracking the data sent from +// this sclient to the destination with dstKey. If this sclient is +// forwarding the data from another mesh peer, then srcOrFwd is the +// source key. If this sclient is forwarding the data to another mesh +// peer, then srcOrFwd is the key of the mesh peer we are sending it +// to. Otherwise, srcOrFwd is the zero value. +func (c *sclient) updateFlowStats(dstKey, srcOrFwd key.NodePublic, contents []byte) { + var f *flowStats + // The common case is that the flowStat already exists + if _, ok := c.flows[dstKey]; ok { + f = c.flows[dstKey][srcOrFwd] + } + if f == nil { + f = c.allocFlowStats(dstKey, srcOrFwd) + } + f.packets += 1 + f.bytes += int64(len(contents)) + f.lastActive = time.Now() +} + // handleFrameSendPacket reads a "send packet" frame from the client. func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { s := c.s @@ -956,6 +1078,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { var fwd PacketForwarder var dstLen int var dst *sclient + var fwdKey key.NodePublic s.mu.Lock() if set, ok := s.clients[dstKey]; ok { @@ -966,9 +1089,14 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error { s.notePeerSendLocked(c.key, dst) } else if dstLen < 1 { fwd = s.clientsMesh[dstKey] + if fwd != nil { + fwdKey = fwd.ServerPublicKey() + } } s.mu.Unlock() + c.updateFlowStats(dstKey, fwdKey, contents) + if dst == nil { if fwd != nil { s.packetsForwardedOut.Add(1) @@ -1338,6 +1466,17 @@ type sclient struct { // client that it's trying to establish a direct connection // through us with a peer we have no record of. peerGoneLim *rate.Limiter + + // flows is a cache of the subset of flows statistics for + // which this client is either the src or the mesh peer + // forwarding for the src. The flowStats fields are changed by + // only the sclient read loop, but the flowStats must be + // allocated by the server. + flows map[key.NodePublic]map[key.NodePublic]*flowStats + + // flowsCleanNow is set by the server to indicate it is time + // for this sclient to look for and free inactive flows. + flowsCleanNow bool } // peerConnState represents whether a peer is connected to the server @@ -1769,6 +1908,10 @@ func (f *multiForwarder) String() string { return fmt.Sprintf("", f.fwd.Load(), len(f.all)) } +func (f *multiForwarder) ServerPublicKey() key.NodePublic { + return f.fwd.Load().ServerPublicKey() +} + func (s *Server) expVarFunc(f func() any) expvar.Func { return expvar.Func(func() any { s.mu.Lock() @@ -1942,6 +2085,113 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) { } } +// takeFlowsSnapshot returns a map of copies of all the flowStats for +// this server. The output should be suitable for comparing the stats +// for a specific flow at multiple points in time so that we can +// calculate the bandwidth used. +func (s *Server) takeFlowsSnapshot() map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats { + s.debugLogf("taking flows snapshot") + s.mu.Lock() + defer s.mu.Unlock() + + snap := maps.Clone(s.flows) + for c := range s.flows { + snap[c] = maps.Clone(s.flows[c]) + for dstKey := range s.flows[c] { + snap[c][dstKey] = maps.Clone(s.flows[c][dstKey]) + for srcOrFwd := range s.flows[c][dstKey] { + f := flowStats{} + f = *s.flows[c][dstKey][srcOrFwd] + snap[c][dstKey][srcOrFwd] = &f + } + } + } + return snap +} + +// ServeFlows prints a list of recently active data flows through +// this server, including the bandwidth used by each over the last +// second. +func (s *Server) ServeFlows(w http.ResponseWriter, r *http.Request) { + fmt.Fprintf(w, "current time: %s\n", time.Now().Format("2006-01-02 15:04:05")) + + lastTime := time.Now() + lastSnap := s.takeFlowsSnapshot() + // Sleep long enough to get some kind of useful bandwidth estimate + time.Sleep(1 * time.Second) + currTime := time.Now() + currSnap := s.takeFlowsSnapshot() + dur := currTime.Sub(lastTime) + + type flowLine struct { + connNum int64 + src, dst key.NodePublic + fwdTo, recvFrom key.NodePublic + bytes int64 + packets int64 + bps int64 + pps int64 + lastActive time.Duration + } + + // Copy current flow snapshot into a sortable slice of flowLines + lines := make([]flowLine, 0) + for c := range currSnap { + for dstKey := range currSnap[c] { + for srcOrFwd := range currSnap[c][dstKey] { + f := currSnap[c][dstKey][srcOrFwd] + l := flowLine{ + connNum: c.connNum, + dst: dstKey, + bytes: f.bytes, + packets: f.packets, + lastActive: currTime.Sub(f.lastActive), + } + // Calculate bandwidth + last := flowStats{} + if _, ok := lastSnap[c]; ok { + if _, ok2 := lastSnap[c][dstKey]; ok2 { + last = *lastSnap[c][dstKey][srcOrFwd] + } + } + bytes := l.bytes - last.bytes + packets := l.packets - last.packets + l.bps = int64(float64(bytes) / dur.Seconds()) + l.pps = int64(float64(packets) / dur.Seconds()) + + // src and mesh take a lil calculation + if srcOrFwd.IsZero() { + // common case, src and dst local + l.src = c.key + } else { + // one of src or dst is remote + if c.canMesh { + // dst is local, src is remote + l.recvFrom = c.key + l.src = srcOrFwd + } else { + // src is local, dst is remote + l.fwdTo = srcOrFwd + l.src = c.key + } + } + lines = append(lines, l) + } + } + } + + fmt.Fprintf(w, "number flows: %v\n", len(lines)) + // Sort by highest bandwidth flow + sort.Slice(lines, func(i, j int) bool { return lines[i].bps > lines[j].bps }) + + headStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %-12v\n" + bodyStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %5v secs\n" + fmt.Fprintf(w, headStr, "conn#", "src", "dst", "recv from", "fwded to", "bytes", "packets", "bps", "pps", "last active") + for _, l := range lines { + fmt.Fprintf(w, bodyStr, l.connNum, l.src.ShortString(), l.dst.ShortString(), l.recvFrom.ShortString(), l.fwdTo.ShortString(), l.bytes, l.packets, l.bps, l.pps, int(l.lastActive.Seconds())) + } +} + var bufioWriterPool = &sync.Pool{ New: func() any { return bufio.NewWriterSize(io.Discard, 2<<10) diff --git a/derp/derp_test.go b/derp/derp_test.go index 62d4e0a00..801ae7cbe 100644 --- a/derp/derp_test.go +++ b/derp/derp_test.go @@ -712,6 +712,9 @@ func (testFwd) ForwardPacket(key.NodePublic, key.NodePublic, []byte) error { func (testFwd) String() string { panic("not called in tests") } +func (testFwd) ServerPublicKey() key.NodePublic { + panic("not called in tests") +} func pubAll(b byte) (ret key.NodePublic) { var bs [32]byte @@ -844,6 +847,9 @@ func (f channelFwd) ForwardPacket(_ key.NodePublic, _ key.NodePublic, packet []b f.c <- packet return nil } +func (f channelFwd) ServerPublicKey() key.NodePublic { + panic("not called in tests") +} func TestMultiForwarder(t *testing.T) { received := 0