Compare commits

...

1 Commits

Author SHA1 Message Date
Andrew Dunham d608fcd7ca wgengine/monitor: add monitor for link change events
Change-Id: I45100c7a5b785ad6824080fe0a38751e3d246eaa
Signed-off-by: Andrew Dunham <andrew@tailscale.com>
2022-10-24 19:29:36 -04:00
2 changed files with 141 additions and 20 deletions

View File

@ -10,13 +10,16 @@ package monitor
import (
"encoding/json"
"errors"
"net"
"net/netip"
"runtime"
"sync"
"time"
"golang.org/x/exp/slices"
"tailscale.com/net/interfaces"
"tailscale.com/types/logger"
"tailscale.com/util/mak"
)
// pollWallTimeInterval is how often we check the time to check
@ -64,19 +67,21 @@ type Mon struct {
change chan struct{}
stop chan struct{} // closed on Stop
mu sync.Mutex // guards all following fields
cbs map[*callbackHandle]ChangeFunc
ruleDelCB map[*callbackHandle]RuleDeleteCallback
ifState *interfaces.State
gwValid bool // whether gw and gwSelfIP are valid
gw netip.Addr // our gateway's IP
gwSelfIP netip.Addr // our own IP address (that corresponds to gw)
started bool
closed bool
goroutines sync.WaitGroup
wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick
lastWall time.Time
timeJumped bool // whether we need to send a changed=true after a big time jump
mu sync.Mutex // guards all following fields
cbs map[*callbackHandle]ChangeFunc
ruleDelCB map[*callbackHandle]RuleDeleteCallback
linkChangedCB map[*callbackHandle]LinkChangedCallback
ifState *interfaces.State
ifState2 []*net.Interface
gwValid bool // whether gw and gwSelfIP are valid
gw netip.Addr // our gateway's IP
gwSelfIP netip.Addr // our own IP address (that corresponds to gw)
started bool
closed bool
goroutines sync.WaitGroup
wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick
lastWall time.Time
timeJumped bool // whether we need to send a changed=true after a big time jump
}
// New instantiates and starts a monitoring instance.
@ -179,6 +184,26 @@ func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregiste
}
}
// LinkChangedCallback is a callback when a network link changes.
type LinkChangedCallback func(iif *net.Interface, deleted bool)
// RegisterLinkChangedCallback adds a callback to the set of parties to be
// notified (in their own goroutine) whenever a link (a.k.a. "interface") is
// changed.
//
// To remove this callback, call unregister or close the monitor.
func (m *Mon) RegisterLinkChangedCallback(callback LinkChangedCallback) (unregister func()) {
handle := new(callbackHandle)
m.mu.Lock()
defer m.mu.Unlock()
mak.Set(&m.linkChangedCB, handle, callback)
return func() {
m.mu.Lock()
defer m.mu.Unlock()
delete(m.linkChangedCB, handle)
}
}
// Start starts the monitor.
// A monitor can only be started & closed once.
func (m *Mon) Start() {
@ -256,6 +281,7 @@ func (m *Mon) stopped() bool {
// the change channel of changes, and stopping when a stop is issued.
func (m *Mon) pump() {
defer m.goroutines.Done()
pumpLoop:
for !m.stopped() {
msg, err := m.om.Receive()
if err != nil {
@ -265,14 +291,18 @@ func (m *Mon) pump() {
// Keep retrying while we're not closed.
m.logf("error from link monitor: %v", err)
time.Sleep(time.Second)
continue
continue pumpLoop
}
if rdm, ok := msg.(ipRuleDeletedMessage); ok {
m.notifyRuleDeleted(rdm)
continue
switch v := msg.(type) {
case ipRuleDeletedMessage:
m.notifyRuleDeleted(v)
continue pumpLoop
case newLinkMessage:
m.notifyLinkChanged(v)
continue pumpLoop
}
if msg.ignore() {
continue
continue pumpLoop
}
m.InjectEvent()
}
@ -286,6 +316,38 @@ func (m *Mon) notifyRuleDeleted(rdm ipRuleDeletedMessage) {
}
}
func (m *Mon) notifyLinkChanged(nlm newLinkMessage) {
m.mu.Lock()
defer m.mu.Unlock()
for _, cb := range m.linkChangedCB {
go cb(nlm.Link, nlm.Delete)
}
// Update our cached state
updated := false
for i, iif := range m.ifState2 {
if iif.Index == nlm.Link.Index {
if nlm.Delete {
m.ifState2 = slices.Delete(m.ifState2, i, i)
} else {
m.ifState2[i] = nlm.Link
}
updated = true
break
}
}
if updated {
return
}
// Need to append
// TODO(andrew): insert sorted instead of insert then sort?
m.ifState2 = append(m.ifState2, nlm.Link)
slices.SortFunc(m.ifState2, func(x, y *net.Interface) bool {
return x.Index < y.Index
})
}
// isInterestingInterface reports whether the provided interface should be
// considered when checking for network state changes.
// The ips parameter should be the IPs of the provided interface.

View File

@ -54,14 +54,18 @@ func newOSMon(logf logger.Logf, m *Mon) (osMon, error) {
// but all reachability would.
Groups: unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR |
unix.RTMGRP_IPV4_ROUTE | unix.RTMGRP_IPV6_ROUTE |
unix.RTMGRP_IPV4_RULE, // no IPV6_RULE in x/sys/unix
unix.RTMGRP_IPV4_RULE | unix.RTMGRP_LINK, // no IPV6_RULE in x/sys/unix
})
if err != nil {
// Google Cloud Run does not implement NETLINK_ROUTE RTMGRP support
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
return newPollingMon(logf, m)
}
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil
return &nlConn{
logf: logf,
conn: conn,
addrCache: make(map[uint32]map[netip.Addr]bool),
}, nil
}
func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
@ -229,6 +233,53 @@ func (c *nlConn) Receive() (message, error) {
c.logf("%+v", rdm)
}
return rdm, nil
case unix.RTM_NEWLINK, unix.RTM_DELLINK:
typeStr := "RTM_NEWLINK"
if msg.Header.Type == unix.RTM_DELLINK {
typeStr = "RTM_DELLINK"
}
var lmsg rtnetlink.LinkMessage
if err := lmsg.UnmarshalBinary(msg.Data); err != nil {
c.logf("%s: failed to parse: %v", typeStr, err)
return unspecifiedMessage{}, nil
}
// Make a *net.Interface
netif := &net.Interface{
Index: int(lmsg.Index),
}
if attrs := lmsg.Attributes; attrs != nil {
netif.HardwareAddr = attrs.Address
netif.MTU = int(attrs.MTU)
netif.Name = attrs.Name
}
// Handle flags
if lmsg.Flags&unix.IFF_UP != 0 {
netif.Flags |= net.FlagUp
}
if lmsg.Flags&unix.IFF_BROADCAST != 0 {
netif.Flags |= net.FlagBroadcast
}
if lmsg.Flags&unix.IFF_LOOPBACK != 0 {
netif.Flags |= net.FlagLoopback
}
if lmsg.Flags&unix.IFF_POINTOPOINT != 0 {
netif.Flags |= net.FlagPointToPoint
}
if lmsg.Flags&unix.IFF_MULTICAST != 0 {
netif.Flags |= net.FlagMulticast
}
nlm := &newLinkMessage{
Link: netif,
Delete: msg.Header.Type == unix.RTM_DELLINK,
}
if debugNetlinkMessages() {
c.logf("newLinkMessage{Link: %+v, Delete: %v}", nlm.Link, nlm.Delete)
}
return nlm, nil
default:
c.logf("unhandled netlink msg type %+v, %q", msg.Header, msg.Data)
return unspecifiedMessage{}, nil
@ -286,3 +337,11 @@ func (m *newAddrMessage) ignore() bool {
type ignoreMessage struct{}
func (ignoreMessage) ignore() bool { return true }
// newLinkMessage is a message for a link being added.
type newLinkMessage struct {
Link *net.Interface
Delete bool
}
func (newLinkMessage) ignore() bool { return true }