[Blooms] Use correct table address function (#11955)

pull/11957/head
Owen Diehl 2 years ago committed by GitHub
parent 3c06b360ec
commit bd12e16339
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 8
      pkg/bloomcompactor/controller.go
  2. 16
      pkg/bloomcompactor/table_utils.go
  3. 6
      pkg/bloomcompactor/tsdb.go
  4. 2
      pkg/bloomgateway/util_test.go
  5. 4
      pkg/storage/config/schema_config.go

@ -77,7 +77,7 @@ func (s *SimpleBloomController) compactTenant(
client, err := s.bloomStore.Client(table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.String())
level.Error(logger).Log("msg", "failed to get client", "err", err, "table", table.Addr())
return errors.Wrap(err, "failed to get client")
}
@ -280,7 +280,7 @@ func (s *SimpleBloomController) buildGaps(
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: table.String(),
TableName: table.Addr(),
Bounds: gap.bounds,
},
},
@ -319,7 +319,7 @@ func (s *SimpleBloomController) buildGaps(
blockCt++
blk := newBlocks.At()
built, err := bloomshipper.BlockFrom(tenant, table.String(), blk)
built, err := bloomshipper.BlockFrom(tenant, table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
return nil, errors.Wrap(err, "failed to build block")
@ -348,7 +348,7 @@ func (s *SimpleBloomController) buildGaps(
s.closeLoadedBlocks(loaded, blocksIter)
// Write the new meta
ref, err := bloomshipper.MetaRefFrom(tenant, table.String(), gap.bounds, meta.Sources, meta.Blocks)
ref, err := bloomshipper.MetaRefFrom(tenant, table.Addr(), gap.bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
return nil, errors.Wrap(err, "failed to checksum meta")

@ -1,16 +0,0 @@
package bloomcompactor
import (
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/compactor/retention"
)
func getIntervalsForTables(tables []string) map[string]model.Interval {
tablesIntervals := make(map[string]model.Interval, len(tables))
for _, table := range tables {
tablesIntervals[table] = retention.ExtractIntervalFromTableName(table)
}
return tablesIntervals
}

@ -50,12 +50,12 @@ func NewBloomTSDBStore(storage storage.Client) *BloomTSDBStore {
}
func (b *BloomTSDBStore) UsersForPeriod(ctx context.Context, table config.DayTime) ([]string, error) {
_, users, err := b.storage.ListFiles(ctx, table.Table(), true) // bypass cache for ease of testing
_, users, err := b.storage.ListFiles(ctx, table.Addr(), true) // bypass cache for ease of testing
return users, err
}
func (b *BloomTSDBStore) ResolveTSDBs(ctx context.Context, table config.DayTime, tenant string) ([]tsdb.SingleTenantTSDBIdentifier, error) {
indices, err := b.storage.ListUserFiles(ctx, table.Table(), tenant, true) // bypass cache for ease of testing
indices, err := b.storage.ListUserFiles(ctx, table.Addr(), tenant, true) // bypass cache for ease of testing
if err != nil {
return nil, errors.Wrap(err, "failed to list user files")
}
@ -87,7 +87,7 @@ func (b *BloomTSDBStore) LoadTSDB(
) (v1.CloseableIterator[*v1.Series], error) {
withCompression := id.Name() + gzipExtension
data, err := b.storage.GetUserFile(ctx, table.Table(), tenant, withCompression)
data, err := b.storage.GetUserFile(ctx, table.Addr(), tenant, withCompression)
if err != nil {
return nil, errors.Wrap(err, "failed to get file")
}

@ -311,7 +311,7 @@ func createBlocks(t *testing.T, tenant string, n int, from, through model.Time,
}
ref := bloomshipper.Ref{
TenantID: tenant,
TableName: config.NewDayTime(truncateDay(from)).Table(),
TableName: config.NewDayTime(truncateDay(from)).Addr(),
Bounds: v1.NewBounds(fromFp, throughFp),
StartTimestamp: from,
EndTimestamp: through,

@ -237,7 +237,9 @@ func (d DayTime) String() string {
return d.Time.Time().UTC().Format("2006-01-02")
}
func (d DayTime) Table() string {
// Addr returns the unix day offset as a string, which is used
// as the address for the index table in storage.
func (d DayTime) Addr() string {
return fmt.Sprintf("%d",
d.ModelTime().Time().UnixNano()/int64(ObjectStorageIndexRequiredPeriod))
}

Loading…
Cancel
Save