add more context to logs, improve comments for clarity in boltdb-shipper code (#5341)

pull/5396/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 4fafc9c234
commit e65d1b8c69
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      pkg/storage/stores/shipper/compactor/index_set.go
  2. 4
      pkg/storage/stores/shipper/compactor/retention/metrics.go
  3. 19
      pkg/storage/stores/shipper/compactor/retention/retention.go
  4. 3
      pkg/storage/stores/shipper/compactor/retention/retention_test.go
  5. 67
      pkg/storage/stores/shipper/compactor/table.go
  6. 23
      pkg/storage/stores/shipper/compactor/table_test.go
  7. 1
      pkg/storage/stores/shipper/downloads/index_set.go
  8. 20
      pkg/storage/stores/shipper/downloads/table.go

@ -99,7 +99,7 @@ func (is *indexSet) initUserIndexSet(workingDir string) {
}
compactedDBName := filepath.Join(workingDir, fmt.Sprint(time.Now().Unix()))
seedFileIdx := findSeedFileIdx(is.sourceObjects)
seedFileIdx := compactedFileIdx(is.sourceObjects)
if len(is.sourceObjects) > 0 {
// we would only have compacted files in user index folder, so it is not expected to have -1 for seedFileIdx but
@ -236,7 +236,7 @@ func (is *indexSet) writeBatch(_ string, batch []indexEntry) error {
// runRetention runs the retention on index set
func (is *indexSet) runRetention(tableMarker retention.TableMarker) error {
empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.compactedDB)
empty, modified, err := tableMarker.MarkForDelete(is.ctx, is.tableName, is.userID, is.compactedDB, is.logger)
if err != nil {
return err
}

@ -59,8 +59,8 @@ func newMarkerMetrics(r prometheus.Registerer) *markerMetrics {
tableProcessedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "retention_marker_table_processed_total",
Help: "Total amount of table processed per action.",
}, []string{"table", "action"}),
Help: "Total amount of table processed for each user per action. Empty string for user_id is for common index",
}, []string{"table", "user_id", "action"}),
tableMarksCreatedTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki_boltdb_shipper",
Name: "retention_marker_count_total",

