@ -26,6 +26,7 @@ import (
@ -33,6 +34,7 @@ import (
@ -71,6 +73,10 @@ func init() {
const (
perClientSendQueueDepth = 32 // packets buffered for sending
writeTimeout = 2 * time.Second
// How often to check for and free inactive flows
flowsCleanInterval = 10 * time.Minute
// How long a flow can be inactive before it is freed
flowsInactiveTime = 5 * time.Minute
// dupPolicy is a temporary (2021-08-30) mechanism to change the policy
@ -88,6 +94,15 @@ const (
// flowStats tracks statistics for a client to client data flow through
// this server. The counters are written solely by run(), read by the
// debug server, and do not need to be thread-safe.
type flowStats struct {
bytes int64
packets int64
lastActive time.Time
type align64 [0]atomic.Int64 // for side effect of its 64-bit alignment
// Server is a DERP server.
@ -162,6 +177,13 @@ type Server struct {
// src.
sentTo map[key.NodePublic]map[key.NodePublic]int64 // src => dst => dst's latest sclient.connNum
// flows is a map of all active flowStats on the server,
// protected by s.mu. After allocation by the server,
// flowStats are updated and deleted by the sclient read loop.
flows map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats // sclient => dst => mesh peer if any => flow stats
// flowsTimer signals when it is time to free inactive flows
flowsTimer *time.Timer
// maps from netip.AddrPort to a client's public key
keyOfAddr map[netip.AddrPort]key.NodePublic
@ -281,6 +303,7 @@ func (s *dupClientSet) removeClient(c *sclient) bool {
type PacketForwarder interface {
ForwardPacket(src, dst key.NodePublic, payload []byte) error
String() string
ServerPublicKey() key.NodePublic
// Conn is the subset of the underlying net.Conn the DERP Server needs.
@ -317,6 +340,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
sentTo: map[key.NodePublic]map[key.NodePublic]int64{},
avgQueueDuration: new(uint64),
tcpRtt: metrics.LabelMap{Label: "le"},
flows: map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats{},
keyOfAddr: map[netip.AddrPort]key.NodePublic{},
@ -333,6 +357,7 @@ func NewServer(privateKey key.NodePrivate, logf logger.Logf) *Server {
s.packetsDroppedTypeDisco = s.packetsDroppedType.Get("disco")
s.packetsDroppedTypeOther = s.packetsDroppedType.Get("other")
s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc)
return s
@ -376,6 +401,7 @@ func (s *Server) Close() error {
var closedChs []chan struct{}
for nc, closed := range s.netConns {
closedChs = append(closedChs, closed)
@ -528,6 +554,7 @@ func (s *Server) registerClient(c *sclient) {
if _, ok := s.clientsMesh[c.key]; !ok {
s.clientsMesh[c.key] = nil // just for varz of total users in cluster
s.flows[c] = make(map[key.NodePublic]map[key.NodePublic]*flowStats)
s.keyOfAddr[c.remoteIPPort] = c.key
s.broadcastPeerStateChangeLocked(c.key, true)
@ -585,6 +612,8 @@ func (s *Server) unregisterClient(c *sclient) {
delete(s.flows, c)
if c.canMesh {
delete(s.watchers, c)
@ -704,6 +733,7 @@ func (s *Server) accept(ctx context.Context, nc Conn, brw *bufio.ReadWriter, rem
peerGone: make(chan peerGoneMsg),
canMesh: clientInfo.MeshKey != "" && clientInfo.MeshKey == s.meshKey,
peerGoneLim: rate.NewLimiter(rate.Every(time.Second), 3),
flows: make(map[key.NodePublic]map[key.NodePublic]*flowStats),
if c.canMesh {
@ -795,6 +825,54 @@ func (c *sclient) run(ctx context.Context) error {
if err != nil {
return err
// flowsTimerFunc tells all the sclients to scan for inactive flows
// and free them.
func (s *Server) flowsTimerFunc() {
defer s.mu.Unlock()
if s.closed {
for c := range s.flows {
c.flowsCleanNow = true
s.flowsTimer = time.AfterFunc(flowsCleanInterval, s.flowsTimerFunc)
// cleanFlows scans all the flows for this sclient and frees any that
// have been inactive for more than flowsCleanInterval.
func (c *sclient) cleanFlows() {
if !c.flowsCleanNow {
c.flowsCleanNow = false
c.debugLogf("cleaning flows")
now := time.Now()
s := c.s
for dstKey, meshMap := range c.flows {
for srcOrFwd, flow := range meshMap {
if now.Sub(flow.lastActive) > flowsInactiveTime {
c.debugLogf("deleting flow %s", dstKey.ShortString())
// Common case is only one flow for
// dstKey, in which case we should
// delete the whole submap.
if len(c.flows[dstKey]) == 1 {
delete(c.flows, dstKey)
delete(s.flows[c], dstKey)
} else {
delete(c.flows[dstKey], srcOrFwd)
delete(s.flows[c][dstKey], srcOrFwd)
@ -912,6 +990,8 @@ func (c *sclient) handleFrameForwardPacket(ft frameType, fl uint32) error {
c.updateFlowStats(dstKey, srcKey, contents)
if dst == nil {
reason := dropReasonUnknownDestOnFwd
if dstLen > 1 {
@ -944,6 +1024,48 @@ func (s *Server) notePeerSendLocked(src key.NodePublic, dst *sclient) {
m[dst.key] = dst.connNum
// allocFlowStats allocates a flowStats to track bytes/packets sent in
// the flow identified by this dstKey and srcOrFwd. It should only be
// called when the flowStat for this connection does not yet exist.
func (c *sclient) allocFlowStats(dstKey, srcOrFwd key.NodePublic) *flowStats {
c.debugLogf("allocating flow")
s := c.s
if _, ok := s.flows[c][dstKey]; !ok {
s.flows[c][dstKey] = make(map[key.NodePublic]*flowStats)
f := &flowStats{}
s.flows[c][dstKey][srcOrFwd] = f
if _, ok := c.flows[dstKey]; !ok {
c.flows[dstKey] = make(map[key.NodePublic]*flowStats)
c.flows[dstKey][srcOrFwd] = f
return f
// updateFlowStats updates the flowStat tracking the data sent from
// this sclient to the destination with dstKey. If this sclient is
// forwarding the data from another mesh peer, then srcOrFwd is the
// source key. If this sclient is forwarding the data to another mesh
// peer, then srcOrFwd is the key of the mesh peer we are sending it
// to. Otherwise, srcOrFwd is the zero value.
func (c *sclient) updateFlowStats(dstKey, srcOrFwd key.NodePublic, contents []byte) {
var f *flowStats
// The common case is that the flowStat already exists
if _, ok := c.flows[dstKey]; ok {
f = c.flows[dstKey][srcOrFwd]
if f == nil {
f = c.allocFlowStats(dstKey, srcOrFwd)
f.packets += 1
f.bytes += int64(len(contents))
f.lastActive = time.Now()
// handleFrameSendPacket reads a "send packet" frame from the client.
func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s := c.s
@ -956,6 +1078,7 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
var fwd PacketForwarder
var dstLen int
var dst *sclient
var fwdKey key.NodePublic
if set, ok := s.clients[dstKey]; ok {
@ -966,9 +1089,14 @@ func (c *sclient) handleFrameSendPacket(ft frameType, fl uint32) error {
s.notePeerSendLocked(c.key, dst)
} else if dstLen < 1 {
fwd = s.clientsMesh[dstKey]
if fwd != nil {
fwdKey = fwd.ServerPublicKey()
c.updateFlowStats(dstKey, fwdKey, contents)
if dst == nil {
if fwd != nil {
@ -1338,6 +1466,17 @@ type sclient struct {
// client that it's trying to establish a direct connection
// through us with a peer we have no record of.
peerGoneLim *rate.Limiter
// flows is a cache of the subset of flows statistics for
// which this client is either the src or the mesh peer
// forwarding for the src. The flowStats fields are changed by
// only the sclient read loop, but the flowStats must be
// allocated by the server.
flows map[key.NodePublic]map[key.NodePublic]*flowStats
// flowsCleanNow is set by the server to indicate it is time
// for this sclient to look for and free inactive flows.
flowsCleanNow bool
// peerConnState represents whether a peer is connected to the server
@ -1769,6 +1908,10 @@ func (f *multiForwarder) String() string {
return fmt.Sprintf("<MultiForwarder fwd=%s total=%d>", f.fwd.Load(), len(f.all))
func (f *multiForwarder) ServerPublicKey() key.NodePublic {
return f.fwd.Load().ServerPublicKey()
func (s *Server) expVarFunc(f func() any) expvar.Func {
return expvar.Func(func() any {
@ -1942,6 +2085,113 @@ func (s *Server) ServeDebugTraffic(w http.ResponseWriter, r *http.Request) {
// takeFlowsSnapshot returns a map of copies of all the flowStats for
// this server. The output should be suitable for comparing the stats
// for a specific flow at multiple points in time so that we can
// calculate the bandwidth used.
func (s *Server) takeFlowsSnapshot() map[*sclient]map[key.NodePublic]map[key.NodePublic]*flowStats {
s.debugLogf("taking flows snapshot")
defer s.mu.Unlock()
snap := maps.Clone(s.flows)
for c := range s.flows {
snap[c] = maps.Clone(s.flows[c])
for dstKey := range s.flows[c] {
snap[c][dstKey] = maps.Clone(s.flows[c][dstKey])
for srcOrFwd := range s.flows[c][dstKey] {
f := flowStats{}
f = *s.flows[c][dstKey][srcOrFwd]
snap[c][dstKey][srcOrFwd] = &f
return snap
// ServeFlows prints a list of recently active data flows through
// this server, including the bandwidth used by each over the last
// second.
func (s *Server) ServeFlows(w http.ResponseWriter, r *http.Request) {
fmt.Fprintf(w, "current time: %s\n", time.Now().Format("2006-01-02 15:04:05"))
lastTime := time.Now()
lastSnap := s.takeFlowsSnapshot()
// Sleep long enough to get some kind of useful bandwidth estimate
time.Sleep(1 * time.Second)
currTime := time.Now()
currSnap := s.takeFlowsSnapshot()
dur := currTime.Sub(lastTime)
type flowLine struct {
connNum int64
src, dst key.NodePublic
fwdTo, recvFrom key.NodePublic
bytes int64
packets int64
bps int64
pps int64
lastActive time.Duration
// Copy current flow snapshot into a sortable slice of flowLines
lines := make([]flowLine, 0)
for c := range currSnap {
for dstKey := range currSnap[c] {
for srcOrFwd := range currSnap[c][dstKey] {
f := currSnap[c][dstKey][srcOrFwd]
l := flowLine{
connNum: c.connNum,
dst: dstKey,
bytes: f.bytes,
packets: f.packets,
lastActive: currTime.Sub(f.lastActive),
// Calculate bandwidth
last := flowStats{}
if _, ok := lastSnap[c]; ok {
if _, ok2 := lastSnap[c][dstKey]; ok2 {
last = *lastSnap[c][dstKey][srcOrFwd]
bytes := l.bytes - last.bytes
packets := l.packets - last.packets
l.bps = int64(float64(bytes) / dur.Seconds())
l.pps = int64(float64(packets) / dur.Seconds())
// src and mesh take a lil calculation
if srcOrFwd.IsZero() {
// common case, src and dst local
l.src = c.key
} else {
// one of src or dst is remote
if c.canMesh {
// dst is local, src is remote
l.recvFrom = c.key
l.src = srcOrFwd
} else {
// src is local, dst is remote
l.fwdTo = srcOrFwd
l.src = c.key
lines = append(lines, l)
fmt.Fprintf(w, "number flows: %v\n", len(lines))
// Sort by highest bandwidth flow
sort.Slice(lines, func(i, j int) bool { return lines[i].bps > lines[j].bps })
headStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %-12v\n"
bodyStr := "%-10v %-10v %-10v %-10v %-10v %10v %10v %10v %10v %5v secs\n"
fmt.Fprintf(w, headStr, "conn#", "src", "dst", "recv from", "fwded to", "bytes", "packets", "bps", "pps", "last active")
for _, l := range lines {
fmt.Fprintf(w, bodyStr, l.connNum, l.src.ShortString(), l.dst.ShortString(), l.recvFrom.ShortString(), l.fwdTo.ShortString(), l.bytes, l.packets, l.bps, l.pps, int(l.lastActive.Seconds()))
var bufioWriterPool = &sync.Pool{
New: func() any {
return bufio.NewWriterSize(io.Discard, 2<<10)