mirror of https://github.com/grafana/grafana
DualWriter: Support managed DualWriter (#100881)
parent
5a7916133e
commit
5a40c84568
|
@ -0,0 +1,99 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"os" |
||||
"sync" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana-app-sdk/logging" |
||||
) |
||||
|
||||
// Simple file implementation -- useful while testing and not yet sure about the SQL structure!
|
||||
// When a path exists, read/write it from disk; otherwise it is held in memory
|
||||
type fileDB struct { |
||||
path string |
||||
changed int64 |
||||
db map[string]StorageStatus |
||||
mu sync.RWMutex |
||||
logger logging.Logger |
||||
} |
||||
|
||||
// File implementation while testing -- values are saved in the data directory
|
||||
func newFileDB(path string) *fileDB { |
||||
return &fileDB{ |
||||
db: make(map[string]StorageStatus), |
||||
path: path, |
||||
logger: logging.DefaultLogger.With("logger", "fileDB"), |
||||
} |
||||
} |
||||
|
||||
func (m *fileDB) Get(ctx context.Context, gr schema.GroupResource) (StorageStatus, bool, error) { |
||||
m.mu.RLock() |
||||
defer m.mu.RUnlock() |
||||
|
||||
info, err := os.Stat(m.path) |
||||
if err == nil && info.ModTime().UnixMilli() != m.changed { |
||||
v, err := os.ReadFile(m.path) |
||||
if err == nil { |
||||
err = json.Unmarshal(v, &m.db) |
||||
m.changed = info.ModTime().UnixMilli() |
||||
} |
||||
if err != nil { |
||||
m.logger.Warn("error reading filedb", "err", err) |
||||
} |
||||
|
||||
changed := false |
||||
for k, v := range m.db { |
||||
// Must write to unified if we are reading unified
|
||||
if v.ReadUnified && !v.WriteUnified { |
||||
v.WriteUnified = true |
||||
m.db[k] = v |
||||
changed = true |
||||
} |
||||
|
||||
// Make sure we are writing something!
|
||||
if !(v.WriteLegacy || v.WriteUnified) { |
||||
v.WriteLegacy = true |
||||
m.db[k] = v |
||||
changed = true |
||||
} |
||||
} |
||||
if changed { |
||||
err = m.save() |
||||
m.logger.Warn("error saving changes filedb", "err", err) |
||||
} |
||||
} |
||||
|
||||
v, ok := m.db[gr.String()] |
||||
return v, ok, nil |
||||
} |
||||
|
||||
func (m *fileDB) Set(ctx context.Context, status StorageStatus) error { |
||||
m.mu.Lock() |
||||
defer m.mu.Unlock() |
||||
|
||||
gr := schema.GroupResource{ |
||||
Group: status.Group, |
||||
Resource: status.Resource, |
||||
} |
||||
m.db[gr.String()] = status |
||||
|
||||
return m.save() |
||||
} |
||||
|
||||
func (m *fileDB) save() error { |
||||
if m.path != "" { |
||||
data, err := json.MarshalIndent(m.db, "", " ") |
||||
if err != nil { |
||||
return err |
||||
} |
||||
err = os.WriteFile(m.path, data, 0644) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,59 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana/pkg/apiserver/rest" |
||||
) |
||||
|
||||
func ProvideTestService(status ...StorageStatus) Service { |
||||
if len(status) < 1 { |
||||
status = []StorageStatus{{ |
||||
WriteLegacy: true, |
||||
WriteUnified: false, |
||||
Runtime: false, |
||||
ReadUnified: false, |
||||
}} |
||||
} |
||||
return &mockService{status: status[0]} |
||||
} |
||||
|
||||
type mockService struct { |
||||
status StorageStatus |
||||
} |
||||
|
||||
// NewStorage implements Service.
|
||||
func (m *mockService) NewStorage(gr schema.GroupResource, legacy rest.LegacyStorage, storage rest.Storage) (rest.Storage, error) { |
||||
return nil, fmt.Errorf("not implemented") |
||||
} |
||||
|
||||
// ReadFromUnified implements Service.
|
||||
func (m *mockService) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) { |
||||
return m.status.ReadUnified, nil |
||||
} |
||||
|
||||
// ShouldManage implements Service.
|
||||
func (m *mockService) ShouldManage(gr schema.GroupResource) bool { |
||||
return true |
||||
} |
||||
|
||||
// StartMigration implements Service.
|
||||
func (m *mockService) StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) { |
||||
return StorageStatus{}, fmt.Errorf("not implemented") |
||||
} |
||||
|
||||
// Status implements Service.
|
||||
func (m *mockService) Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) { |
||||
s := m.status |
||||
s.Group = gr.Group |
||||
s.Resource = gr.Resource |
||||
return s, nil |
||||
} |
||||
|
||||
// Update implements Service.
|
||||
func (m *mockService) Update(ctx context.Context, status StorageStatus) (StorageStatus, error) { |
||||
return m.status, fmt.Errorf("not implemented") |
||||
} |
@ -0,0 +1,162 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"net/http" |
||||
|
||||
apierrors "k8s.io/apimachinery/pkg/api/errors" |
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/registry/rest" |
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" |
||||
) |
||||
|
||||
func (m *service) NewStorage(gr schema.GroupResource, |
||||
legacy grafanarest.LegacyStorage, |
||||
storage grafanarest.Storage, |
||||
) (grafanarest.Storage, error) { |
||||
status, err := m.Status(context.Background(), gr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if m.enabled && status.Runtime { |
||||
// Dynamic storage behavior
|
||||
return &runtimeDualWriter{ |
||||
service: m, |
||||
legacy: legacy, |
||||
unified: storage, |
||||
dualwrite: grafanarest.NewDualWriter(grafanarest.Mode3, legacy, storage, m.reg, gr.String()), |
||||
gr: gr, |
||||
}, nil |
||||
} |
||||
|
||||
if status.ReadUnified { |
||||
if status.WriteLegacy { |
||||
// Write both, read unified
|
||||
return grafanarest.NewDualWriter(grafanarest.Mode3, legacy, storage, m.reg, gr.String()), nil |
||||
} |
||||
return storage, nil |
||||
} |
||||
if status.WriteUnified { |
||||
// Write both, read legacy
|
||||
return grafanarest.NewDualWriter(grafanarest.Mode2, legacy, storage, m.reg, gr.String()), nil |
||||
} |
||||
return legacy, nil |
||||
} |
||||
|
||||
// The runtime dual writer implements the various modes we have described as: mode:1/2/3/4/5
|
||||
// However the behavior can be configured at runtime rather than just at startup.
|
||||
// When a resource is marked as "migrating", all write requests will be 503 unavailable
|
||||
type runtimeDualWriter struct { |
||||
service Service |
||||
legacy grafanarest.LegacyStorage |
||||
unified grafanarest.Storage |
||||
dualwrite grafanarest.Storage |
||||
gr schema.GroupResource |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { |
||||
unified, err := d.service.ReadFromUnified(ctx, d.gr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if unified { |
||||
return d.unified.Get(ctx, name, options) |
||||
} |
||||
return d.legacy.Get(ctx, name, options) |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
unified, err := d.service.ReadFromUnified(ctx, d.gr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if unified { |
||||
return d.unified.List(ctx, options) |
||||
} |
||||
return d.legacy.List(ctx, options) |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) getWriter(ctx context.Context) (grafanarest.Storage, error) { |
||||
status, err := d.service.Status(ctx, d.gr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if status.Migrating > 0 { |
||||
return nil, &apierrors.StatusError{ |
||||
ErrStatus: metav1.Status{ |
||||
Code: http.StatusServiceUnavailable, |
||||
Message: "the system is migrating", |
||||
}, |
||||
} |
||||
} |
||||
if status.WriteLegacy { |
||||
if status.WriteUnified { |
||||
return d.dualwrite, nil |
||||
} |
||||
return d.legacy, nil // only write legacy (mode0)
|
||||
} |
||||
return d.unified, nil // only write unified (mode4)
|
||||
} |
||||
|
||||
func (d *runtimeDualWriter) Create(ctx context.Context, in runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { |
||||
store, err := d.getWriter(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return store.Create(ctx, in, createValidation, options) |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { |
||||
store, err := d.getWriter(ctx) |
||||
if err != nil { |
||||
return nil, false, err |
||||
} |
||||
return store.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { |
||||
store, err := d.getWriter(ctx) |
||||
if err != nil { |
||||
return nil, false, err |
||||
} |
||||
return store.Delete(ctx, name, deleteValidation, options) |
||||
} |
||||
|
||||
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
|
||||
func (d *runtimeDualWriter) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
store, err := d.getWriter(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return store.DeleteCollection(ctx, deleteValidation, options, listOptions) |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) Destroy() { |
||||
d.dualwrite.Destroy() |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) GetSingularName() string { |
||||
return d.unified.GetSingularName() |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) NamespaceScoped() bool { |
||||
return d.unified.NamespaceScoped() |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) New() runtime.Object { |
||||
return d.unified.New() |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) NewList() runtime.Object { |
||||
return d.unified.NewList() |
||||
} |
||||
|
||||
func (d *runtimeDualWriter) ConvertToTable(ctx context.Context, object runtime.Object, tableOptions runtime.Object) (*metav1.Table, error) { |
||||
return d.unified.ConvertToTable(ctx, object, tableOptions) |
||||
} |
@ -0,0 +1,284 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/stretchr/testify/mock" |
||||
"github.com/stretchr/testify/require" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
"k8s.io/apiserver/pkg/apis/example" |
||||
|
||||
"github.com/grafana/grafana/pkg/apiserver/rest" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
) |
||||
|
||||
var now = time.Now() |
||||
|
||||
var createFn = func(context.Context, runtime.Object) error { return nil } |
||||
|
||||
var exampleObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "1", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}} |
||||
var exampleObjNoRV = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "foo", ResourceVersion: "", CreationTimestamp: metav1.Time{}, GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}} |
||||
var anotherObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "bar", ResourceVersion: "2", GenerateName: "foo"}, Spec: example.PodSpec{}, Status: example.PodStatus{StartTime: &metav1.Time{Time: now}}} |
||||
var failingObj = &example.Pod{TypeMeta: metav1.TypeMeta{Kind: "foo"}, ObjectMeta: metav1.ObjectMeta{Name: "object-fail", ResourceVersion: "2", GenerateName: "object-fail"}, Spec: example.PodSpec{}, Status: example.PodStatus{}} |
||||
|
||||
var p = prometheus.NewRegistry() |
||||
var kind = schema.GroupResource{Group: "g", Resource: "r"} |
||||
|
||||
func TestManagedMode3_Create(t *testing.T) { |
||||
type testCase struct { |
||||
input runtime.Object |
||||
setupLegacyFn func(m *mock.Mock, input runtime.Object) |
||||
setupStorageFn func(m *mock.Mock, input runtime.Object) |
||||
name string |
||||
wantErr bool |
||||
} |
||||
tests := |
||||
[]testCase{ |
||||
{ |
||||
name: "should succeed when creating an object in both the LegacyStorage and Storage", |
||||
input: exampleObj, |
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) { |
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, _ runtime.Object) { |
||||
// We don't use the input here, as the input is transformed before being passed to unified storage.
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
}, |
||||
{ |
||||
name: "should return an error when creating an object in the legacy store fails", |
||||
input: failingObj, |
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) { |
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(nil, errors.New("error")).Once() |
||||
}, |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
name: "should return an error when creating an object in the unified store fails and delete from LegacyStorage", |
||||
input: exampleObj, |
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) { |
||||
m.On("Delete", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, true, nil).Once() |
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, _ runtime.Object) { |
||||
// We don't use the input here, as the input is transformed before being passed to unified storage.
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(nil, errors.New("error")).Once() |
||||
}, |
||||
wantErr: true, |
||||
}, |
||||
} |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
l := (rest.LegacyStorage)(nil) |
||||
s := (rest.Storage)(nil) |
||||
|
||||
ls := legacyStoreMock{&mock.Mock{}, l} |
||||
us := storageMock{&mock.Mock{}, s} |
||||
|
||||
if tt.setupLegacyFn != nil { |
||||
tt.setupLegacyFn(ls.Mock, tt.input) |
||||
} |
||||
if tt.setupStorageFn != nil { |
||||
tt.setupStorageFn(us.Mock, tt.input) |
||||
} |
||||
|
||||
m := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), p, nil) |
||||
dw, err := m.NewStorage(kind, ls, us) |
||||
require.NoError(t, err) |
||||
|
||||
obj, err := dw.Create(context.Background(), tt.input, createFn, &metav1.CreateOptions{}) |
||||
|
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
|
||||
require.Equal(t, exampleObj, obj) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestManagedMode3_Get(t *testing.T) { |
||||
type testCase struct { |
||||
setupLegacyFn func(m *mock.Mock, name string) |
||||
setupStorageFn func(m *mock.Mock, name string) |
||||
name string |
||||
wantErr bool |
||||
} |
||||
tests := |
||||
[]testCase{ |
||||
{ |
||||
name: "should succeed when getting an object from both stores", |
||||
setupLegacyFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil) |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil) |
||||
}, |
||||
}, |
||||
{ |
||||
name: "should return an error when getting an object in the unified store fails", |
||||
setupLegacyFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil) |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(nil, errors.New("error")) |
||||
}, |
||||
wantErr: true, |
||||
}, |
||||
{ |
||||
name: "should succeed when getting an object in the LegacyStorage fails", |
||||
setupLegacyFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(nil, errors.New("error")) |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, name string) { |
||||
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil) |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
name := "foo" |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
l := (rest.LegacyStorage)(nil) |
||||
s := (rest.Storage)(nil) |
||||
|
||||
ls := legacyStoreMock{&mock.Mock{}, l} |
||||
us := storageMock{&mock.Mock{}, s} |
||||
|
||||
if tt.setupLegacyFn != nil { |
||||
tt.setupLegacyFn(ls.Mock, name) |
||||
} |
||||
if tt.setupStorageFn != nil { |
||||
tt.setupStorageFn(us.Mock, name) |
||||
} |
||||
|
||||
m := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), p, nil) |
||||
dw, err := m.NewStorage(kind, ls, us) |
||||
require.NoError(t, err) |
||||
status, err := m.Status(context.Background(), kind) |
||||
require.NoError(t, err) |
||||
status.Migrated = now.UnixMilli() |
||||
status.ReadUnified = true // Read from unified (like mode3)
|
||||
_, err = m.Update(context.Background(), status) |
||||
require.NoError(t, err) |
||||
|
||||
obj, err := dw.Get(context.Background(), name, &metav1.GetOptions{}) |
||||
|
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
|
||||
require.Equal(t, obj, exampleObj) |
||||
require.NotEqual(t, obj, anotherObj) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestManagedMode3_CreateWhileMigrating(t *testing.T) { |
||||
type testCase struct { |
||||
input runtime.Object |
||||
setupLegacyFn func(m *mock.Mock, input runtime.Object) |
||||
setupStorageFn func(m *mock.Mock, input runtime.Object) |
||||
prepare func(dual Service) (StorageStatus, error) |
||||
name string |
||||
wantErr bool |
||||
} |
||||
tests := |
||||
[]testCase{ |
||||
{ |
||||
name: "should succeed when not migrated", |
||||
input: exampleObj, |
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) { |
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, _ runtime.Object) { |
||||
// We don't use the input here, as the input is transformed before being passed to unified storage.
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
prepare: func(dual Service) (StorageStatus, error) { |
||||
status, err := dual.Status(context.Background(), kind) |
||||
require.NoError(t, err) |
||||
status.Migrating = 0 |
||||
status.Migrated = 0 |
||||
return dual.Update(context.Background(), status) |
||||
}, |
||||
}, |
||||
{ |
||||
name: "should be unavailable if migrating", |
||||
input: failingObj, |
||||
wantErr: true, |
||||
prepare: func(dual Service) (StorageStatus, error) { |
||||
status, err := dual.Status(context.Background(), kind) |
||||
require.NoError(t, err) |
||||
return dual.StartMigration(context.Background(), kind, status.UpdateKey) |
||||
}, |
||||
}, |
||||
{ |
||||
name: "should succeed after migration", |
||||
input: exampleObj, |
||||
setupLegacyFn: func(m *mock.Mock, input runtime.Object) { |
||||
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
setupStorageFn: func(m *mock.Mock, _ runtime.Object) { |
||||
// We don't use the input here, as the input is transformed before being passed to unified storage.
|
||||
m.On("Create", mock.Anything, exampleObjNoRV, mock.Anything, mock.Anything).Return(exampleObj, nil).Once() |
||||
}, |
||||
prepare: func(dual Service) (StorageStatus, error) { |
||||
status, err := dual.Status(context.Background(), kind) |
||||
require.NoError(t, err) |
||||
status.Migrating = 0 |
||||
status.Migrated = now.UnixMilli() |
||||
status.ReadUnified = true |
||||
return dual.Update(context.Background(), status) |
||||
}, |
||||
}, |
||||
} |
||||
|
||||
// Shared provider across all tests
|
||||
dual := ProvideService(featuremgmt.WithFeatures(featuremgmt.FlagManagedDualWriter), p, nil) |
||||
|
||||
for _, tt := range tests { |
||||
t.Run(tt.name, func(t *testing.T) { |
||||
l := (rest.LegacyStorage)(nil) |
||||
s := (rest.Storage)(nil) |
||||
|
||||
ls := legacyStoreMock{&mock.Mock{}, l} |
||||
us := storageMock{&mock.Mock{}, s} |
||||
|
||||
if tt.setupLegacyFn != nil { |
||||
tt.setupLegacyFn(ls.Mock, tt.input) |
||||
} |
||||
if tt.setupStorageFn != nil { |
||||
tt.setupStorageFn(us.Mock, tt.input) |
||||
} |
||||
|
||||
dw, err := dual.NewStorage(kind, ls, us) |
||||
require.NoError(t, err) |
||||
|
||||
// Apply the changes and
|
||||
if tt.prepare != nil { |
||||
_, err = tt.prepare(dual) |
||||
require.NoError(t, err) |
||||
} |
||||
|
||||
obj, err := dw.Create(context.Background(), tt.input, createFn, &metav1.CreateOptions{}) |
||||
|
||||
if tt.wantErr { |
||||
require.Error(t, err) |
||||
return |
||||
} |
||||
|
||||
require.Equal(t, exampleObj, obj) |
||||
}) |
||||
} |
||||
} |
@ -0,0 +1,155 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"path/filepath" |
||||
"time" |
||||
|
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
func ProvideService(features featuremgmt.FeatureToggles, reg prometheus.Registerer, cfg *setting.Cfg) Service { |
||||
enabled := features.IsEnabledGlobally(featuremgmt.FlagManagedDualWriter) || |
||||
features.IsEnabledGlobally(featuremgmt.FlagProvisioning) // required for git provisioning
|
||||
if !enabled && cfg != nil { |
||||
return &staticService{cfg} // fallback to using the dual write flags from cfg
|
||||
} |
||||
|
||||
path := "" // storage path
|
||||
if cfg != nil { |
||||
path = filepath.Join(cfg.DataPath, "dualwrite.json") |
||||
} |
||||
|
||||
return &service{ |
||||
db: newFileDB(path), |
||||
reg: reg, |
||||
enabled: enabled, |
||||
} |
||||
} |
||||
|
||||
type service struct { |
||||
db statusStorage |
||||
reg prometheus.Registerer |
||||
enabled bool |
||||
} |
||||
|
||||
// The storage interface has zero business logic and simply writes values to a database
|
||||
type statusStorage interface { |
||||
Get(ctx context.Context, gr schema.GroupResource) (StorageStatus, bool, error) |
||||
Set(ctx context.Context, status StorageStatus) error |
||||
} |
||||
|
||||
// Hardcoded list of resources that should be controlled by the database (eventually everything?)
|
||||
func (m *service) ShouldManage(gr schema.GroupResource) bool { |
||||
if !m.enabled { |
||||
return false |
||||
} |
||||
switch gr.String() { |
||||
case "folders.folder.grafana.app": |
||||
return true |
||||
case "dashboards.dashboard.grafana.app": |
||||
return true |
||||
} |
||||
return false |
||||
} |
||||
|
||||
func (m *service) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) { |
||||
v, ok, err := m.db.Get(ctx, gr) |
||||
return ok && v.ReadUnified, err |
||||
} |
||||
|
||||
// Status implements Service.
|
||||
func (m *service) Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) { |
||||
v, found, err := m.db.Get(ctx, gr) |
||||
if err != nil { |
||||
return v, err |
||||
} |
||||
if !found { |
||||
v = StorageStatus{ |
||||
Group: gr.Group, |
||||
Resource: gr.Resource, |
||||
WriteLegacy: true, |
||||
WriteUnified: true, // Write both, but read legacy
|
||||
ReadUnified: false, |
||||
Migrated: 0, |
||||
Migrating: 0, |
||||
Runtime: true, // need to explicitly ask for not runtime
|
||||
UpdateKey: 1, |
||||
} |
||||
err := m.db.Set(ctx, v) |
||||
return v, err |
||||
} |
||||
return v, nil |
||||
} |
||||
|
||||
// StartMigration implements Service.
|
||||
func (m *service) StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) { |
||||
now := time.Now().UnixMilli() |
||||
v, ok, err := m.db.Get(ctx, gr) |
||||
if err != nil { |
||||
return v, err |
||||
} |
||||
if ok { |
||||
if v.Migrated > 0 { |
||||
return v, fmt.Errorf("already migrated") |
||||
} |
||||
if key != v.UpdateKey { |
||||
return v, fmt.Errorf("migration key mismatch") |
||||
} |
||||
if v.Migrating > 0 { |
||||
return v, fmt.Errorf("migration in progress") |
||||
} |
||||
|
||||
v.Migrating = now |
||||
v.UpdateKey++ |
||||
} else { |
||||
v = StorageStatus{ |
||||
Group: gr.Group, |
||||
Resource: gr.Resource, |
||||
Runtime: true, |
||||
WriteLegacy: true, |
||||
WriteUnified: true, |
||||
ReadUnified: false, |
||||
Migrating: now, |
||||
Migrated: 0, // timestamp
|
||||
UpdateKey: 1, |
||||
} |
||||
} |
||||
err = m.db.Set(ctx, v) |
||||
return v, err |
||||
} |
||||
|
||||
// FinishMigration implements Service.
|
||||
func (m *service) Update(ctx context.Context, status StorageStatus) (StorageStatus, error) { |
||||
v, ok, err := m.db.Get(ctx, schema.GroupResource{Group: status.Group, Resource: status.Resource}) |
||||
if err != nil { |
||||
return v, err |
||||
} |
||||
if !ok { |
||||
return v, fmt.Errorf("unable to update status that is not yet saved") |
||||
} |
||||
if status.UpdateKey != v.UpdateKey { |
||||
return v, fmt.Errorf("key mismatch (resource: %s, expected:%d, received: %d)", v.Resource, v.UpdateKey, status.UpdateKey) |
||||
} |
||||
if status.Migrating > 0 { |
||||
return v, fmt.Errorf("update can not change migrating status") |
||||
} |
||||
if status.ReadUnified { |
||||
if status.Migrated == 0 { |
||||
return v, fmt.Errorf("can not read from unified before a migration") |
||||
} |
||||
if !status.WriteUnified { |
||||
return v, fmt.Errorf("must write to unified when reading from unified") |
||||
} |
||||
} |
||||
if !status.WriteLegacy && !status.WriteUnified { |
||||
return v, fmt.Errorf("must write either legacy or unified") |
||||
} |
||||
status.UpdateKey++ |
||||
return status, m.db.Set(ctx, status) |
||||
} |
@ -0,0 +1,57 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
) |
||||
|
||||
func TestService(t *testing.T) { |
||||
ctx := context.Background() |
||||
mode := ProvideService(featuremgmt.WithFeatures(), nil, nil) |
||||
|
||||
gr := schema.GroupResource{Group: "ggg", Resource: "rrr"} |
||||
status, err := mode.Status(ctx, gr) |
||||
require.NoError(t, err) |
||||
require.Equal(t, StorageStatus{ |
||||
Group: "ggg", |
||||
Resource: "rrr", |
||||
WriteLegacy: true, |
||||
WriteUnified: true, |
||||
ReadUnified: false, |
||||
Migrated: 0, |
||||
Migrating: 0, |
||||
Runtime: true, |
||||
UpdateKey: 1, |
||||
}, status, "should start with the right defaults") |
||||
|
||||
// Start migration
|
||||
status, err = mode.StartMigration(ctx, gr, 1) |
||||
require.NoError(t, err) |
||||
require.Equal(t, status.UpdateKey, int64(2), "the key increased") |
||||
require.True(t, status.Migrating > 0, "migration is running") |
||||
|
||||
status.Migrated = time.Now().UnixMilli() |
||||
status.Migrating = 0 |
||||
status, err = mode.Update(ctx, status) |
||||
require.NoError(t, err) |
||||
require.Equal(t, status.UpdateKey, int64(3), "the key increased") |
||||
require.Equal(t, status.Migrating, int64(0), "done migrating") |
||||
require.True(t, status.Migrated > 0, "migration is running") |
||||
|
||||
status.WriteUnified = false |
||||
status.ReadUnified = true |
||||
_, err = mode.Update(ctx, status) |
||||
require.Error(t, err) // must write unified if we read it
|
||||
|
||||
status.WriteUnified = false |
||||
status.ReadUnified = false |
||||
status.WriteLegacy = false |
||||
_, err = mode.Update(ctx, status) |
||||
require.Error(t, err) // must write something!
|
||||
} |
@ -0,0 +1,25 @@ |
||||
package dualwrite |
||||
|
||||
import "github.com/grafana/grafana/pkg/services/sqlstore/migrator" |
||||
|
||||
// Not yet used... but you get the idea
|
||||
func AddUnifiedStatusMigrations(mg *migrator.Migrator) { |
||||
resourceStorageStatus := migrator.Table{ |
||||
Name: "resource_storage_status", |
||||
Columns: []*migrator.Column{ |
||||
{Name: "group", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, |
||||
{Name: "resource", Type: migrator.DB_NVarchar, Length: 190, Nullable: false}, |
||||
{Name: "write_legacy", Type: migrator.DB_Bool, Nullable: false, Default: "TRUE"}, |
||||
{Name: "write_unified", Type: migrator.DB_Bool, Nullable: false, Default: "TRUE"}, |
||||
{Name: "read_unified", Type: migrator.DB_Bool, Nullable: false}, |
||||
{Name: "migrating", Type: migrator.DB_BigInt, Nullable: false}, // Timestamp Actively running a migration (start timestamp)
|
||||
{Name: "migrated", Type: migrator.DB_BigInt, Nullable: false}, // Timestamp job finished
|
||||
{Name: "runtime", Type: migrator.DB_Bool, Nullable: false, Default: "TRUE"}, |
||||
{Name: "update_key", Type: migrator.DB_BigInt, Nullable: false}, // optimistic lock key -- required for update
|
||||
}, |
||||
Indices: []*migrator.Index{ |
||||
{Cols: []string{"group", "resource"}, Type: migrator.UniqueIndex}, |
||||
}, |
||||
} |
||||
mg.AddMigration("create resource_storage_status table", migrator.NewAddTableMigration(resourceStorageStatus)) |
||||
} |
@ -0,0 +1,76 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana/pkg/apiserver/rest" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
type staticService struct { |
||||
cfg *setting.Cfg |
||||
} |
||||
|
||||
func (m *staticService) NewStorage(gr schema.GroupResource, legacy rest.LegacyStorage, storage rest.Storage) (rest.Storage, error) { |
||||
return nil, fmt.Errorf("not implemented") |
||||
} |
||||
|
||||
// ReadFromUnified implements Service.
|
||||
func (m *staticService) ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) { |
||||
config := m.cfg.UnifiedStorage[gr.String()] |
||||
switch config.DualWriterMode { |
||||
case rest.Mode3, rest.Mode4, rest.Mode5: |
||||
return true, nil |
||||
default: |
||||
return false, nil |
||||
} |
||||
} |
||||
|
||||
// ShouldManage implements Service.
|
||||
func (m *staticService) ShouldManage(gr schema.GroupResource) bool { |
||||
return false |
||||
} |
||||
|
||||
// StartMigration implements Service.
|
||||
func (m *staticService) StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) { |
||||
return StorageStatus{}, fmt.Errorf("not implemented") |
||||
} |
||||
|
||||
// Status implements Service.
|
||||
func (m *staticService) Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) { |
||||
status := StorageStatus{ |
||||
Group: gr.Group, |
||||
Resource: gr.Resource, |
||||
WriteLegacy: true, |
||||
} |
||||
config, ok := m.cfg.UnifiedStorage[gr.String()] |
||||
if ok { |
||||
switch config.DualWriterMode { |
||||
case rest.Mode0: |
||||
status.WriteLegacy = true |
||||
status.WriteUnified = false |
||||
status.ReadUnified = false |
||||
case rest.Mode1, rest.Mode2: // only difference is that 2 will error!
|
||||
status.WriteLegacy = true |
||||
status.WriteUnified = true |
||||
status.ReadUnified = false |
||||
case rest.Mode3: |
||||
status.WriteLegacy = true |
||||
status.WriteUnified = true |
||||
status.ReadUnified = true |
||||
case rest.Mode4, rest.Mode5: |
||||
status.WriteLegacy = false |
||||
status.WriteUnified = true |
||||
status.ReadUnified = true |
||||
} |
||||
} |
||||
return status, nil |
||||
} |
||||
|
||||
// Update implements Service.
|
||||
func (m *staticService) Update(ctx context.Context, status StorageStatus) (StorageStatus, error) { |
||||
return StorageStatus{}, fmt.Errorf("not implemented") |
||||
} |
@ -0,0 +1,199 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
|
||||
"github.com/stretchr/testify/mock" |
||||
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion" |
||||
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" |
||||
"k8s.io/apimachinery/pkg/runtime" |
||||
"k8s.io/apiserver/pkg/registry/rest" |
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" |
||||
) |
||||
|
||||
type legacyStoreMock struct { |
||||
*mock.Mock |
||||
grafanarest.LegacyStorage |
||||
} |
||||
|
||||
type storageMock struct { |
||||
*mock.Mock |
||||
grafanarest.Storage |
||||
} |
||||
|
||||
func (m legacyStoreMock) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, name, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m legacyStoreMock) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, obj, createValidation, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m legacyStoreMock) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m legacyStoreMock) NewList() runtime.Object { |
||||
return nil |
||||
} |
||||
|
||||
func (m legacyStoreMock) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, false, errors.New("context canceled") |
||||
default: |
||||
} |
||||
args := m.Called(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
||||
if err := args.Get(2); err != nil { |
||||
return nil, false, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Bool(1), args.Error(2) |
||||
} |
||||
|
||||
func (m legacyStoreMock) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, false, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, name, deleteValidation, options) |
||||
if err := args.Get(2); err != nil { |
||||
return nil, false, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Bool(1), args.Error(2) |
||||
} |
||||
|
||||
func (m legacyStoreMock) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
args := m.Called(ctx, deleteValidation, options, listOptions) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
// Unified Store
|
||||
func (m storageMock) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, name, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m storageMock) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, obj, createValidation, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m storageMock) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, options) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
||||
|
||||
func (m storageMock) NewList() runtime.Object { |
||||
return nil |
||||
} |
||||
|
||||
func (m storageMock) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, false, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options) |
||||
if err := args.Get(2); err != nil { |
||||
return nil, false, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Bool(1), args.Error(2) |
||||
} |
||||
|
||||
func (m storageMock) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, false, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, name, deleteValidation, options) |
||||
if err := args.Get(2); err != nil { |
||||
return nil, false, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Bool(1), args.Error(2) |
||||
} |
||||
|
||||
func (m storageMock) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) { |
||||
select { |
||||
case <-ctx.Done(): |
||||
return nil, errors.New("context canceled") |
||||
default: |
||||
} |
||||
|
||||
args := m.Called(ctx, deleteValidation, options, listOptions) |
||||
if err := args.Get(1); err != nil { |
||||
return nil, err.(error) |
||||
} |
||||
return args.Get(0).(runtime.Object), args.Error(1) |
||||
} |
@ -0,0 +1,52 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"context" |
||||
|
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
grafanarest "github.com/grafana/grafana/pkg/apiserver/rest" |
||||
) |
||||
|
||||
// For *legacy* services, this will indicate if we have transitioned to Unified storage yet
|
||||
type StorageStatus struct { |
||||
Group string `json:"group" xorm:"group"` |
||||
Resource string `json:"resource" xorm:"resource"` |
||||
WriteLegacy bool `json:"write_legacy" xorm:"write_legacy"` |
||||
WriteUnified bool `json:"write_unified" xorm:"write_unified"` |
||||
|
||||
// Unified is the primary source (legacy may be secondary)
|
||||
ReadUnified bool `json:"read_unified" xorm:"read_unified"` |
||||
|
||||
// Timestamp when a migration finished
|
||||
Migrated int64 `json:"migrated" xorm:"migrated"` |
||||
|
||||
// Timestamp when a migration *started* this should be cleared when finished
|
||||
// While migrating all write commands will be unavailable
|
||||
Migrating int64 `json:"migrating" xorm:"migrating"` |
||||
|
||||
// When false, the behavior will not change at runtime
|
||||
Runtime bool `json:"runtime" xorm:"runtime"` |
||||
|
||||
// UpdateKey used for optimistic locking -- requests to change the status must match previous value
|
||||
UpdateKey int64 `json:"update_key" xorm:"update_key"` |
||||
} |
||||
|
||||
type Service interface { |
||||
ShouldManage(gr schema.GroupResource) bool |
||||
|
||||
// Create a managed k8s storage instance
|
||||
NewStorage(gr schema.GroupResource, legacy grafanarest.LegacyStorage, storage grafanarest.Storage) (grafanarest.Storage, error) |
||||
|
||||
// Check if the dual writes is reading from unified storage (mode3++)
|
||||
ReadFromUnified(ctx context.Context, gr schema.GroupResource) (bool, error) |
||||
|
||||
// Get status details for a Group/Resource
|
||||
Status(ctx context.Context, gr schema.GroupResource) (StorageStatus, error) |
||||
|
||||
// Start a migration process (writes will be locked)
|
||||
StartMigration(ctx context.Context, gr schema.GroupResource, key int64) (StorageStatus, error) |
||||
|
||||
// change the status (finish migration etc)
|
||||
Update(ctx context.Context, status StorageStatus) (StorageStatus, error) |
||||
} |
@ -0,0 +1,18 @@ |
||||
package dualwrite |
||||
|
||||
import ( |
||||
"golang.org/x/net/context" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
dashboard "github.com/grafana/grafana/pkg/apis/dashboard" |
||||
folders "github.com/grafana/grafana/pkg/apis/folder/v0alpha1" |
||||
) |
||||
|
||||
func IsReadingLegacyDashboardsAndFolders(ctx context.Context, svc Service) bool { |
||||
f, _ := svc.ReadFromUnified(ctx, folders.FolderResourceInfo.GroupResource()) |
||||
d, _ := svc.ReadFromUnified(ctx, schema.GroupResource{ |
||||
Group: dashboard.GROUP, |
||||
Resource: dashboard.DASHBOARD_RESOURCE, |
||||
}) |
||||
return !(f && d) |
||||
} |
@ -1,20 +1,58 @@ |
||||
package resource |
||||
|
||||
import ( |
||||
"github.com/grafana/grafana/pkg/apiserver/rest" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
"context" |
||||
|
||||
"google.golang.org/grpc" |
||||
"k8s.io/apimachinery/pkg/runtime/schema" |
||||
|
||||
"github.com/grafana/grafana/pkg/storage/legacysql/dualwrite" |
||||
) |
||||
|
||||
func NewSearchClient(cfg *setting.Cfg, unifiedStorageConfigKey string, unifiedClient ResourceClient, legacyClient ResourceIndexClient) ResourceIndexClient { |
||||
config, ok := cfg.UnifiedStorage[unifiedStorageConfigKey] |
||||
if !ok { |
||||
return legacyClient |
||||
func NewSearchClient(dual dualwrite.Service, gr schema.GroupResource, unifiedClient ResourceIndexClient, legacyClient ResourceIndexClient) ResourceIndexClient { |
||||
status, _ := dual.Status(context.Background(), gr) |
||||
if status.Runtime && dual.ShouldManage(gr) { |
||||
return &searchWrapper{ |
||||
dual: dual, |
||||
groupResource: gr, |
||||
unifiedClient: unifiedClient, |
||||
legacyClient: legacyClient, |
||||
} |
||||
} |
||||
|
||||
switch config.DualWriterMode { |
||||
case rest.Mode0, rest.Mode1, rest.Mode2: |
||||
return legacyClient |
||||
default: |
||||
if status.ReadUnified { |
||||
return unifiedClient |
||||
} |
||||
return legacyClient |
||||
} |
||||
|
||||
type searchWrapper struct { |
||||
dual dualwrite.Service |
||||
groupResource schema.GroupResource |
||||
|
||||
unifiedClient ResourceIndexClient |
||||
legacyClient ResourceIndexClient |
||||
} |
||||
|
||||
func (s *searchWrapper) GetStats(ctx context.Context, in *ResourceStatsRequest, opts ...grpc.CallOption) (*ResourceStatsResponse, error) { |
||||
client := s.legacyClient |
||||
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if unified { |
||||
client = s.unifiedClient |
||||
} |
||||
return client.GetStats(ctx, in, opts...) |
||||
} |
||||
|
||||
func (s *searchWrapper) Search(ctx context.Context, in *ResourceSearchRequest, opts ...grpc.CallOption) (*ResourceSearchResponse, error) { |
||||
client := s.legacyClient |
||||
unified, err := s.dual.ReadFromUnified(ctx, s.groupResource) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
if unified { |
||||
client = s.unifiedClient |
||||
} |
||||
return client.Search(ctx, in, opts...) |
||||
} |
||||
|
Loading…
Reference in new issue