ipn/store/kubestore: cache state in memory
Fixes #7671 Signed-off-by: Maisem Ali <maisem@tailscale.com>maisem/k8s-cache
parent
0e203e414f
commit
108933c04d
|
@ -11,6 +11,7 @@ import (
|
|||
"time"
|
||||
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/ipn/store/mem"
|
||||
"tailscale.com/kube"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
@ -19,6 +20,8 @@ import (
|
|||
type Store struct {
|
||||
client *kube.Client
|
||||
secretName string
|
||||
|
||||
memory mem.Store
|
||||
}
|
||||
|
||||
// New returns a new Store that persists to the named secret.
|
||||
|
@ -27,31 +30,37 @@ func New(_ logger.Logf, secretName string) (*Store, error) {
|
|||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return &Store{
|
||||
s := &Store{
|
||||
client: c,
|
||||
secretName: secretName,
|
||||
}, nil
|
||||
}
|
||||
// Hydrate cache with the potentially current state
|
||||
if err := s.loadState(); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
return s, nil
|
||||
}
|
||||
|
||||
func (s *Store) String() string { return "kube.Store" }
|
||||
|
||||
// ReadState implements the StateStore interface.
|
||||
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
|
||||
func (s *Store) loadState() error {
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
secret, err := s.client.GetSecret(ctx, s.secretName)
|
||||
if err != nil {
|
||||
if st, ok := err.(*kube.Status); ok && st.Code == 404 {
|
||||
return nil, ipn.ErrStateNotExist
|
||||
return nil
|
||||
}
|
||||
return nil, err
|
||||
return err
|
||||
}
|
||||
b, ok := secret.Data[sanitizeKey(id)]
|
||||
if !ok {
|
||||
return nil, ipn.ErrStateNotExist
|
||||
}
|
||||
return b, nil
|
||||
s.memory.LoadFromMap(secret.Data)
|
||||
return nil
|
||||
}
|
||||
|
||||
func (s *Store) String() string { return "kube.Store" }
|
||||
|
||||
// ReadState implements the StateStore interface.
|
||||
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
|
||||
return s.memory.ReadState(ipn.StateKey(sanitizeKey(id)))
|
||||
}
|
||||
|
||||
func sanitizeKey(k ipn.StateKey) string {
|
||||
|
@ -66,7 +75,12 @@ func sanitizeKey(k ipn.StateKey) string {
|
|||
}
|
||||
|
||||
// WriteState implements the StateStore interface.
|
||||
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
|
||||
func (s *Store) WriteState(id ipn.StateKey, bs []byte) (err error) {
|
||||
defer func() {
|
||||
if err == nil {
|
||||
s.memory.WriteState(id, bs)
|
||||
}
|
||||
}()
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
|
||||
|
|
|
@ -9,6 +9,7 @@ import (
|
|||
"encoding/json"
|
||||
"sync"
|
||||
|
||||
"golang.org/x/exp/maps"
|
||||
"tailscale.com/ipn"
|
||||
"tailscale.com/types/logger"
|
||||
)
|
||||
|
@ -27,6 +28,7 @@ type Store struct {
|
|||
func (s *Store) String() string { return "mem.Store" }
|
||||
|
||||
// ReadState implements the StateStore interface.
|
||||
// It returns ipn.ErrStateNotExist if the state does not exist.
|
||||
func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -38,6 +40,7 @@ func (s *Store) ReadState(id ipn.StateKey) ([]byte, error) {
|
|||
}
|
||||
|
||||
// WriteState implements the StateStore interface.
|
||||
// It never returns an error.
|
||||
func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
|
@ -48,6 +51,19 @@ func (s *Store) WriteState(id ipn.StateKey, bs []byte) error {
|
|||
return nil
|
||||
}
|
||||
|
||||
// LoadFromMap loads the in-memory cache from the provided map.
|
||||
// Any existing content is cleared, and the provided map is
|
||||
// copied into the cache.
|
||||
func (s *Store) LoadFromMap(m map[string][]byte) {
|
||||
s.mu.Lock()
|
||||
defer s.mu.Unlock()
|
||||
maps.Clear(s.cache)
|
||||
for k, v := range m {
|
||||
s.cache[ipn.StateKey(k)] = v
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
// LoadFromJSON attempts to unmarshal json content into the
|
||||
// in-memory cache.
|
||||
func (s *Store) LoadFromJSON(data []byte) error {
|
||||
|
|
Loading…
Reference in New Issue