mirror of https://github.com/grafana/grafana
K8s: JSON file storage (#75759)
parent
355a794e32
commit
798e224ce6
@ -1,5 +1,5 @@ |
||||
// +k8s:deepcopy-gen=package
|
||||
// +k8s:openapi-gen=true
|
||||
// +groupName=playlist.grafana.io
|
||||
// +groupName=playlist.grafana.app
|
||||
|
||||
package v0alpha1 // import "github.com/grafana/grafana/pkg/apis/playlist/v0alpha1"
|
||||
|
@ -0,0 +1,519 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package file |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"path/filepath" |
||||
"reflect" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/bwmarrin/snowflake" |
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
"k8s.io/apimachinery/pkg/api/meta" |
||||
"k8s.io/apimachinery/pkg/conversion" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
||||
"k8s.io/client-go/tools/cache" |
||||
) |
||||
|
||||
const MaxUpdateAttempts = 30 |
||||
|
||||
var _ storage.Interface = (*Storage)(nil) |
||||
|
||||
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
||||
type Storage struct { |
||||
root string |
||||
gr schema.GroupResource |
||||
codec runtime.Codec |
||||
keyFunc func(obj runtime.Object) (string, error) |
||||
newFunc func() runtime.Object |
||||
newListFunc func() runtime.Object |
||||
getAttrsFunc storage.AttrFunc |
||||
trigger storage.IndexerFuncs |
||||
indexers *cache.Indexers |
||||
|
||||
watchSet *WatchSet |
||||
} |
||||
|
||||
// ErrFileNotExists means the file doesn't actually exist.
|
||||
var ErrFileNotExists = fmt.Errorf("file doesn't exist") |
||||
|
||||
// ErrNamespaceNotExists means the directory for the namespace doesn't actually exist.
|
||||
var ErrNamespaceNotExists = errors.New("namespace does not exist") |
||||
|
||||
func getResourceVersion() (*uint64, error) { |
||||
node, err := snowflake.NewNode(1) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
snowflakeNumber := node.Generate().Int64() |
||||
resourceVersion := uint64(snowflakeNumber) |
||||
return &resourceVersion, nil |
||||
} |
||||
|
||||
// NewStorage instantiates a new Storage.
|
||||
func NewStorage( |
||||
config *storagebackend.ConfigForResource, |
||||
resourcePrefix string, |
||||
keyFunc func(obj runtime.Object) (string, error), |
||||
newFunc func() runtime.Object, |
||||
newListFunc func() runtime.Object, |
||||
getAttrsFunc storage.AttrFunc, |
||||
trigger storage.IndexerFuncs, |
||||
indexers *cache.Indexers, |
||||
) (storage.Interface, factory.DestroyFunc, error) { |
||||
if err := ensureDir(resourcePrefix); err != nil { |
||||
return nil, func() {}, fmt.Errorf("could not establish a writable directory at path=%s", resourcePrefix) |
||||
} |
||||
ws := NewWatchSet() |
||||
return &Storage{ |
||||
root: resourcePrefix, |
||||
gr: config.GroupResource, |
||||
codec: config.Codec, |
||||
keyFunc: keyFunc, |
||||
newFunc: newFunc, |
||||
newListFunc: newListFunc, |
||||
getAttrsFunc: getAttrsFunc, |
||||
trigger: trigger, |
||||
indexers: indexers, |
||||
|
||||
watchSet: ws, |
||||
}, func() { |
||||
ws.cleanupWatchers() |
||||
}, nil |
||||
} |
||||
|
||||
// Returns Versioner associated with this storage.
|
||||
func (s *Storage) Versioner() storage.Versioner { |
||||
return &storage.APIObjectVersioner{} |
||||
} |
||||
|
||||
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
||||
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
||||
// set to the read value from database.
|
||||
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error { |
||||
filename := s.filePath(key) |
||||
if exists(filename) { |
||||
return storage.NewKeyExistsError(key, 0) |
||||
} |
||||
|
||||
dirname := filepath.Dir(filename) |
||||
if err := ensureDir(dirname); err != nil { |
||||
return err |
||||
} |
||||
|
||||
generatedRV, err := getResourceVersion() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
metaObj, err := meta.Accessor(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
metaObj.SetSelfLink("") |
||||
|
||||
if err := s.Versioner().UpdateObject(obj, *generatedRV); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err := writeFile(s.codec, filename, obj); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// set a timer to delete the file after ttl seconds
|
||||
if ttl > 0 { |
||||
time.AfterFunc(time.Second*time.Duration(ttl), func() { |
||||
if err := s.Delete(ctx, key, s.newFunc(), &storage.Preconditions{}, func(ctx context.Context, obj runtime.Object) error { return nil }, obj); err != nil { |
||||
panic(err) |
||||
} |
||||
}) |
||||
} |
||||
|
||||
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil { |
||||
return err |
||||
} |
||||
|
||||
s.watchSet.notifyWatchers(watch.Event{ |
||||
Object: out.DeepCopyObject(), |
||||
Type: watch.Added, |
||||
}) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Delete removes the specified key and returns the value that existed at that spot.
|
||||
// If key didn't exist, it will return NotFound storage error.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) Delete( |
||||
ctx context.Context, |
||||
key string, |
||||
out runtime.Object, |
||||
preconditions *storage.Preconditions, |
||||
validateDeletion storage.ValidateObjectFunc, |
||||
cachedExistingObject runtime.Object, |
||||
) error { |
||||
filename := s.filePath(key) |
||||
var currentState runtime.Object |
||||
var stateIsCurrent bool |
||||
if cachedExistingObject != nil { |
||||
currentState = cachedExistingObject |
||||
} else { |
||||
getOptions := storage.GetOptions{} |
||||
if preconditions != nil && preconditions.ResourceVersion != nil { |
||||
getOptions.ResourceVersion = *preconditions.ResourceVersion |
||||
} |
||||
if err := s.Get(ctx, key, getOptions, currentState); err == nil { |
||||
stateIsCurrent = true |
||||
} |
||||
} |
||||
|
||||
for { |
||||
if preconditions != nil { |
||||
if err := preconditions.Check(key, out); err != nil { |
||||
if stateIsCurrent { |
||||
return err |
||||
} |
||||
|
||||
// If the state is not current, we need to re-read the state and try again.
|
||||
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err != nil { |
||||
return err |
||||
} |
||||
stateIsCurrent = true |
||||
continue |
||||
} |
||||
} |
||||
|
||||
if err := validateDeletion(ctx, out); err != nil { |
||||
if stateIsCurrent { |
||||
return err |
||||
} |
||||
|
||||
// If the state is not current, we need to re-read the state and try again.
|
||||
if err := s.Get(ctx, key, storage.GetOptions{}, currentState); err == nil { |
||||
stateIsCurrent = true |
||||
} |
||||
continue |
||||
} |
||||
|
||||
if err := s.Get(ctx, key, storage.GetOptions{}, out); err != nil { |
||||
return err |
||||
} |
||||
|
||||
generatedRV, err := getResourceVersion() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if err := s.Versioner().UpdateObject(out, *generatedRV); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err := deleteFile(filename); err != nil { |
||||
return err |
||||
} |
||||
|
||||
s.watchSet.notifyWatchers(watch.Event{ |
||||
Object: out.DeepCopyObject(), |
||||
Type: watch.Deleted, |
||||
}) |
||||
|
||||
return nil |
||||
} |
||||
} |
||||
|
||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||
// and any items selected by 'p' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
// If resource version is "0", this interface will get current object at given key
|
||||
// and send it in an "ADDED" event, before watch starts.
|
||||
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { |
||||
p := opts.Predicate |
||||
jw := s.watchSet.newWatch() |
||||
|
||||
listObj := s.newListFunc() |
||||
|
||||
if opts.ResourceVersion == "0" { |
||||
err := s.GetList(ctx, key, opts, listObj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
initEvents := make([]watch.Event, 0) |
||||
listPtr, err := meta.GetItemsPtr(listObj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
v, err := conversion.EnforcePtr(listPtr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if v.IsNil() { |
||||
jw.Start(p, initEvents) |
||||
return jw, nil |
||||
} |
||||
|
||||
for _, obj := range v.Elem().Interface().([]runtime.Object) { |
||||
initEvents = append(initEvents, watch.Event{ |
||||
Type: watch.Added, |
||||
Object: obj.DeepCopyObject(), |
||||
}) |
||||
} |
||||
jw.Start(p, initEvents) |
||||
return jw, nil |
||||
} |
||||
|
||||
// Get unmarshals object found at key into objPtr. On a not found error, will either
|
||||
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
|
||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { |
||||
filename := s.filePath(key) |
||||
if !exists(filename) { |
||||
if opts.IgnoreNotFound { |
||||
return runtime.SetZeroValue(objPtr) |
||||
} |
||||
rv, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return storage.NewKeyNotFoundError(key, int64(rv)) |
||||
} |
||||
|
||||
obj, err := readFile(s.codec, filename, func() runtime.Object { |
||||
return objPtr |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
currentVersion, err := s.Versioner().ObjectResourceVersion(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetList unmarshalls objects found at key into a *List api object (an object
|
||||
// that satisfies runtime.IsList definition).
|
||||
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
|
||||
// is true, 'key' is used as a prefix.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { |
||||
generatedRV, err := getResourceVersion() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
remainingItems := int64(0) |
||||
if err := s.Versioner().UpdateList(listObj, *generatedRV, "", &remainingItems); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if opts.ResourceVersion != "" { |
||||
resourceVersionInt, err := s.Versioner().ParseResourceVersion(opts.ResourceVersion) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if err := s.Versioner().UpdateList(listObj, resourceVersionInt, "", &remainingItems); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
objs, err := readDirRecursive(s.codec, key, s.newFunc) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
listPtr, err := meta.GetItemsPtr(listObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
v, err := conversion.EnforcePtr(listPtr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, obj := range objs { |
||||
currentVersion, err := s.Versioner().ObjectResourceVersion(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err = s.validateMinimumResourceVersion(opts.ResourceVersion, currentVersion); err != nil { |
||||
continue |
||||
} |
||||
|
||||
ok, err := opts.Predicate.Matches(obj) |
||||
if err == nil && ok { |
||||
v.Set(reflect.Append(v, reflect.ValueOf(obj).Elem())) |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
|
||||
// retrying the update until success if there is index conflict.
|
||||
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
|
||||
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
|
||||
// the current contents of the object when deciding how the update object should look.
|
||||
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
|
||||
// else `destination` will be set to the zero value of it's type.
|
||||
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
|
||||
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
|
||||
// contents.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) GuaranteedUpdate( |
||||
ctx context.Context, |
||||
key string, |
||||
destination runtime.Object, |
||||
ignoreNotFound bool, |
||||
preconditions *storage.Preconditions, |
||||
tryUpdate storage.UpdateFunc, |
||||
cachedExistingObject runtime.Object, |
||||
) error { |
||||
var res storage.ResponseMeta |
||||
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 { |
||||
var ( |
||||
filename = s.filePath(key) |
||||
|
||||
obj runtime.Object |
||||
err error |
||||
created bool |
||||
) |
||||
|
||||
if !exists(filename) && !ignoreNotFound { |
||||
return apierrors.NewNotFound(s.gr, s.nameFromKey(key)) |
||||
} |
||||
|
||||
obj, err = readFile(s.codec, filename, s.newFunc) |
||||
if err != nil { |
||||
// fallback to new object if the file is not found
|
||||
obj = s.newFunc() |
||||
created = true |
||||
} |
||||
|
||||
if err := preconditions.Check(key, obj); err != nil { |
||||
if attempt >= MaxUpdateAttempts { |
||||
return fmt.Errorf("precondition failed: %w", err) |
||||
} |
||||
continue |
||||
} |
||||
|
||||
updatedObj, _, err := tryUpdate(obj, res) |
||||
if err != nil { |
||||
if attempt >= MaxUpdateAttempts { |
||||
return err |
||||
} |
||||
continue |
||||
} |
||||
|
||||
unchanged, err := isUnchanged(s.codec, obj, updatedObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if unchanged { |
||||
u, err := conversion.EnforcePtr(updatedObj) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to enforce updated object pointer: %w", err) |
||||
} |
||||
d, err := conversion.EnforcePtr(destination) |
||||
if err != nil { |
||||
return fmt.Errorf("unable to enforce destination pointer: %w", err) |
||||
} |
||||
d.Set(u) |
||||
return nil |
||||
} |
||||
|
||||
generatedRV, err := getResourceVersion() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if err := s.Versioner().UpdateObject(updatedObj, *generatedRV); err != nil { |
||||
return err |
||||
} |
||||
if err := writeFile(s.codec, filename, updatedObj); err != nil { |
||||
return err |
||||
} |
||||
eventType := watch.Modified |
||||
if created { |
||||
eventType = watch.Added |
||||
} |
||||
s.watchSet.notifyWatchers(watch.Event{ |
||||
Object: updatedObj.DeepCopyObject(), |
||||
Type: eventType, |
||||
}) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Count returns number of different entries under the key (generally being path prefix).
|
||||
func (s *Storage) Count(key string) (int64, error) { |
||||
return 0, nil |
||||
} |
||||
|
||||
// RequestWatchProgress requests the a watch stream progress status be sent in the
|
||||
// watch response stream as soon as possible.
|
||||
// Used for monitor watch progress even if watching resources with no changes.
|
||||
//
|
||||
// If watch is lagging, progress status might:
|
||||
// * be pointing to stale resource version. Use etcd KV request to get linearizable resource version.
|
||||
// * not be delivered at all. It's recommended to poll request progress periodically.
|
||||
//
|
||||
// Note: Only watches with matching context grpc metadata will be notified.
|
||||
// https://github.com/kubernetes/kubernetes/blob/9325a57125e8502941d1b0c7379c4bb80a678d5c/vendor/go.etcd.io/etcd/client/v3/watch.go#L1037-L1042
|
||||
//
|
||||
// TODO: Remove when storage.Interface will be separate from etc3.store.
|
||||
// Deprecated: Added temporarily to simplify exposing RequestProgress for watch cache.
|
||||
func (s *Storage) RequestWatchProgress(ctx context.Context) error { |
||||
return nil |
||||
} |
||||
|
||||
// validateMinimumResourceVersion returns a 'too large resource' version error when the provided minimumResourceVersion is
|
||||
// greater than the most recent actualRevision available from storage.
|
||||
func (s *Storage) validateMinimumResourceVersion(minimumResourceVersion string, actualRevision uint64) error { |
||||
if minimumResourceVersion == "" { |
||||
return nil |
||||
} |
||||
minimumRV, err := s.Versioner().ParseResourceVersion(minimumResourceVersion) |
||||
if err != nil { |
||||
return apierrors.NewBadRequest(fmt.Sprintf("invalid resource version: %v", err)) |
||||
} |
||||
// Enforce the storage.Interface guarantee that the resource version of the returned data
|
||||
// "will be at least 'resourceVersion'".
|
||||
if minimumRV > actualRevision { |
||||
return storage.NewTooLargeResourceVersionError(minimumRV, actualRevision, 0) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (s *Storage) nameFromKey(key string) string { |
||||
return strings.Replace(key, s.root+"/", "", 1) |
||||
} |
@ -0,0 +1,61 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package file |
||||
|
||||
import ( |
||||
"path" |
||||
"time" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/registry/generic" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" |
||||
) |
||||
|
||||
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) |
||||
|
||||
type RESTOptionsGetter struct { |
||||
path string |
||||
original storagebackend.Config |
||||
} |
||||
|
||||
func NewRESTOptionsGetter(path string, originalStorageConfig storagebackend.Config) *RESTOptionsGetter { |
||||
if path == "" { |
||||
path = "/tmp/grafana-apiserver" |
||||
} |
||||
|
||||
return &RESTOptionsGetter{path: path, original: originalStorageConfig} |
||||
} |
||||
|
||||
func (r *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { |
||||
storageConfig := &storagebackend.ConfigForResource{ |
||||
Config: storagebackend.Config{ |
||||
Type: "file", |
||||
Prefix: r.path, |
||||
Transport: storagebackend.TransportConfig{}, |
||||
Paging: false, |
||||
Codec: r.original.Codec, |
||||
EncodeVersioner: r.original.EncodeVersioner, |
||||
Transformer: r.original.Transformer, |
||||
CompactionInterval: 0, |
||||
CountMetricPollPeriod: 0, |
||||
DBMetricPollInterval: 0, |
||||
HealthcheckTimeout: 0, |
||||
ReadycheckTimeout: 0, |
||||
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), |
||||
}, |
||||
GroupResource: resource, |
||||
} |
||||
|
||||
ret := generic.RESTOptions{ |
||||
StorageConfig: storageConfig, |
||||
Decorator: NewStorage, |
||||
DeleteCollectionWorkers: 0, |
||||
EnableGarbageCollection: false, |
||||
ResourcePrefix: path.Join(storageConfig.Prefix, resource.Group, resource.Resource), |
||||
CountMetricPollPeriod: 1 * time.Second, |
||||
StorageObjectCountTracker: storageConfig.Config.StorageObjectCountTracker, |
||||
} |
||||
|
||||
return ret, nil |
||||
} |
@ -0,0 +1,91 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package file |
||||
|
||||
import ( |
||||
"bytes" |
||||
"os" |
||||
"path/filepath" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
) |
||||
|
||||
func (s *Storage) filePath(key string) string { |
||||
return key + ".json" |
||||
} |
||||
|
||||
func writeFile(codec runtime.Codec, path string, obj runtime.Object) error { |
||||
buf := new(bytes.Buffer) |
||||
if err := codec.Encode(obj, buf); err != nil { |
||||
return err |
||||
} |
||||
return os.WriteFile(path, buf.Bytes(), 0600) |
||||
} |
||||
|
||||
func readFile(codec runtime.Codec, path string, newFunc func() runtime.Object) (runtime.Object, error) { |
||||
content, err := os.ReadFile(filepath.Clean(path)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
newObj := newFunc() |
||||
decodedObj, _, err := codec.Decode(content, nil, newObj) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return decodedObj, nil |
||||
} |
||||
|
||||
func readDirRecursive(codec runtime.Codec, path string, newFunc func() runtime.Object) ([]runtime.Object, error) { |
||||
var objs []runtime.Object |
||||
err := filepath.Walk(path, func(path string, info os.FileInfo, err error) error { |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if info.IsDir() || filepath.Ext(path) != ".json" { |
||||
return nil |
||||
} |
||||
obj, err := readFile(codec, path, newFunc) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
objs = append(objs, obj) |
||||
return nil |
||||
}) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return objs, nil |
||||
} |
||||
|
||||
func deleteFile(path string) error { |
||||
return os.Remove(path) |
||||
} |
||||
|
||||
func exists(filepath string) bool { |
||||
_, err := os.Stat(filepath) |
||||
return err == nil |
||||
} |
||||
|
||||
func ensureDir(dirname string) error { |
||||
if !exists(dirname) { |
||||
return os.MkdirAll(dirname, 0700) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func isUnchanged(codec runtime.Codec, obj runtime.Object, newObj runtime.Object) (bool, error) { |
||||
buf := new(bytes.Buffer) |
||||
if err := codec.Encode(obj, buf); err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
newBuf := new(bytes.Buffer) |
||||
if err := codec.Encode(newObj, newBuf); err != nil { |
||||
return false, err |
||||
} |
||||
|
||||
return bytes.Equal(buf.Bytes(), newBuf.Bytes()), nil |
||||
} |
@ -0,0 +1,103 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/tilt-dev/tilt-apiserver/blob/main/pkg/storage/filepath/watchset.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package file |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
) |
||||
|
||||
// Keeps track of which watches need to be notified
|
||||
type WatchSet struct { |
||||
mu sync.RWMutex |
||||
nodes map[int]*watchNode |
||||
counter int |
||||
} |
||||
|
||||
func NewWatchSet() *WatchSet { |
||||
return &WatchSet{ |
||||
nodes: make(map[int]*watchNode, 20), |
||||
counter: 0, |
||||
} |
||||
} |
||||
|
||||
// Creates a new watch with a unique id, but
|
||||
// does not start sending events to it until start() is called.
|
||||
func (s *WatchSet) newWatch() *watchNode { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
s.counter++ |
||||
|
||||
return &watchNode{ |
||||
id: s.counter, |
||||
s: s, |
||||
updateCh: make(chan watch.Event), |
||||
outCh: make(chan watch.Event), |
||||
} |
||||
} |
||||
|
||||
func (s *WatchSet) cleanupWatchers() { |
||||
// Doesn't protect the below access on nodes slice since it won't ever be modified during cleanup
|
||||
for _, w := range s.nodes { |
||||
w.Stop() |
||||
} |
||||
} |
||||
|
||||
func (s *WatchSet) notifyWatchers(ev watch.Event) { |
||||
s.mu.RLock() |
||||
for _, w := range s.nodes { |
||||
w.updateCh <- ev |
||||
} |
||||
s.mu.RUnlock() |
||||
} |
||||
|
||||
type watchNode struct { |
||||
s *WatchSet |
||||
id int |
||||
updateCh chan watch.Event |
||||
outCh chan watch.Event |
||||
} |
||||
|
||||
// Start sending events to this watch.
|
||||
func (w *watchNode) Start(p storage.SelectionPredicate, initEvents []watch.Event) { |
||||
w.s.mu.Lock() |
||||
w.s.nodes[w.id] = w |
||||
w.s.mu.Unlock() |
||||
|
||||
go func() { |
||||
for _, e := range initEvents { |
||||
w.outCh <- e |
||||
} |
||||
|
||||
for e := range w.updateCh { |
||||
ok, err := p.Matches(e.Object) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
|
||||
if !ok { |
||||
continue |
||||
} |
||||
w.outCh <- e |
||||
} |
||||
close(w.outCh) |
||||
}() |
||||
} |
||||
|
||||
func (w *watchNode) Stop() { |
||||
w.s.mu.Lock() |
||||
delete(w.s.nodes, w.id) |
||||
w.s.mu.Unlock() |
||||
|
||||
close(w.updateCh) |
||||
} |
||||
|
||||
func (w *watchNode) ResultChan() <-chan watch.Event { |
||||
return w.outCh |
||||
} |
Loading…
Reference in new issue