Dual writer: mode 3 (#90045)

* Dual writer: mode 3

* Add integration tests for playlits in mode 3

* Remove todo

* Update pkg/apiserver/rest/dualwriter_mode3.go

Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com>

* Admin: Fixes an issue where user accounts could not be enabled (#88117)

Fix: unable to enable user

* [REVIEW] FInish mode 3 and add tests

* Improve logging

* Update dependencies

* Update pkg/apiserver/rest/dualwriter_mode3_test.go

Co-authored-by: maicon <maiconscosta@gmail.com>

* remove test assertion

* Use mode log when dual writer is initiated

---------

Co-authored-by: Arati R. <33031346+suntala@users.noreply.github.com>
Co-authored-by: gonvee <gonvee@qq.com>
Co-authored-by: maicon <maiconscosta@gmail.com>
pull/90882/head
Leonor Oliveira 10 months ago committed by GitHub
parent 5e3a5b355e
commit 67b74e1e8a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 16
      go.work.sum
  2. 1
      pkg/apiserver/rest/dualwriter_mode1_test.go
  3. 158
      pkg/apiserver/rest/dualwriter_mode3.go
  4. 424
      pkg/apiserver/rest/dualwriter_mode3_test.go
  5. 56
      pkg/tests/apis/playlist/playlist_test.go

@ -402,6 +402,12 @@ github.com/grafana/e2e v0.1.1/go.mod h1:RpNLgae5VT+BUHvPE+/zSypmOXKwEu4t+tnEMS1A
github.com/grafana/grafana-cloud-migration-snapshot v1.2.0 h1:FCUWASPPzGGbF2jTutR5i3rmoQdmnC4bypwJswdW3fI=
github.com/grafana/grafana-cloud-migration-snapshot v1.2.0/go.mod h1:bd6Cm06EK0MzRO5ahUpbDz1SxNOKu+fzladbaRPHZPY=
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1/go.mod h1:DkxMin+qOh1Fgkxfbt+CUfBqqsCQJMG9op8Os/irBPA=
github.com/grafana/grafana-azure-sdk-go/v2 v2.1.0 h1:lajVqTWaE96MpbjZToj7EshvqgRWOfYNkD4MbIZizaY=
github.com/grafana/grafana-azure-sdk-go/v2 v2.1.0/go.mod h1:aKlFPE36IDa8qccRg3KbgZX3MQ5xymS3RelT4j6kkVU=
github.com/grafana/grafana-plugin-sdk-go v0.235.0/go.mod h1:6n9LbrjGL3xAATntYVNcIi90G9BVHRJjzHKz5FXVfWw=
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240701135906-559738ce6ae1/go.mod h1:DkxMin+qOh1Fgkxfbt+CUfBqqsCQJMG9op8Os/irBPA=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240422145632-c33c6b5b6e6b h1:HCbWyVL6vi7gxyO76gQksSPH203oBJ1MJ3JcG1OQlsg=
github.com/grafana/prometheus-alertmanager v0.25.1-0.20240422145632-c33c6b5b6e6b/go.mod h1:01sXtHoRwI8W324IPAzuxDFOmALqYLCOhvSC2fUHWXc=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645 h1:MJG/KsmcqMwFAkh8mTnAwhyKoB+sTAnY4CACC110tbU=
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645/go.mod h1:6iZfnjpejD4L/4DwD7NryNaJyCQdzwWwH2MWhCA90Kw=
@ -793,15 +799,24 @@ go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1 h1:ZqR
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetricgrpc v1.23.1/go.mod h1:D7ynngPWlGJrqyGSDOdscuv7uqttfCE3jcBvffDv9y4=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1 h1:q/Nj5/2TZRIt6PderQ9oU0M00fzoe8UZuINGw6ETGTw=
go.opentelemetry.io/otel/exporters/otlp/otlpmetric/otlpmetrichttp v1.23.1/go.mod h1:DTE9yAu6r08jU3xa68GiSeI7oRcSEQ2RpKbbQGO+dWM=
go.opentelemetry.io/otel/exporters/otlp/otlptrace v1.26.0/go.mod h1:z46paqbJ9l7c9fIPCXTqTGwhQZ5XoTIsfeFYWboizjs=
go.opentelemetry.io/otel/exporters/otlp/otlptrace/otlptracegrpc v1.26.0/go.mod h1:wnJIG4fOqyynOnnQF/eQb4/16VlX2EJAHhHgqIqWfAo=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0 h1:NQc0epfL0xItsmGgSXgfbH2C1fq2VLXkZoDFsfRNHpc=
go.opentelemetry.io/otel/exporters/prometheus v0.37.0/go.mod h1:hB8qWjsStK36t50/R0V2ULFb4u95X/Q6zupXLgvjTh8=
go.opentelemetry.io/otel/exporters/prometheus v0.46.0 h1:I8WIFXR351FoLJYuloU4EgXbtNX2URfU/85pUPheIEQ=
go.opentelemetry.io/otel/exporters/prometheus v0.46.0/go.mod h1:ztwVUHe5DTR/1v7PeuGRnU5Bbd4QKYwApWmuutKsJSs=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.23.1 h1:C8r95vDR125t815KD+b1tI0Fbc1pFnwHTBxkbIZ6Szc=
go.opentelemetry.io/otel/exporters/stdout/stdoutmetric v1.23.1/go.mod h1:Qr0qomr64jentMtOjWMbtYeJMSuMSlsPEjmnRA2sWZ4=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0 h1:s0PHtIkN+3xrbDOpt2M8OTG92cWqUESvzh2MxiR5xY8=
go.opentelemetry.io/otel/exporters/stdout/stdouttrace v1.24.0/go.mod h1:hZlFbDbRt++MMPCCfSJfmhkGIWnX1h3XjkfxZUjLrIA=
go.opentelemetry.io/otel/metric v1.26.0/go.mod h1:SY+rHOI4cEawI9a7N1A4nIg/nTQXe1ccCNWYOJUrpX4=
go.opentelemetry.io/otel/sdk v1.26.0/go.mod h1:0p8MXpqLeJ0pzcszQQN4F0S5FVjBLgypeGSngLsmirs=
go.opentelemetry.io/otel/sdk/metric v0.39.0 h1:Kun8i1eYf48kHH83RucG93ffz0zGV1sh46FAScOTuDI=
go.opentelemetry.io/otel/sdk/metric v0.39.0/go.mod h1:piDIRgjcK7u0HCL5pCA4e74qpK/jk3NiUoAHATVAmiI=
go.opentelemetry.io/otel/sdk/metric v1.26.0 h1:cWSks5tfriHPdWFnl+qpX3P681aAYqlZHcAyHw5aU9Y=
go.opentelemetry.io/otel/sdk/metric v1.26.0/go.mod h1:ClMFFknnThJCksebJwz7KIyEDHO+nTB6gK8obLy8RyE=
go.opentelemetry.io/otel/trace v1.26.0/go.mod h1:4iDxvGDQuUkHve82hJJ8UqrwswHYsZuWCBllGV2U2y0=
go.opentelemetry.io/proto/otlp v1.2.0/go.mod h1:gGpR8txAl5M03pDhMC79G6SdqNV26naRm/KDsgaHD8A=
go.uber.org/automaxprocs v1.5.3 h1:kWazyxZUrS3Gs4qUpbwo5kEIMGe/DAvi5Z4tl2NW4j8=
go.uber.org/tools v0.0.0-20190618225709-2cfd321de3ee h1:0mgffUl7nfd+FpvXMVz4IDEaUSmT1ysygQC7qYo7sG4=
golang.org/x/arch v0.4.0 h1:A8WCeEWhLwPBKNbFi5Wv5UTCBx5zzubnXDlMOFAzFMc=
@ -825,6 +840,7 @@ google.golang.org/genproto/googleapis/api v0.0.0-20240318140521-94a12d6c2237/go.
google.golang.org/genproto/googleapis/api v0.0.0-20240604185151-ef581f913117/go.mod h1:OimBR/bc1wPO9iV4NC2bpyjy3VnAwZh5EBPQdtaE5oo=
google.golang.org/genproto/googleapis/bytestream v0.0.0-20240325203815-454cdb8f5daa h1:wBkzraZsSqhj1M4L/nMrljUU6XasJkgHvUsq8oRGwF0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240318140521-94a12d6c2237/go.mod h1:WtryC6hu0hhx87FDGxWCDptyssuo68sk10vYjF+T9fY=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240513163218-0867130af1f8/go.mod h1:I7Y+G38R2bu5j1aLzfFmQfTcU/WnFuqDwLZAbvKTKpM=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240604185151-ef581f913117/go.mod h1:EfXuqaE1J41VCDicxHzUDm+8rk+7ZdXzHV0IhO/I6s0=
google.golang.org/genproto/googleapis/rpc v0.0.0-20240624140628-dc46fd24d27d/go.mod h1:Ue6ibwXGpU+dqIcODieyLOcgj7z8+IcskoNIgZxtrFY=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0 h1:M1YKkFIboKNieVO5DLUEVzQfGwJD30Nv2jfUgzb5UcE=

@ -135,7 +135,6 @@ func TestMode1_Get(t *testing.T) {
tt.setupStorageFn(m, tt.input)
}
p := prometheus.NewRegistry()
dw := NewDualWriter(Mode1, ls, us, p)
obj, err := dw.Get(context.Background(), tt.input, &metav1.GetOptions{})

@ -2,8 +2,9 @@ package rest
import (
"context"
"errors"
"time"
apierrors "k8s.io/apimachinery/pkg/api/errors"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
@ -21,7 +22,7 @@ type DualWriterMode3 struct {
// newDualWriterMode3 returns a new DualWriter in mode 3.
// Mode 3 represents writing to LegacyStorage and Storage and reading from Storage.
func newDualWriterMode3(legacy LegacyStorage, storage Storage, dwm *dualWriterMetrics) *DualWriterMode3 {
return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3"), dualWriterMetrics: dwm}
return &DualWriterMode3{Legacy: legacy, Storage: storage, Log: klog.NewKlogr().WithName("DualWriterMode3").WithValues("mode", mode3Str), dualWriterMetrics: dwm}
}
// Mode returns the mode of the dual writer.
@ -29,106 +30,143 @@ func (d *DualWriterMode3) Mode() DualWriterMode {
return Mode3
}
const mode3Str = "3"
// Create overrides the behavior of the generic DualWriter and writes to LegacyStorage and Storage.
func (d *DualWriterMode3) Create(ctx context.Context, obj runtime.Object, createValidation rest.ValidateObjectFunc, options *metav1.CreateOptions) (runtime.Object, error) {
log := klog.FromContext(ctx)
var method = "create"
log := d.Log.WithValues("kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
created, err := d.Storage.Create(ctx, obj, createValidation, options)
if err != nil {
log.Error(err, "unable to create object in storage")
d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage)
return created, err
}
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
if _, err := d.Legacy.Create(ctx, obj, createValidation, options); err != nil {
log.WithValues("object", created).Error(err, "unable to create object in legacy storage")
}
return created, nil
go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy create timeout"))
defer cancel()
startLegacy := time.Now()
_, errObjectSt := d.Legacy.Create(ctx, obj, createValidation, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy)
}()
return created, err
}
// Get overrides the behavior of the generic DualWriter and retrieves an object from Storage.
func (d *DualWriterMode3) Get(ctx context.Context, name string, options *metav1.GetOptions) (runtime.Object, error) {
return d.Storage.Get(ctx, name, &metav1.GetOptions{})
var method = "get"
log := d.Log.WithValues("kind", options.Kind, "name", name, "method", method)
ctx = klog.NewContext(ctx, log)
startStorage := time.Now()
res, err := d.Storage.Get(ctx, name, options)
if err != nil {
log.Error(err, "unable to get object in storage")
}
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage)
return res, err
}
func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name)
// List overrides the behavior of the generic DualWriter and reads only from Unified Store.
func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
var method = "list"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", options.ResourceVersion, "method", method)
ctx = klog.NewContext(ctx, log)
deleted, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
startStorage := time.Now()
res, err := d.Storage.List(ctx, options)
if err != nil {
if !apierrors.IsNotFound(err) {
log.Error(err, "could not delete from unified store")
return deleted, async, err
}
log.Error(err, "unable to list object in storage")
}
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startStorage)
_, _, errLS := d.Legacy.Delete(ctx, name, deleteValidation, options)
if errLS != nil {
if !apierrors.IsNotFound(errLS) {
log.WithValues("deleted", deleted).Error(errLS, "could not delete from legacy store")
}
return res, err
}
func (d *DualWriterMode3) Delete(ctx context.Context, name string, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions) (runtime.Object, bool, error) {
var method = "delete"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, d.Log)
startStorage := time.Now()
res, async, err := d.Storage.Delete(ctx, name, deleteValidation, options)
if err != nil {
log.Error(err, "unable to delete object in storage")
d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage)
return res, async, err
}
d.recordStorageDuration(false, mode3Str, name, method, startStorage)
return deleted, async, err
go func() {
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy delete timeout"))
defer cancel()
_, _, err := d.Legacy.Delete(ctx, name, deleteValidation, options)
d.recordLegacyDuration(err != nil, mode3Str, options.Kind, method, startLegacy)
}()
return res, async, err
}
// Update overrides the behavior of the generic DualWriter and writes first to Storage and then to LegacyStorage.
func (d *DualWriterMode3) Update(ctx context.Context, name string, objInfo rest.UpdatedObjectInfo, createValidation rest.ValidateObjectFunc, updateValidation rest.ValidateObjectUpdateFunc, forceAllowCreate bool, options *metav1.UpdateOptions) (runtime.Object, bool, error) {
log := d.Log.WithValues("name", name)
var method = "update"
log := d.Log.WithValues("name", name, "kind", options.Kind, "method", method)
ctx = klog.NewContext(ctx, log)
old, err := d.Storage.Get(ctx, name, &metav1.GetOptions{})
if err != nil {
log.WithValues("object", old).Error(err, "could not get object to update")
return nil, false, err
}
updated, err := objInfo.UpdatedObject(ctx, old)
startStorage := time.Now()
res, async, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.WithValues("object", updated).Error(err, "could not update or create object")
return nil, false, err
}
objInfo = &updateWrapper{
upstream: objInfo,
updated: updated,
log.Error(err, "unable to update in storage")
d.recordLegacyDuration(true, mode3Str, options.Kind, method, startStorage)
return res, async, err
}
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
obj, created, err := d.Storage.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
if err != nil {
log.WithValues("object", obj).Error(err, "could not write to US")
return obj, created, err
}
go func() {
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy update timeout"))
_, _, errLeg := d.Legacy.Update(ctx, name, &updateWrapper{
upstream: objInfo,
updated: obj,
}, createValidation, updateValidation, forceAllowCreate, options)
if errLeg != nil {
log.Error(errLeg, "could not update object in legacy store")
}
return obj, created, err
startLegacy := time.Now()
defer cancel()
_, _, errObjectSt := d.Legacy.Update(ctx, name, objInfo, createValidation, updateValidation, forceAllowCreate, options)
d.recordLegacyDuration(errObjectSt != nil, mode3Str, options.Kind, method, startLegacy)
}()
return res, async, err
}
// DeleteCollection overrides the behavior of the generic DualWriter and deletes from both LegacyStorage and Storage.
func (d *DualWriterMode3) DeleteCollection(ctx context.Context, deleteValidation rest.ValidateObjectFunc, options *metav1.DeleteOptions, listOptions *metainternalversion.ListOptions) (runtime.Object, error) {
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion)
var method = "delete-collection"
log := d.Log.WithValues("kind", options.Kind, "resourceVersion", listOptions.ResourceVersion, "method", method)
ctx = klog.NewContext(ctx, log)
deleted, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
startStorage := time.Now()
res, err := d.Storage.DeleteCollection(ctx, deleteValidation, options, listOptions)
if err != nil {
log.Error(err, "failed to delete collection successfully from Storage")
log.Error(err, "unable to delete collection in storage")
d.recordStorageDuration(true, mode3Str, options.Kind, method, startStorage)
return res, err
}
d.recordStorageDuration(false, mode3Str, options.Kind, method, startStorage)
if deleted, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions); err != nil {
log.WithValues("deleted", deleted).Error(err, "failed to delete collection successfully from LegacyStorage")
}
go func() {
startLegacy := time.Now()
ctx, cancel := context.WithTimeoutCause(ctx, time.Second*10, errors.New("legacy deletecollection timeout"))
defer cancel()
_, err := d.Legacy.DeleteCollection(ctx, deleteValidation, options, listOptions)
d.recordStorageDuration(err != nil, mode3Str, options.Kind, method, startLegacy)
}()
return deleted, err
}
func (d *DualWriterMode3) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
//TODO: implement List
klog.Error("List not implemented")
return nil, nil
return res, err
}
func (d *DualWriterMode3) Destroy() {

@ -1,72 +1,356 @@
package rest
// import (
// "context"
// "testing"
// "github.com/stretchr/testify/assert"
// metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
// metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
// "k8s.io/apimachinery/pkg/runtime"
// "k8s.io/apiserver/pkg/apis/example"
// )
// func TestMode3(t *testing.T) {
// var ls = (LegacyStorage)(nil)
// var s = (Storage)(nil)
// lsSpy := NewLegacyStorageSpyClient(ls)
// sSpy := NewStorageSpyClient(s)
// dw := NewDualWriterMode3(lsSpy, sSpy)
// // Create: it should use the Legacy Create implementation
// _, err := dw.Create(context.Background(), &dummyObject{}, func(context.Context, runtime.Object) error { return nil }, &metav1.CreateOptions{})
// assert.NoError(t, err)
// assert.Equal(t, 1, lsSpy.Counts("LegacyStorage.Create"))
// assert.Equal(t, 1, sSpy.Counts("Storage.Create"))
// // Get: it should use the Storage Get implementation
// _, err = dw.Get(context.Background(), kind, &metav1.GetOptions{})
// assert.NoError(t, err)
// assert.Equal(t, 0, lsSpy.Counts("LegacyStorage.Get"))
// assert.Equal(t, 1, sSpy.Counts("Storage.Get"))
// // List: it should use the Storage List implementation
// _, err = dw.List(context.Background(), &metainternalversion.ListOptions{})
// assert.NoError(t, err)
// assert.Equal(t, 0, lsSpy.Counts("LegacyStorage.List"))
// assert.Equal(t, 1, sSpy.Counts("Storage.List"))
// // Delete: it should use call both Legacy and Storage Delete methods
// var deleteValidation = func(ctx context.Context, obj runtime.Object) error { return nil }
// _, _, err = dw.Delete(context.Background(), kind, deleteValidation, &metav1.DeleteOptions{})
// assert.NoError(t, err)
// assert.Equal(t, 1, lsSpy.Counts("LegacyStorage.Delete"))
// assert.Equal(t, 1, sSpy.Counts("Storage.Delete"))
// // DeleteCollection: it should delete from both LegacyStorage and Storage
// _, err = dw.DeleteCollection(
// context.Background(),
// func(context.Context, runtime.Object) error { return nil },
// &metav1.DeleteOptions{},
// &metainternalversion.ListOptions{},
// )
// assert.NoError(t, err)
// assert.Equal(t, 1, lsSpy.Counts("LegacyStorage.DeleteCollection"))
// assert.Equal(t, 1, sSpy.Counts("Storage.DeleteCollection"))
// // Update: it should update in both storages
// dummy := &example.Pod{}
// uoi := UpdatedObjInfoObj{}
// _, err = uoi.UpdatedObject(context.Background(), dummy)
// assert.NoError(t, err)
// var validateObjFn = func(ctx context.Context, obj runtime.Object) error { return nil }
// var validateObjUpdateFn = func(ctx context.Context, obj, old runtime.Object) error { return nil }
// _, _, err = dw.Update(context.Background(), kind, uoi, validateObjFn, validateObjUpdateFn, false, &metav1.UpdateOptions{})
// assert.NoError(t, err)
// assert.Equal(t, 1, lsSpy.Counts("LegacyStorage.Update"))
// assert.Equal(t, 1, sSpy.Counts("Storage.Update"))
// assert.NoError(t, err)
// }
import (
"context"
"errors"
"testing"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"k8s.io/apimachinery/pkg/api/meta"
metainternalversion "k8s.io/apimachinery/pkg/apis/meta/internalversion"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/runtime"
)
func TestMode3_Create(t *testing.T) {
type testCase struct {
input runtime.Object
setupLegacyFn func(m *mock.Mock, input runtime.Object)
setupStorageFn func(m *mock.Mock)
name string
wantErr bool
}
tests :=
[]testCase{
{
name: "creating an object only in the unified store",
input: exampleObj,
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, input, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
setupStorageFn: func(m *mock.Mock) {
m.On("Create", mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "error when creating object in the unified store fails",
input: failingObj,
setupLegacyFn: func(m *mock.Mock, input runtime.Object) {
m.On("Create", mock.Anything, failingObj, mock.Anything, mock.Anything).Return(nil, errors.New("error"))
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m)
}
dw := NewDualWriter(Mode3, ls, us, p)
obj, err := dw.Create(context.Background(), tt.input, func(context.Context, runtime.Object) error { return nil }, &metav1.CreateOptions{})
if tt.wantErr {
assert.Error(t, err)
return
}
acc, err := meta.Accessor(obj)
assert.NoError(t, err)
assert.Equal(t, acc.GetResourceVersion(), "1")
assert.NotEqual(t, obj, anotherObj)
})
}
}
func TestMode3_Get(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, name string)
name string
input string
wantErr bool
}
tests :=
[]testCase{
{
name: "get an object only in unified store",
input: "foo",
setupStorageFn: func(m *mock.Mock, name string) {
m.On("Get", mock.Anything, name, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "error when getting an object in the unified store fails",
input: "object-fail",
setupStorageFn: func(m *mock.Mock, name string) {
m.On("Get", mock.Anything, name, mock.Anything).Return(nil, errors.New("error"))
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
}
p := prometheus.NewRegistry()
dw := NewDualWriter(Mode3, ls, us, p)
obj, err := dw.Get(context.Background(), tt.input, &metav1.GetOptions{})
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
})
}
}
func TestMode3_List(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, options *metainternalversion.ListOptions)
name string
options *metainternalversion.ListOptions
wantErr bool
}
tests :=
[]testCase{
{
name: "error when listing an object in the unified store is not implemented",
options: &metainternalversion.ListOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}},
setupStorageFn: func(m *mock.Mock, options *metainternalversion.ListOptions) {
m.On("List", mock.Anything, options).Return(nil, errors.New("error"))
},
wantErr: true,
},
{
name: "list objects in the unified store",
options: &metainternalversion.ListOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
setupStorageFn: func(m *mock.Mock, options *metainternalversion.ListOptions) {
m.On("List", mock.Anything, options).Return(exampleList, nil)
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.options)
}
dw := NewDualWriter(Mode3, ls, us, p)
res, err := dw.List(context.Background(), tt.options)
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equal(t, exampleList, res)
assert.NotEqual(t, anotherList, res)
})
}
}
func TestMode3_Delete(t *testing.T) {
type testCase struct {
setupStorageFn func(m *mock.Mock, name string)
name string
input string
wantErr bool
}
tests :=
[]testCase{
{
name: "deleting an object in the unified store",
input: "foo",
setupStorageFn: func(m *mock.Mock, name string) {
m.On("Delete", mock.Anything, name, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
},
{
name: "error when deleting an object in the unified store",
input: "object-fail",
setupStorageFn: func(m *mock.Mock, name string) {
m.On("Delete", mock.Anything, name, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
}
dw := NewDualWriter(Mode3, ls, us, p)
obj, _, err := dw.Delete(context.Background(), tt.input, func(ctx context.Context, obj runtime.Object) error { return nil }, &metav1.DeleteOptions{})
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
})
}
}
func TestMode3_DeleteCollection(t *testing.T) {
type testCase struct {
input *metav1.DeleteOptions
setupStorageFn func(m *mock.Mock, input *metav1.DeleteOptions)
name string
wantErr bool
}
tests :=
[]testCase{
{
name: "deleting a collection in the unified store",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "foo"}},
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", mock.Anything, mock.Anything, input, mock.Anything).Return(exampleObj, nil)
},
},
{
name: "error deleting a collection in the unified store",
input: &metav1.DeleteOptions{TypeMeta: metav1.TypeMeta{Kind: "fail"}},
setupStorageFn: func(m *mock.Mock, input *metav1.DeleteOptions) {
m.On("DeleteCollection", mock.Anything, mock.Anything, input, mock.Anything).Return(nil, errors.New("error"))
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
}
dw := NewDualWriter(Mode3, ls, us, p)
obj, err := dw.DeleteCollection(context.Background(), func(ctx context.Context, obj runtime.Object) error { return nil }, tt.input, &metainternalversion.ListOptions{})
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
})
}
}
func TestMode3_Update(t *testing.T) {
type testCase struct {
setupLegacyFn func(m *mock.Mock, input string)
setupStorageFn func(m *mock.Mock, input string)
name string
input string
wantErr bool
}
tests :=
[]testCase{
{
name: "update an object in unified store",
input: "foo",
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
setupLegacyFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(exampleObj, false, nil)
},
},
{
name: "error updating an object in unified store",
input: "object-fail",
setupStorageFn: func(m *mock.Mock, input string) {
m.On("Update", mock.Anything, input, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil, false, errors.New("error"))
},
wantErr: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
l := (LegacyStorage)(nil)
s := (Storage)(nil)
m := &mock.Mock{}
ls := legacyStoreMock{m, l}
us := storageMock{m, s}
if tt.setupLegacyFn != nil {
tt.setupLegacyFn(m, tt.input)
}
if tt.setupStorageFn != nil {
tt.setupStorageFn(m, tt.input)
}
dw := NewDualWriter(Mode3, ls, us, p)
obj, _, err := dw.Update(context.Background(), tt.input, updatedObjInfoObj{}, func(ctx context.Context, obj runtime.Object) error { return nil }, func(ctx context.Context, obj, old runtime.Object) error { return nil }, false, &metav1.UpdateOptions{})
if tt.wantErr {
assert.Error(t, err)
return
}
assert.Equal(t, obj, exampleObj)
assert.NotEqual(t, obj, anotherObj)
})
}
}

