From c2c94496297bc7ff3db9e0dcb943610d90cf2565 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Fri, 21 Jun 2024 09:09:01 +0300 Subject: [PATCH] working --- pkg/apiserver/endpoints/filters/requester.go | 1 + .../unified/entitybridge/entitybridge.go | 133 +++++++++++------- .../unified/resource/grpc/authenticator.go | 2 + 3 files changed, 88 insertions(+), 48 deletions(-) diff --git a/pkg/apiserver/endpoints/filters/requester.go b/pkg/apiserver/endpoints/filters/requester.go index f148c6d252b..d091246d024 100644 --- a/pkg/apiserver/endpoints/filters/requester.go +++ b/pkg/apiserver/endpoints/filters/requester.go @@ -33,6 +33,7 @@ func WithRequester(handler http.Handler) http.Handler { slices.Contains(info.GetGroups(), user.SystemPrivilegedGroup) { orgId := int64(1) requester = &identity.StaticRequester{ + Namespace: identity.NamespaceServiceAccount, // system:apiserver UserID: 1, OrgID: orgId, Name: info.GetName(), diff --git a/pkg/storage/unified/entitybridge/entitybridge.go b/pkg/storage/unified/entitybridge/entitybridge.go index 9b4b8a7504c..ec2f41230a8 100644 --- a/pkg/storage/unified/entitybridge/entitybridge.go +++ b/pkg/storage/unified/entitybridge/entitybridge.go @@ -4,6 +4,8 @@ import ( "context" "fmt" "os" + "path/filepath" + "time" "gocloud.dev/blob/fileblob" @@ -20,67 +22,100 @@ import ( // Creates a ResourceServer using the existing entity tables // NOTE: most of the field values are ignored func ProvideResourceServer(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, tracer tracing.Tracer) (resource.ResourceServer, error) { - if true { - tmp, err := os.MkdirTemp("", "xxx-*") - if err != nil { + bridge := &entityBridge{} + opts := resource.ResourceServerOptions{ + Tracer: tracer, + NodeID: 0, // From config? defaults to random + Lifecycle: bridge, + } + + supportBlobs := true + useEntitySQL := true + + // Create a local blob filesystem blob store + if supportBlobs { + dir := filepath.Join(cfg.DataPath, "unistore", "blobs") + if err := os.MkdirAll(dir, 0o755); err != nil { return nil, err } - bucket, err := fileblob.OpenBucket(tmp, &fileblob.Options{ + bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{ CreateDir: true, Metadata: fileblob.MetadataDontWrite, // skip }) if err != nil { return nil, err } - - fmt.Printf("ROOT: %s\n\n", tmp) - store, err := resource.NewCDKAppendingStore(context.Background(), resource.CDKAppenderOptions{ - Bucket: bucket, + opts.Blob, err = resource.NewCDKBlobStore(context.Background(), resource.CDKBlobStoreOptions{ + Tracer: tracer, + Bucket: bucket, + URLExpiration: time.Minute * 20, }) if err != nil { return nil, err } - - return resource.NewResourceServer(resource.ResourceServerOptions{ - Store: store, - }) } - eDB, err := dbimpl.ProvideEntityDB(db, cfg, features, tracer) - if err != nil { - return nil, err - } + if useEntitySQL { + eDB, err := dbimpl.ProvideEntityDB(db, cfg, features, tracer) + if err != nil { + return nil, err + } - entity, err := sqlstash.ProvideSQLEntityServer(eDB, tracer) - if err != nil { - return nil, err - } + bridge.server, err = sqlstash.ProvideSQLEntityServer(eDB, tracer) + if err != nil { + return nil, err + } + bridge.client = entity.NewEntityStoreClientLocal(bridge.server) + + // Use this bridge as the resource store + opts.Store = bridge + opts.Diagnostics = bridge + } else { + dir := filepath.Join(cfg.DataPath, "unistore", "resource") + if err := os.MkdirAll(dir, 0o755); err != nil { + return nil, err + } - store := &entityBridge{ - entity: entity, + bucket, err := fileblob.OpenBucket(dir, &fileblob.Options{ + CreateDir: true, + Metadata: fileblob.MetadataDontWrite, // skip + }) + if err != nil { + return nil, err + } + opts.Store, err = resource.NewCDKAppendingStore(context.Background(), resource.CDKAppenderOptions{ + Tracer: tracer, + Bucket: bucket, + }) + if err != nil { + return nil, err + } } - return resource.NewResourceServer(resource.ResourceServerOptions{ - Tracer: tracer, - Store: store, - NodeID: 234, // from config? used for snowflake ID - Diagnostics: store, - Lifecycle: store, - }) + return resource.NewResourceServer(opts) } type entityBridge struct { - entity sqlstash.SqlEntityServer + client entity.EntityStoreClient + + // When running directly + // (we need the explicit version so we have access to init+stop) + server sqlstash.SqlEntityServer } // Init implements ResourceServer. func (b *entityBridge) Init() error { - return b.entity.Init() + if b.server != nil { + return b.server.Init() + } + return nil } // Stop implements ResourceServer. func (b *entityBridge) Stop() { - b.entity.Stop() + if b.server != nil { + b.server.Stop() + } } // Convert resource key to the entity key @@ -99,7 +134,7 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent // Delete does not need to create an entity first if event.Type == resource.WatchEvent_DELETED { - rsp, err := b.entity.Delete(ctx, &entity.DeleteEntityRequest{ + rsp, err := b.client.Delete(ctx, &entity.DeleteEntityRequest{ Key: key, PreviousVersion: event.PreviousRV, }) @@ -109,18 +144,20 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent return rsp.Entity.ResourceVersion, err } + gvr := event.Object.GetGroupVersionKind() obj := event.Object msg := &entity.Entity{ - Key: key, - Group: event.Key.Group, - Resource: event.Key.Resource, - Namespace: event.Key.Namespace, - Name: event.Key.Name, - Guid: string(event.Object.GetUID()), - - // Key: fmt.Sprint("%s/%s/%s/%s", ), - Folder: obj.GetFolder(), - Body: event.Value, + Key: key, + Group: event.Key.Group, + Resource: event.Key.Resource, + Namespace: event.Key.Namespace, + Name: event.Key.Name, + Guid: string(event.Object.GetUID()), + GroupVersion: gvr.GroupVersion().String(), + + Folder: obj.GetFolder(), + Body: event.Value, + Message: event.Object.GetMessage(), Labels: obj.GetLabels(), Size: int64(len(event.Value)), @@ -129,7 +166,7 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent switch event.Type { case resource.WatchEvent_ADDED: msg.Action = entity.Entity_CREATED - rsp, err := b.entity.Create(ctx, &entity.CreateEntityRequest{Entity: msg}) + rsp, err := b.client.Create(ctx, &entity.CreateEntityRequest{Entity: msg}) if err != nil { return 0, err } @@ -137,7 +174,7 @@ func (b *entityBridge) WriteEvent(ctx context.Context, event resource.WriteEvent case resource.WatchEvent_MODIFIED: msg.Action = entity.Entity_UPDATED - rsp, err := b.entity.Update(ctx, &entity.UpdateEntityRequest{ + rsp, err := b.client.Update(ctx, &entity.UpdateEntityRequest{ Entity: msg, PreviousVersion: event.PreviousRV, }) @@ -158,7 +195,7 @@ func (b *entityBridge) WatchWriteEvents(ctx context.Context) (<-chan *resource.W // IsHealthy implements ResourceServer. func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { - rsp, err := b.entity.IsHealthy(ctx, &entity.HealthCheckRequest{ + rsp, err := b.client.IsHealthy(ctx, &entity.HealthCheckRequest{ Service: req.Service, // ?? }) if err != nil { @@ -171,7 +208,7 @@ func (b *entityBridge) IsHealthy(ctx context.Context, req *resource.HealthCheckR // Read implements ResourceServer. func (b *entityBridge) Read(ctx context.Context, req *resource.ReadRequest) (*resource.ReadResponse, error) { - v, err := b.entity.Read(ctx, &entity.ReadEntityRequest{ + v, err := b.client.Read(ctx, &entity.ReadEntityRequest{ Key: toEntityKey(req.Key), WithBody: true, }) @@ -202,7 +239,7 @@ func (b *entityBridge) List(ctx context.Context, req *resource.ListRequest) (*re } } - found, err := b.entity.List(ctx, query) + found, err := b.client.List(ctx, query) if err != nil { return nil, err } diff --git a/pkg/storage/unified/resource/grpc/authenticator.go b/pkg/storage/unified/resource/grpc/authenticator.go index f77cc87de92..16a343b6db7 100644 --- a/pkg/storage/unified/resource/grpc/authenticator.go +++ b/pkg/storage/unified/resource/grpc/authenticator.go @@ -41,6 +41,8 @@ func (f *Authenticator) Authenticate(ctx context.Context) (context.Context, erro } user, err := f.DecodeMetadata(ctx, md) if err != nil { + + fmt.Printf("???? %v\n", err) return nil, err } return identity.WithRequester(ctx, user), nil