From 83c8653eed4298b6b4f95e8b618e815fa6373bc5 Mon Sep 17 00:00:00 2001 From: Sonia Appasamy Date: Thu, 29 Jun 2023 12:06:14 -0400 Subject: [PATCH] cmd/tailscale/cli, ipn/ipnlocal: [funnel] add stream mode Adds ability to start Funnel in the foreground and stream incoming connections. When foreground process is stopped, Funnel is turned back off for the port. Exampe usage: ``` TS_DEBUG_FUNNEL_STREAM=on tailscale funnel 443 stream ``` Updates #8489 Signed-off-by: Sonia Appasamy --- client/tailscale/localclient.go | 40 +++++++++++--- cmd/tailscale/cli/funnel.go | 56 ++++++++++++++++--- cmd/tailscale/cli/serve.go | 1 + cmd/tailscale/cli/serve_test.go | 6 +++ ipn/ipnlocal/local.go | 5 +- ipn/ipnlocal/serve.go | 95 +++++++++++++++++++++++++++++++++ ipn/localapi/localapi.go | 25 +++++++++ ipn/serve.go | 35 ++++++++++++ 8 files changed, 245 insertions(+), 18 deletions(-) diff --git a/client/tailscale/localclient.go b/client/tailscale/localclient.go index 611a49b29..d47c3d813 100644 --- a/client/tailscale/localclient.go +++ b/client/tailscale/localclient.go @@ -946,6 +946,14 @@ func (lc *LocalClient) NetworkLockForceLocalDisable(ctx context.Context) error { return nil } +// NetworkLockDisable shuts down network-lock across the tailnet. +func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error { + if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil { + return fmt.Errorf("error: %w", err) + } + return nil +} + // SetServeConfig sets or replaces the serving settings. // If config is nil, settings are cleared and serving is disabled. func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConfig) error { @@ -956,14 +964,6 @@ func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConf return nil } -// NetworkLockDisable shuts down network-lock across the tailnet. -func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error { - if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil { - return fmt.Errorf("error: %w", err) - } - return nil -} - // GetServeConfig return the current serve config. // // If the serve config is empty, it returns (nil, nil). @@ -982,6 +982,30 @@ func getServeConfigFromJSON(body []byte) (sc *ipn.ServeConfig, err error) { return sc, nil } +// StreamFunnel opens funnel on the provided HostPort and keeps the +// connection to the local backend open, streaming connections made +// to the funnel URL. +// +// When the context is cancelled, the funnel is closed on HostPort. +// +// During the duration that the connection is open, the client's +// ServeConfig will show that funnel is streaming for this HostPort. +func (lc *LocalClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) { + req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/stream-funnel", jsonBody(hp)) + if err != nil { + return nil, err + } + res, err := lc.doLocalRequestNiceError(req) + if err != nil { + return nil, err + } + if res.StatusCode != 200 { + res.Body.Close() + return nil, errors.New(res.Status) + } + return res.Body, nil +} + // tailscaledConnectHint gives a little thing about why tailscaled (or // platform equivalent) is not answering localapi connections. // diff --git a/cmd/tailscale/cli/funnel.go b/cmd/tailscale/cli/funnel.go index f8c15e19d..fb9f5b41c 100644 --- a/cmd/tailscale/cli/funnel.go +++ b/cmd/tailscale/cli/funnel.go @@ -7,6 +7,7 @@ import ( "context" "flag" "fmt" + "io" "net" "os" "strconv" @@ -66,20 +67,20 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error { return flag.ErrHelp } - var on bool + var stream, on bool switch args[1] { + case "stream": + if s := os.Getenv("TS_DEBUG_FUNNEL_STREAM"); s == "on" { + stream = true + } else { + return flag.ErrHelp + } case "on", "off": on = args[1] == "on" default: return flag.ErrHelp } - sc, err := e.lc.GetServeConfig(ctx) - if err != nil { - return err - } - if sc == nil { - sc = new(ipn.ServeConfig) - } + st, err := e.getLocalClientStatus(ctx) if err != nil { return fmt.Errorf("getting client status: %w", err) @@ -96,6 +97,18 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error { } dnsName := strings.TrimSuffix(st.Self.DNSName, ".") hp := ipn.HostPort(dnsName + ":" + strconv.Itoa(int(port))) + + if stream { + return e.streamFunnel(ctx, hp) + } + + sc, err := e.lc.GetServeConfig(ctx) + if err != nil { + return err + } + if sc == nil { + sc = new(ipn.ServeConfig) + } if on == sc.AllowFunnel[hp] { printFunnelWarning(sc) // Nothing to do. @@ -117,6 +130,33 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error { return nil } +func (e *serveEnv) streamFunnel(ctx context.Context, hp ipn.HostPort) error { + sc, err := e.lc.GetServeConfig(ctx) + if err != nil { + return err + } + if sc == nil { + sc = new(ipn.ServeConfig) + } + if sc.AllowFunnel[hp] { + return fmt.Errorf("Funnel already running on %q", hp) + } + + mak.Set(&sc.AllowFunnel, hp, true) + printFunnelWarning(sc) + + stream, err := e.lc.StreamFunnel(ctx, hp) + if err != nil { + return err + } + defer stream.Close() + + fmt.Fprintf(os.Stderr, "Funnel started on \"https://%s\".\n", hp) + fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop Funnel.\n\n") + _, err = io.Copy(os.Stdout, stream) + return err +} + // printFunnelWarning prints a warning if the Funnel is on but there is no serve // config for its host:port. func printFunnelWarning(sc *ipn.ServeConfig) { diff --git a/cmd/tailscale/cli/serve.go b/cmd/tailscale/cli/serve.go index 572e2148b..c41fbea82 100644 --- a/cmd/tailscale/cli/serve.go +++ b/cmd/tailscale/cli/serve.go @@ -128,6 +128,7 @@ type localServeClient interface { Status(context.Context) (*ipnstate.Status, error) GetServeConfig(context.Context) (*ipn.ServeConfig, error) SetServeConfig(context.Context, *ipn.ServeConfig) error + StreamFunnel(context.Context, ipn.HostPort) (io.ReadCloser, error) // TODO: testing :) } // serveEnv is the environment the serve command runs within. All I/O should be diff --git a/cmd/tailscale/cli/serve_test.go b/cmd/tailscale/cli/serve_test.go index 3d5b8f1f6..e458af499 100644 --- a/cmd/tailscale/cli/serve_test.go +++ b/cmd/tailscale/cli/serve_test.go @@ -8,6 +8,7 @@ import ( "context" "flag" "fmt" + "io" "os" "path/filepath" "reflect" @@ -782,6 +783,11 @@ func (lc *fakeLocalServeClient) SetServeConfig(ctx context.Context, config *ipn. return nil } +func (lc *fakeLocalServeClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) { + // TODO: testing :) + return nil, nil +} + // exactError returns an error checker that wants exactly the provided want error. // If optName is non-empty, it's used in the error message. func exactErr(want error, optName ...string) func(error) string { diff --git a/ipn/ipnlocal/local.go b/ipn/ipnlocal/local.go index 762dfba9b..57f0c30da 100644 --- a/ipn/ipnlocal/local.go +++ b/ipn/ipnlocal/local.go @@ -240,8 +240,9 @@ type LocalBackend struct { lastServeConfJSON mem.RO // last JSON that was parsed into serveConfig serveConfig ipn.ServeConfigView // or !Valid if none - serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener - serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy + serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener + serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy + funnelStreamers map[uint16]*func(ipn.FunnelRequestLog) // serve port => stream logger // statusLock must be held before calling statusChanged.Wait() or // statusChanged.Broadcast(). diff --git a/ipn/ipnlocal/serve.go b/ipn/ipnlocal/serve.go index a6a7d1421..27bcedb62 100644 --- a/ipn/ipnlocal/serve.go +++ b/ipn/ipnlocal/serve.go @@ -257,6 +257,96 @@ func (b *LocalBackend) ServeConfig() ipn.ServeConfigView { return b.serveConfig } +// StreamFunnel turns on Funnel for the given HostPort, and keeps it on +// for the duration that the context is open. +// +// When the context is done, Funnel is turned off for the port. +// +// During the stream, any incoming connections to the port are reported +// out to the provided io.Writer, in the format ipn.FunnelRequestLog. +func (b *LocalBackend) StreamFunnel(ctx context.Context, w io.Writer, hp ipn.HostPort) (err error) { + port, err := hp.Port() + if err != nil { + return err + } + + // Turn on Funnel for the given HostPort. + sc := b.ServeConfig().AsStruct() + mak.Set(&sc.AllowFunnel, hp, true) + if err := b.SetServeConfig(sc); err != nil { + return err + } + + // Defer turning off Funnel once stream ends. + cleanup := func() { + sc := b.ServeConfig().AsStruct() + delete(sc.AllowFunnel, hp) + err = errors.Join(err, b.SetServeConfig(sc)) + } + defer cleanup() + + f, ok := w.(http.Flusher) + if !ok { + return errors.New("writer not a flusher") + } + + var writeErrs []error + writeToStream := func(log ipn.FunnelRequestLog) { + jsonLog, err := json.Marshal(log) + if err != nil { + writeErrs = append(writeErrs, err) + return + } + if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil { + writeErrs = append(writeErrs, err) + return + } + f.Flush() + } + + // Hook up connections stream. + b.mu.Lock() + mak.Set(&b.funnelStreamers, port, &writeToStream) + b.mu.Unlock() + + select { + case <-ctx.Done(): + } + + b.mu.Lock() + delete(b.funnelStreamers, port) + b.mu.Unlock() + + return errors.Join(writeErrs...) +} + +// maybeLogFunnelConnection logs out an ipn.FunnelRequestLog for the +// connection if a streamer is currently active for the given destPort. +func (b *LocalBackend) maybeLogFunnelConnection(destPort uint16, srcAddr netip.AddrPort) { + b.mu.Lock() + stream := b.funnelStreamers[destPort] + b.mu.Unlock() + if stream == nil { + return + } + + var log ipn.FunnelRequestLog + log.SrcAddr = srcAddr + log.Time = time.Now() // TODO: use a different clock somewhere? + + if node, user, ok := b.WhoIs(srcAddr); ok { + log.NodeName = node.ComputedName + if node.IsTagged() { + log.NodeTags = node.Tags + } else { + log.UserLoginName = user.LoginName + log.UserDisplayName = user.DisplayName + } + } + + (*stream)(log) +} + func (b *LocalBackend) HandleIngressTCPConn(ingressPeer *tailcfg.Node, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) { b.mu.Lock() sc := b.serveConfig @@ -359,6 +449,7 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort) if backDst := tcph.TCPForward(); backDst != "" { return func(conn net.Conn) error { defer conn.Close() + b.maybeLogFunnelConnection(dport, srcAddr) ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst) cancel() @@ -522,6 +613,10 @@ func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) { http.NotFound(w, r) return } + if c, ok := getServeHTTPContext(r); ok { + b.maybeLogFunnelConnection(c.DestPort, c.SrcAddr) + } + if s := h.Text(); s != "" { w.Header().Set("Content-Type", "text/plain; charset=utf-8") io.WriteString(w, s) diff --git a/ipn/localapi/localapi.go b/ipn/localapi/localapi.go index d99dcad88..f4a2f40f8 100644 --- a/ipn/localapi/localapi.go +++ b/ipn/localapi/localapi.go @@ -95,6 +95,7 @@ var handler = map[string]localAPIHandler{ "set-expiry-sooner": (*Handler).serveSetExpirySooner, "start": (*Handler).serveStart, "status": (*Handler).serveStatus, + "stream-funnel": (*Handler).serveStreamFunnel, "tka/init": (*Handler).serveTKAInit, "tka/log": (*Handler).serveTKALog, "tka/modify": (*Handler).serveTKAModify, @@ -842,6 +843,30 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) { } } +func (h *Handler) serveStreamFunnel(w http.ResponseWriter, r *http.Request) { + if !h.PermitWrite { + http.Error(w, "funnel stream denied", http.StatusForbidden) + return + } + if r.Method != "POST" { + http.Error(w, "POST required", http.StatusMethodNotAllowed) + return + } + hp := new(ipn.HostPort) + if err := json.NewDecoder(r.Body).Decode(hp); err != nil { + writeErrorJSON(w, fmt.Errorf("decoding HostPort: %w", err)) + return + } + + w.(http.Flusher).Flush() + w.Header().Set("Content-Type", "application/json") + if err := h.b.StreamFunnel(r.Context(), w, *hp); err != nil { + writeErrorJSON(w, fmt.Errorf("streaming funnel: %w", err)) + return + } + w.WriteHeader(http.StatusOK) +} + func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) { if !h.PermitRead { http.Error(w, "IP forwarding check access denied", http.StatusForbidden) diff --git a/ipn/serve.go b/ipn/serve.go index 48e3343a1..543354643 100644 --- a/ipn/serve.go +++ b/ipn/serve.go @@ -11,6 +11,7 @@ import ( "net/url" "strconv" "strings" + "time" "golang.org/x/exp/slices" "tailscale.com/tailcfg" @@ -42,6 +43,21 @@ type ServeConfig struct { // There is no implicit port 443. It must contain a colon. type HostPort string +// Port extracts just the port number from hp. +// An error is reported in the case that the hp does not +// have a valid numeric port ending. +func (hp HostPort) Port() (uint16, error) { + _, port, err := net.SplitHostPort(string(hp)) + if err != nil { + return 0, err + } + port16, err := strconv.ParseUint(port, 10, 16) + if err != nil { + return 0, err + } + return uint16(port16), nil +} + // A FunnelConn wraps a net.Conn that is coming over a // Funnel connection. It can be used to determine further // information about the connection, like the source address @@ -62,6 +78,25 @@ type FunnelConn struct { Src netip.AddrPort } +// FunnelRequestLog is the JSON type written out to io.Writers +// watching funnel connections via ipnlocal.StreamFunnel. +// +// This structure is in development and subject to change. +type FunnelRequestLog struct { + Time time.Time `json:",omitempty"` // time of request forwarding + + // SrcAddr is the address that initiated the Funnel request. + SrcAddr netip.AddrPort `json:",omitempty"` + + // The following fields are only populated if the connection + // initiated from another node on the client's tailnet. + + NodeName string `json:",omitempty"` // src node MagicDNS name + NodeTags []string `json:",omitempty"` // src node tags + UserLoginName string `json:",omitempty"` // src node's owner login (if not tagged) + UserDisplayName string `json:",omitempty"` // src node's owner name (if not tagged) +} + // WebServerConfig describes a web server's configuration. type WebServerConfig struct { Handlers map[string]*HTTPHandler // mountPoint => handler