@ -129,6 +129,20 @@ func TestIntegrationPlaylist(t *testing.T) {
}))
})
t.Run("with dual write (file, mode 3)", func(t *testing.T) {
doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
DisableAnonymous: true,
APIServerStorageType: "file", // write the files to disk
EnableFeatureToggles: []string{
featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written
},
DualWriterDesiredModes: map[string]grafanarest.DualWriterMode{
playlistv0alpha1.GROUPRESOURCE: grafanarest.Mode3,
},
}))
})
t.Run("with dual write (unified storage, mode 0)", func(t *testing.T) {
doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: false, // required for unified storage
@ -174,6 +188,21 @@ func TestIntegrationPlaylist(t *testing.T) {
}))
})
t.Run("with dual write (unified storage, mode 3)", func(t *testing.T) {
doPlaylistTests(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: false, // required for unified storage
DisableAnonymous: true,
APIServerStorageType: "unified", // use the entity api tables
EnableFeatureToggles: []string{
featuremgmt.FlagUnifiedStorage,
featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written
},
DualWriterDesiredModes: map[string]grafanarest.DualWriterMode{
playlistv0alpha1.GROUPRESOURCE: grafanarest.Mode3,
},
}))
})
t.Run("with dual write (etcd, mode 0)", func(t *testing.T) {
// NOTE: running local etcd, that will be wiped clean!
t.Skip("local etcd testing")
@ -254,6 +283,33 @@ func TestIntegrationPlaylist(t *testing.T) {
doPlaylistTests(t, helper)
})
t.Run("with dual write (etcd, mode 3)", func(t *testing.T) {
// NOTE: running local etcd, that will be wiped clean!
t.Skip("local etcd testing")
helper := apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
DisableAnonymous: true,
APIServerStorageType: "etcd", // requires etcd running on localhost:2379
EnableFeatureToggles: []string{
featuremgmt.FlagKubernetesPlaylists, // Required so that legacy calls are also written
},
DualWriterDesiredModes: map[string]grafanarest.DualWriterMode{
playlistv0alpha1.GROUPRESOURCE: grafanarest.Mode3,
},
})
// Clear the collection before starting (etcd)
client := helper.GetResourceClient(apis.ResourceClientArgs{
User: helper.Org1.Admin,
GVR: gvr,
})
err := client.Resource.DeleteCollection(context.Background(), metav1.DeleteOptions{}, metav1.ListOptions{})
require.NoError(t, err)
doPlaylistTests(t, helper)
})
}
func doPlaylistTests(t *testing.T, helper *apis.K8sTestHelper) *apis.K8sTestHelper {

Loading…
Cancel
Save