Compare commits

...

1 Commits

Author SHA1 Message Date
Sonia Appasamy 83c8653eed
cmd/tailscale/cli, ipn/ipnlocal: [funnel] add stream mode
Adds ability to start Funnel in the foreground and stream incoming
connections. When foreground process is stopped, Funnel is turned
back off for the port.

Exampe usage:
```
TS_DEBUG_FUNNEL_STREAM=on tailscale funnel 443 stream
```

Updates #8489

Signed-off-by: Sonia Appasamy <sonia@tailscale.com>
2023-06-29 12:25:38 -04:00
8 changed files with 245 additions and 18 deletions

View File

@ -946,6 +946,14 @@ func (lc *LocalClient) NetworkLockForceLocalDisable(ctx context.Context) error {
return nil
}
// NetworkLockDisable shuts down network-lock across the tailnet.
func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error {
if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil {
return fmt.Errorf("error: %w", err)
}
return nil
}
// SetServeConfig sets or replaces the serving settings.
// If config is nil, settings are cleared and serving is disabled.
func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConfig) error {
@ -956,14 +964,6 @@ func (lc *LocalClient) SetServeConfig(ctx context.Context, config *ipn.ServeConf
return nil
}
// NetworkLockDisable shuts down network-lock across the tailnet.
func (lc *LocalClient) NetworkLockDisable(ctx context.Context, secret []byte) error {
if _, err := lc.send(ctx, "POST", "/localapi/v0/tka/disable", 200, bytes.NewReader(secret)); err != nil {
return fmt.Errorf("error: %w", err)
}
return nil
}
// GetServeConfig return the current serve config.
//
// If the serve config is empty, it returns (nil, nil).
@ -982,6 +982,30 @@ func getServeConfigFromJSON(body []byte) (sc *ipn.ServeConfig, err error) {
return sc, nil
}
// StreamFunnel opens funnel on the provided HostPort and keeps the
// connection to the local backend open, streaming connections made
// to the funnel URL.
//
// When the context is cancelled, the funnel is closed on HostPort.
//
// During the duration that the connection is open, the client's
// ServeConfig will show that funnel is streaming for this HostPort.
func (lc *LocalClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) {
req, err := http.NewRequestWithContext(ctx, "POST", "http://"+apitype.LocalAPIHost+"/localapi/v0/stream-funnel", jsonBody(hp))
if err != nil {
return nil, err
}
res, err := lc.doLocalRequestNiceError(req)
if err != nil {
return nil, err
}
if res.StatusCode != 200 {
res.Body.Close()
return nil, errors.New(res.Status)
}
return res.Body, nil
}
// tailscaledConnectHint gives a little thing about why tailscaled (or
// platform equivalent) is not answering localapi connections.
//

View File

@ -7,6 +7,7 @@ import (
"context"
"flag"
"fmt"
"io"
"net"
"os"
"strconv"
@ -66,20 +67,20 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
return flag.ErrHelp
}
var on bool
var stream, on bool
switch args[1] {
case "stream":
if s := os.Getenv("TS_DEBUG_FUNNEL_STREAM"); s == "on" {
stream = true
} else {
return flag.ErrHelp
}
case "on", "off":
on = args[1] == "on"
default:
return flag.ErrHelp
}
sc, err := e.lc.GetServeConfig(ctx)
if err != nil {
return err
}
if sc == nil {
sc = new(ipn.ServeConfig)
}
st, err := e.getLocalClientStatus(ctx)
if err != nil {
return fmt.Errorf("getting client status: %w", err)
@ -96,6 +97,18 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
}
dnsName := strings.TrimSuffix(st.Self.DNSName, ".")
hp := ipn.HostPort(dnsName + ":" + strconv.Itoa(int(port)))
if stream {
return e.streamFunnel(ctx, hp)
}
sc, err := e.lc.GetServeConfig(ctx)
if err != nil {
return err
}
if sc == nil {
sc = new(ipn.ServeConfig)
}
if on == sc.AllowFunnel[hp] {
printFunnelWarning(sc)
// Nothing to do.
@ -117,6 +130,33 @@ func (e *serveEnv) runFunnel(ctx context.Context, args []string) error {
return nil
}
func (e *serveEnv) streamFunnel(ctx context.Context, hp ipn.HostPort) error {
sc, err := e.lc.GetServeConfig(ctx)
if err != nil {
return err
}
if sc == nil {
sc = new(ipn.ServeConfig)
}
if sc.AllowFunnel[hp] {
return fmt.Errorf("Funnel already running on %q", hp)
}
mak.Set(&sc.AllowFunnel, hp, true)
printFunnelWarning(sc)
stream, err := e.lc.StreamFunnel(ctx, hp)
if err != nil {
return err
}
defer stream.Close()
fmt.Fprintf(os.Stderr, "Funnel started on \"https://%s\".\n", hp)
fmt.Fprintf(os.Stderr, "Press Ctrl-C to stop Funnel.\n\n")
_, err = io.Copy(os.Stdout, stream)
return err
}
// printFunnelWarning prints a warning if the Funnel is on but there is no serve
// config for its host:port.
func printFunnelWarning(sc *ipn.ServeConfig) {

View File

@ -128,6 +128,7 @@ type localServeClient interface {
Status(context.Context) (*ipnstate.Status, error)
GetServeConfig(context.Context) (*ipn.ServeConfig, error)
SetServeConfig(context.Context, *ipn.ServeConfig) error
StreamFunnel(context.Context, ipn.HostPort) (io.ReadCloser, error) // TODO: testing :)
}
// serveEnv is the environment the serve command runs within. All I/O should be

View File

@ -8,6 +8,7 @@ import (
"context"
"flag"
"fmt"
"io"
"os"
"path/filepath"
"reflect"
@ -782,6 +783,11 @@ func (lc *fakeLocalServeClient) SetServeConfig(ctx context.Context, config *ipn.
return nil
}
func (lc *fakeLocalServeClient) StreamFunnel(ctx context.Context, hp ipn.HostPort) (io.ReadCloser, error) {
// TODO: testing :)
return nil, nil
}
// exactError returns an error checker that wants exactly the provided want error.
// If optName is non-empty, it's used in the error message.
func exactErr(want error, optName ...string) func(error) string {

View File

@ -240,8 +240,9 @@ type LocalBackend struct {
lastServeConfJSON mem.RO // last JSON that was parsed into serveConfig
serveConfig ipn.ServeConfigView // or !Valid if none
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
serveListeners map[netip.AddrPort]*serveListener // addrPort => serveListener
serveProxyHandlers sync.Map // string (HTTPHandler.Proxy) => *httputil.ReverseProxy
funnelStreamers map[uint16]*func(ipn.FunnelRequestLog) // serve port => stream logger
// statusLock must be held before calling statusChanged.Wait() or
// statusChanged.Broadcast().

View File

@ -257,6 +257,96 @@ func (b *LocalBackend) ServeConfig() ipn.ServeConfigView {
return b.serveConfig
}
// StreamFunnel turns on Funnel for the given HostPort, and keeps it on
// for the duration that the context is open.
//
// When the context is done, Funnel is turned off for the port.
//
// During the stream, any incoming connections to the port are reported
// out to the provided io.Writer, in the format ipn.FunnelRequestLog.
func (b *LocalBackend) StreamFunnel(ctx context.Context, w io.Writer, hp ipn.HostPort) (err error) {
port, err := hp.Port()
if err != nil {
return err
}
// Turn on Funnel for the given HostPort.
sc := b.ServeConfig().AsStruct()
mak.Set(&sc.AllowFunnel, hp, true)
if err := b.SetServeConfig(sc); err != nil {
return err
}
// Defer turning off Funnel once stream ends.
cleanup := func() {
sc := b.ServeConfig().AsStruct()
delete(sc.AllowFunnel, hp)
err = errors.Join(err, b.SetServeConfig(sc))
}
defer cleanup()
f, ok := w.(http.Flusher)
if !ok {
return errors.New("writer not a flusher")
}
var writeErrs []error
writeToStream := func(log ipn.FunnelRequestLog) {
jsonLog, err := json.Marshal(log)
if err != nil {
writeErrs = append(writeErrs, err)
return
}
if _, err := fmt.Fprintf(w, "%s\n", jsonLog); err != nil {
writeErrs = append(writeErrs, err)
return
}
f.Flush()
}
// Hook up connections stream.
b.mu.Lock()
mak.Set(&b.funnelStreamers, port, &writeToStream)
b.mu.Unlock()
select {
case <-ctx.Done():
}
b.mu.Lock()
delete(b.funnelStreamers, port)
b.mu.Unlock()
return errors.Join(writeErrs...)
}
// maybeLogFunnelConnection logs out an ipn.FunnelRequestLog for the
// connection if a streamer is currently active for the given destPort.
func (b *LocalBackend) maybeLogFunnelConnection(destPort uint16, srcAddr netip.AddrPort) {
b.mu.Lock()
stream := b.funnelStreamers[destPort]
b.mu.Unlock()
if stream == nil {
return
}
var log ipn.FunnelRequestLog
log.SrcAddr = srcAddr
log.Time = time.Now() // TODO: use a different clock somewhere?
if node, user, ok := b.WhoIs(srcAddr); ok {
log.NodeName = node.ComputedName
if node.IsTagged() {
log.NodeTags = node.Tags
} else {
log.UserLoginName = user.LoginName
log.UserDisplayName = user.DisplayName
}
}
(*stream)(log)
}
func (b *LocalBackend) HandleIngressTCPConn(ingressPeer *tailcfg.Node, target ipn.HostPort, srcAddr netip.AddrPort, getConnOrReset func() (net.Conn, bool), sendRST func()) {
b.mu.Lock()
sc := b.serveConfig
@ -359,6 +449,7 @@ func (b *LocalBackend) tcpHandlerForServe(dport uint16, srcAddr netip.AddrPort)
if backDst := tcph.TCPForward(); backDst != "" {
return func(conn net.Conn) error {
defer conn.Close()
b.maybeLogFunnelConnection(dport, srcAddr)
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
backConn, err := b.dialer.SystemDial(ctx, "tcp", backDst)
cancel()
@ -522,6 +613,10 @@ func (b *LocalBackend) serveWebHandler(w http.ResponseWriter, r *http.Request) {
http.NotFound(w, r)
return
}
if c, ok := getServeHTTPContext(r); ok {
b.maybeLogFunnelConnection(c.DestPort, c.SrcAddr)
}
if s := h.Text(); s != "" {
w.Header().Set("Content-Type", "text/plain; charset=utf-8")
io.WriteString(w, s)

View File

@ -95,6 +95,7 @@ var handler = map[string]localAPIHandler{
"set-expiry-sooner": (*Handler).serveSetExpirySooner,
"start": (*Handler).serveStart,
"status": (*Handler).serveStatus,
"stream-funnel": (*Handler).serveStreamFunnel,
"tka/init": (*Handler).serveTKAInit,
"tka/log": (*Handler).serveTKALog,
"tka/modify": (*Handler).serveTKAModify,
@ -842,6 +843,30 @@ func (h *Handler) serveServeConfig(w http.ResponseWriter, r *http.Request) {
}
}
func (h *Handler) serveStreamFunnel(w http.ResponseWriter, r *http.Request) {
if !h.PermitWrite {
http.Error(w, "funnel stream denied", http.StatusForbidden)
return
}
if r.Method != "POST" {
http.Error(w, "POST required", http.StatusMethodNotAllowed)
return
}
hp := new(ipn.HostPort)
if err := json.NewDecoder(r.Body).Decode(hp); err != nil {
writeErrorJSON(w, fmt.Errorf("decoding HostPort: %w", err))
return
}
w.(http.Flusher).Flush()
w.Header().Set("Content-Type", "application/json")
if err := h.b.StreamFunnel(r.Context(), w, *hp); err != nil {
writeErrorJSON(w, fmt.Errorf("streaming funnel: %w", err))
return
}
w.WriteHeader(http.StatusOK)
}
func (h *Handler) serveCheckIPForwarding(w http.ResponseWriter, r *http.Request) {
if !h.PermitRead {
http.Error(w, "IP forwarding check access denied", http.StatusForbidden)

View File

@ -11,6 +11,7 @@ import (
"net/url"
"strconv"
"strings"
"time"
"golang.org/x/exp/slices"
"tailscale.com/tailcfg"
@ -42,6 +43,21 @@ type ServeConfig struct {
// There is no implicit port 443. It must contain a colon.
type HostPort string
// Port extracts just the port number from hp.
// An error is reported in the case that the hp does not
// have a valid numeric port ending.
func (hp HostPort) Port() (uint16, error) {
_, port, err := net.SplitHostPort(string(hp))
if err != nil {
return 0, err
}
port16, err := strconv.ParseUint(port, 10, 16)
if err != nil {
return 0, err
}
return uint16(port16), nil
}
// A FunnelConn wraps a net.Conn that is coming over a
// Funnel connection. It can be used to determine further
// information about the connection, like the source address
@ -62,6 +78,25 @@ type FunnelConn struct {
Src netip.AddrPort
}
// FunnelRequestLog is the JSON type written out to io.Writers
// watching funnel connections via ipnlocal.StreamFunnel.
//
// This structure is in development and subject to change.
type FunnelRequestLog struct {
Time time.Time `json:",omitempty"` // time of request forwarding
// SrcAddr is the address that initiated the Funnel request.
SrcAddr netip.AddrPort `json:",omitempty"`
// The following fields are only populated if the connection
// initiated from another node on the client's tailnet.
NodeName string `json:",omitempty"` // src node MagicDNS name
NodeTags []string `json:",omitempty"` // src node tags
UserLoginName string `json:",omitempty"` // src node's owner login (if not tagged)
UserDisplayName string `json:",omitempty"` // src node's owner name (if not tagged)
}
// WebServerConfig describes a web server's configuration.
type WebServerConfig struct {
Handlers map[string]*HTTPHandler // mountPoint => handler