mirror of https://github.com/grafana/grafana
Storage: Unified Storage based on Entity API (#71977)
* first round of entityapi updates - quote column names and clean up insert/update queries - replace grn with guid - streamline table structure fixes streamline entity history move EntitySummary into proto remove EntitySummary add guid to json fix tests change DB_Uuid to DB_NVarchar fix folder test convert interface to any more cleanup start entity store under grafana-apiserver dskit target CRUD working, kind of rough cut of wiring entity api to kube-apiserver fake grafana user in context add key to entity list working revert unnecessary changes move entity storage files to their own package, clean up use accessor to read/write grafana annotations implement separate Create and Update functions * go mod tidy * switch from Kind to resource * basic grpc storage server * basic support for grpc entity store * don't connect to database unless it's needed, pass user identity over grpc * support getting user from k8s context, fix some mysql issues * assign owner to snowflake dependency * switch from ulid to uuid for guids * cleanup, rename Search to List * remove entityListResult * EntityAPI: remove extra user abstraction (#79033) * remove extra user abstraction * add test stub (but * move grpc context setup into client wrapper, fix lint issue * remove unused constants * remove custom json stuff * basic list filtering, add todo * change target to storage-server, allow entityStore flag in prod mode * fix issue with Update * EntityAPI: make test work, need to resolve expected differences (#79123) * make test work, need to resolve expected differences * remove the fields not supported by legacy * sanitize out the bits legacy does not support * sanitize out the bits legacy does not support --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com> * update feature toggle generated files * remove unused http headers * update feature flag strategy * devmode * update readme * spelling * readme --------- Co-authored-by: Ryan McKinley <ryantxu@gmail.com>pull/79192/head
parent
07915703fe
commit
c4c9bfaf2e
|
@ -0,0 +1,91 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
|
||||
package entity |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"path" |
||||
"time" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/registry/generic" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
||||
flowcontrolrequest "k8s.io/apiserver/pkg/util/flowcontrol/request" |
||||
"k8s.io/client-go/tools/cache" |
||||
|
||||
entityStore "github.com/grafana/grafana/pkg/services/store/entity" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
var _ generic.RESTOptionsGetter = (*RESTOptionsGetter)(nil) |
||||
|
||||
type RESTOptionsGetter struct { |
||||
cfg *setting.Cfg |
||||
store entityStore.EntityStoreServer |
||||
Codec runtime.Codec |
||||
} |
||||
|
||||
func NewRESTOptionsGetter(cfg *setting.Cfg, store entityStore.EntityStoreServer, codec runtime.Codec) *RESTOptionsGetter { |
||||
return &RESTOptionsGetter{ |
||||
cfg: cfg, |
||||
store: store, |
||||
Codec: codec, |
||||
} |
||||
} |
||||
|
||||
func (f *RESTOptionsGetter) GetRESTOptions(resource schema.GroupResource) (generic.RESTOptions, error) { |
||||
// build connection string to uniquely identify the storage backend
|
||||
connectionInfo, err := json.Marshal(f.cfg.SectionWithEnvOverrides("entity_api").KeysHash()) |
||||
if err != nil { |
||||
return generic.RESTOptions{}, err |
||||
} |
||||
|
||||
storageConfig := &storagebackend.ConfigForResource{ |
||||
Config: storagebackend.Config{ |
||||
Type: "custom", |
||||
Prefix: "", |
||||
Transport: storagebackend.TransportConfig{ |
||||
ServerList: []string{ |
||||
string(connectionInfo), |
||||
}, |
||||
}, |
||||
Paging: false, |
||||
Codec: f.Codec, |
||||
EncodeVersioner: nil, |
||||
Transformer: nil, |
||||
CompactionInterval: 0, |
||||
CountMetricPollPeriod: 0, |
||||
DBMetricPollInterval: 0, |
||||
HealthcheckTimeout: 0, |
||||
ReadycheckTimeout: 0, |
||||
StorageObjectCountTracker: nil, |
||||
}, |
||||
GroupResource: resource, |
||||
} |
||||
|
||||
ret := generic.RESTOptions{ |
||||
StorageConfig: storageConfig, |
||||
Decorator: func( |
||||
config *storagebackend.ConfigForResource, |
||||
resourcePrefix string, |
||||
keyFunc func(obj runtime.Object) (string, error), |
||||
newFunc func() runtime.Object, |
||||
newListFunc func() runtime.Object, |
||||
getAttrsFunc storage.AttrFunc, |
||||
trigger storage.IndexerFuncs, |
||||
indexers *cache.Indexers, |
||||
) (storage.Interface, factory.DestroyFunc, error) { |
||||
return NewStorage(config, resource, f.store, f.Codec, keyFunc, newFunc, newListFunc, getAttrsFunc) |
||||
}, |
||||
DeleteCollectionWorkers: 0, |
||||
EnableGarbageCollection: false, |
||||
ResourcePrefix: path.Join(storageConfig.Prefix, resource.Group, resource.Resource), |
||||
CountMetricPollPeriod: 1 * time.Second, |
||||
StorageObjectCountTracker: flowcontrolrequest.NewStorageObjectCountTracker(), |
||||
} |
||||
|
||||
return ret, nil |
||||
} |
||||
@ -0,0 +1,390 @@ |
||||
// SPDX-License-Identifier: AGPL-3.0-only
|
||||
// Provenance-includes-location: https://github.com/kubernetes-sigs/apiserver-runtime/blob/main/pkg/experimental/storage/filepath/jsonfile_rest.go
|
||||
// Provenance-includes-license: Apache-2.0
|
||||
// Provenance-includes-copyright: The Kubernetes Authors.
|
||||
|
||||
package entity |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"reflect" |
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
"k8s.io/apimachinery/pkg/api/meta" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/conversion" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apimachinery/pkg/watch" |
||||
"k8s.io/apiserver/pkg/endpoints/request" |
||||
"k8s.io/apiserver/pkg/storage" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend" |
||||
"k8s.io/apiserver/pkg/storage/storagebackend/factory" |
||||
|
||||
entityStore "github.com/grafana/grafana/pkg/services/store/entity" |
||||
"github.com/grafana/grafana/pkg/util" |
||||
) |
||||
|
||||
var _ storage.Interface = (*Storage)(nil) |
||||
|
||||
const MaxUpdateAttempts = 1 |
||||
|
||||
// Storage implements storage.Interface and storage resources as JSON files on disk.
|
||||
type Storage struct { |
||||
config *storagebackend.ConfigForResource |
||||
store entityStore.EntityStoreServer |
||||
gr schema.GroupResource |
||||
codec runtime.Codec |
||||
keyFunc func(obj runtime.Object) (string, error) |
||||
newFunc func() runtime.Object |
||||
newListFunc func() runtime.Object |
||||
getAttrsFunc storage.AttrFunc |
||||
// trigger storage.IndexerFuncs
|
||||
// indexers *cache.Indexers
|
||||
|
||||
// watchSet *WatchSet
|
||||
} |
||||
|
||||
func NewStorage( |
||||
config *storagebackend.ConfigForResource, |
||||
gr schema.GroupResource, |
||||
store entityStore.EntityStoreServer, |
||||
codec runtime.Codec, |
||||
keyFunc func(obj runtime.Object) (string, error), |
||||
newFunc func() runtime.Object, |
||||
newListFunc func() runtime.Object, |
||||
getAttrsFunc storage.AttrFunc, |
||||
) (storage.Interface, factory.DestroyFunc, error) { |
||||
return &Storage{ |
||||
config: config, |
||||
gr: gr, |
||||
codec: codec, |
||||
store: store, |
||||
keyFunc: keyFunc, |
||||
newFunc: newFunc, |
||||
newListFunc: newListFunc, |
||||
getAttrsFunc: getAttrsFunc, |
||||
}, nil, nil |
||||
} |
||||
|
||||
// Create adds a new object at a key unless it already exists. 'ttl' is time-to-live
|
||||
// in seconds (0 means forever). If no error is returned and out is not nil, out will be
|
||||
// set to the read value from database.
|
||||
func (s *Storage) Create(ctx context.Context, key string, obj runtime.Object, out runtime.Object, ttl uint64) error { |
||||
requestInfo, ok := request.RequestInfoFrom(ctx) |
||||
if !ok { |
||||
return apierrors.NewInternalError(fmt.Errorf("could not get request info")) |
||||
} |
||||
|
||||
if err := s.Versioner().PrepareObjectForStorage(obj); err != nil { |
||||
return err |
||||
} |
||||
|
||||
metaAccessor, err := meta.Accessor(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Replace the default name generation strategy
|
||||
if metaAccessor.GetGenerateName() != "" { |
||||
k, err := ParseKey(key) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
k.Name = util.GenerateShortUID() |
||||
key = k.String() |
||||
|
||||
metaAccessor.SetName(k.Name) |
||||
metaAccessor.SetGenerateName("") |
||||
} |
||||
|
||||
e, err := resourceToEntity(key, obj, requestInfo) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
req := &entityStore.CreateEntityRequest{ |
||||
Entity: e, |
||||
} |
||||
|
||||
rsp, err := s.store.Create(ctx, req) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if rsp.Status != entityStore.CreateEntityResponse_CREATED { |
||||
return fmt.Errorf("this was not a create operation... (%s)", rsp.Status.String()) |
||||
} |
||||
|
||||
err = entityToResource(rsp.Entity, out) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
/* |
||||
s.watchSet.notifyWatchers(watch.Event{ |
||||
Object: out.DeepCopyObject(), |
||||
Type: watch.Added, |
||||
}) |
||||
*/ |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Delete removes the specified key and returns the value that existed at that spot.
|
||||
// If key didn't exist, it will return NotFound storage error.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) Delete(ctx context.Context, key string, out runtime.Object, preconditions *storage.Preconditions, validateDeletion storage.ValidateObjectFunc, cachedExistingObject runtime.Object) error { |
||||
grn, err := keyToGRN(key) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
previousVersion := "" |
||||
if preconditions != nil && preconditions.ResourceVersion != nil { |
||||
previousVersion = *preconditions.ResourceVersion |
||||
} |
||||
|
||||
rsp, err := s.store.Delete(ctx, &entityStore.DeleteEntityRequest{ |
||||
GRN: grn, |
||||
PreviousVersion: previousVersion, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = entityToResource(rsp.Entity, out) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Watch begins watching the specified key. Events are decoded into API objects,
|
||||
// and any items selected by 'p' are sent down to returned watch.Interface.
|
||||
// resourceVersion may be used to specify what version to begin watching,
|
||||
// which should be the current resourceVersion, and no longer rv+1
|
||||
// (e.g. reconnecting without missing any updates).
|
||||
// If resource version is "0", this interface will get current object at given key
|
||||
// and send it in an "ADDED" event, before watch starts.
|
||||
func (s *Storage) Watch(ctx context.Context, key string, opts storage.ListOptions) (watch.Interface, error) { |
||||
return nil, apierrors.NewMethodNotSupported(schema.GroupResource{}, "watch") |
||||
} |
||||
|
||||
// Get unmarshals object found at key into objPtr. On a not found error, will either
|
||||
// return a zero object of the requested type, or an error, depending on 'opts.ignoreNotFound'.
|
||||
// Treats empty responses and nil response nodes exactly like a not found error.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) Get(ctx context.Context, key string, opts storage.GetOptions, objPtr runtime.Object) error { |
||||
rsp, err := s.store.Read(ctx, &entityStore.ReadEntityRequest{ |
||||
Key: key, |
||||
WithMeta: true, |
||||
WithBody: true, |
||||
WithStatus: true, |
||||
WithSummary: true, |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if rsp.GRN == nil { |
||||
if opts.IgnoreNotFound { |
||||
return nil |
||||
} |
||||
|
||||
return apierrors.NewNotFound(s.gr, key) |
||||
} |
||||
|
||||
err = entityToResource(rsp, objPtr) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GetList unmarshalls objects found at key into a *List api object (an object
|
||||
// that satisfies runtime.IsList definition).
|
||||
// If 'opts.Recursive' is false, 'key' is used as an exact match. If `opts.Recursive'
|
||||
// is true, 'key' is used as a prefix.
|
||||
// The returned contents may be delayed, but it is guaranteed that they will
|
||||
// match 'opts.ResourceVersion' according 'opts.ResourceVersionMatch'.
|
||||
func (s *Storage) GetList(ctx context.Context, key string, opts storage.ListOptions, listObj runtime.Object) error { |
||||
listPtr, err := meta.GetItemsPtr(listObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
v, err := conversion.EnforcePtr(listPtr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
rsp, err := s.store.List(ctx, &entityStore.EntityListRequest{ |
||||
Key: []string{key}, |
||||
WithBody: true, |
||||
WithLabels: true, |
||||
WithFields: true, |
||||
NextPageToken: opts.Predicate.Continue, |
||||
Limit: opts.Predicate.Limit, |
||||
// TODO push label/field matching down to storage
|
||||
}) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
for _, r := range rsp.Results { |
||||
res := s.newFunc() |
||||
|
||||
err := entityToResource(r, res) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
// TODO filter in storage
|
||||
matches, err := opts.Predicate.Matches(res) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
if !matches { |
||||
continue |
||||
} |
||||
|
||||
v.Set(reflect.Append(v, reflect.ValueOf(res).Elem())) |
||||
} |
||||
|
||||
listAccessor, err := meta.ListAccessor(listObj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if rsp.NextPageToken != "" { |
||||
listAccessor.SetContinue(rsp.NextPageToken) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// GuaranteedUpdate keeps calling 'tryUpdate()' to update key 'key' (of type 'destination')
|
||||
// retrying the update until success if there is index conflict.
|
||||
// Note that object passed to tryUpdate may change across invocations of tryUpdate() if
|
||||
// other writers are simultaneously updating it, so tryUpdate() needs to take into account
|
||||
// the current contents of the object when deciding how the update object should look.
|
||||
// If the key doesn't exist, it will return NotFound storage error if ignoreNotFound=false
|
||||
// else `destination` will be set to the zero value of it's type.
|
||||
// If the eventual successful invocation of `tryUpdate` returns an output with the same serialized
|
||||
// contents as the input, it won't perform any update, but instead set `destination` to an object with those
|
||||
// contents.
|
||||
// If 'cachedExistingObject' is non-nil, it can be used as a suggestion about the
|
||||
// current version of the object to avoid read operation from storage to get it.
|
||||
// However, the implementations have to retry in case suggestion is stale.
|
||||
func (s *Storage) GuaranteedUpdate( |
||||
ctx context.Context, |
||||
key string, |
||||
destination runtime.Object, |
||||
ignoreNotFound bool, |
||||
preconditions *storage.Preconditions, |
||||
tryUpdate storage.UpdateFunc, |
||||
cachedExistingObject runtime.Object, |
||||
) error { |
||||
var err error |
||||
for attempt := 1; attempt <= MaxUpdateAttempts; attempt = attempt + 1 { |
||||
err = s.guaranteedUpdate(ctx, key, destination, ignoreNotFound, preconditions, tryUpdate, cachedExistingObject) |
||||
if err == nil { |
||||
return nil |
||||
} |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
func (s *Storage) guaranteedUpdate( |
||||
ctx context.Context, |
||||
key string, |
||||
destination runtime.Object, |
||||
ignoreNotFound bool, |
||||
preconditions *storage.Preconditions, |
||||
tryUpdate storage.UpdateFunc, |
||||
cachedExistingObject runtime.Object, |
||||
) error { |
||||
requestInfo, ok := request.RequestInfoFrom(ctx) |
||||
if !ok { |
||||
return apierrors.NewInternalError(fmt.Errorf("could not get request info")) |
||||
} |
||||
|
||||
err := s.Get(ctx, key, storage.GetOptions{}, destination) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
res := &storage.ResponseMeta{} |
||||
updatedObj, _, err := tryUpdate(destination, *res) |
||||
if err != nil { |
||||
var statusErr *apierrors.StatusError |
||||
if errors.As(err, &statusErr) { |
||||
// For now, forbidden may come from a mutation handler
|
||||
if statusErr.ErrStatus.Reason == metav1.StatusReasonForbidden { |
||||
return statusErr |
||||
} |
||||
} |
||||
|
||||
return apierrors.NewInternalError(fmt.Errorf("could not successfully update object. key=%s, err=%s", key, err.Error())) |
||||
} |
||||
|
||||
e, err := resourceToEntity(key, updatedObj, requestInfo) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// e.GRN.ResourceKind = destination.GetObjectKind().GroupVersionKind().Kind
|
||||
|
||||
previousVersion := "" |
||||
if preconditions != nil && preconditions.ResourceVersion != nil { |
||||
previousVersion = *preconditions.ResourceVersion |
||||
} |
||||
|
||||
req := &entityStore.UpdateEntityRequest{ |
||||
Entity: e, |
||||
PreviousVersion: previousVersion, |
||||
} |
||||
|
||||
rsp, err := s.store.Update(ctx, req) |
||||
if err != nil { |
||||
return err // continue???
|
||||
} |
||||
|
||||
if rsp.Status == entityStore.UpdateEntityResponse_UNCHANGED { |
||||
return nil // destination is already set
|
||||
} |
||||
|
||||
err = entityToResource(rsp.Entity, destination) |
||||
if err != nil { |
||||
return apierrors.NewInternalError(err) |
||||
} |
||||
|
||||
/* |
||||
s.watchSet.notifyWatchers(watch.Event{ |
||||
Object: destination.DeepCopyObject(), |
||||
Type: watch.Modified, |
||||
}) |
||||
*/ |
||||
|
||||
return nil |
||||
} |
||||
|
||||
// Count returns number of different entries under the key (generally being path prefix).
|
||||
func (s *Storage) Count(key string) (int64, error) { |
||||
return 0, nil |
||||
} |
||||
|
||||
func (s *Storage) Versioner() storage.Versioner { |
||||
return &storage.APIObjectVersioner{} |
||||
} |
||||
|
||||
func (s *Storage) RequestWatchProgress(ctx context.Context) error { |
||||
return nil |
||||
} |
||||
@ -0,0 +1,263 @@ |
||||
package entity |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"fmt" |
||||
"reflect" |
||||
"strconv" |
||||
"strings" |
||||
"time" |
||||
|
||||
"k8s.io/apimachinery/pkg/api/meta" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/types" |
||||
"k8s.io/apiserver/pkg/endpoints/request" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/grn" |
||||
"github.com/grafana/grafana/pkg/kinds" |
||||
entityStore "github.com/grafana/grafana/pkg/services/store/entity" |
||||
) |
||||
|
||||
type Key struct { |
||||
Group string |
||||
Resource string |
||||
Namespace string |
||||
Name string |
||||
Subresource string |
||||
} |
||||
|
||||
func ParseKey(key string) (*Key, error) { |
||||
// /<group>/<resource>/<namespace>/<name>(/<subresource>)
|
||||
parts := strings.SplitN(key, "/", 6) |
||||
if len(parts) != 5 && len(parts) != 6 { |
||||
return nil, fmt.Errorf("invalid key (expecting 4 or 5 parts) " + key) |
||||
} |
||||
|
||||
if parts[0] != "" { |
||||
return nil, fmt.Errorf("invalid key (expecting leading slash) " + key) |
||||
} |
||||
|
||||
k := &Key{ |
||||
Group: parts[1], |
||||
Resource: parts[2], |
||||
Namespace: parts[3], |
||||
Name: parts[4], |
||||
} |
||||
|
||||
if len(parts) == 6 { |
||||
k.Subresource = parts[5] |
||||
} |
||||
|
||||
return k, nil |
||||
} |
||||
|
||||
func (k *Key) String() string { |
||||
if len(k.Subresource) > 0 { |
||||
return fmt.Sprintf("/%s/%s/%s/%s/%s", k.Group, k.Resource, k.Namespace, k.Name, k.Subresource) |
||||
} |
||||
return fmt.Sprintf("/%s/%s/%s/%s", k.Group, k.Resource, k.Namespace, k.Name) |
||||
} |
||||
|
||||
func (k *Key) IsEqual(other *Key) bool { |
||||
return k.Group == other.Group && |
||||
k.Resource == other.Resource && |
||||
k.Namespace == other.Namespace && |
||||
k.Name == other.Name && |
||||
k.Subresource == other.Subresource |
||||
} |
||||
|
||||
func (k *Key) TenantID() (int64, error) { |
||||
if k.Namespace == "default" { |
||||
return 1, nil |
||||
} |
||||
tid := strings.Split(k.Namespace, "-") |
||||
if len(tid) != 2 || !(tid[0] == "org" || tid[0] == "tenant") { |
||||
return 0, fmt.Errorf("invalid namespace, expected org|tenant-${#}") |
||||
} |
||||
intVar, err := strconv.ParseInt(tid[1], 10, 64) |
||||
if err != nil { |
||||
return 0, fmt.Errorf("invalid namespace, expected number") |
||||
} |
||||
return intVar, nil |
||||
} |
||||
|
||||
func (k *Key) ToGRN() (*grn.GRN, error) { |
||||
tid, err := k.TenantID() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
fullResource := k.Resource |
||||
if k.Subresource != "" { |
||||
fullResource = fmt.Sprintf("%s/%s", k.Resource, k.Subresource) |
||||
} |
||||
|
||||
return &grn.GRN{ |
||||
ResourceGroup: k.Group, |
||||
ResourceKind: fullResource, |
||||
ResourceIdentifier: k.Name, |
||||
TenantID: tid, |
||||
}, nil |
||||
} |
||||
|
||||
// Convert an etcd key to GRN style
|
||||
func keyToGRN(key string) (*grn.GRN, error) { |
||||
k, err := ParseKey(key) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return k.ToGRN() |
||||
} |
||||
|
||||
// this is terrible... but just making it work!!!!
|
||||
func entityToResource(rsp *entityStore.Entity, res runtime.Object) error { |
||||
var err error |
||||
|
||||
metaAccessor, err := meta.Accessor(res) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if rsp.GRN == nil { |
||||
return fmt.Errorf("invalid entity, missing GRN") |
||||
} |
||||
|
||||
if len(rsp.Meta) > 0 { |
||||
err = json.Unmarshal(rsp.Meta, res) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
metaAccessor.SetName(rsp.GRN.ResourceIdentifier) |
||||
if rsp.GRN.TenantID != 1 { |
||||
metaAccessor.SetNamespace(fmt.Sprintf("tenant-%d", rsp.GRN.TenantID)) |
||||
} else { |
||||
metaAccessor.SetNamespace("default") // org 1
|
||||
} |
||||
metaAccessor.SetUID(types.UID(rsp.Guid)) |
||||
metaAccessor.SetResourceVersion(rsp.Version) |
||||
metaAccessor.SetCreationTimestamp(metav1.Unix(rsp.CreatedAt/1000, rsp.CreatedAt%1000*1000000)) |
||||
|
||||
grafanaAccessor := kinds.MetaAccessor(metaAccessor) |
||||
|
||||
if rsp.Folder != "" { |
||||
grafanaAccessor.SetFolder(rsp.Folder) |
||||
} |
||||
if rsp.CreatedBy != "" { |
||||
grafanaAccessor.SetCreatedBy(rsp.CreatedBy) |
||||
} |
||||
if rsp.UpdatedBy != "" { |
||||
grafanaAccessor.SetUpdatedBy(rsp.UpdatedBy) |
||||
} |
||||
if rsp.UpdatedAt != 0 { |
||||
updatedAt := time.UnixMilli(rsp.UpdatedAt).UTC() |
||||
grafanaAccessor.SetUpdatedTimestamp(&updatedAt) |
||||
} |
||||
grafanaAccessor.SetSlug(rsp.Slug) |
||||
|
||||
if rsp.Origin != nil { |
||||
originTime := time.UnixMilli(rsp.Origin.Time).UTC() |
||||
grafanaAccessor.SetOriginInfo(&kinds.ResourceOriginInfo{ |
||||
Name: rsp.Origin.Source, |
||||
Key: rsp.Origin.Key, |
||||
// Path: rsp.Origin.Path,
|
||||
Timestamp: &originTime, |
||||
}) |
||||
} |
||||
|
||||
if len(rsp.Labels) > 0 { |
||||
metaAccessor.SetLabels(rsp.Labels) |
||||
} |
||||
|
||||
// TODO fields?
|
||||
|
||||
if len(rsp.Body) > 0 { |
||||
spec := reflect.ValueOf(res).Elem().FieldByName("Spec") |
||||
if spec != (reflect.Value{}) && spec.CanSet() { |
||||
err = json.Unmarshal(rsp.Body, spec.Addr().Interface()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
if len(rsp.Status) > 0 { |
||||
status := reflect.ValueOf(res).Elem().FieldByName("Status") |
||||
if status != (reflect.Value{}) && status.CanSet() { |
||||
err = json.Unmarshal(rsp.Status, status.Addr().Interface()) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func resourceToEntity(key string, res runtime.Object, requestInfo *request.RequestInfo) (*entityStore.Entity, error) { |
||||
metaAccessor, err := meta.Accessor(res) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
g, err := keyToGRN(key) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
grafanaAccessor := kinds.MetaAccessor(metaAccessor) |
||||
|
||||
rsp := &entityStore.Entity{ |
||||
GRN: g, |
||||
GroupVersion: requestInfo.APIVersion, |
||||
Key: key, |
||||
Name: metaAccessor.GetName(), |
||||
Guid: string(metaAccessor.GetUID()), |
||||
Version: metaAccessor.GetResourceVersion(), |
||||
Folder: grafanaAccessor.GetFolder(), |
||||
CreatedAt: metaAccessor.GetCreationTimestamp().Time.UnixMilli(), |
||||
CreatedBy: grafanaAccessor.GetCreatedBy(), |
||||
UpdatedBy: grafanaAccessor.GetUpdatedBy(), |
||||
Slug: grafanaAccessor.GetSlug(), |
||||
Origin: &entityStore.EntityOriginInfo{ |
||||
Source: grafanaAccessor.GetOriginName(), |
||||
Key: grafanaAccessor.GetOriginKey(), |
||||
// Path: grafanaAccessor.GetOriginPath(),
|
||||
}, |
||||
Labels: metaAccessor.GetLabels(), |
||||
} |
||||
|
||||
if t := grafanaAccessor.GetUpdatedTimestamp(); t != nil { |
||||
rsp.UpdatedAt = t.UnixMilli() |
||||
} |
||||
|
||||
if t := grafanaAccessor.GetOriginTimestamp(); t != nil { |
||||
rsp.Origin.Time = t.UnixMilli() |
||||
} |
||||
|
||||
rsp.Meta, err = json.Marshal(meta.AsPartialObjectMetadata(metaAccessor)) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// TODO: store entire object in body?
|
||||
spec := reflect.ValueOf(res).Elem().FieldByName("Spec") |
||||
if spec != (reflect.Value{}) { |
||||
rsp.Body, err = json.Marshal(spec.Interface()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
status := reflect.ValueOf(res).Elem().FieldByName("Status") |
||||
if status != (reflect.Value{}) { |
||||
rsp.Status, err = json.Marshal(status.Interface()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} |
||||
|
||||
return rsp, nil |
||||
} |
||||
@ -0,0 +1,110 @@ |
||||
package entity |
||||
|
||||
import ( |
||||
context "context" |
||||
"strconv" |
||||
|
||||
grpc "google.golang.org/grpc" |
||||
codes "google.golang.org/grpc/codes" |
||||
"google.golang.org/grpc/metadata" |
||||
status "google.golang.org/grpc/status" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext" |
||||
) |
||||
|
||||
var _ EntityStoreServer = (*entityStoreClientWrapper)(nil) |
||||
|
||||
// wrapper for EntityStoreClient that implements EntityStore interface
|
||||
type entityStoreClientWrapper struct { |
||||
EntityStoreClient |
||||
} |
||||
|
||||
func (c *entityStoreClientWrapper) Read(ctx context.Context, in *ReadEntityRequest) (*Entity, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.Read(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) BatchRead(ctx context.Context, in *BatchReadEntityRequest) (*BatchReadEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.BatchRead(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) Write(ctx context.Context, in *WriteEntityRequest) (*WriteEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.Write(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) Create(ctx context.Context, in *CreateEntityRequest) (*CreateEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.Create(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) Update(ctx context.Context, in *UpdateEntityRequest) (*UpdateEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.Update(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) Delete(ctx context.Context, in *DeleteEntityRequest) (*DeleteEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.Delete(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) History(ctx context.Context, in *EntityHistoryRequest) (*EntityHistoryResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.History(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) List(ctx context.Context, in *EntityListRequest) (*EntityListResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.List(ctx, in) |
||||
} |
||||
func (c *entityStoreClientWrapper) Watch(*EntityWatchRequest, EntityStore_WatchServer) error { |
||||
return status.Errorf(codes.Unimplemented, "method Watch not implemented") |
||||
} |
||||
|
||||
func (c *entityStoreClientWrapper) wrapContext(ctx context.Context) (context.Context, error) { |
||||
user, err := appcontext.User(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
// set grpc metadata into the context to pass to the grpc server
|
||||
ctx = metadata.NewOutgoingContext(ctx, metadata.Pairs( |
||||
"grafana-idtoken", user.IDToken, |
||||
"grafana-userid", strconv.FormatInt(user.UserID, 10), |
||||
"grafana-orgid", strconv.FormatInt(user.OrgID, 10), |
||||
"grafana-login", user.Login, |
||||
)) |
||||
|
||||
return ctx, nil |
||||
} |
||||
|
||||
// TEMPORARY... while we split this into a new service (see below)
|
||||
func (c *entityStoreClientWrapper) AdminWrite(ctx context.Context, in *AdminWriteEntityRequest) (*WriteEntityResponse, error) { |
||||
ctx, err := c.wrapContext(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return c.EntityStoreClient.AdminWrite(ctx, in) |
||||
} |
||||
|
||||
func NewEntityStoreClientWrapper(cc grpc.ClientConnInterface) EntityStoreServer { |
||||
return &entityStoreClientWrapper{&entityStoreClient{cc}} |
||||
} |
||||
@ -0,0 +1,153 @@ |
||||
package db |
||||
|
||||
import ( |
||||
"fmt" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/jmoiron/sqlx" |
||||
"xorm.io/xorm" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/db" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
|
||||
// "github.com/grafana/grafana/pkg/services/sqlstore"
|
||||
"github.com/grafana/grafana/pkg/services/sqlstore/session" |
||||
"github.com/grafana/grafana/pkg/services/store/entity/migrations" |
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"github.com/grafana/grafana/pkg/util" |
||||
) |
||||
|
||||
var _ sqlstash.EntityDB = (*EntityDB)(nil) |
||||
|
||||
func ProvideEntityDB(db db.DB, cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*EntityDB, error) { |
||||
return &EntityDB{ |
||||
db: db, |
||||
cfg: cfg, |
||||
features: features, |
||||
}, nil |
||||
} |
||||
|
||||
type EntityDB struct { |
||||
db db.DB |
||||
features featuremgmt.FeatureToggles |
||||
engine *xorm.Engine |
||||
cfg *setting.Cfg |
||||
} |
||||
|
||||
func (db *EntityDB) Init() error { |
||||
_, err := db.GetEngine() |
||||
return err |
||||
} |
||||
|
||||
func (db *EntityDB) GetEngine() (*xorm.Engine, error) { |
||||
if db.engine != nil { |
||||
return db.engine, nil |
||||
} |
||||
|
||||
var engine *xorm.Engine |
||||
var err error |
||||
|
||||
cfgSection := db.cfg.SectionWithEnvOverrides("entity_api") |
||||
dbType := cfgSection.Key("db_type").MustString("") |
||||
|
||||
// if explicit connection settings are provided, use them
|
||||
if dbType != "" { |
||||
dbHost := cfgSection.Key("db_host").MustString("") |
||||
dbName := cfgSection.Key("db_name").MustString("") |
||||
dbUser := cfgSection.Key("db_user").MustString("") |
||||
dbPass := cfgSection.Key("db_pass").MustString("") |
||||
|
||||
if dbType == "postgres" { |
||||
// TODO: support all postgres connection options
|
||||
dbSslMode := cfgSection.Key("db_sslmode").MustString("disable") |
||||
|
||||
addr, err := util.SplitHostPortDefault(dbHost, "127.0.0.1", "5432") |
||||
if err != nil { |
||||
return nil, fmt.Errorf("invalid host specifier '%s': %w", dbHost, err) |
||||
} |
||||
|
||||
connectionString := fmt.Sprintf( |
||||
"user=%s password=%s host=%s port=%s dbname=%s sslmode=%s", // sslcert=%s sslkey=%s sslrootcert=%s",
|
||||
dbUser, dbPass, addr.Host, addr.Port, dbName, dbSslMode, // ss.dbCfg.ClientCertPath, ss.dbCfg.ClientKeyPath, ss.dbCfg.CaCertPath
|
||||
) |
||||
|
||||
engine, err = xorm.NewEngine("postgres", connectionString) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
_, err = engine.Exec("SET SESSION enable_experimental_alter_column_type_general=true") |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} else if dbType == "mysql" { |
||||
// TODO: support all mysql connection options
|
||||
protocol := "tcp" |
||||
if strings.HasPrefix(dbHost, "/") { |
||||
protocol = "unix" |
||||
} |
||||
|
||||
connectionString := fmt.Sprintf("%s:%s@%s(%s)/%s?collation=utf8mb4_unicode_ci&allowNativePasswords=true&clientFoundRows=true", |
||||
dbUser, dbPass, protocol, dbHost, dbName) |
||||
|
||||
engine, err = xorm.NewEngine("mysql", connectionString) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
engine.SetMaxOpenConns(0) |
||||
engine.SetMaxIdleConns(2) |
||||
engine.SetConnMaxLifetime(time.Second * time.Duration(14400)) |
||||
|
||||
_, err = engine.Exec("SELECT 1") |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
} else { |
||||
// TODO: sqlite support
|
||||
return nil, fmt.Errorf("invalid db type specified: %s", dbType) |
||||
} |
||||
|
||||
// configure sql logging
|
||||
debugSQL := cfgSection.Key("log_queries").MustBool(false) |
||||
if !debugSQL { |
||||
engine.SetLogger(&xorm.DiscardLogger{}) |
||||
} else { |
||||
// add stack to database calls to be able to see what repository initiated queries. Top 7 items from the stack as they are likely in the xorm library.
|
||||
// engine.SetLogger(sqlstore.NewXormLogger(log.LvlInfo, log.WithSuffix(log.New("sqlstore.xorm"), log.CallerContextKey, log.StackCaller(log.DefaultCallerDepth))))
|
||||
engine.ShowSQL(true) |
||||
engine.ShowExecTime(true) |
||||
} |
||||
// otherwise, try to use the grafana db connection
|
||||
} else { |
||||
if db.db == nil { |
||||
return nil, fmt.Errorf("no db connection provided") |
||||
} |
||||
|
||||
engine = db.db.GetEngine() |
||||
} |
||||
|
||||
db.engine = engine |
||||
|
||||
if err := migrations.MigrateEntityStore(db, db.features); err != nil { |
||||
db.engine = nil |
||||
return nil, err |
||||
} |
||||
|
||||
return db.engine, nil |
||||
} |
||||
|
||||
func (db *EntityDB) GetSession() (*session.SessionDB, error) { |
||||
engine, err := db.GetEngine() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return session.GetSession(sqlx.NewDb(engine.DB().DB, engine.DriverName())), nil |
||||
} |
||||
|
||||
func (db *EntityDB) GetCfg() *setting.Cfg { |
||||
return db.cfg |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -1,303 +0,0 @@ |
||||
package entity |
||||
|
||||
import ( |
||||
"encoding/base64" |
||||
"encoding/json" |
||||
"fmt" |
||||
"unsafe" |
||||
|
||||
jsoniter "github.com/json-iterator/go" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/grn" |
||||
) |
||||
|
||||
func init() { //nolint:gochecknoinits
|
||||
jsoniter.RegisterTypeEncoder("entity.EntitySearchResult", &searchResultCodec{}) |
||||
jsoniter.RegisterTypeEncoder("entity.WriteEntityResponse", &writeResponseCodec{}) |
||||
|
||||
jsoniter.RegisterTypeEncoder("entity.Entity", &rawEntityCodec{}) |
||||
jsoniter.RegisterTypeDecoder("entity.Entity", &rawEntityCodec{}) |
||||
} |
||||
|
||||
func writeRawJson(stream *jsoniter.Stream, val []byte) { |
||||
if json.Valid(val) { |
||||
_, _ = stream.Write(val) |
||||
} else { |
||||
stream.WriteString(string(val)) |
||||
} |
||||
} |
||||
|
||||
// Unlike the standard JSON marshal, this will write bytes as JSON when it can
|
||||
type rawEntityCodec struct{} |
||||
|
||||
func (obj *Entity) MarshalJSON() ([]byte, error) { |
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary |
||||
return json.Marshal(obj) |
||||
} |
||||
|
||||
// UnmarshalJSON will read JSON into a Entity
|
||||
func (obj *Entity) UnmarshalJSON(b []byte) error { |
||||
if obj == nil { |
||||
return fmt.Errorf("unexpected nil for raw objcet") |
||||
} |
||||
iter := jsoniter.ParseBytes(jsoniter.ConfigDefault, b) |
||||
readEntity(iter, obj) |
||||
return iter.Error |
||||
} |
||||
|
||||
func (codec *rawEntityCodec) IsEmpty(ptr unsafe.Pointer) bool { |
||||
f := (*Entity)(ptr) |
||||
return f.GRN == nil && f.Body == nil |
||||
} |
||||
|
||||
func (codec *rawEntityCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) { |
||||
obj := (*Entity)(ptr) |
||||
stream.WriteObjectStart() |
||||
stream.WriteObjectField("GRN") |
||||
stream.WriteVal(obj.GRN) |
||||
|
||||
if obj.Version != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("version") |
||||
stream.WriteString(obj.Version) |
||||
} |
||||
if obj.CreatedAt > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("createdAt") |
||||
stream.WriteInt64(obj.CreatedAt) |
||||
} |
||||
if obj.UpdatedAt > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("updatedAt") |
||||
stream.WriteInt64(obj.UpdatedAt) |
||||
} |
||||
if obj.CreatedBy != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("createdBy") |
||||
stream.WriteString(obj.CreatedBy) |
||||
} |
||||
if obj.UpdatedBy != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("updatedBy") |
||||
stream.WriteString(obj.UpdatedBy) |
||||
} |
||||
if obj.Folder != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("folder") |
||||
stream.WriteString(obj.Folder) |
||||
} |
||||
if obj.Body != nil { |
||||
stream.WriteMore() |
||||
if json.Valid(obj.Body) { |
||||
stream.WriteObjectField("body") |
||||
stream.WriteRaw(string(obj.Body)) // works for strings
|
||||
} else { |
||||
sEnc := base64.StdEncoding.EncodeToString(obj.Body) |
||||
stream.WriteObjectField("body_base64") |
||||
stream.WriteString(sEnc) // works for strings
|
||||
} |
||||
} |
||||
if len(obj.SummaryJson) > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("summary") |
||||
writeRawJson(stream, obj.SummaryJson) |
||||
} |
||||
if obj.ETag != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("etag") |
||||
stream.WriteString(obj.ETag) |
||||
} |
||||
if obj.Size > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("size") |
||||
stream.WriteInt64(obj.Size) |
||||
} |
||||
if obj.Origin != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("origin") |
||||
stream.WriteVal(obj.Origin) |
||||
} |
||||
stream.WriteObjectEnd() |
||||
} |
||||
|
||||
func (codec *rawEntityCodec) Decode(ptr unsafe.Pointer, iter *jsoniter.Iterator) { |
||||
*(*Entity)(ptr) = Entity{} |
||||
raw := (*Entity)(ptr) |
||||
readEntity(iter, raw) |
||||
} |
||||
|
||||
func readEntity(iter *jsoniter.Iterator, raw *Entity) { |
||||
for l1Field := iter.ReadObject(); l1Field != ""; l1Field = iter.ReadObject() { |
||||
switch l1Field { |
||||
case "GRN": |
||||
raw.GRN = &grn.GRN{} |
||||
iter.ReadVal(raw.GRN) |
||||
case "updatedAt": |
||||
raw.UpdatedAt = iter.ReadInt64() |
||||
case "updatedBy": |
||||
raw.UpdatedBy = iter.ReadString() |
||||
case "createdAt": |
||||
raw.CreatedAt = iter.ReadInt64() |
||||
case "createdBy": |
||||
raw.CreatedBy = iter.ReadString() |
||||
case "size": |
||||
raw.Size = iter.ReadInt64() |
||||
case "etag": |
||||
raw.ETag = iter.ReadString() |
||||
case "version": |
||||
raw.Version = iter.ReadString() |
||||
case "folder": |
||||
raw.Folder = iter.ReadString() |
||||
case "origin": |
||||
raw.Origin = &EntityOriginInfo{} |
||||
iter.ReadVal(raw.Origin) |
||||
case "summary": |
||||
var val interface{} |
||||
iter.ReadVal(&val) // ??? is there a smarter way to just keep the underlying bytes without read+marshal
|
||||
body, err := json.Marshal(val) |
||||
if err != nil { |
||||
iter.ReportError("raw entity", "error reading summary body") |
||||
return |
||||
} |
||||
raw.SummaryJson = body |
||||
|
||||
case "body": |
||||
var val interface{} |
||||
iter.ReadVal(&val) // ??? is there a smarter way to just keep the underlying bytes without read+marshal
|
||||
body, err := json.Marshal(val) |
||||
if err != nil { |
||||
iter.ReportError("raw entity", "error creating json from body") |
||||
return |
||||
} |
||||
raw.Body = body |
||||
|
||||
case "body_base64": |
||||
val := iter.ReadString() |
||||
body, err := base64.StdEncoding.DecodeString(val) |
||||
if err != nil { |
||||
iter.ReportError("raw entity", "error decoding base64 body") |
||||
return |
||||
} |
||||
raw.Body = body |
||||
|
||||
default: |
||||
iter.ReportError("raw object", "unexpected field: "+l1Field) |
||||
return |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Unlike the standard JSON marshal, this will write bytes as JSON when it can
|
||||
type searchResultCodec struct{} |
||||
|
||||
func (obj *EntitySearchResult) MarshalJSON() ([]byte, error) { |
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary |
||||
return json.Marshal(obj) |
||||
} |
||||
|
||||
func (codec *searchResultCodec) IsEmpty(ptr unsafe.Pointer) bool { |
||||
f := (*EntitySearchResult)(ptr) |
||||
return f.GRN == nil && f.Body == nil |
||||
} |
||||
|
||||
func (codec *searchResultCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) { |
||||
obj := (*EntitySearchResult)(ptr) |
||||
stream.WriteObjectStart() |
||||
stream.WriteObjectField("GRN") |
||||
stream.WriteVal(obj.GRN) |
||||
|
||||
if obj.Name != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("name") |
||||
stream.WriteString(obj.Name) |
||||
} |
||||
if obj.Description != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("description") |
||||
stream.WriteString(obj.Description) |
||||
} |
||||
if obj.Size > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("size") |
||||
stream.WriteInt64(obj.Size) |
||||
} |
||||
if obj.UpdatedAt > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("updatedAt") |
||||
stream.WriteInt64(obj.UpdatedAt) |
||||
} |
||||
if obj.UpdatedBy != "" { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("updatedBy") |
||||
stream.WriteVal(obj.UpdatedBy) |
||||
} |
||||
if obj.Body != nil { |
||||
stream.WriteMore() |
||||
if json.Valid(obj.Body) { |
||||
stream.WriteObjectField("body") |
||||
_, _ = stream.Write(obj.Body) // works for strings
|
||||
} else { |
||||
stream.WriteObjectField("body_base64") |
||||
stream.WriteVal(obj.Body) // works for strings
|
||||
} |
||||
} |
||||
if obj.Labels != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("labels") |
||||
stream.WriteVal(obj.Labels) |
||||
} |
||||
if obj.ErrorJson != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("error") |
||||
writeRawJson(stream, obj.ErrorJson) |
||||
} |
||||
if obj.FieldsJson != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("fields") |
||||
writeRawJson(stream, obj.FieldsJson) |
||||
} |
||||
|
||||
stream.WriteObjectEnd() |
||||
} |
||||
|
||||
// Unlike the standard JSON marshal, this will write bytes as JSON when it can
|
||||
type writeResponseCodec struct{} |
||||
|
||||
func (obj *WriteEntityResponse) MarshalJSON() ([]byte, error) { |
||||
var json = jsoniter.ConfigCompatibleWithStandardLibrary |
||||
return json.Marshal(obj) |
||||
} |
||||
|
||||
func (codec *writeResponseCodec) IsEmpty(ptr unsafe.Pointer) bool { |
||||
f := (*WriteEntityResponse)(ptr) |
||||
return f == nil |
||||
} |
||||
|
||||
func (codec *writeResponseCodec) Encode(ptr unsafe.Pointer, stream *jsoniter.Stream) { |
||||
obj := (*WriteEntityResponse)(ptr) |
||||
stream.WriteObjectStart() |
||||
stream.WriteObjectField("status") |
||||
stream.WriteString(obj.Status.String()) |
||||
|
||||
if obj.Error != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("error") |
||||
stream.WriteVal(obj.Error) |
||||
} |
||||
if obj.GRN != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("GRN") |
||||
stream.WriteVal(obj.GRN) |
||||
} |
||||
if obj.Entity != nil { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("entity") |
||||
stream.WriteVal(obj.Entity) |
||||
} |
||||
if len(obj.SummaryJson) > 0 { |
||||
stream.WriteMore() |
||||
stream.WriteObjectField("summary") |
||||
writeRawJson(stream, obj.SummaryJson) |
||||
} |
||||
stream.WriteObjectEnd() |
||||
} |
||||
@ -1,50 +0,0 @@ |
||||
package entity |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/grn" |
||||
) |
||||
|
||||
func TestRawEncoders(t *testing.T) { |
||||
body, err := json.Marshal(map[string]any{ |
||||
"hello": "world", |
||||
"field": 1.23, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
raw := &Entity{ |
||||
GRN: &grn.GRN{ |
||||
ResourceIdentifier: "a", |
||||
ResourceKind: "b", |
||||
}, |
||||
Version: "c", |
||||
ETag: "d", |
||||
Body: body, |
||||
} |
||||
|
||||
b, err := json.MarshalIndent(raw, "", " ") |
||||
require.NoError(t, err) |
||||
|
||||
str := string(b) |
||||
|
||||
require.JSONEq(t, `{ |
||||
"GRN": { |
||||
"ResourceKind": "b", |
||||
"ResourceIdentifier": "a" |
||||
}, |
||||
"version": "c", |
||||
"body": { |
||||
"field": 1.23, |
||||
"hello": "world" |
||||
}, |
||||
"etag": "d" |
||||
}`, str) |
||||
|
||||
copy := &Entity{} |
||||
err = json.Unmarshal(b, copy) |
||||
require.NoError(t, err) |
||||
} |
||||
@ -0,0 +1,52 @@ |
||||
package server |
||||
|
||||
import ( |
||||
"fmt" |
||||
"net" |
||||
"strconv" |
||||
|
||||
// "github.com/grafana/grafana/pkg/services/featuremgmt"
|
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
type config struct { |
||||
enabled bool |
||||
devMode bool |
||||
|
||||
ip net.IP |
||||
port int |
||||
host string |
||||
apiURL string |
||||
|
||||
logLevel int |
||||
} |
||||
|
||||
func newConfig(cfg *setting.Cfg) *config { |
||||
defaultLogLevel := 0 |
||||
// TODO
|
||||
ip := net.ParseIP(cfg.HTTPAddr) |
||||
apiURL := cfg.AppURL |
||||
port, err := strconv.Atoi(cfg.HTTPPort) |
||||
if err != nil { |
||||
port = 3001 |
||||
} |
||||
|
||||
if cfg.Env == setting.Dev { |
||||
defaultLogLevel = 10 |
||||
port = 3001 |
||||
ip = net.ParseIP("127.0.0.1") |
||||
apiURL = fmt.Sprintf("https://%s:%d", ip, port) |
||||
} |
||||
|
||||
host := fmt.Sprintf("%s:%d", ip, port) |
||||
|
||||
return &config{ |
||||
enabled: true, // cfg.IsFeatureToggleEnabled(featuremgmt.FlagGrafanaStorageServer),
|
||||
devMode: cfg.Env == setting.Dev, |
||||
ip: ip, |
||||
port: port, |
||||
host: host, |
||||
logLevel: cfg.SectionWithEnvOverrides("storage-server").Key("log_level").MustInt(defaultLogLevel), |
||||
apiURL: apiURL, |
||||
} |
||||
} |
||||
@ -0,0 +1,196 @@ |
||||
package server |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"strconv" |
||||
|
||||
"github.com/go-jose/go-jose/v3/jwt" |
||||
"github.com/grafana/dskit/services" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"google.golang.org/grpc/metadata" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/appcontext" |
||||
"github.com/grafana/grafana/pkg/infra/tracing" |
||||
"github.com/grafana/grafana/pkg/modules" |
||||
"github.com/grafana/grafana/pkg/registry" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/services/grpcserver" |
||||
"github.com/grafana/grafana/pkg/services/grpcserver/interceptors" |
||||
"github.com/grafana/grafana/pkg/services/store/entity" |
||||
entityDB "github.com/grafana/grafana/pkg/services/store/entity/db" |
||||
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash" |
||||
"github.com/grafana/grafana/pkg/services/user" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
var ( |
||||
_ Service = (*service)(nil) |
||||
_ registry.BackgroundService = (*service)(nil) |
||||
_ registry.CanBeDisabled = (*service)(nil) |
||||
) |
||||
|
||||
func init() { |
||||
// do nothing
|
||||
} |
||||
|
||||
type Service interface { |
||||
services.NamedService |
||||
registry.BackgroundService |
||||
registry.CanBeDisabled |
||||
} |
||||
|
||||
type service struct { |
||||
*services.BasicService |
||||
|
||||
config *config |
||||
|
||||
cfg *setting.Cfg |
||||
features featuremgmt.FeatureToggles |
||||
|
||||
stopCh chan struct{} |
||||
stoppedCh chan error |
||||
|
||||
handler grpcserver.Provider |
||||
|
||||
tracing *tracing.TracingService |
||||
|
||||
authenticator interceptors.Authenticator |
||||
} |
||||
|
||||
type Authenticator struct{} |
||||
|
||||
func (f *Authenticator) Authenticate(ctx context.Context) (context.Context, error) { |
||||
md, ok := metadata.FromIncomingContext(ctx) |
||||
if !ok { |
||||
return nil, fmt.Errorf("no metadata found") |
||||
} |
||||
|
||||
// TODO: use id token instead of these fields
|
||||
login := md.Get("grafana-login")[0] |
||||
if login == "" { |
||||
return nil, fmt.Errorf("no login found in context") |
||||
} |
||||
userID, err := strconv.ParseInt(md.Get("grafana-userid")[0], 10, 64) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("invalid user id: %w", err) |
||||
} |
||||
orgID, err := strconv.ParseInt(md.Get("grafana-orgid")[0], 10, 64) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("invalid org id: %w", err) |
||||
} |
||||
|
||||
// TODO: validate id token
|
||||
idToken := md.Get("grafana-idtoken")[0] |
||||
if idToken == "" { |
||||
return nil, fmt.Errorf("no id token found in context") |
||||
} |
||||
jwtToken, err := jwt.ParseSigned(idToken) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("invalid id token: %w", err) |
||||
} |
||||
claims := jwt.Claims{} |
||||
err = jwtToken.UnsafeClaimsWithoutVerification(&claims) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("invalid id token: %w", err) |
||||
} |
||||
// fmt.Printf("JWT CLAIMS: %+v\n", claims)
|
||||
|
||||
return appcontext.WithUser(ctx, &user.SignedInUser{ |
||||
Login: login, |
||||
UserID: userID, |
||||
OrgID: orgID, |
||||
}), nil |
||||
} |
||||
|
||||
var _ interceptors.Authenticator = (*Authenticator)(nil) |
||||
|
||||
func ProvideService( |
||||
cfg *setting.Cfg, |
||||
features featuremgmt.FeatureToggles, |
||||
) (*service, error) { |
||||
tracing, err := tracing.ProvideService(cfg) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
authn := &Authenticator{} |
||||
|
||||
s := &service{ |
||||
config: newConfig(cfg), |
||||
cfg: cfg, |
||||
features: features, |
||||
stopCh: make(chan struct{}), |
||||
authenticator: authn, |
||||
tracing: tracing, |
||||
} |
||||
|
||||
// This will be used when running as a dskit service
|
||||
s.BasicService = services.NewBasicService(s.start, s.running, nil).WithName(modules.StorageServer) |
||||
|
||||
return s, nil |
||||
} |
||||
|
||||
func (s *service) IsDisabled() bool { |
||||
return !s.config.enabled |
||||
} |
||||
|
||||
// Run is an adapter for the BackgroundService interface.
|
||||
func (s *service) Run(ctx context.Context) error { |
||||
if err := s.start(ctx); err != nil { |
||||
return err |
||||
} |
||||
return s.running(ctx) |
||||
} |
||||
|
||||
func (s *service) start(ctx context.Context) error { |
||||
// TODO: use wire
|
||||
|
||||
// TODO: support using grafana db connection?
|
||||
eDB, err := entityDB.ProvideEntityDB(nil, s.cfg, s.features) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = eDB.Init() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
store, err := sqlstash.ProvideSQLEntityServer(eDB) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
s.handler, err = grpcserver.ProvideService(s.cfg, s.features, s.authenticator, s.tracing, prometheus.DefaultRegisterer) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
entity.RegisterEntityStoreServer(s.handler.GetServer(), store) |
||||
|
||||
err = s.handler.Run(ctx) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (s *service) running(ctx context.Context) error { |
||||
// skip waiting for the server in prod mode
|
||||
if !s.config.devMode { |
||||
<-ctx.Done() |
||||
return nil |
||||
} |
||||
|
||||
select { |
||||
case err := <-s.stoppedCh: |
||||
if err != nil { |
||||
return err |
||||
} |
||||
case <-ctx.Done(): |
||||
close(s.stopCh) |
||||
} |
||||
return nil |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
@ -1,116 +0,0 @@ |
||||
package sqlstash |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/grn" |
||||
"github.com/grafana/grafana/pkg/services/store/entity" |
||||
) |
||||
|
||||
type summarySupport struct { |
||||
model *entity.EntitySummary |
||||
name string |
||||
description *string // null or empty
|
||||
slug *string // null or empty
|
||||
labels *string |
||||
fields *string |
||||
errors *string // should not allow saving with this!
|
||||
marshaled []byte |
||||
|
||||
// metadata for nested objects
|
||||
parent_grn *grn.GRN |
||||
folder string |
||||
isNested bool // set when this is for a nested item
|
||||
} |
||||
|
||||
func newSummarySupport(summary *entity.EntitySummary) (*summarySupport, error) { |
||||
var err error |
||||
var js []byte |
||||
s := &summarySupport{ |
||||
model: summary, |
||||
} |
||||
if summary != nil { |
||||
s.marshaled, err = json.Marshal(summary) |
||||
if err != nil { |
||||
return s, err |
||||
} |
||||
|
||||
s.name = summary.Name |
||||
if summary.Description != "" { |
||||
s.description = &summary.Description |
||||
} |
||||
if summary.Slug != "" { |
||||
s.slug = &summary.Slug |
||||
} |
||||
if len(summary.Labels) > 0 { |
||||
js, err = json.Marshal(summary.Labels) |
||||
if err != nil { |
||||
return s, err |
||||
} |
||||
str := string(js) |
||||
s.labels = &str |
||||
} |
||||
|
||||
if len(summary.Fields) > 0 { |
||||
js, err = json.Marshal(summary.Fields) |
||||
if err != nil { |
||||
return s, err |
||||
} |
||||
str := string(js) |
||||
s.fields = &str |
||||
} |
||||
|
||||
if summary.Error != nil { |
||||
js, err = json.Marshal(summary.Error) |
||||
if err != nil { |
||||
return s, err |
||||
} |
||||
str := string(js) |
||||
s.errors = &str |
||||
} |
||||
} |
||||
return s, err |
||||
} |
||||
|
||||
func (s summarySupport) toEntitySummary() (*entity.EntitySummary, error) { |
||||
var err error |
||||
summary := &entity.EntitySummary{ |
||||
Name: s.name, |
||||
} |
||||
if s.description != nil { |
||||
summary.Description = *s.description |
||||
} |
||||
if s.slug != nil { |
||||
summary.Slug = *s.slug |
||||
} |
||||
if s.labels != nil { |
||||
b := []byte(*s.labels) |
||||
err = json.Unmarshal(b, &summary.Labels) |
||||
if err != nil { |
||||
return summary, err |
||||
} |
||||
} |
||||
if s.fields != nil { |
||||
b := []byte(*s.fields) |
||||
err = json.Unmarshal(b, &summary.Fields) |
||||
if err != nil { |
||||
return summary, err |
||||
} |
||||
} |
||||
if s.errors != nil { |
||||
b := []byte(*s.errors) |
||||
err = json.Unmarshal(b, &summary.Error) |
||||
if err != nil { |
||||
return summary, err |
||||
} |
||||
} |
||||
return summary, err |
||||
} |
||||
|
||||
func (s *summarySupport) getParentGRN() *string { |
||||
if s.isNested { |
||||
t := s.parent_grn.ToGRNString() |
||||
return &t |
||||
} |
||||
return nil |
||||
} |
||||
Loading…
Reference in new issue