fix(unified-storage): use continue token containing both formats for dualwriter (#106525)

pull/106716/head
Jean-Philippe Quéméner 1 month ago committed by GitHub
parent 8504f7ea90
commit 5f21f320f7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      pkg/setting/setting.go
  2. 1
      pkg/setting/setting_unified_storage.go
  3. 91
      pkg/storage/legacysql/dualwrite/dualwriter.go
  4. 33
      pkg/storage/legacysql/dualwrite/dualwriter_continue_token.go
  5. 98
      pkg/storage/legacysql/dualwrite/dualwriter_continue_token_test.go
  6. 38
      pkg/storage/unified/resource/server.go
  7. 6
      pkg/storage/unified/sql/server.go
  8. 91
      pkg/tests/apis/folder/folders_test.go
  9. 7
      pkg/tests/testinfra/testinfra.go

@ -551,6 +551,7 @@ type Cfg struct {
// Unified Storage
UnifiedStorage map[string]UnifiedStorageConfig
MaxPageSizeBytes int
IndexPath string
IndexWorkers int
IndexMaxBatchSize int

@ -51,6 +51,7 @@ func (cfg *Cfg) setUnifiedStorageConfig() {
// Set indexer config for unified storaae
section := cfg.Raw.Section("unified_storage")
cfg.MaxPageSizeBytes = section.Key("max_page_size_bytes").MustInt(0)
cfg.IndexPath = section.Key("index_path").String()
cfg.IndexWorkers = section.Key("index_workers").MustInt(10)
cfg.IndexMaxBatchSize = section.Key("index_max_batch_size").MustInt(100)

@ -67,31 +67,106 @@ func (d *dualWriter) Get(ctx context.Context, name string, options *metav1.GetOp
}
func (d *dualWriter) List(ctx context.Context, options *metainternalversion.ListOptions) (runtime.Object, error) {
// Always work on *copies* so we never mutate the caller's ListOptions.
var (
legacyOptions = options.DeepCopy()
unifiedOptions = options.DeepCopy()
log = logging.FromContext(ctx).With("method", "List")
)
legacyToken, unifiedToken, err := parseContinueTokens(options.Continue)
if err != nil {
return nil, err
}
legacyOptions.Continue = legacyToken
unifiedOptions.Continue = unifiedToken
// If we read from unified, we can just do that and return.
if d.readUnified {
return d.unified.List(ctx, options)
unifiedList, err := d.unified.List(ctx, unifiedOptions)
if err != nil {
return nil, err
}
unifiedMeta, err := meta.ListAccessor(unifiedList)
if err != nil {
return nil, fmt.Errorf("failed to access legacy List MetaData: %w", err)
}
unifiedMeta.SetContinue(buildContinueToken("", unifiedMeta.GetContinue()))
return unifiedList, nil
}
// In some cases, the unified token might be there but legacy token is empty (i.e. finished iteration).
// This can happen, as unified storage iteration is doing paging not only based on the provided limit,
// but also based on the response size. This check prevents starting the new iteration again.
if options.Continue != "" && legacyToken == "" {
return nil, nil
}
// In some cases, where the stores are not in sync yet, the unified storage continue token might already
// be empty, while the legacy one is not, as it has more data. In that case we don't want to issue a new
// request with an empty continue token, resulting in getting the first page again.
// nolint:staticcheck
shouldDoUnifiedRequest := true
if options.Continue != "" && unifiedToken == "" {
shouldDoUnifiedRequest = false
}
// If legacy is still the main store, lets first read from it.
legacyList, err := d.legacy.List(ctx, options)
legacyList, err := d.legacy.List(ctx, legacyOptions)
if err != nil {
return nil, err
}
legacyMeta, err := meta.ListAccessor(legacyList)
if err != nil {
return nil, fmt.Errorf("failed to access legacy List MetaData: %w", err)
}
legacyToken = legacyMeta.GetContinue()
// Once we have successfully listed from legacy, we can check if we want to fail on a unified list.
// If we allow the unified list to fail, we can do it in the background and return.
if d.errorIsOK {
if d.errorIsOK && shouldDoUnifiedRequest {
// We would like to get continue token from unified storage, but
// don't want to wait for unified storage too long, since we're calling
// unified-storage asynchronously.
out := make(chan string, 1)
go func(ctxBg context.Context, cancel context.CancelFunc) {
defer cancel()
if _, err := d.unified.List(ctxBg, options); err != nil {
log := logging.FromContext(ctxBg).With("method", "List")
defer close(out)
unifiedList, err := d.unified.List(ctxBg, unifiedOptions)
if err != nil {
log.Error("failed background LIST to unified", "err", err)
return
}
unifiedMeta, err := meta.ListAccessor(unifiedList)
if err != nil {
log.Error("failed background LIST to unified", "err",
fmt.Errorf("failed to access unified List MetaData: %w", err))
}
out <- unifiedMeta.GetContinue()
}(context.WithTimeout(context.WithoutCancel(ctx), backgroundReqTimeout))
select {
case unifiedToken = <-out:
case <-time.After(300 * time.Millisecond):
log.Warn("timeout while waiting on the unified storage continue token")
break
}
legacyMeta.SetContinue(buildContinueToken(legacyToken, unifiedToken))
return legacyList, nil
}
// If it's not okay to fail, we have to check it in the foreground.
if _, err := d.unified.List(ctx, options); err != nil {
return nil, err
if shouldDoUnifiedRequest {
// If it's not okay to fail, we have to check it in the foreground.
unifiedList, err := d.unified.List(ctx, unifiedOptions)
if err != nil {
return nil, err
}
unifiedMeta, err := meta.ListAccessor(unifiedList)
if err != nil {
return nil, fmt.Errorf("failed to access unified List MetaData: %w", err)
}
unifiedToken = unifiedMeta.GetContinue()
}
legacyMeta.SetContinue(buildContinueToken(legacyToken, unifiedToken))
return legacyList, nil
}

@ -0,0 +1,33 @@
package dualwrite
import (
"encoding/base64"
"fmt"
"strings"
)
// parseContinueTokens splits a dualwriter continue token (legacy, unified) if we received one.
// If we receive a single token not separated by a comma, we return the token as-is as a legacy
// token and an empty unified token. This is to ensure a smooth transition to the new token format.
func parseContinueTokens(token string) (string, string, error) {
if token == "" {
return "", "", nil
}
decodedToken, err := base64.StdEncoding.DecodeString(token)
if err != nil {
return "", "", fmt.Errorf("failed to decode dualwriter continue token: %w", err)
}
decodedTokens := strings.Split(string(decodedToken), ",")
if len(decodedTokens) > 1 {
return decodedTokens[0], decodedTokens[1], nil
}
return token, "", nil
}
func buildContinueToken(legacyToken, unifiedToken string) string {
if legacyToken == "" && unifiedToken == "" {
return ""
}
return base64.StdEncoding.EncodeToString([]byte(
strings.Join([]string{legacyToken, unifiedToken}, ",")))
}

@ -0,0 +1,98 @@
package dualwrite
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestParseContinueTokens(t *testing.T) {
tcs := []struct {
name string
token string
legacyToken string
unifiedToken string
}{
{
name: "Should handle empty token",
token: "",
legacyToken: "",
unifiedToken: "",
},
{
name: "Should handle legacy token",
token: "MXwy",
legacyToken: "MXwy",
unifiedToken: "",
},
{
name: "Should handle new token format",
// both slots taken 'MXwy,eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0='
token: "TVh3eSxleUp2SWpveExDSjJJam94TnpRNU5UWTFOVFU0TURjNE9Ua3dMQ0p6SWpwbVlXeHpaWDA9",
legacyToken: "MXwy",
unifiedToken: "eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=",
},
{
name: "Should handle new token with only unified token (mode >= 3)",
// first slot empty ',eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0='
token: "LGV5SnZJam94TENKMklqb3hOelE1TlRZMU5UVTRNRGM0T1Rrd0xDSnpJanBtWVd4elpYMD0=",
legacyToken: "",
unifiedToken: "eyJvIjoxLCJ2IjoxNzQ5NTY1NTU4MDc4OTkwLCJzIjpmYWxzZX0=",
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
legacyToken, unifiedToken, err := parseContinueTokens(tc.token)
require.NoError(t, err)
require.Equal(t, legacyToken, tc.legacyToken)
require.Equal(t, unifiedToken, tc.unifiedToken)
})
}
}
func TestBuildContinueToken(t *testing.T) {
tcs := []struct {
name string
legacyToken string
unifiedToken string
shouldBeEmpty bool
}{
{
name: "Should handle both tokens",
legacyToken: "abc",
unifiedToken: "xyz",
},
{
name: "Should handle legacy token standalone",
legacyToken: "abc",
},
{
name: "Should handle unified token standalone",
unifiedToken: "xyz",
},
{
name: "Should handle both tokens empty",
shouldBeEmpty: true,
},
}
for _, tc := range tcs {
t.Run(tc.name, func(t *testing.T) {
token := buildContinueToken(tc.legacyToken, tc.unifiedToken)
legacyToken, unifiedToken, err := parseContinueTokens(token)
require.NoError(t, err)
require.Equal(t, legacyToken, tc.legacyToken)
require.Equal(t, unifiedToken, tc.unifiedToken)
if tc.shouldBeEmpty {
require.Equal(t, "", token)
}
})
}
}
func TestInvalidToken(t *testing.T) {
// nolint: gosec
invalidToken := "325232ff4fF->"
_, _, err := parseContinueTokens(invalidToken)
require.Error(t, err)
}

@ -198,6 +198,8 @@ type ResourceServerOptions struct {
storageMetrics *StorageMetrics
IndexMetrics *BleveIndexMetrics
MaxPageSizeBytes int
}
func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
@ -222,6 +224,11 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
}
}
if opts.MaxPageSizeBytes <= 0 {
// By default, we use 2MB for the page size.
opts.MaxPageSizeBytes = 1024 * 1024 * 2
}
// Initialize the blob storage
blobstore := opts.Blob.Backend
if blobstore == nil {
@ -250,19 +257,20 @@ func NewResourceServer(opts ResourceServerOptions) (ResourceServer, error) {
// Make this cancelable
ctx, cancel := context.WithCancel(context.Background())
s := &server{
tracer: opts.Tracer,
log: logger,
backend: opts.Backend,
blob: blobstore,
diagnostics: opts.Diagnostics,
access: opts.AccessClient,
writeHooks: opts.WriteHooks,
lifecycle: opts.Lifecycle,
now: opts.Now,
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
tracer: opts.Tracer,
log: logger,
backend: opts.Backend,
blob: blobstore,
diagnostics: opts.Diagnostics,
access: opts.AccessClient,
writeHooks: opts.WriteHooks,
lifecycle: opts.Lifecycle,
now: opts.Now,
ctx: ctx,
cancel: cancel,
storageMetrics: opts.storageMetrics,
indexMetrics: opts.IndexMetrics,
maxPageSizeBytes: opts.MaxPageSizeBytes,
}
if opts.Search.Resources != nil {
@ -307,6 +315,8 @@ type server struct {
// init checking
once sync.Once
initErr error
maxPageSizeBytes int
}
// Init implements ResourceServer.
@ -791,7 +801,7 @@ func (s *server) List(ctx context.Context, req *resourcepb.ListRequest) (*resour
if req.Limit < 1 {
req.Limit = 50 // default max 50 items in a page
}
maxPageBytes := 1024 * 1024 * 2 // 2mb/page
maxPageBytes := s.maxPageSizeBytes
pageBytes := 0
rsp := &resourcepb.ListResponse{}

@ -43,6 +43,12 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
opts.Blob.URL = "file:///" + dir
}
// This is mostly for testing, being able to influence when we paginate
// based on the page size during tests.
unifiedStorageCfg := cfg.SectionWithEnvOverrides("unified_storage")
maxPageSizeBytes := unifiedStorageCfg.Key("max_page_size_bytes")
opts.MaxPageSizeBytes = maxPageSizeBytes.MustInt(0)
eDB, err := dbimpl.ProvideResourceDB(db, cfg, tracer)
if err != nil {
return nil, err

@ -177,6 +177,38 @@ func TestIntegrationFoldersApp(t *testing.T) {
}))
})
// This is a general test for the unified storage list operation. We don't have a common test
// directory for now, so we (search and storage) keep it here as we own this part of the tests.
t.Run("make sure list works with continue tokens", func(t *testing.T) {
modes := []grafanarest.DualWriterMode{
grafanarest.Mode1,
grafanarest.Mode2,
grafanarest.Mode3,
grafanarest.Mode4,
grafanarest.Mode5,
}
for _, mode := range modes {
t.Run(fmt.Sprintf("mode %d", mode), func(t *testing.T) {
doListFoldersTest(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
DisableAnonymous: true,
APIServerStorageType: "unified",
UnifiedStorageConfig: map[string]setting.UnifiedStorageConfig{
folders.RESOURCEGROUP: {
DualWriterMode: mode,
},
},
// We set it to 1 here, so we always get forced pagination based on the response size.
UnifiedStorageMaxPageSizeBytes: 1,
EnableFeatureToggles: []string{
featuremgmt.FlagKubernetesClientDashboardsFolders,
featuremgmt.FlagNestedFolders,
},
}), mode)
})
}
})
t.Run("when creating a folder it should trim leading and trailing spaces", func(t *testing.T) {
doCreateEnsureTitleIsTrimmedTest(t, apis.NewK8sTestHelper(t, testinfra.GrafanaOpts{
AppModeProduction: true,
@ -488,6 +520,65 @@ func doCreateCircularReferenceFolderTest(t *testing.T, helper *apis.K8sTestHelpe
require.Equal(t, 400, create.Response.StatusCode)
}
func doListFoldersTest(t *testing.T, helper *apis.K8sTestHelper, mode grafanarest.DualWriterMode) {
client := helper.GetResourceClient(apis.ResourceClientArgs{
User: helper.Org1.Admin,
GVR: gvr,
})
foldersCount := 3
for i := 0; i < foldersCount; i++ {
payload, err := json.Marshal(map[string]interface{}{
"title": fmt.Sprintf("Test-%d", i),
"uid": fmt.Sprintf("uid-%d", i),
})
require.NoError(t, err)
parentCreate := apis.DoRequest(helper, apis.RequestParams{
User: client.Args.User,
Method: http.MethodPost,
Path: "/api/folders",
Body: payload,
}, &folder.Folder{})
require.NotNil(t, parentCreate.Result)
require.Equal(t, http.StatusOK, parentCreate.Response.StatusCode)
}
fetchedFolders, fetchItemsPerCall := checkListRequest(t, 1, client)
require.Equal(t, []string{"uid-0", "uid-1", "uid-2"}, fetchedFolders)
require.Equal(t, []int{1, 1, 1}, fetchItemsPerCall[:3])
// Now let's see if the iterator also works when we are limited by the page size, which should be set
// to 1 byte for this test. We only need to check that if we test unified storage as the primary storage,
// as legacy doesn't have such a page size limit.
if mode == grafanarest.Mode3 || mode == grafanarest.Mode4 || mode == grafanarest.Mode5 {
t.Run("check page size iterator", func(t *testing.T) {
fetchedFolders, fetchItemsPerCall := checkListRequest(t, 3, client)
require.Equal(t, []string{"uid-0", "uid-1", "uid-2"}, fetchedFolders)
require.Equal(t, []int{1, 1, 1}, fetchItemsPerCall[:3])
})
}
}
func checkListRequest(t *testing.T, limit int64, client *apis.K8sResourceClient) ([]string, []int) {
fetchedFolders := make([]string, 0, 3)
fetchItemsPerCall := make([]int, 0, 3)
continueToken := ""
for {
res, err := client.Resource.List(context.Background(), metav1.ListOptions{
Limit: limit,
Continue: continueToken,
})
require.NoError(t, err)
fetchItemsPerCall = append(fetchItemsPerCall, len(res.Items))
for _, item := range res.Items {
fetchedFolders = append(fetchedFolders, item.GetName())
}
continueToken = res.GetContinue()
if continueToken == "" {
break
}
}
return fetchedFolders, fetchItemsPerCall
}
func TestIntegrationFolderCreatePermissions(t *testing.T) {
if testing.Short() {
t.Skip("skipping integration test")

@ -492,6 +492,12 @@ func CreateGrafDir(t *testing.T, opts GrafanaOpts) (string, string) {
require.NoError(t, err)
}
}
if opts.UnifiedStorageMaxPageSizeBytes > 0 {
section, err := getOrCreateSection("unified_storage")
require.NoError(t, err)
_, err = section.NewKey("max_page_size_bytes", fmt.Sprintf("%d", opts.UnifiedStorageMaxPageSizeBytes))
require.NoError(t, err)
}
if opts.PermittedProvisioningPaths != "" {
_, err = pathsSect.NewKey("permitted_provisioning_paths", opts.PermittedProvisioningPaths)
require.NoError(t, err)
@ -556,6 +562,7 @@ type GrafanaOpts struct {
QueryRetries int64
GrafanaComAPIURL string
UnifiedStorageConfig map[string]setting.UnifiedStorageConfig
UnifiedStorageMaxPageSizeBytes int
PermittedProvisioningPaths string
GrafanaComSSOAPIToken string
LicensePath string

Loading…
Cancel
Save