|
|
|
@ -26,6 +26,7 @@ import (
|
|
|
|
|
"net/netip"
|
|
|
|
|
"os/exec"
|
|
|
|
|
"runtime"
|
|
|
|
|
"sort"
|
|
|
|
|
"strconv"
|
|
|
|
|
"strings"
|
|
|
|
|
"sync"
|
|
|
|
@ -33,6 +34,7 @@ import (
|
|
|
|
|
"time"
|
|
|
|
|
|
|
|
|
|
"go4.org/mem"
|
|
|
|
|
"golang.org/x/exp/maps"
|
|
|
|
|
"golang.org/x/sync/errgroup"
|
|
|
|
|
"tailscale.com/client/tailscale"
|
|
|
|
|
"tailscale.com/disco"
|
|
|
|
@ -71,6 +73,10 @@ func init() {
|
|
|
|
|
const (
|
|
|
|
|
perClientSendQueueDepth = 32 // packets buffered for sending
|
|
|
|
|
writeTimeout = 2 * time.Second
|
|
|
|
|
// How often to check for and free inactive flows
|
|
|
|
|
flowsCleanInterval = 10 * time.Minute
|
|
|
|
|
// How long a flow can be inactive before it is freed
|
|
|
|
|
flowsInactiveTime = 5 * time.Minute
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
|
|
|
|
@ -88,6 +94,15 @@ const (
|
|
|
|
|
disableFighters
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
// flowStats tracks statistics for a client to client data flow through
|
|
|
|
|
// this server. The counters are written solely by run(), read by the
|
|
|
|
|
// debug server, and do not need to be thread-safe.
|
|
|
|
|
type flowStats struct {
|
|
|
|
|
bytes int64
|
|
|
|
|
packets int64
|
|
|
|
|
lastActive time.Time
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment
|
|
|
|
|
|
|
|
|
|
// Server is a DERP server.
|
|
|
|
@ -162,6 +177,13 @@ type Server struct {
|
|
|
|
|
// src.
|
|
|
|
|
sentTo map[key.NodePublic]map[key.NodePublic]int64 // src => dst => dst's latest sclient.connNum
|
|
|
|
|
|
|
|
|
|
// flows is a map of all active flowStats on the server,
|
|
|
|
|
// protected by s.mu. After allocation by the server,
|
|
|
|
|
// flowStats are updated and deleted by the sclient read loop.
|
|
|
|
|
flows map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats // sclient => dst => mesh peer if any => flow stats
|
|
|
|
|
|
|
|
|
|
// flowsTimer signals when it is time to free inactive flows
|
|
|
|
|
flowsTimer *time.Timer
|
|
|
|
|
// maps from netip.AddrPort to a client's public key
|
|
|
|
|
keyOfAddr map[netip.AddrPort]key.NodePublic
|
|
|
|
|
}
|
|
|
|
@ -281,6 +303,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool {
|
|
|
|
|
type PacketForwarder interface {
|
|
|
|
|
ForwardPacket(src, dst key.NodePublic, payload []byte) error
|
|
|
|
|
String() string
|
|
|
|
|
ServerPublicKey() key.NodePublic
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Conn is the subset of the underlying net.Conn the DERP Server needs.
|
|
|
|
@ -317,6 +340,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
|
|
|
|
sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
|
|
|
|
|
avgQueueDuration: new(uint64),
|
|
|
|
|
tcpRtt: metrics.LabelMap{Label: "le"},
|
|
|
|
|
flows: map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats{},
|
|
|
|
|
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
|
|
|
|
|
}
|
|
|
|
|
s.initMetacert()
|
|
|
|
@ -333,6 +357,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
|
|
|
|
|
}
|
|
|
|
|
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
|
|
|
|
|
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
|
|
|
|
|
s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc)
|
|
|
|
|
return s
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -376,6 +401,7 @@ func (s *Server) Close() error {
|
|
|
|
|
var closedChs []chan struct{}
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
s.flowsTimer.Stop()
|
|
|
|
|
for nc, closed := range s.netConns {
|
|
|
|
|
nc.Close()
|
|
|
|
|
closedChs = append(closedChs, closed)
|
|
|
|
@ -528,6 +554,7 @@ func (s *Server) registerClient(c *sclient) {
|
|
|
|
|
if _, ok := s.clientsMesh[c.key]; !ok {
|
|
|
|
|
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
|
|
|
|
|
}
|
|
|
|
|
s.flows[c] = make(map[key.NodePublic]map[key.NodePublic]*flowStats)
|
|
|
|
|
s.keyOfAddr[c.remoteIPPort] = c.key
|
|
|
|
|
s.curClients.Add(1)
|
|
|
|
|
s.broadcastPeerStateChangeLocked(c.key, true)
|
|
|
|
@ -585,6 +612,8 @@ func (s *Server) unregisterClient(c *sclient) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
delete(s.flows, c)
|
|
|
|
|
|
|
|
|
|
if c.canMesh {
|
|
|
|
|
delete(s.watchers, c)
|
|
|
|
|
}
|
|
|
|
@ -704,6 +733,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
|
|
|
|
|
peerGone: make(chan peerGoneMsg),
|
|
|
|
|
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
|
|
|
|
|
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
|
|
|
|
|
flows: make(map[key.NodePublic]map[key.NodePublic]*flowStats),
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if c.canMesh {
|
|
|
|
@ -795,6 +825,54 @@ func (c *sclient) run(ctx context.Context) error {
|
|
|
|
|
if err != nil {
|
|
|
|
|
return err
|
|
|
|
|
}
|
|
|
|
|
c.cleanFlows()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// flowsTimerFunc tells all the sclients to scan for inactive flows
|
|
|
|
|
// and free them.
|
|
|
|
|
func (s *Server) flowsTimerFunc() {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
if s.closed {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
for c := range s.flows {
|
|
|
|
|
c.flowsCleanNow = true
|
|
|
|
|
}
|
|
|
|
|
s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// cleanFlows scans all the flows for this sclient and frees any that
|
|
|
|
|
// have been inactive for more than flowsCleanInterval.
|
|
|
|
|
func (c *sclient) cleanFlows() {
|
|
|
|
|
if !c.flowsCleanNow {
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
c.flowsCleanNow = false
|
|
|
|
|
c.debugLogf("cleaning flows")
|
|
|
|
|
now := time.Now()
|
|
|
|
|
s := c.s
|
|
|
|
|
for dstKey, meshMap := range c.flows {
|
|
|
|
|
for srcOrFwd, flow := range meshMap {
|
|
|
|
|
if now.Sub(flow.lastActive) > flowsInactiveTime {
|
|
|
|
|
c.debugLogf("deleting flow %s", dstKey.ShortString())
|
|
|
|
|
// Common case is only one flow for
|
|
|
|
|
// dstKey, in which case we should
|
|
|
|
|
// delete the whole submap.
|
|
|
|
|
if len(c.flows[dstKey]) == 1 {
|
|
|
|
|
delete(c.flows, dstKey)
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
delete(s.flows[c], dstKey)
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
} else {
|
|
|
|
|
delete(c.flows[dstKey], srcOrFwd)
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
delete(s.flows[c][dstKey], srcOrFwd)
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
@ -912,6 +990,8 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
c.updateFlowStats(dstKey, srcKey, contents)
|
|
|
|
|
|
|
|
|
|
if dst == nil {
|
|
|
|
|
reason := dropReasonUnknownDestOnFwd
|
|
|
|
|
if dstLen > 1 {
|
|
|
|
@ -944,6 +1024,48 @@ func (s *Server) notePeerSendLocked(src key.NodePublic, dst *sclient) {
|
|
|
|
|
m[dst.key] = dst.connNum
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// allocFlowStats allocates a flowStats to track bytes/packets sent in
|
|
|
|
|
// the flow identified by this dstKey and srcOrFwd. It should only be
|
|
|
|
|
// called when the flowStat for this connection does not yet exist.
|
|
|
|
|
func (c *sclient) allocFlowStats(dstKey, srcOrFwd key.NodePublic) *flowStats {
|
|
|
|
|
c.debugLogf("allocating flow")
|
|
|
|
|
s := c.s
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if _, ok := s.flows[c][dstKey]; !ok {
|
|
|
|
|
s.flows[c][dstKey] = make(map[key.NodePublic]*flowStats)
|
|
|
|
|
}
|
|
|
|
|
f := &flowStats{}
|
|
|
|
|
s.flows[c][dstKey][srcOrFwd] = f
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
if _, ok := c.flows[dstKey]; !ok {
|
|
|
|
|
c.flows[dstKey] = make(map[key.NodePublic]*flowStats)
|
|
|
|
|
}
|
|
|
|
|
c.flows[dstKey][srcOrFwd] = f
|
|
|
|
|
return f
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// updateFlowStats updates the flowStat tracking the data sent from
|
|
|
|
|
// this sclient to the destination with dstKey. If this sclient is
|
|
|
|
|
// forwarding the data from another mesh peer, then srcOrFwd is the
|
|
|
|
|
// source key. If this sclient is forwarding the data to another mesh
|
|
|
|
|
// peer, then srcOrFwd is the key of the mesh peer we are sending it
|
|
|
|
|
// to. Otherwise, srcOrFwd is the zero value.
|
|
|
|
|
func (c *sclient) updateFlowStats(dstKey, srcOrFwd key.NodePublic, contents []byte) {
|
|
|
|
|
var f *flowStats
|
|
|
|
|
// The common case is that the flowStat already exists
|
|
|
|
|
if _, ok := c.flows[dstKey]; ok {
|
|
|
|
|
f = c.flows[dstKey][srcOrFwd]
|
|
|
|
|
}
|
|
|
|
|
if f == nil {
|
|
|
|
|
f = c.allocFlowStats(dstKey, srcOrFwd)
|
|
|
|
|
}
|
|
|
|
|
f.packets += 1
|
|
|
|
|
f.bytes += int64(len(contents))
|
|
|
|
|
f.lastActive = time.Now()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// handleFrameSendPacket reads a "send packet" frame from the client.
|
|
|
|
|
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|
|
|
|
s := c.s
|
|
|
|
@ -956,6 +1078,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|
|
|
|
var fwd PacketForwarder
|
|
|
|
|
var dstLen int
|
|
|
|
|
var dst *sclient
|
|
|
|
|
var fwdKey key.NodePublic
|
|
|
|
|
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
if set, ok := s.clients[dstKey]; ok {
|
|
|
|
@ -966,9 +1089,14 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
|
|
|
|
|
s.notePeerSendLocked(c.key, dst)
|
|
|
|
|
} else if dstLen < 1 {
|
|
|
|
|
fwd = s.clientsMesh[dstKey]
|
|
|
|
|
if fwd != nil {
|
|
|
|
|
fwdKey = fwd.ServerPublicKey()
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
c.updateFlowStats(dstKey, fwdKey, contents)
|
|
|
|
|
|
|
|
|
|
if dst == nil {
|
|
|
|
|
if fwd != nil {
|
|
|
|
|
s.packetsForwardedOut.Add(1)
|
|
|
|
@ -1338,6 +1466,17 @@ type sclient struct {
|
|
|
|
|
// client that it's trying to establish a direct connection
|
|
|
|
|
// through us with a peer we have no record of.
|
|
|
|
|
peerGoneLim *rate.Limiter
|
|
|
|
|
|
|
|
|
|
// flows is a cache of the subset of flows statistics for
|
|
|
|
|
// which this client is either the src or the mesh peer
|
|
|
|
|
// forwarding for the src. The flowStats fields are changed by
|
|
|
|
|
// only the sclient read loop, but the flowStats must be
|
|
|
|
|
// allocated by the server.
|
|
|
|
|
flows map[key.NodePublic]map[key.NodePublic]*flowStats
|
|
|
|
|
|
|
|
|
|
// flowsCleanNow is set by the server to indicate it is time
|
|
|
|
|
// for this sclient to look for and free inactive flows.
|
|
|
|
|
flowsCleanNow bool
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// peerConnState represents whether a peer is connected to the server
|
|
|
|
@ -1769,6 +1908,10 @@ func (f *multiForwarder) String() string {
|
|
|
|
|
return fmt.Sprintf("<MultiForwarder fwd=%s total=%d>", f.fwd.Load(), len(f.all))
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (f *multiForwarder) ServerPublicKey() key.NodePublic {
|
|
|
|
|
return f.fwd.Load().ServerPublicKey()
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func (s *Server) expVarFunc(f func() any) expvar.Func {
|
|
|
|
|
return expvar.Func(func() any {
|
|
|
|
|
s.mu.Lock()
|
|
|
|
@ -1942,6 +2085,113 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// takeFlowsSnapshot returns a map of copies of all the flowStats for
|
|
|
|
|
// this server. The output should be suitable for comparing the stats
|
|
|
|
|
// for a specific flow at multiple points in time so that we can
|
|
|
|
|
// calculate the bandwidth used.
|
|
|
|
|
func (s *Server) takeFlowsSnapshot() map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats {
|
|
|
|
|
s.debugLogf("taking flows snapshot")
|
|
|
|
|
s.mu.Lock()
|
|
|
|
|
defer s.mu.Unlock()
|
|
|
|
|
|
|
|
|
|
snap := maps.Clone(s.flows)
|
|
|
|
|
for c := range s.flows {
|
|
|
|
|
snap[c] = maps.Clone(s.flows[c])
|
|
|
|
|
for dstKey := range s.flows[c] {
|
|
|
|
|
snap[c][dstKey] = maps.Clone(s.flows[c][dstKey])
|
|
|
|
|
for srcOrFwd := range s.flows[c][dstKey] {
|
|
|
|
|
f := flowStats{}
|
|
|
|
|
f = *s.flows[c][dstKey][srcOrFwd]
|
|
|
|
|
snap[c][dstKey][srcOrFwd] = &f
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
return snap
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// ServeFlows prints a list of recently active data flows through
|
|
|
|
|
// this server, including the bandwidth used by each over the last
|
|
|
|
|
// second.
|
|
|
|
|
func (s *Server) ServeFlows(w http.ResponseWriter, r *http.Request) {
|
|
|
|
|
fmt.Fprintf(w, "current time: %s\n", time.Now().Format("2006-01-02 15:04:05"))
|
|
|
|
|
|
|
|
|
|
lastTime := time.Now()
|
|
|
|
|
lastSnap := s.takeFlowsSnapshot()
|
|
|
|
|
// Sleep long enough to get some kind of useful bandwidth estimate
|
|
|
|
|
time.Sleep(1 * time.Second)
|
|
|
|
|
currTime := time.Now()
|
|
|
|
|
currSnap := s.takeFlowsSnapshot()
|
|
|
|
|
dur := currTime.Sub(lastTime)
|
|
|
|
|
|
|
|
|
|
type flowLine struct {
|
|
|
|
|
connNum int64
|
|
|
|
|
src, dst key.NodePublic
|
|
|
|
|
fwdTo, recvFrom key.NodePublic
|
|
|
|
|
bytes int64
|
|
|
|
|
packets int64
|
|
|
|
|
bps int64
|
|
|
|
|
pps int64
|
|
|
|
|
lastActive time.Duration
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Copy current flow snapshot into a sortable slice of flowLines
|
|
|
|
|
lines := make([]flowLine, 0)
|
|
|
|
|
for c := range currSnap {
|
|
|
|
|
for dstKey := range currSnap[c] {
|
|
|
|
|
for srcOrFwd := range currSnap[c][dstKey] {
|
|
|
|
|
f := currSnap[c][dstKey][srcOrFwd]
|
|
|
|
|
l := flowLine{
|
|
|
|
|
connNum: c.connNum,
|
|
|
|
|
dst: dstKey,
|
|
|
|
|
bytes: f.bytes,
|
|
|
|
|
packets: f.packets,
|
|
|
|
|
lastActive: currTime.Sub(f.lastActive),
|
|
|
|
|
}
|
|
|
|
|
// Calculate bandwidth
|
|
|
|
|
last := flowStats{}
|
|
|
|
|
if _, ok := lastSnap[c]; ok {
|
|
|
|
|
if _, ok2 := lastSnap[c][dstKey]; ok2 {
|
|
|
|
|
last = *lastSnap[c][dstKey][srcOrFwd]
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
bytes := l.bytes - last.bytes
|
|
|
|
|
packets := l.packets - last.packets
|
|
|
|
|
l.bps = int64(float64(bytes) / dur.Seconds())
|
|
|
|
|
l.pps = int64(float64(packets) / dur.Seconds())
|
|
|
|
|
|
|
|
|
|
// src and mesh take a lil calculation
|
|
|
|
|
if srcOrFwd.IsZero() {
|
|
|
|
|
// common case, src and dst local
|
|
|
|
|
l.src = c.key
|
|
|
|
|
} else {
|
|
|
|
|
// one of src or dst is remote
|
|
|
|
|
if c.canMesh {
|
|
|
|
|
// dst is local, src is remote
|
|
|
|
|
l.recvFrom = c.key
|
|
|
|
|
l.src = srcOrFwd
|
|
|
|
|
} else {
|
|
|
|
|
// src is local, dst is remote
|
|
|
|
|
l.fwdTo = srcOrFwd
|
|
|
|
|
l.src = c.key
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
lines = append(lines, l)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
fmt.Fprintf(w, "number flows: %v\n", len(lines))
|
|
|
|
|
// Sort by highest bandwidth flow
|
|
|
|
|
sort.Slice(lines, func(i, j int) bool { return lines[i].bps > lines[j].bps })
|
|
|
|
|
|
|
|
|
|
headStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %-12v\n"
|
|
|
|
|
bodyStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %5v secs\n"
|
|
|
|
|
fmt.Fprintf(w, headStr, "conn#", "src", "dst", "recv from", "fwded to", "bytes", "packets", "bps", "pps", "last active")
|
|
|
|
|
for _, l := range lines {
|
|
|
|
|
fmt.Fprintf(w, bodyStr, l.connNum, l.src.ShortString(), l.dst.ShortString(), l.recvFrom.ShortString(), l.fwdTo.ShortString(), l.bytes, l.packets, l.bps, l.pps, int(l.lastActive.Seconds()))
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
var bufioWriterPool = &sync.Pool{
|
|
|
|
|
New: func() any {
|
|
|
|
|
return bufio.NewWriterSize(io.Discard, 2<<10)
|
|
|
|
|