From 0db0e132487620d65bb8ff4fd912aeacf323542a Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Tue, 2 Jul 2024 14:37:28 -0700 Subject: [PATCH] make list preparelist --- pkg/registry/apis/dashboard/legacy/storage.go | 2 +- pkg/registry/apis/dashboard/legacy/types.go | 2 +- pkg/registry/apis/dashboard/legacy_storage.go | 6 +- .../unified/entitybridge/entitybridge.go | 6 +- pkg/storage/unified/resource/cdk_appender.go | 298 ------------------ pkg/storage/unified/resource/resource.proto | 8 + .../unified/resource/resource_grpc.pb.go | 12 + pkg/storage/unified/resource/server.go | 45 +-- pkg/storage/unified/resource/server_test.go | 4 +- pkg/storage/unified/sqlnext/sql_resources.go | 4 +- 10 files changed, 57 insertions(+), 330 deletions(-) delete mode 100644 pkg/storage/unified/resource/cdk_appender.go diff --git a/pkg/registry/apis/dashboard/legacy/storage.go b/pkg/registry/apis/dashboard/legacy/storage.go index fee75814046..4e99b6c0835 100644 --- a/pkg/registry/apis/dashboard/legacy/storage.go +++ b/pkg/registry/apis/dashboard/legacy/storage.go @@ -145,7 +145,7 @@ func (a *dashboardSqlAccess) Read(ctx context.Context, req *resource.ReadRequest } // List implements AppendingStore. -func (a *dashboardSqlAccess) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { +func (a *dashboardSqlAccess) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { opts := req.Options info, err := request.ParseNamespace(opts.Key.Namespace) if err == nil { diff --git a/pkg/registry/apis/dashboard/legacy/types.go b/pkg/registry/apis/dashboard/legacy/types.go index 84c8eb96c0a..1bf48e137bb 100644 --- a/pkg/registry/apis/dashboard/legacy/types.go +++ b/pkg/registry/apis/dashboard/legacy/types.go @@ -24,7 +24,7 @@ type DashboardQuery struct { } type DashboardAccess interface { - resource.AppendingStore + resource.StorageBackend resource.BlobStore resource.ResourceIndexServer diff --git a/pkg/registry/apis/dashboard/legacy_storage.go b/pkg/registry/apis/dashboard/legacy_storage.go index c546934ad71..b41c995386f 100644 --- a/pkg/registry/apis/dashboard/legacy_storage.go +++ b/pkg/registry/apis/dashboard/legacy_storage.go @@ -24,9 +24,9 @@ type dashboardStorage struct { func (s *dashboardStorage) newStore(scheme *runtime.Scheme, defaultOptsGetter generic.RESTOptionsGetter) (grafanarest.LegacyStorage, error) { server, err := resource.NewResourceServer(resource.ResourceServerOptions{ - Store: s.access, - Search: s.access, - Blob: s.access, + Backend: s.access, + Search: s.access, + Blob: s.access, // WriteAccess: resource.WriteAccessHooks{ // Folder: func(ctx context.Context, user identity.Requester, uid string) bool { // // ??? diff --git a/pkg/storage/unified/entitybridge/entitybridge.go b/pkg/storage/unified/entitybridge/entitybridge.go index 16ccc24e216..51f75279f40 100644 --- a/pkg/storage/unified/entitybridge/entitybridge.go +++ b/pkg/storage/unified/entitybridge/entitybridge.go @@ -72,7 +72,7 @@ func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.Feat server: server, client: client, } - opts.Store = bridge + opts.Backend = bridge opts.Diagnostics = bridge opts.Lifecycle = bridge } else { @@ -88,7 +88,7 @@ func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.Feat if err != nil { return nil, err } - opts.Store, err = resource.NewCDKAppendingStore(context.Background(), resource.CDKAppenderOptions{ + opts.Backend, err = resource.NewCDKBackend(context.Background(), resource.CDKBackendOptions{ Tracer: tracer, Bucket: bucket, }) @@ -262,7 +262,7 @@ func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*re } // List implements ResourceServer. -func (b *entityBridge) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { +func (b *entityBridge) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { key := req.Options.Key query := &entity.EntityListRequest{ NextPageToken: req.NextPageToken, diff --git a/pkg/storage/unified/resource/cdk_appender.go b/pkg/storage/unified/resource/cdk_appender.go deleted file mode 100644 index 214feea7502..00000000000 --- a/pkg/storage/unified/resource/cdk_appender.go +++ /dev/null @@ -1,298 +0,0 @@ -package resource - -import ( - "bytes" - context "context" - "fmt" - "io" - "sort" - "strconv" - "strings" - "sync" - "time" - - "go.opentelemetry.io/otel/trace" - "go.opentelemetry.io/otel/trace/noop" - "gocloud.dev/blob" - _ "gocloud.dev/blob/fileblob" - _ "gocloud.dev/blob/memblob" - apierrors "k8s.io/apimachinery/pkg/api/errors" - "k8s.io/apimachinery/pkg/apis/meta/v1/unstructured" - "k8s.io/apimachinery/pkg/runtime/schema" -) - -type CDKAppenderOptions struct { - Tracer trace.Tracer - Bucket *blob.Bucket - RootFolder string - - NextResourceVersion NextResourceVersion -} - -func NewCDKAppendingStore(ctx context.Context, opts CDKAppenderOptions) (AppendingStore, error) { - if opts.Tracer == nil { - opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store") - } - - if opts.Bucket == nil { - return nil, fmt.Errorf("missing bucket") - } - - found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{ - Prefix: opts.RootFolder, - Delimiter: "/", - }) - if err != nil { - return nil, err - } - if found == nil { - return nil, fmt.Errorf("the root folder does not exist") - } - - // This is not safe when running in HA! - if opts.NextResourceVersion == nil { - opts.NextResourceVersion = newResourceVersionCounter(time.Now().UnixMilli()) - } - - return &cdkAppender{ - tracer: opts.Tracer, - bucket: opts.Bucket, - root: opts.RootFolder, - nextRV: opts.NextResourceVersion, - }, nil -} - -type cdkAppender struct { - tracer trace.Tracer - bucket *blob.Bucket - root string - nextRV NextResourceVersion - mutex sync.Mutex - - // Typically one... the server wrapper - subscribers []chan *WrittenEvent -} - -func (s *cdkAppender) getPath(key *ResourceKey, rv int64) string { - var buffer bytes.Buffer - buffer.WriteString(s.root) - - if key.Group == "" { - return buffer.String() - } - buffer.WriteString(key.Group) - - if key.Resource == "" { - return buffer.String() - } - buffer.WriteString("/") - buffer.WriteString(key.Resource) - - if key.Namespace == "" { - if key.Name == "" { - return buffer.String() - } - buffer.WriteString("/__cluster__") - } else { - buffer.WriteString("/") - buffer.WriteString(key.Namespace) - } - - if key.Name == "" { - return buffer.String() - } - buffer.WriteString("/") - buffer.WriteString(key.Name) - - if rv > 0 { - buffer.WriteString(fmt.Sprintf("/%d.json", rv)) - } - return buffer.String() -} - -func (s *cdkAppender) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) { - // Scope the lock - { - s.mutex.Lock() - defer s.mutex.Unlock() - - rv = s.nextRV() - err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{ - ContentType: "application/json", - }) - } - - // Async notify all subscribers - if s.subscribers != nil { - go func() { - write := &WrittenEvent{ - WriteEvent: event, - - Timestamp: time.Now().UnixMilli(), - ResourceVersion: rv, - } - for _, sub := range s.subscribers { - sub <- write - } - }() - } - - return rv, err -} - -// Read implements ResourceStoreServer. -func (s *cdkAppender) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { - rv := req.ResourceVersion - - path := s.getPath(req.Key, req.ResourceVersion) - if rv < 1 { - iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"}) - for { - obj, err := iter.Next(ctx) - if err == io.EOF { - break - } - if strings.HasSuffix(obj.Key, ".json") { - idx := strings.LastIndex(obj.Key, "/") + 1 - edx := strings.LastIndex(obj.Key, ".") - if idx > 0 { - v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) - if err == nil && v > rv { - rv = v - path = obj.Key // find the path with biggest resource version - } - } - } - } - } - - raw, err := s.bucket.ReadAll(ctx, path) - if err == nil && bytes.Contains(raw, []byte(`"DeletedMarker"`)) { - tmp := &unstructured.Unstructured{} - err = tmp.UnmarshalJSON(raw) - if err == nil && tmp.GetKind() == "DeletedMarker" { - return nil, apierrors.NewNotFound(schema.GroupResource{ - Group: req.Key.Group, - Resource: req.Key.Resource, - }, req.Key.Name) - } - } - - return &ReadResponse{ - ResourceVersion: rv, - Value: raw, - }, err -} - -// List implements AppendingStore. -func (s *cdkAppender) List(ctx context.Context, req *ListRequest) (*ListResponse, error) { - resources, err := buildTree(ctx, s, req.Options.Key) - if err != nil { - return nil, err - } - - rsp := &ListResponse{} - for _, item := range resources { - latest := item.versions[0] - raw, err := s.bucket.ReadAll(ctx, latest.key) - if err != nil { - return nil, err - } - rsp.Items = append(rsp.Items, &ResourceWrapper{ - ResourceVersion: latest.rv, - Value: raw, - }) - } - return rsp, nil -} - -// Watch implements AppendingStore. -func (s *cdkAppender) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { - stream := make(chan *WrittenEvent, 10) - { - s.mutex.Lock() - defer s.mutex.Unlock() - - // Add the event stream - s.subscribers = append(s.subscribers, stream) - } - - // Wait for context done - go func() { - // Wait till the context is done - <-ctx.Done() - - // Then remove the subscription - s.mutex.Lock() - defer s.mutex.Unlock() - - // Copy all streams without our listener - subs := []chan *WrittenEvent{} - for _, sub := range s.subscribers { - if sub != stream { - subs = append(subs, sub) - } - } - s.subscribers = subs - }() - return stream, nil -} - -// group > resource > namespace > name > versions -type cdkResource struct { - prefix string - versions []cdkVersion -} -type cdkVersion struct { - rv int64 - key string -} - -func buildTree(ctx context.Context, s *cdkAppender, key *ResourceKey) ([]cdkResource, error) { - byPrefix := make(map[string]*cdkResource) - - path := s.getPath(key, 0) - iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive - for { - obj, err := iter.Next(ctx) - if err == io.EOF { - break - } - if strings.HasSuffix(obj.Key, ".json") { - idx := strings.LastIndex(obj.Key, "/") + 1 - edx := strings.LastIndex(obj.Key, ".") - if idx > 0 { - rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64) - if err == nil { - prefix := obj.Key[:idx] - res, ok := byPrefix[prefix] - if !ok { - res = &cdkResource{prefix: prefix} - byPrefix[prefix] = res - } - - res.versions = append(res.versions, cdkVersion{ - rv: rv, - key: obj.Key, - }) - } - } - } - } - - // Now sort all versions - resources := make([]cdkResource, 0, len(byPrefix)) - for _, res := range byPrefix { - sort.Slice(res.versions, func(i, j int) bool { - return res.versions[i].rv > res.versions[j].rv - }) - resources = append(resources, *res) - } - sort.Slice(resources, func(i, j int) bool { - a := resources[i].versions[0].rv - b := resources[j].versions[0].rv - return a > b - }) - - return resources, nil -} diff --git a/pkg/storage/unified/resource/resource.proto b/pkg/storage/unified/resource/resource.proto index b8bfe391813..6d4b408fec0 100644 --- a/pkg/storage/unified/resource/resource.proto +++ b/pkg/storage/unified/resource/resource.proto @@ -444,7 +444,15 @@ service ResourceStore { rpc Create(CreateRequest) returns (CreateResponse); rpc Update(UpdateRequest) returns (UpdateResponse); rpc Delete(DeleteRequest) returns (DeleteResponse); + + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering rpc List(ListRequest) returns (ListResponse); + + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering rpc Watch(WatchRequest) returns (stream WatchEvent); } diff --git a/pkg/storage/unified/resource/resource_grpc.pb.go b/pkg/storage/unified/resource/resource_grpc.pb.go index 5428c73847b..ff2f9abde25 100644 --- a/pkg/storage/unified/resource/resource_grpc.pb.go +++ b/pkg/storage/unified/resource/resource_grpc.pb.go @@ -40,7 +40,13 @@ type ResourceStoreClient interface { Create(ctx context.Context, in *CreateRequest, opts ...grpc.CallOption) (*CreateResponse, error) Update(ctx context.Context, in *UpdateRequest, opts ...grpc.CallOption) (*UpdateResponse, error) Delete(ctx context.Context, in *DeleteRequest, opts ...grpc.CallOption) (*DeleteResponse, error) + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering List(ctx context.Context, in *ListRequest, opts ...grpc.CallOption) (*ListResponse, error) + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering Watch(ctx context.Context, in *WatchRequest, opts ...grpc.CallOption) (ResourceStore_WatchClient, error) } @@ -148,7 +154,13 @@ type ResourceStoreServer interface { Create(context.Context, *CreateRequest) (*CreateResponse, error) Update(context.Context, *UpdateRequest) (*UpdateResponse, error) Delete(context.Context, *DeleteRequest) (*DeleteResponse, error) + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering List(context.Context, *ListRequest) (*ListResponse, error) + // The results *may* include values that should not be returned to the user + // This will perform best-effort filtering to increase performace. + // NOTE: storage.Interface is ultimatly responsible for the final filtering Watch(*WatchRequest, ResourceStore_WatchServer) error } diff --git a/pkg/storage/unified/resource/server.go b/pkg/storage/unified/resource/server.go index ef9cc1ef892..4080718832d 100644 --- a/pkg/storage/unified/resource/server.go +++ b/pkg/storage/unified/resource/server.go @@ -40,19 +40,24 @@ type ResourceServer interface { LifecycleHooks } -// This store is not exposed directly to users, it is a helper to implement writing -// resources as a stream of WriteEvents -type AppendingStore interface { +// The StorageBackend is an internal abstraction that supports interacting with +// the underlying raw storage medium. This interface is never exposed directly, +// it is provided by concrete instances that actually write values. +type StorageBackend interface { // Write a Create/Update/Delete, // NOTE: the contents of WriteEvent have been validated // Return the revisionVersion for this event or error WriteEvent(context.Context, WriteEvent) (int64, error) - // Read a value from storage + // Read a value from storage optionally at an explicit version Read(context.Context, *ReadRequest) (*ReadResponse, error) - // Implement List -- this expects the read after write semantics - List(context.Context, *ListRequest) (*ListResponse, error) + // When the ResourceServer executes a List request, it will first + // query the backend for potential results. All results will be + // checked against the kubernetes requirements before finally returning + // results. The list options can be used to improve performance + // but are the the final answer. + PrepareList(context.Context, *ListRequest) (*ListResponse, error) // Get all events from the store // For HA setups, this will be more events than the local WriteEvent above! @@ -81,7 +86,7 @@ type ResourceServerOptions struct { Tracer trace.Tracer // Real storage backend - Store AppendingStore + Backend StorageBackend // The blob storage engine Blob BlobStore @@ -108,8 +113,8 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { opts.Tracer = noop.NewTracerProvider().Tracer("resource-server") } - if opts.Store == nil { - return nil, fmt.Errorf("missing AppendingStore implementation") + if opts.Backend == nil { + return nil, fmt.Errorf("missing Backend implementation") } if opts.Search == nil { opts.Search = &noopService{} @@ -137,7 +142,7 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) { return &server{ tracer: opts.Tracer, log: slog.Default().With("logger", "resource-server"), - store: opts.Store, + backend: opts.Backend, search: opts.Search, diagnostics: opts.Diagnostics, access: opts.WriteAccess, @@ -153,7 +158,7 @@ var _ ResourceServer = &server{} type server struct { tracer trace.Tracer log *slog.Logger - store AppendingStore + backend StorageBackend search ResourceIndexServer blob BlobStore diagnostics DiagnosticsServer @@ -323,7 +328,7 @@ func (s *server) Create(ctx context.Context, req *CreateRequest) (*CreateRespons return rsp, err } - rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) if err == nil { rsp.Value = event.Value // with mutated fields } else { @@ -370,7 +375,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons return rsp, nil } - latest, err := s.store.Read(ctx, &ReadRequest{ + latest, err := s.backend.Read(ctx, &ReadRequest{ Key: req.Key, }) if err != nil { @@ -399,7 +404,7 @@ func (s *server) Update(ctx context.Context, req *UpdateRequest) (*UpdateRespons event.Type = WatchEvent_MODIFIED event.PreviousRV = latest.ResourceVersion - rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) rsp.Status, err = errToStatus(err) if err == nil { rsp.Value = event.Value // with mutated fields @@ -422,7 +427,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons return nil, apierrors.NewBadRequest("update must include the previous version") } - latest, err := s.store.Read(ctx, &ReadRequest{ + latest, err := s.backend.Read(ctx, &ReadRequest{ Key: req.Key, }) if err != nil { @@ -468,7 +473,7 @@ func (s *server) Delete(ctx context.Context, req *DeleteRequest) (*DeleteRespons fmt.Sprintf("unable creating deletion marker, %v", err)) } - rsp.ResourceVersion, err = s.store.WriteEvent(ctx, event) + rsp.ResourceVersion, err = s.backend.WriteEvent(ctx, event) rsp.Status, err = errToStatus(err) return rsp, err } @@ -487,7 +492,7 @@ func (s *server) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, err return &ReadResponse{Status: status}, nil } - rsp, err := s.store.Read(ctx, req) + rsp, err := s.backend.Read(ctx, req) if err != nil { if rsp == nil { rsp = &ReadResponse{} @@ -502,7 +507,7 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err return nil, err } - rsp, err := s.store.List(ctx, req) + rsp, err := s.backend.PrepareList(ctx, req) // Status??? return rsp, err } @@ -510,7 +515,7 @@ func (s *server) List(ctx context.Context, req *ListRequest) (*ListResponse, err func (s *server) initWatcher() error { var err error s.broadcaster, err = NewBroadcaster(s.ctx, func(out chan<- *WrittenEvent) error { - events, err := s.store.WatchWriteEvents(s.ctx) + events, err := s.backend.WatchWriteEvents(s.ctx) if err != nil { return err } @@ -595,7 +600,7 @@ func (s *server) PutBlob(ctx context.Context, req *PutBlobRequest) (*PutBlobResp } func (s *server) getPartialObject(ctx context.Context, key *ResourceKey, rv int64) (utils.GrafanaMetaAccessor, *StatusResult) { - rsp, err := s.store.Read(ctx, &ReadRequest{ + rsp, err := s.backend.Read(ctx, &ReadRequest{ Key: key, ResourceVersion: rv, }) diff --git a/pkg/storage/unified/resource/server_test.go b/pkg/storage/unified/resource/server_test.go index 5b69495a145..9125bca00dc 100644 --- a/pkg/storage/unified/resource/server_test.go +++ b/pkg/storage/unified/resource/server_test.go @@ -41,13 +41,13 @@ func TestSimpleServer(t *testing.T) { fmt.Printf("ROOT: %s\n\n", tmp) } - store, err := NewCDKAppendingStore(ctx, CDKAppenderOptions{ + store, err := NewCDKBackend(ctx, CDKBackendOptions{ Bucket: bucket, }) require.NoError(t, err) server, err := NewResourceServer(ResourceServerOptions{ - Store: store, + Backend: store, }) require.NoError(t, err) diff --git a/pkg/storage/unified/sqlnext/sql_resources.go b/pkg/storage/unified/sqlnext/sql_resources.go index 1a35fb4bfc0..6f5cda1e7e3 100644 --- a/pkg/storage/unified/sqlnext/sql_resources.go +++ b/pkg/storage/unified/sqlnext/sql_resources.go @@ -43,7 +43,7 @@ func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (r return resource.NewResourceServer(resource.ResourceServerOptions{ Tracer: tracer, - Store: store, + Backend: store, Diagnostics: store, Lifecycle: store, }) @@ -152,7 +152,7 @@ func (s *sqlResourceStore) Read(ctx context.Context, req *resource.ReadRequest) return nil, ErrNotImplementedYet } -func (s *sqlResourceStore) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { +func (s *sqlResourceStore) PrepareList(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) { _, span := s.tracer.Start(ctx, "storage_server.List") defer span.End()