Compare commits

...

1 Commits

Author SHA1 Message Date
Maisem Ali 108933c04d ipn/store/kubestore: cache state in memory
Fixes #7671

Signed-off-by: Maisem Ali <maisem@tailscale.com>
2023-03-23 17:10:09 -07:00
2 changed files with 44 additions and 14 deletions

View File

@ -11,6 +11,7 @@ import (
"time" "time"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/ipn/store/mem"
"tailscale.com/kube" "tailscale.com/kube"
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
@ -19,6 +20,8 @@ import (
type Store struct { type Store struct {
client *kube.Client client *kube.Client
secretName string secretName string
memory mem.Store
} }
// New returns a new Store that persists to the named secret. // New returns a new Store that persists to the named secret.
@ -27,31 +30,37 @@ func New(_ logger.Logf, secretName string) (*Store, error) {
if err != nil { if err != nil {
return nil, err return nil, err
} }
return &Store{ s := &Store{
client: c, client: c,
secretName: secretName, secretName: secretName,
}, nil }
// Hydrate cache with the potentially current state
if err := s.loadState(); err != nil {
return nil, err
}
return s, nil
} }
func (s *Store) String() string { return "kube.Store" } func (s *Store) loadState() error {
// ReadState implements the StateStore interface.
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()
secret, err := s.client.GetSecret(ctx, s.secretName) secret, err := s.client.GetSecret(ctx, s.secretName)
if err != nil { if err != nil {
if st, ok := err.(*kube.Status); ok && st.Code == 404 { if st, ok := err.(*kube.Status); ok && st.Code == 404 {
return nil, ipn.ErrStateNotExist return nil
} }
return nil, err return err
} }
b, ok := secret.Data[sanitizeKey(id)] s.memory.LoadFromMap(secret.Data)
if !ok { return nil
return nil, ipn.ErrStateNotExist
} }
return b, nil
func (s *Store) String() string { return "kube.Store" }
// ReadState implements the StateStore interface.
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
return s.memory.ReadState(ipn.StateKey(sanitizeKey(id)))
} }
func sanitizeKey(k ipn.StateKey) string { func sanitizeKey(k ipn.StateKey) string {
@ -66,7 +75,12 @@ func sanitizeKey(k ipn.StateKey) string {
} }
// WriteState implements the StateStore interface. // WriteState implements the StateStore interface.
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
defer func() {
if err == nil {
s.memory.WriteState(id, bs)
}
}()
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel() defer cancel()

View File

@ -9,6 +9,7 @@ import (
"encoding/json" "encoding/json"
"sync" "sync"
"golang.org/x/exp/maps"
"tailscale.com/ipn" "tailscale.com/ipn"
"tailscale.com/types/logger" "tailscale.com/types/logger"
) )
@ -27,6 +28,7 @@ type Store struct {
func (s *Store) String() string { return "mem.Store" } func (s *Store) String() string { return "mem.Store" }
// ReadState implements the StateStore interface. // ReadState implements the StateStore interface.
// It returns ipn.ErrStateNotExist if the state does not exist.
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) { func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -38,6 +40,7 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
} }
// WriteState implements the StateStore interface. // WriteState implements the StateStore interface.
// It never returns an error.
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error { func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
s.mu.Lock() s.mu.Lock()
defer s.mu.Unlock() defer s.mu.Unlock()
@ -48,6 +51,19 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
return nil return nil
} }
// LoadFromMap loads the in-memory cache from the provided map.
// Any existing content is cleared, and the provided map is
// copied into the cache.
func (s *Store) LoadFromMap(m map[string][]byte) {
s.mu.Lock()
defer s.mu.Unlock()
maps.Clear(s.cache)
for k, v := range m {
s.cache[ipn.StateKey(k)] = v
}
return
}
// LoadFromJSON attempts to unmarshal json content into the // LoadFromJSON attempts to unmarshal json content into the
// in-memory cache. // in-memory cache.
func (s *Store) LoadFromJSON(data []byte) error { func (s *Store) LoadFromJSON(data []byte) error {