@ -7,6 +7,7 @@ import (
"fmt"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -32,7 +33,7 @@ var errNoChunksFound = errors.New("no chunks found in table, please check if the
type TableMarker interface {
// MarkForDelete marks chunks to delete for a given table and returns if it's empty or modified.
MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error)
MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error)
}
type Marker struct {
@ -58,16 +59,16 @@ func NewMarker(workingDirectory string, config storage.SchemaConfig, expiration
}
// MarkForDelete marks all chunks expired for a given table.
func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
func (t *Marker) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
start := time.Now()
status := statusSuccess
defer func() {
t.markerMetrics.tableProcessedDurationSeconds.WithLabelValues(tableName, status).Observe(time.Since(start).Seconds())
level.Debug(util_log.Logger).Log("msg", "finished to process table", "table", tableName, "duration", time.Since(start))
level.Debug(logger).Log("msg", "finished to process table", "duration", time.Since(start))
}()
level.Debug(util_log.Logger).Log("msg", "starting to process table", "table", tableName)
level.Debug(logger).Log("msg", "starting to process table")
empty, modified, err := t.markTable(ctx, tableName, db)
empty, modified, err := t.markTable(ctx, tableName, userID, db)
if err != nil {
status = statusFailure
return false, false, err
@ -75,7 +76,7 @@ func (t *Marker) MarkForDelete(ctx context.Context, tableName string, db *bbolt.
return empty, modified, nil
}
func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
func (t *Marker) markTable(ctx context.Context, tableName, userID string, db *bbolt.DB) (bool, bool, error) {
schemaCfg, ok := schemaPeriodForTable(t.config, tableName)
if !ok {
return false, false, fmt.Errorf("could not find schema for table: %s", tableName)
@ -119,14 +120,14 @@ func (t *Marker) markTable(ctx context.Context, tableName string, db *bbolt.DB)
return false, false, err
}
if empty {
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionDeleted).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionDeleted).Inc()
return empty, true, nil
}
if !modified {
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionNone).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionNone).Inc()
return empty, modified, nil
}
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, tableActionModified).Inc()
t.markerMetrics.tableProcessedTotal.WithLabelValues(tableName, userID, tableActionModified).Inc()
return empty, modified, nil
}

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/storage/chunk/objectclient"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper/util"
util_log "github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/validation"
)
@ -155,7 +156,7 @@ func Test_Retention(t *testing.T) {
marker, err := NewMarker(workDir, store.schemaCfg, expiration, nil, prometheus.NewRegistry())
require.NoError(t, err)
for _, table := range store.indexTables() {
_, _, err := marker.MarkForDelete(context.Background(), table.name, table.DB)
_, _, err := marker.MarkForDelete(context.Background(), table.name, "", table.DB, util_log.Logger)
require.Nil(t, err)
table.Close()

@ -25,6 +25,38 @@ import (
util_log "github.com/grafana/loki/pkg/util/log"
)
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
// Below we show various formats that we have for structuring index in the object store. //
// //
// FORMAT1 FORMAT2 FORMAT3 //
// //
// table1 table1 table1 //
// | | | //
// ----> db1.gz ----> db1.gz ----> user1 //
// | | | | //
// ----> index ----> user1 | ----> db1.gz //
// ----> user2 | | //
// | ----> index //
// ----> user2 //
// | //
// ----> db1.gz //
// | //
// ----> index //
// //
// FORMAT1 - `table1` has 1 db named db1.gz and 1 boltdb bucket named `index` which contains index for all the users. //
// It is in use when the flag to build per user index is not enabled. //
// Ingesters write the index in Format1 which then compactor compacts down in same format. //
// //
// FORMAT2 - `table1` has 1 db named db1.gz and 1 boltdb bucket each for `user1` and `user2` containing //
// index just for those users. //
// It is an intermediate format built by ingesters when the flag to build per user index is enabled. //
// //
// FORMAT3 - `table1` has 1 folder each for `user1` and `user2` containing index files having index just for those users. //
// Compactor builds index in this format from Format2. //
// //
// THING TO NOTE HERE IS COMPACTOR BUILDS INDEX IN FORMAT1 FROM FORMAT1 AND FORMAT3 FROM FORMAT2. //
////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////////
const (
uploaderName = "compactor"
@ -60,7 +92,6 @@ type table struct {
usersWithPerUserIndex []string
uploadCompactedDB bool
compactedDB *bbolt.DB
seedSourceFileIdx int
logger log.Logger
ctx context.Context
@ -83,7 +114,6 @@ func newTable(ctx context.Context, workingDirectory string, indexStorageClient s
indexSets: map[string]*indexSet{},
baseUserIndexSet: storage.NewIndexSet(indexStorageClient, true),
baseCommonIndexSet: storage.NewIndexSet(indexStorageClient, false),
seedSourceFileIdx: -1,
}
table.logger = log.With(util_log.Logger, "table-name", table.name)
@ -130,8 +160,8 @@ func (t *table) compact(applyRetention bool) error {
return err
}
} else if len(indexFiles) == 1 && (applyRetention || mustRecreateCompactedDB(indexFiles)) {
// we have just 1 common index file which is already compacted.
// initialize common compacted db if we need to apply retention, or we need to recreate it
t.seedSourceFileIdx = 0
downloadAt := filepath.Join(t.workingDirectory, indexFiles[0].Name)
err = shipper_util.DownloadFileFromStorage(downloadAt, shipper_util.IsCompressedFile(indexFiles[0].Name),
false, shipper_util.LoggerWithFilename(t.logger, indexFiles[0].Name),
@ -182,6 +212,9 @@ func (t *table) done() error {
return err
}
// initialize the user index sets for:
// - compaction if we have more than 1 index file, taken care of by index set initialization
// - recreation if mustRecreateCompactedDB says so, taken care of by indexSet.done call below
if len(indexFiles) > 1 || mustRecreateCompactedDB(indexFiles) {
t.indexSets[userID], err = t.getOrCreateUserIndex(userID)
if err != nil {
@ -243,16 +276,17 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
level.Info(t.logger).Log("msg", "starting compaction of dbs")
compactedDBName := filepath.Join(t.workingDirectory, fmt.Sprint(time.Now().Unix()))
t.seedSourceFileIdx = findSeedFileIdx(files)
// if we find a previously compacted file, use it as a seed file to copy other index into it
seedSourceFileIdx := compactedFileIdx(files)
if t.seedSourceFileIdx != -1 {
if seedSourceFileIdx != -1 {
t.uploadCompactedDB = true
compactedDBName = filepath.Join(t.workingDirectory, files[t.seedSourceFileIdx].Name)
compactedDBName = filepath.Join(t.workingDirectory, files[seedSourceFileIdx].Name)
level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[t.seedSourceFileIdx].Name))
err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[t.seedSourceFileIdx].Name),
false, shipper_util.LoggerWithFilename(t.logger, files[t.seedSourceFileIdx].Name), func() (io.ReadCloser, error) {
return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[t.seedSourceFileIdx].Name)
level.Info(t.logger).Log("msg", fmt.Sprintf("using %s as seed file", files[seedSourceFileIdx].Name))
err = shipper_util.DownloadFileFromStorage(compactedDBName, shipper_util.IsCompressedFile(files[seedSourceFileIdx].Name),
false, shipper_util.LoggerWithFilename(t.logger, files[seedSourceFileIdx].Name), func() (io.ReadCloser, error) {
return t.baseCommonIndexSet.GetFile(t.ctx, t.name, "", files[seedSourceFileIdx].Name)
})
if err != nil {
return err
@ -264,10 +298,11 @@ func (t *table) compactFiles(files []storage.IndexFile) error {
return err
}
// go through each file and build index in FORMAT1 from FORMAT1 files and FORMAT3 from FORMAT2 files
return concurrency.ForEachJob(t.ctx, len(files), readDBsConcurrency, func(ctx context.Context, idx int) error {
workNum := idx
// skip seed file
if workNum == t.seedSourceFileIdx {
if workNum == seedSourceFileIdx {
return nil
}
fileName := files[idx].Name
@ -293,7 +328,7 @@ func (t *table) writeBatch(bucketName string, batch []indexEntry) error {
return t.writeUserIndex(bucketName, batch)
}
// writeCommonIndex writes a batch to compactedDB
// writeCommonIndex writes a batch to compactedDB which is for FORMAT1 index
func (t *table) writeCommonIndex(batch []indexEntry) error {
t.uploadCompactedDB = true
return t.compactedDB.Batch(func(tx *bbolt.Tx) error {
@ -313,7 +348,7 @@ func (t *table) writeCommonIndex(batch []indexEntry) error {
})
}
// writeUserIndex sends a batch to write to the user index set
// writeUserIndex sends a batch to write to the user index set which is for FORMAT3 index
func (t *table) writeUserIndex(userID string, batch []indexEntry) error {
ui, err := t.getOrCreateUserIndex(userID)
if err != nil {
@ -364,11 +399,9 @@ func openBoltdbFileWithNoSync(path string) (*bbolt.DB, error) {
return boltdb, nil
}
// findSeedFileIdx returns index of file to use as seed which would then get index from all the files written to.
// It tries to find previously compacted file(which has uploaderName) which would be the biggest file.
// In a large cluster, using previously compacted file as seed would significantly reduce compaction time.
// compactedFileIdx returns index of previously compacted file(which starts with uploaderName).
// If it can't find a previously compacted file, it would return -1.
func findSeedFileIdx(files []storage.IndexFile) int {
func compactedFileIdx(files []storage.IndexFile) int {
for i, file := range files {
if strings.HasPrefix(file.Name, uploaderName) {
return i

@ -11,6 +11,7 @@ import (
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"go.etcd.io/bbolt"
@ -269,10 +270,10 @@ func TestTable_Compaction(t *testing.T) {
}
}
type TableMarkerFunc func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error)
type TableMarkerFunc func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error)
func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
return t(ctx, tableName, db)
func (t TableMarkerFunc) MarkForDelete(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return t(ctx, tableName, userID, db, logger)
}
type IntervalMayHaveExpiredChunksFunc func(interval model.Interval, userID string) bool
@ -309,7 +310,7 @@ func TestTable_CompactionRetention(t *testing.T) {
_, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return true, true, nil
}),
},
@ -321,7 +322,7 @@ func TestTable_CompactionRetention(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, true, nil
}),
},
@ -333,7 +334,7 @@ func TestTable_CompactionRetention(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
},
@ -554,7 +555,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
expectedIndexSetState: indexSetState{
@ -571,7 +572,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-recreateCompactedDBOlderThan / 2),
@ -585,7 +586,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, true, nil
}),
expectedIndexSetState: indexSetState{
@ -599,7 +600,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
_, err := ioutil.ReadDir(filepath.Join(storagePath, tableName))
require.True(t, os.IsNotExist(err))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return true, true, nil
}),
expectedIndexSetState: indexSetState{
@ -616,7 +617,7 @@ func TestTable_RecreateCompactedDB(t *testing.T) {
})
compareCompactedTable(t, filepath.Join(storagePath, tableName), filepath.Join(storagePath, fmt.Sprintf("%s-copy", tableName)))
},
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName string, db *bbolt.DB) (bool, bool, error) {
tableMarker: TableMarkerFunc(func(ctx context.Context, tableName, userID string, db *bbolt.DB, logger log.Logger) (bool, bool, error) {
return false, false, nil
}),
compactedDBMtime: time.Now().Add(-(recreateCompactedDBOlderThan + time.Minute)),

@ -117,6 +117,7 @@ func (t *indexSet) Init() (err error) {
return err
}
// open all the locally present files first to avoid downloading them again during sync operation below.
for _, fileInfo := range filesInfo {
if fileInfo.IsDir() {
continue

@ -101,13 +101,15 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
level.Debug(table.logger).Log("msg", fmt.Sprintf("opening locally present files for table %s", name), "files", fmt.Sprint(filesInfo))
// common index files are outside the directories and user index files are in the directories
for _, fileInfo := range filesInfo {
if !fileInfo.IsDir() {
continue
}
userIndexSet, err := NewIndexSet(name, fileInfo.Name(), filepath.Join(cacheLocation, fileInfo.Name()),
table.baseUserIndexSet, boltDBIndexClient, table.logger, metrics)
userID := fileInfo.Name()
userIndexSet, err := NewIndexSet(name, userID, filepath.Join(cacheLocation, userID),
table.baseUserIndexSet, boltDBIndexClient, loggerWithUserID(table.logger, userID), metrics)
if err != nil {
return nil, err
}
@ -117,7 +119,7 @@ func LoadTable(name, cacheLocation string, storageClient storage.Client, boltDBI
return nil, err
}
table.indexSets[fileInfo.Name()] = userIndexSet
table.indexSets[userID] = userIndexSet
}
commonIndexSet, err := NewIndexSet(name, "", cacheLocation, table.baseCommonIndexSet,
@ -155,6 +157,7 @@ func (t *Table) MultiQueries(ctx context.Context, queries []chunk.IndexQuery, ca
return err
}
// query both user and common index
for _, uid := range []string{userID, ""} {
indexSet, err := t.getOrCreateIndexSet(uid)
if err != nil {
@ -277,7 +280,8 @@ func (t *Table) getOrCreateIndexSet(id string) (IndexSet, error) {
}
// instantiate the index set, add it to the map
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.boltDBIndexClient, t.logger, t.metrics)
indexSet, err = NewIndexSet(t.name, id, filepath.Join(t.cacheLocation, id), baseIndexSet, t.boltDBIndexClient,
loggerWithUserID(t.logger, id), t.metrics)
if err != nil {
return nil, err
}
@ -336,3 +340,11 @@ func (t *Table) downloadUserIndexes(ctx context.Context, userIDs []string) error
return indexSet.AwaitReady(ctx)
})
}
func loggerWithUserID(logger log.Logger, userID string) log.Logger {
if userID == "" {
return logger
}
return log.With(logger, "user-id", userID)
}

Loading…
Cancel
Save