always retry syncing index files from indexshipper when the index list cache is stale (#6327)

pull/6330/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 36e0979cf5
commit 086dbf7270
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 30
      pkg/storage/stores/indexshipper/downloads/index_set.go
  2. 65
      pkg/storage/stores/indexshipper/downloads/index_set_test.go
  3. 9
      pkg/storage/stores/indexshipper/downloads/table.go

@ -2,6 +2,7 @@ package downloads
import (
"context"
"errors"
"fmt"
"io"
"io/ioutil"
@ -25,7 +26,8 @@ import (
)
const (
gzipExtension = ".gz"
gzipExtension = ".gz"
maxSyncRetries = 1
)
var errIndexListCacheTooStale = fmt.Errorf("index list cache too stale")
@ -144,7 +146,7 @@ func (t *indexSet) Init(forQuerying bool) (err error) {
level.Debug(logger).Log("msg", fmt.Sprintf("opened %d local files, now starting sync operation", len(t.index)))
// sync the table to get new files and remove the deleted ones from storage.
err = t.sync(ctx, false, forQuerying)
err = t.syncWithRetry(ctx, false, forQuerying)
if err != nil {
return
}
@ -243,11 +245,31 @@ func (t *indexSet) cleanupDB(fileName string) error {
}
func (t *indexSet) Sync(ctx context.Context) (err error) {
return t.sync(ctx, true, false)
return t.syncWithRetry(ctx, true, false)
}
// syncWithRetry runs a sync with upto maxSyncRetries on failure
func (t *indexSet) syncWithRetry(ctx context.Context, lock, bypassListCache bool) error {
var err error
for i := 0; i <= maxSyncRetries; i++ {
err = t.sync(ctx, lock, bypassListCache)
if err == nil {
return nil
}
if errors.Is(err, errIndexListCacheTooStale) && i < maxSyncRetries {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it before retrying")
t.baseIndexSet.RefreshIndexListCache(ctx)
}
level.Error(t.logger).Log("msg", "sync failed, retrying it", "err", err)
}
return err
}
// sync downloads updated and new files from the storage relevant for the table and removes the deleted ones
func (t *indexSet) sync(ctx context.Context, lock bool, bypassListCache bool) (err error) {
func (t *indexSet) sync(ctx context.Context, lock, bypassListCache bool) (err error) {
level.Debug(t.logger).Log("msg", fmt.Sprintf("syncing files for table %s", t.tableName))
toDownload, toDelete, err := t.checkStorageForUpdates(ctx, lock, bypassListCache)

@ -3,12 +3,14 @@ package downloads
import (
"context"
"fmt"
"io/ioutil"
"os"
"path/filepath"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/storage/chunk/client/util"
"github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/shipper/storage"
util_log "github.com/grafana/loki/pkg/util/log"
@ -89,3 +91,66 @@ func TestIndexSet_doConcurrentDownload(t *testing.T) {
})
}
}
func TestIndexSet_Sync(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName)
var indexesSetup []string
indexSet, stopFunc := buildTestIndexSet(t, "", tempDir)
defer stopFunc()
checkIndexSet := func() {
require.Len(t, indexSet.index, len(indexesSetup))
verifyIndexForEach(t, indexesSetup, func(callbackFunc func(index.Index) error) error {
return indexSet.ForEach(context.Background(), callbackFunc)
})
}
// setup some indexes in object storage
setupIndexesAtPath(t, "", tablePathInStorage, 0, 10)
indexesSetup = buildListOfExpectedIndexes("", 0, 10)
// sync and verify the indexSet
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())
require.NoError(t, indexSet.Sync(context.Background()))
// check index set twice; first run to have new files to download, second run to test with no changes in storage.
for i := 0; i < 2; i++ {
checkIndexSet()
}
// delete a file from storage which should get removed from local as well
require.NoError(t, os.Remove(filepath.Join(tablePathInStorage, indexesSetup[0])))
indexesSetup = indexesSetup[1:]
// sync and verify the indexSet
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())
require.NoError(t, indexSet.Sync(context.Background()))
checkIndexSet()
// let us simulate a compaction to test stale index list cache handling
// first, let us add a new file and refresh the index list cache
oneMoreDB := "one-more-db"
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, oneMoreDB), []byte(oneMoreDB), 0755))
indexSet.baseIndexSet.RefreshIndexListCache(context.Background())
// now, without syncing the indexset, let us compact the index in storage
compactedDBName := "compacted-db"
require.NoError(t, os.RemoveAll(tablePathInStorage))
require.NoError(t, util.EnsureDirectory(tablePathInStorage))
require.NoError(t, ioutil.WriteFile(filepath.Join(tablePathInStorage, compactedDBName), []byte(compactedDBName), 0755))
indexesSetup = []string{compactedDBName}
// verify that we are getting errIndexListCacheTooStale without refreshing the list cache
require.ErrorIs(t, errIndexListCacheTooStale, indexSet.sync(context.Background(), true, false))
// let us run a sync which should detect the stale index list cache and sync the table after refreshing the cache
require.NoError(t, indexSet.Sync(context.Background()))
// verify that table has got only compacted db
checkIndexSet()
}

@ -235,15 +235,6 @@ func (t *table) Sync(ctx context.Context) error {
for userID, indexSet := range t.indexSets {
if err := indexSet.Sync(ctx); err != nil {
if errors.Is(err, errIndexListCacheTooStale) {
level.Info(t.logger).Log("msg", "we have hit stale list cache, refreshing it and running sync again")
t.storageClient.RefreshIndexListCache(ctx)
err = indexSet.Sync(ctx)
if err == nil {
continue
}
}
return errors.Wrap(err, fmt.Sprintf("failed to sync index set %s for table %s", userID, t.name))
}
}

Loading…
Cancel
Save