Compare commits

...

1 Commits

Author SHA1 Message Date
Andrew Dunham d608fcd7ca wgengine/monitor: add monitor for link change events
Change-Id: I45100c7a5b785ad6824080fe0a38751e3d246eaa
Signed-off-by: Andrew Dunham <andrew@tailscale.com>
2022-10-24 19:29:36 -04:00
2 changed files with 141 additions and 20 deletions

View File

@ -10,13 +10,16 @@ package monitor
import ( import (
"encoding/json" "encoding/json"
"errors" "errors"
"net"
"net/netip" "net/netip"
"runtime" "runtime"
"sync" "sync"
"time" "time"
"golang.org/x/exp/slices"
"tailscale.com/net/interfaces" "tailscale.com/net/interfaces"
"tailscale.com/types/logger" "tailscale.com/types/logger"
"tailscale.com/util/mak"
) )
// pollWallTimeInterval is how often we check the time to check // pollWallTimeInterval is how often we check the time to check
@ -64,19 +67,21 @@ type Mon struct {
change chan struct{} change chan struct{}
stop chan struct{} // closed on Stop stop chan struct{} // closed on Stop
mu sync.Mutex // guards all following fields mu sync.Mutex // guards all following fields
cbs map[*callbackHandle]ChangeFunc cbs map[*callbackHandle]ChangeFunc
ruleDelCB map[*callbackHandle]RuleDeleteCallback ruleDelCB map[*callbackHandle]RuleDeleteCallback
ifState *interfaces.State linkChangedCB map[*callbackHandle]LinkChangedCallback
gwValid bool // whether gw and gwSelfIP are valid ifState *interfaces.State
gw netip.Addr // our gateway's IP ifState2 []*net.Interface
gwSelfIP netip.Addr // our own IP address (that corresponds to gw) gwValid bool // whether gw and gwSelfIP are valid
started bool gw netip.Addr // our gateway's IP
closed bool gwSelfIP netip.Addr // our own IP address (that corresponds to gw)
goroutines sync.WaitGroup started bool
wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick closed bool
lastWall time.Time goroutines sync.WaitGroup
timeJumped bool // whether we need to send a changed=true after a big time jump wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick
lastWall time.Time
timeJumped bool // whether we need to send a changed=true after a big time jump
} }
// New instantiates and starts a monitoring instance. // New instantiates and starts a monitoring instance.
@ -179,6 +184,26 @@ func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregiste
} }
} }
// LinkChangedCallback is a callback when a network link changes.
type LinkChangedCallback func(iif *net.Interface, deleted bool)
// RegisterLinkChangedCallback adds a callback to the set of parties to be
// notified (in their own goroutine) whenever a link (a.k.a. "interface") is
// changed.
//
// To remove this callback, call unregister or close the monitor.
func (m *Mon) RegisterLinkChangedCallback(callback LinkChangedCallback) (unregister func()) {
handle := new(callbackHandle)
m.mu.Lock()
defer m.mu.Unlock()
mak.Set(&m.linkChangedCB, handle, callback)
return func() {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.linkChangedCB, handle)
}
}
// Start starts the monitor. // Start starts the monitor.
// A monitor can only be started & closed once. // A monitor can only be started & closed once.
func (m *Mon) Start() { func (m *Mon) Start() {
@ -256,6 +281,7 @@ func (m *Mon) stopped() bool {
// the change channel of changes, and stopping when a stop is issued. // the change channel of changes, and stopping when a stop is issued.
func (m *Mon) pump() { func (m *Mon) pump() {
defer m.goroutines.Done() defer m.goroutines.Done()
pumpLoop:
for !m.stopped() { for !m.stopped() {
msg, err := m.om.Receive() msg, err := m.om.Receive()
if err != nil { if err != nil {
@ -265,14 +291,18 @@ func (m *Mon) pump() {
// Keep retrying while we're not closed. // Keep retrying while we're not closed.
m.logf("error from link monitor: %v", err) m.logf("error from link monitor: %v", err)
time.Sleep(time.Second) time.Sleep(time.Second)
continue continue pumpLoop
} }
if rdm, ok := msg.(ipRuleDeletedMessage); ok { switch v := msg.(type) {
m.notifyRuleDeleted(rdm) case ipRuleDeletedMessage:
continue m.notifyRuleDeleted(v)
continue pumpLoop
case newLinkMessage:
m.notifyLinkChanged(v)
continue pumpLoop
} }
if msg.ignore() { if msg.ignore() {
continue continue pumpLoop
} }
m.InjectEvent() m.InjectEvent()
} }
@ -286,6 +316,38 @@ func (m *Mon) notifyRuleDeleted(rdm ipRuleDeletedMessage) {
} }
} }
func (m *Mon) notifyLinkChanged(nlm newLinkMessage) {
m.mu.Lock()
defer m.mu.Unlock()
for _, cb := range m.linkChangedCB {
go cb(nlm.Link, nlm.Delete)
}
// Update our cached state
updated := false
for i, iif := range m.ifState2 {
if iif.Index == nlm.Link.Index {
if nlm.Delete {
m.ifState2 = slices.Delete(m.ifState2, i, i)
} else {
m.ifState2[i] = nlm.Link
}
updated = true
break
}
}
if updated {
return
}
// Need to append
// TODO(andrew): insert sorted instead of insert then sort?
m.ifState2 = append(m.ifState2, nlm.Link)
slices.SortFunc(m.ifState2, func(x, y *net.Interface) bool {
return x.Index < y.Index
})
}
// isInterestingInterface reports whether the provided interface should be // isInterestingInterface reports whether the provided interface should be
// considered when checking for network state changes. // considered when checking for network state changes.
// The ips parameter should be the IPs of the provided interface. // The ips parameter should be the IPs of the provided interface.

View File

@ -54,14 +54,18 @@ func newOSMon(logf logger.Logf, m *Mon) (osMon, error) {
// but all reachability would. // but all reachability would.
Groups: unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR | Groups: unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR |
unix.RTMGRP_IPV4_ROUTE | unix.RTMGRP_IPV6_ROUTE | unix.RTMGRP_IPV4_ROUTE | unix.RTMGRP_IPV6_ROUTE |
unix.RTMGRP_IPV4_RULE, // no IPV6_RULE in x/sys/unix unix.RTMGRP_IPV4_RULE | unix.RTMGRP_LINK, // no IPV6_RULE in x/sys/unix
}) })
if err != nil { if err != nil {
// Google Cloud Run does not implement NETLINK_ROUTE RTMGRP support // Google Cloud Run does not implement NETLINK_ROUTE RTMGRP support
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling") logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
return newPollingMon(logf, m) return newPollingMon(logf, m)
} }
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil return &nlConn{
logf: logf,
conn: conn,
addrCache: make(map[uint32]map[netip.Addr]bool),
}, nil
} }
func (c *nlConn) IsInterestingInterface(iface string) bool { return true } func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
@ -229,6 +233,53 @@ func (c *nlConn) Receive() (message, error) {
c.logf("%+v", rdm) c.logf("%+v", rdm)
} }
return rdm, nil return rdm, nil
case unix.RTM_NEWLINK, unix.RTM_DELLINK:
typeStr := "RTM_NEWLINK"
if msg.Header.Type == unix.RTM_DELLINK {
typeStr = "RTM_DELLINK"
}
var lmsg rtnetlink.LinkMessage
if err := lmsg.UnmarshalBinary(msg.Data); err != nil {
c.logf("%s: failed to parse: %v", typeStr, err)
return unspecifiedMessage{}, nil
}
// Make a *net.Interface
netif := &net.Interface{
Index: int(lmsg.Index),
}
if attrs := lmsg.Attributes; attrs != nil {
netif.HardwareAddr = attrs.Address
netif.MTU = int(attrs.MTU)
netif.Name = attrs.Name
}
// Handle flags
if lmsg.Flags&unix.IFF_UP != 0 {
netif.Flags |= net.FlagUp
}
if lmsg.Flags&unix.IFF_BROADCAST != 0 {
netif.Flags |= net.FlagBroadcast
}
if lmsg.Flags&unix.IFF_LOOPBACK != 0 {
netif.Flags |= net.FlagLoopback
}
if lmsg.Flags&unix.IFF_POINTOPOINT != 0 {
netif.Flags |= net.FlagPointToPoint
}
if lmsg.Flags&unix.IFF_MULTICAST != 0 {
netif.Flags |= net.FlagMulticast
}
nlm := &newLinkMessage{
Link: netif,
Delete: msg.Header.Type == unix.RTM_DELLINK,
}
if debugNetlinkMessages() {
c.logf("newLinkMessage{Link: %+v, Delete: %v}", nlm.Link, nlm.Delete)
}
return nlm, nil
default: default:
c.logf("unhandled netlink msg type %+v, %q", msg.Header, msg.Data) c.logf("unhandled netlink msg type %+v, %q", msg.Header, msg.Data)
return unspecifiedMessage{}, nil return unspecifiedMessage{}, nil
@ -286,3 +337,11 @@ func (m *newAddrMessage) ignore() bool {
type ignoreMessage struct{} type ignoreMessage struct{}
func (ignoreMessage) ignore() bool { return true } func (ignoreMessage) ignore() bool { return true }
// newLinkMessage is a message for a link being added.
type newLinkMessage struct {
Link *net.Interface
Delete bool
}
func (newLinkMessage) ignore() bool { return true }