Compare commits
1 Commits
main
...
andrew/mon
Author | SHA1 | Date |
---|---|---|
![]() |
d608fcd7ca |
|
@ -10,13 +10,16 @@ package monitor
|
|||
import (
|
||||
"encoding/json"
|
||||
"errors"
|
||||
"net"
|
||||
"net/netip"
|
||||
"runtime"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
"tailscale.com/net/interfaces"
|
||||
"tailscale.com/types/logger"
|
||||
"tailscale.com/util/mak"
|
||||
)
|
||||
|
||||
// pollWallTimeInterval is how often we check the time to check
|
||||
|
@ -64,19 +67,21 @@ type Mon struct {
|
|||
change chan struct{}
|
||||
stop chan struct{} // closed on Stop
|
||||
|
||||
mu sync.Mutex // guards all following fields
|
||||
cbs map[*callbackHandle]ChangeFunc
|
||||
ruleDelCB map[*callbackHandle]RuleDeleteCallback
|
||||
ifState *interfaces.State
|
||||
gwValid bool // whether gw and gwSelfIP are valid
|
||||
gw netip.Addr // our gateway's IP
|
||||
gwSelfIP netip.Addr // our own IP address (that corresponds to gw)
|
||||
started bool
|
||||
closed bool
|
||||
goroutines sync.WaitGroup
|
||||
wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick
|
||||
lastWall time.Time
|
||||
timeJumped bool // whether we need to send a changed=true after a big time jump
|
||||
mu sync.Mutex // guards all following fields
|
||||
cbs map[*callbackHandle]ChangeFunc
|
||||
ruleDelCB map[*callbackHandle]RuleDeleteCallback
|
||||
linkChangedCB map[*callbackHandle]LinkChangedCallback
|
||||
ifState *interfaces.State
|
||||
ifState2 []*net.Interface
|
||||
gwValid bool // whether gw and gwSelfIP are valid
|
||||
gw netip.Addr // our gateway's IP
|
||||
gwSelfIP netip.Addr // our own IP address (that corresponds to gw)
|
||||
started bool
|
||||
closed bool
|
||||
goroutines sync.WaitGroup
|
||||
wallTimer *time.Timer // nil until Started; re-armed AfterFunc per tick
|
||||
lastWall time.Time
|
||||
timeJumped bool // whether we need to send a changed=true after a big time jump
|
||||
}
|
||||
|
||||
// New instantiates and starts a monitoring instance.
|
||||
|
@ -179,6 +184,26 @@ func (m *Mon) RegisterRuleDeleteCallback(callback RuleDeleteCallback) (unregiste
|
|||
}
|
||||
}
|
||||
|
||||
// LinkChangedCallback is a callback when a network link changes.
|
||||
type LinkChangedCallback func(iif *net.Interface, deleted bool)
|
||||
|
||||
// RegisterLinkChangedCallback adds a callback to the set of parties to be
|
||||
// notified (in their own goroutine) whenever a link (a.k.a. "interface") is
|
||||
// changed.
|
||||
//
|
||||
// To remove this callback, call unregister or close the monitor.
|
||||
func (m *Mon) RegisterLinkChangedCallback(callback LinkChangedCallback) (unregister func()) {
|
||||
handle := new(callbackHandle)
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
mak.Set(&m.linkChangedCB, handle, callback)
|
||||
return func() {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
delete(m.linkChangedCB, handle)
|
||||
}
|
||||
}
|
||||
|
||||
// Start starts the monitor.
|
||||
// A monitor can only be started & closed once.
|
||||
func (m *Mon) Start() {
|
||||
|
@ -256,6 +281,7 @@ func (m *Mon) stopped() bool {
|
|||
// the change channel of changes, and stopping when a stop is issued.
|
||||
func (m *Mon) pump() {
|
||||
defer m.goroutines.Done()
|
||||
pumpLoop:
|
||||
for !m.stopped() {
|
||||
msg, err := m.om.Receive()
|
||||
if err != nil {
|
||||
|
@ -265,14 +291,18 @@ func (m *Mon) pump() {
|
|||
// Keep retrying while we're not closed.
|
||||
m.logf("error from link monitor: %v", err)
|
||||
time.Sleep(time.Second)
|
||||
continue
|
||||
continue pumpLoop
|
||||
}
|
||||
if rdm, ok := msg.(ipRuleDeletedMessage); ok {
|
||||
m.notifyRuleDeleted(rdm)
|
||||
continue
|
||||
switch v := msg.(type) {
|
||||
case ipRuleDeletedMessage:
|
||||
m.notifyRuleDeleted(v)
|
||||
continue pumpLoop
|
||||
case newLinkMessage:
|
||||
m.notifyLinkChanged(v)
|
||||
continue pumpLoop
|
||||
}
|
||||
if msg.ignore() {
|
||||
continue
|
||||
continue pumpLoop
|
||||
}
|
||||
m.InjectEvent()
|
||||
}
|
||||
|
@ -286,6 +316,38 @@ func (m *Mon) notifyRuleDeleted(rdm ipRuleDeletedMessage) {
|
|||
}
|
||||
}
|
||||
|
||||
func (m *Mon) notifyLinkChanged(nlm newLinkMessage) {
|
||||
m.mu.Lock()
|
||||
defer m.mu.Unlock()
|
||||
for _, cb := range m.linkChangedCB {
|
||||
go cb(nlm.Link, nlm.Delete)
|
||||
}
|
||||
|
||||
// Update our cached state
|
||||
updated := false
|
||||
for i, iif := range m.ifState2 {
|
||||
if iif.Index == nlm.Link.Index {
|
||||
if nlm.Delete {
|
||||
m.ifState2 = slices.Delete(m.ifState2, i, i)
|
||||
} else {
|
||||
m.ifState2[i] = nlm.Link
|
||||
}
|
||||
updated = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if updated {
|
||||
return
|
||||
}
|
||||
|
||||
// Need to append
|
||||
// TODO(andrew): insert sorted instead of insert then sort?
|
||||
m.ifState2 = append(m.ifState2, nlm.Link)
|
||||
slices.SortFunc(m.ifState2, func(x, y *net.Interface) bool {
|
||||
return x.Index < y.Index
|
||||
})
|
||||
}
|
||||
|
||||
// isInterestingInterface reports whether the provided interface should be
|
||||
// considered when checking for network state changes.
|
||||
// The ips parameter should be the IPs of the provided interface.
|
||||
|
|
|
@ -54,14 +54,18 @@ func newOSMon(logf logger.Logf, m *Mon) (osMon, error) {
|
|||
// but all reachability would.
|
||||
Groups: unix.RTMGRP_IPV4_IFADDR | unix.RTMGRP_IPV6_IFADDR |
|
||||
unix.RTMGRP_IPV4_ROUTE | unix.RTMGRP_IPV6_ROUTE |
|
||||
unix.RTMGRP_IPV4_RULE, // no IPV6_RULE in x/sys/unix
|
||||
unix.RTMGRP_IPV4_RULE | unix.RTMGRP_LINK, // no IPV6_RULE in x/sys/unix
|
||||
})
|
||||
if err != nil {
|
||||
// Google Cloud Run does not implement NETLINK_ROUTE RTMGRP support
|
||||
logf("monitor_linux: AF_NETLINK RTMGRP failed, falling back to polling")
|
||||
return newPollingMon(logf, m)
|
||||
}
|
||||
return &nlConn{logf: logf, conn: conn, addrCache: make(map[uint32]map[netip.Addr]bool)}, nil
|
||||
return &nlConn{
|
||||
logf: logf,
|
||||
conn: conn,
|
||||
addrCache: make(map[uint32]map[netip.Addr]bool),
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (c *nlConn) IsInterestingInterface(iface string) bool { return true }
|
||||
|
@ -229,6 +233,53 @@ func (c *nlConn) Receive() (message, error) {
|
|||
c.logf("%+v", rdm)
|
||||
}
|
||||
return rdm, nil
|
||||
case unix.RTM_NEWLINK, unix.RTM_DELLINK:
|
||||
typeStr := "RTM_NEWLINK"
|
||||
if msg.Header.Type == unix.RTM_DELLINK {
|
||||
typeStr = "RTM_DELLINK"
|
||||
}
|
||||
|
||||
var lmsg rtnetlink.LinkMessage
|
||||
if err := lmsg.UnmarshalBinary(msg.Data); err != nil {
|
||||
c.logf("%s: failed to parse: %v", typeStr, err)
|
||||
return unspecifiedMessage{}, nil
|
||||
}
|
||||
|
||||
// Make a *net.Interface
|
||||
netif := &net.Interface{
|
||||
Index: int(lmsg.Index),
|
||||
}
|
||||
if attrs := lmsg.Attributes; attrs != nil {
|
||||
netif.HardwareAddr = attrs.Address
|
||||
netif.MTU = int(attrs.MTU)
|
||||
netif.Name = attrs.Name
|
||||
}
|
||||
|
||||
// Handle flags
|
||||
if lmsg.Flags&unix.IFF_UP != 0 {
|
||||
netif.Flags |= net.FlagUp
|
||||
}
|
||||
if lmsg.Flags&unix.IFF_BROADCAST != 0 {
|
||||
netif.Flags |= net.FlagBroadcast
|
||||
}
|
||||
if lmsg.Flags&unix.IFF_LOOPBACK != 0 {
|
||||
netif.Flags |= net.FlagLoopback
|
||||
}
|
||||
if lmsg.Flags&unix.IFF_POINTOPOINT != 0 {
|
||||
netif.Flags |= net.FlagPointToPoint
|
||||
}
|
||||
if lmsg.Flags&unix.IFF_MULTICAST != 0 {
|
||||
netif.Flags |= net.FlagMulticast
|
||||
}
|
||||
|
||||
nlm := &newLinkMessage{
|
||||
Link: netif,
|
||||
Delete: msg.Header.Type == unix.RTM_DELLINK,
|
||||
}
|
||||
if debugNetlinkMessages() {
|
||||
c.logf("newLinkMessage{Link: %+v, Delete: %v}", nlm.Link, nlm.Delete)
|
||||
}
|
||||
return nlm, nil
|
||||
default:
|
||||
c.logf("unhandled netlink msg type %+v, %q", msg.Header, msg.Data)
|
||||
return unspecifiedMessage{}, nil
|
||||
|
@ -286,3 +337,11 @@ func (m *newAddrMessage) ignore() bool {
|
|||
type ignoreMessage struct{}
|
||||
|
||||
func (ignoreMessage) ignore() bool { return true }
|
||||
|
||||
// newLinkMessage is a message for a link being added.
|
||||
type newLinkMessage struct {
|
||||
Link *net.Interface
|
||||
Delete bool
|
||||
}
|
||||
|
||||
func (newLinkMessage) ignore() bool { return true }
|
||||
|
|
Loading…
Reference in New Issue