cmd/tailscale/cli, ipn/ipnlocal: [funnel] add stream mode
Adds ability to start Funnel in the foreground and stream incoming connections. When foreground process is stopped, Funnel is turned back off for the port. Exampe usage: ``` TS_DEBUG_FUNNEL_STREAM=on tailscale funnel 443 stream ``` Updates #8489 Signed-off-by: Sonia Appasamy <sonia@tailscale.com>soniaappasamy/funnel-foreground-play
parent
8e840489ed
commit
83c8653eed
|
@ -946,6 +946,14 @@ func (lc *LocalClient) NetworkLockForceLocalDisable(ctx context.Context) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// NetworkLockDisable shuts down network-lock across the tailnet.
|
||||
func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error {
|
||||
if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil {
|
||||
return fmt.Errorf("error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// SetServeConfig sets or replaces the serving settings.
|
||||
// If config is nil, settings are cleared and serving is disabled.
|
||||
func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConfig) error {
|
||||
|
@ -956,14 +964,6 @@ func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConf
|
|||
return nil
|
||||
}
|
||||
|
||||
// NetworkLockDisable shuts down network-lock across the tailnet.
|
||||
func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error {
|
||||
if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil {
|
||||
return fmt.Errorf("error: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetServeConfig return the current serve config.
|
||||
//
|
||||
// If the serve config is empty, it returns (nil, nil).
|
||||
|
@ -982,6 +982,30 @@ func getServeConfigFromJSON(body []byte) (sc *ipn.ServeConfig, err error) {
|
|||
return sc, nil
|
||||
}
|
||||
|
||||
// StreamFunnel opens funnel on the provided HostPort and keeps the
|
||||
// connection to the local backend open, streaming connections made
|
||||
// to the funnel URL.
|
||||
//
|
||||
// When the context is cancelled, the funnel is closed on HostPort.
|
||||
//
|
||||
// During the duration that the connection is open, the client's
|
||||
// ServeConfig will show that funnel is streaming for this HostPort.
|
||||
func (lc *LocalClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/stream-funnel", jsonBody(hp))
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
res, err := lc.doLocalRequestNiceError(req)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if res.StatusCode != 200 {
|
||||
res.Body.Close()
|
||||
return nil, errors.New(res.Status)
|
||||
}
|
||||
return res.Body, nil
|
||||
}
|
||||
|
||||
// tailscaledConnectHint gives a little thing about why tailscaled (or
|
||||
// platform equivalent) is not answering localapi connections.
|
||||
//
|
||||
|
|
|
@ -7,6 +7,7 @@ import (
|
|||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"net"
|
||||
"os"
|
||||
"strconv"
|
||||
|
@ -66,20 +67,20 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
|
|||
return flag.ErrHelp
|
||||
}
|
||||
|
||||
var on bool
|
||||
var stream, on bool
|
||||
switch args[1] {
|
||||
case "stream":
|
||||
if s := os.Getenv("TS_DEBUG_FUNNEL_STREAM"); s == "on" {
|
||||
stream = true
|
||||
} else {
|
||||
return flag.ErrHelp
|
||||
}
|
||||
case "on", "off":
|
||||
on = args[1] == "on"
|
||||
default:
|
||||
return flag.ErrHelp
|
||||
}
|
||||
sc, err := e.lc.GetServeConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if sc == nil {
|
||||
sc = new(ipn.ServeConfig)
|
||||
}
|
||||
|
||||
st, err := e.getLocalClientStatus(ctx)
|
||||
if err != nil {
|
||||
return fmt.Errorf("getting client status: %w", err)
|
||||
|
@ -96,6 +97,18 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
|
|||
}
|
||||
dnsName := strings.TrimSuffix(st.Self.DNSName, ".")
|
||||
hp := ipn.HostPort(dnsName + ":" + strconv.Itoa(int(port)))
|
||||
|
||||
if stream {
|
||||
return e.streamFunnel(ctx, hp)
|
||||
}
|
||||
|
||||
sc, err := e.lc.GetServeConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if sc == nil {
|
||||
sc = new(ipn.ServeConfig)
|
||||
}
|
||||
if on == sc.AllowFunnel[hp] {
|
||||
printFunnelWarning(sc)
|
||||
// Nothing to do.
|
||||
|
@ -117,6 +130,33 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
func (e *serveEnv) streamFunnel(ctx context.Context, hp ipn.HostPort) error {
|
||||
sc, err := e.lc.GetServeConfig(ctx)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if sc == nil {
|
||||
sc = new(ipn.ServeConfig)
|
||||
}
|
||||
if sc.AllowFunnel[hp] {
|
||||
return fmt.Errorf("Funnel already running on %q", hp)
|
||||
}
|
||||
|
||||
mak.Set(&sc.AllowFunnel, hp, true)
|
||||
printFunnelWarning(sc)
|
||||
|
||||
stream, err := e.lc.StreamFunnel(ctx, hp)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
defer stream.Close()
|
||||
|
||||
fmt.Fprintf(os.Stderr, "Funnel started on \"https://%s\".\n", hp)
|
||||
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop Funnel.\n\n")
|
||||
_, err = io.Copy(os.Stdout, stream)
|
||||
return err
|
||||
}
|
||||
|
||||
// printFunnelWarning prints a warning if the Funnel is on but there is no serve
|
||||
// config for its host:port.
|
||||
func printFunnelWarning(sc *ipn.ServeConfig) {
|
||||
|
|
|
@ -128,6 +128,7 @@ type localServeClient interface {
|
|||
Status(context.Context) (*ipnstate.Status, error)
|
||||
GetServeConfig(context.Context) (*ipn.ServeConfig, error)
|
||||
SetServeConfig(context.Context, *ipn.ServeConfig) error
|
||||
StreamFunnel(context.Context, ipn.HostPort) (io.ReadCloser, error) // TODO: testing :)
|
||||
}
|
||||
|
||||
// serveEnv is the environment the serve command runs within. All I/O should be
|
||||
|
|
|
@ -8,6 +8,7 @@ import (
|
|||
"context"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"os"
|
||||
"path/filepath"
|
||||
"reflect"
|
||||
|
@ -782,6 +783,11 @@ func (lc *fakeLocalServeClient) SetServeConfig(ctx context.Context, config *ipn.
|
|||
return nil
|
||||
}
|
||||
|
||||
func (lc *fakeLocalServeClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) {
|
||||
// TODO: testing :)
|
||||
return nil, nil
|
||||
}
|
||||
|
||||
// exactError returns an error checker that wants exactly the provided want error.
|
||||
// If optName is non-empty, it's used in the error message.
|
||||
func exactErr(want error, optName ...string) func(error) string {
|
||||
|
|
|
@ -242,6 +242,7 @@ type LocalBackend struct {
|
|||
|
||||
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
|
||||
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
|
||||
funnelStreamers map[uint16]*func(ipn.FunnelRequestLog) // serve port => stream logger
|
||||
|
||||
// statusLock must be held before calling statusChanged.Wait() or
|
||||
// statusChanged.Broadcast().
|
||||
|
|
|
@ -257,6 +257,96 @@ func (b *LocalBackend) ServeConfig() ipn.ServeConfigView {
|
|||
return b.serveConfig
|
||||
}
|
||||
|
||||
// StreamFunnel turns on Funnel for the given HostPort, and keeps it on
|
||||
// for the duration that the context is open.
|
||||
//
|
||||
// When the context is done, Funnel is turned off for the port.
|
||||
//
|
||||
// During the stream, any incoming connections to the port are reported
|
||||
// out to the provided io.Writer, in the format ipn.FunnelRequestLog.
|
||||
func (b *LocalBackend) StreamFunnel(ctx context.Context, w io.Writer, hp ipn.HostPort) (err error) {
|
||||
port, err := hp.Port()
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Turn on Funnel for the given HostPort.
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
mak.Set(&sc.AllowFunnel, hp, true)
|
||||
if err := b.SetServeConfig(sc); err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
// Defer turning off Funnel once stream ends.
|
||||
cleanup := func() {
|
||||
sc := b.ServeConfig().AsStruct()
|
||||
delete(sc.AllowFunnel, hp)
|
||||
err = errors.Join(err, b.SetServeConfig(sc))
|
||||
}
|
||||
defer cleanup()
|
||||
|
||||
f, ok := w.(http.Flusher)
|
||||
if !ok {
|
||||
return errors.New("writer not a flusher")
|
||||
}
|
||||
|
||||
var writeErrs []error
|
||||
writeToStream := func(log ipn.FunnelRequestLog) {
|
||||
jsonLog, err := json.Marshal(log)
|
||||
if err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
}
|
||||
if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil {
|
||||
writeErrs = append(writeErrs, err)
|
||||
return
|
||||
}
|
||||
f.Flush()
|
||||
}
|
||||
|
||||
// Hook up connections stream.
|
||||
b.mu.Lock()
|
||||
mak.Set(&b.funnelStreamers, port, &writeToStream)
|
||||
b.mu.Unlock()
|
||||
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
}
|
||||
|
||||
b.mu.Lock()
|
||||
delete(b.funnelStreamers, port)
|
||||
b.mu.Unlock()
|
||||
|
||||
return errors.Join(writeErrs...)
|
||||
}
|
||||
|
||||
// maybeLogFunnelConnection logs out an ipn.FunnelRequestLog for the
|
||||
// connection if a streamer is currently active for the given destPort.
|
||||
func (b *LocalBackend) maybeLogFunnelConnection(destPort uint16, srcAddr netip.AddrPort) {
|
||||
b.mu.Lock()
|
||||
stream := b.funnelStreamers[destPort]
|
||||
b.mu.Unlock()
|
||||
if stream == nil {
|
||||
return
|
||||
}
|
||||
|
||||
var log ipn.FunnelRequestLog
|
||||
log.SrcAddr = srcAddr
|
||||
log.Time = time.Now() // TODO: use a different clock somewhere?
|
||||
|
||||
if node, user, ok := b.WhoIs(srcAddr); ok {
|
||||
log.NodeName = node.ComputedName
|
||||
if node.IsTagged() {
|
||||
log.NodeTags = node.Tags
|
||||
} else {
|
||||
log.UserLoginName = user.LoginName
|
||||
log.UserDisplayName = user.DisplayName
|
||||
}
|
||||
}
|
||||
|
||||
(*stream)(log)
|
||||
}
|
||||
|
||||
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer *tailcfg.Node, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) {
|
||||
b.mu.Lock()
|
||||
sc := b.serveConfig
|
||||
|
@ -359,6 +449,7 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort)
|
|||
if backDst := tcph.TCPForward(); backDst != "" {
|
||||
return func(conn net.Conn) error {
|
||||
defer conn.Close()
|
||||
b.maybeLogFunnelConnection(dport, srcAddr)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
|
||||
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
|
||||
cancel()
|
||||
|
@ -522,6 +613,10 @@ func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) {
|
|||
http.NotFound(w, r)
|
||||
return
|
||||
}
|
||||
if c, ok := getServeHTTPContext(r); ok {
|
||||
b.maybeLogFunnelConnection(c.DestPort, c.SrcAddr)
|
||||
}
|
||||
|
||||
if s := h.Text(); s != "" {
|
||||
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
|
||||
io.WriteString(w, s)
|
||||
|
|
|
@ -95,6 +95,7 @@ var handler = map[string]localAPIHandler{
|
|||
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
|
||||
"start": (*Handler).serveStart,
|
||||
"status": (*Handler).serveStatus,
|
||||
"stream-funnel": (*Handler).serveStreamFunnel,
|
||||
"tka/init": (*Handler).serveTKAInit,
|
||||
"tka/log": (*Handler).serveTKALog,
|
||||
"tka/modify": (*Handler).serveTKAModify,
|
||||
|
@ -842,6 +843,30 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) {
|
|||
}
|
||||
}
|
||||
|
||||
func (h *Handler) serveStreamFunnel(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.PermitWrite {
|
||||
http.Error(w, "funnel stream denied", http.StatusForbidden)
|
||||
return
|
||||
}
|
||||
if r.Method != "POST" {
|
||||
http.Error(w, "POST required", http.StatusMethodNotAllowed)
|
||||
return
|
||||
}
|
||||
hp := new(ipn.HostPort)
|
||||
if err := json.NewDecoder(r.Body).Decode(hp); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("decoding HostPort: %w", err))
|
||||
return
|
||||
}
|
||||
|
||||
w.(http.Flusher).Flush()
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
if err := h.b.StreamFunnel(r.Context(), w, *hp); err != nil {
|
||||
writeErrorJSON(w, fmt.Errorf("streaming funnel: %w", err))
|
||||
return
|
||||
}
|
||||
w.WriteHeader(http.StatusOK)
|
||||
}
|
||||
|
||||
func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) {
|
||||
if !h.PermitRead {
|
||||
http.Error(w, "IP forwarding check access denied", http.StatusForbidden)
|
||||
|
|
35
ipn/serve.go
35
ipn/serve.go
|
@ -11,6 +11,7 @@ import (
|
|||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"golang.org/x/exp/slices"
|
||||
"tailscale.com/tailcfg"
|
||||
|
@ -42,6 +43,21 @@ type ServeConfig struct {
|
|||
// There is no implicit port 443. It must contain a colon.
|
||||
type HostPort string
|
||||
|
||||
// Port extracts just the port number from hp.
|
||||
// An error is reported in the case that the hp does not
|
||||
// have a valid numeric port ending.
|
||||
func (hp HostPort) Port() (uint16, error) {
|
||||
_, port, err := net.SplitHostPort(string(hp))
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
port16, err := strconv.ParseUint(port, 10, 16)
|
||||
if err != nil {
|
||||
return 0, err
|
||||
}
|
||||
return uint16(port16), nil
|
||||
}
|
||||
|
||||
// A FunnelConn wraps a net.Conn that is coming over a
|
||||
// Funnel connection. It can be used to determine further
|
||||
// information about the connection, like the source address
|
||||
|
@ -62,6 +78,25 @@ type FunnelConn struct {
|
|||
Src netip.AddrPort
|
||||
}
|
||||
|
||||
// FunnelRequestLog is the JSON type written out to io.Writers
|
||||
// watching funnel connections via ipnlocal.StreamFunnel.
|
||||
//
|
||||
// This structure is in development and subject to change.
|
||||
type FunnelRequestLog struct {
|
||||
Time time.Time `json:",omitempty"` // time of request forwarding
|
||||
|
||||
// SrcAddr is the address that initiated the Funnel request.
|
||||
SrcAddr netip.AddrPort `json:",omitempty"`
|
||||
|
||||
// The following fields are only populated if the connection
|
||||
// initiated from another node on the client's tailnet.
|
||||
|
||||
NodeName string `json:",omitempty"` // src node MagicDNS name
|
||||
NodeTags []string `json:",omitempty"` // src node tags
|
||||
UserLoginName string `json:",omitempty"` // src node's owner login (if not tagged)
|
||||
UserDisplayName string `json:",omitempty"` // src node's owner name (if not tagged)
|
||||
}
|
||||
|
||||
// WebServerConfig describes a web server's configuration.
|
||||
type WebServerConfig struct {
|
||||
Handlers map[string]*HTTPHandler // mountPoint => handler
|
||||
|
|
Loading…
Reference in New Issue