store: fix incorrect usage of period config in table range (#9754)

**What this PR does / why we need it**:
We are incorrectly taking reference to a loop variable in store init, I
overlooked this part as the reference was being taken in the call to
`GetIndexTableNumberRange`

As a result of this, `TableRange` of each period (which is used by
index-shipper) points to the latest `periodConfig`. This in itself is
not a problem if the index prefix doesn't change as we use the
`periodConfig` only to generate the `tableName`. But if the index prefix
changes across periods, we end up losing access to the indexes using the
old prefix.


**Special notes for your reviewer**:

**Checklist**
- [X] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

---------

Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/9780/head
Ashwanth 2 years ago committed by GitHub
parent 35465d0297
commit 10e55a6359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 4
      integration/cluster/cluster.go
  3. 12
      integration/cluster/schema.go
  4. 2
      integration/loki_micro_services_delete_test.go
  5. 196
      integration/loki_micro_services_test.go
  6. 2
      pkg/loki/modules.go
  7. 1
      pkg/storage/store.go
  8. 146
      pkg/storage/store_test.go

@ -56,6 +56,7 @@
* [9495](https://github.com/grafana/loki/pull/9495) **thampiotr**: Promtail: Fix potential goroutine leak in file tailer.
* [9650](https://github.com/grafana/loki/pull/9650) **ashwanthgoli**: Config: ensure storage config defaults apply to named stores.
* [9629](https://github.com/grafana/loki/pull/9629) **periklis**: Fix duplicate label values from ingester streams.
* [9754](https://github.com/grafana/loki/pull/9754) **ashwanthgoli**: Fixes an issue with indexes becoming unqueriable if the index prefix is different from the one configured in the latest period config.
* [9763](https://github.com/grafana/loki/pull/9763) **ssncferreira**: Fix the logic of the `offset` operator for downstream queries on instant query splitting of (range) vector aggregation expressions containing an offset.
* [9773](https://github.com/grafana/loki/pull/9773) **ssncferreira**: Fix instant query summary statistic's `splits` corresponding to the number of subqueries a query is split into based on `split_queries_by_interval`.

@ -188,6 +188,10 @@ func (c *Cluster) Run() error {
return nil
}
func (c *Cluster) ResetSchemaConfig() {
c.periodCfgs = nil
}
func (c *Cluster) Restart() error {
if err := c.stop(false); err != nil {
return err

@ -43,19 +43,23 @@ schema_config:
object_store: store-1
schema: v11
index:
prefix: index_
prefix: index_tsdb_
period: 24h
`
)
func WithAdditionalBoltDBPeriod(c *Cluster) {
func SchemaWithTSDB(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalTSDBShipperSchemaConfigTemplate)
}
func SchemaWithBoltDBAndBoltDB(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, boltDBShipperSchemaConfigTemplate)
}
func WithAdditionalTSDBPeriod(c *Cluster) {
func SchemaWithTSDBAndTSDB(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalTSDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}
func WithBoltDBAndTSDBPeriods(c *Cluster) {
func SchemaWithBoltDBAndTSDB(c *Cluster) {
c.periodCfgs = append(c.periodCfgs, additionalBoltDBShipperSchemaConfigTemplate, tsdbShipperSchemaConfigTemplate)
}

@ -18,7 +18,7 @@ import (
func TestMicroServicesDeleteRequest(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()
clu := cluster.New(nil, cluster.WithAdditionalBoltDBPeriod)
clu := cluster.New(nil, cluster.SchemaWithBoltDBAndBoltDB)
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientsWithShipper()

@ -134,11 +134,199 @@ func TestMicroServicesIngestQuery(t *testing.T) {
})
}
func TestMicroServicesMultipleBucketSingleProvider(t *testing.T) {
func TestMicroServicesIngestQueryWithSchemaChange(t *testing.T) {
// init the cluster with a single tsdb period. Uses prefix index_tsdb_
clu := cluster.New(nil, cluster.SchemaWithTSDB)
defer func() {
assert.NoError(t, clu.Cleanup())
}()
// initially, run only compactor and distributor.
var (
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=1s",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
)
)
require.NoError(t, clu.Run())
// then, run only ingester and query-scheduler.
var (
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
)
)
require.NoError(t, clu.Run())
// finally, run the query-frontend and querier.
var (
tQueryFrontend = clu.AddComponent(
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliDistributor.Now = now
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
cliQueryFrontend := client.New(tenantID, "", tQueryFrontend.HTTPURL())
cliQueryFrontend.Now = now
t.Run("ingest-logs", func(t *testing.T) {
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", time.Now().Add(-48*time.Hour), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", time.Now().Add(-36*time.Hour), map[string]string{"job": "fake"}))
})
t.Run("query-lookback-default", func(t *testing.T) {
// queries ingesters with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{}, lines)
})
t.Run("query-lookback-7d", func(t *testing.T) {
tQuerier.AddFlags("-querier.query-ingesters-within=168h")
require.NoError(t, tQuerier.Restart())
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
tQuerier.AddFlags("-querier.query-ingesters-within=3h")
require.NoError(t, tQuerier.Restart())
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB"}, lines)
tQuerier.AddFlags("-querier.query-store-only=false")
require.NoError(t, tQuerier.Restart())
})
// Add new tsdb period with a different index prefix(index_)
clu.ResetSchemaConfig()
cluster.SchemaWithTSDBAndTSDB(clu)
// restart to load the new schema
require.NoError(t, tIngester.Restart())
require.NoError(t, tQuerier.Restart())
t.Run("ingest-logs-new-period", func(t *testing.T) {
// ingest logs to the new period
require.NoError(t, cliDistributor.PushLogLine("lineC", map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLine("lineD", map[string]string{"job": "fake"}))
})
t.Run("query-both-periods-with-default-lookback", func(t *testing.T) {
// queries with the default lookback period (3h)
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// restart querier and index shipper to sync the index
tQuerier.AddFlags("-querier.query-store-only=true")
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query both periods to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
}
func TestMicroServicesIngestQueryOverMultipleBucketSingleProvider(t *testing.T) {
for name, opt := range map[string]func(c *cluster.Cluster){
"boltdb-index": cluster.WithAdditionalBoltDBPeriod,
"tsdb-index": cluster.WithAdditionalTSDBPeriod,
"boltdb-and-tsdb": cluster.WithBoltDBAndTSDBPeriods,
"boltdb-index": cluster.SchemaWithBoltDBAndBoltDB,
"tsdb-index": cluster.SchemaWithTSDBAndTSDB,
"boltdb-and-tsdb": cluster.SchemaWithBoltDBAndTSDB,
} {
t.Run(name, func(t *testing.T) {
storage.ResetBoltDBIndexClientsWithShipper()

@ -1170,6 +1170,8 @@ func (t *Loki) initIndexGateway() (services.Service, error) {
var indexClients []indexgateway.IndexClientWithRange
for i, period := range t.Cfg.SchemaConfig.Configs {
period := period
if period.IndexType != config.BoltDBShipperType {
continue
}

@ -149,6 +149,7 @@ func NewStore(cfg Config, storeCfg config.ChunkStoreConfig, schemaCfg config.Sch
func (s *store) init() error {
for i, p := range s.schemaCfg.Configs {
p := p
chunkClient, err := s.chunkClientForPeriod(p)
if err != nil {
return err

@ -994,6 +994,152 @@ type timeRange struct {
from, to time.Time
}
func TestStore_indexPrefixChange(t *testing.T) {
tempDir := t.TempDir()
shipperConfig := indexshipper.Config{}
flagext.DefaultValues(&shipperConfig)
shipperConfig.ActiveIndexDirectory = path.Join(tempDir, "index")
shipperConfig.CacheLocation = path.Join(tempDir, "cache")
shipperConfig.Mode = indexshipper.ModeReadWrite
cfg := Config{
FSConfig: local.FSConfig{Directory: path.Join(tempDir, "chunks")},
TSDBShipperConfig: shipperConfig,
NamedStores: NamedStores{
Filesystem: map[string]NamedFSConfig{
"named-store": {Directory: path.Join(tempDir, "named-store")},
},
},
}
require.NoError(t, cfg.NamedStores.validate())
firstPeriodDate := parseDate("2019-01-01")
secondPeriodDate := parseDate("2019-01-02")
schemaConfig := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: config.DayTime{Time: timeToModelTime(firstPeriodDate)},
IndexType: config.TSDBType,
ObjectType: config.StorageTypeFileSystem,
Schema: "v9",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 24,
},
},
},
}
// time ranges for adding chunks to the first period
chunksToBuildForTimeRanges := []timeRange{
{
secondPeriodDate.Add(-10 * time.Hour),
secondPeriodDate.Add(-9 * time.Hour),
},
{
secondPeriodDate.Add(-3 * time.Hour),
secondPeriodDate.Add(-2 * time.Hour),
},
}
limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)
store, err := NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
// build and add chunks to the store
addedChunkIDs := map[string]struct{}{}
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{}
}
// get all the chunks from the first period
chunks, _, err := store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
var totalChunks int
for _, chks := range chunks {
totalChunks += len(chks)
}
require.Equal(t, totalChunks, len(addedChunkIDs))
// check whether we got back all the chunks which were added
for i := range chunks {
for _, c := range chunks[i] {
_, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)]
require.True(t, ok)
}
}
// update schema with a new period that uses different index prefix
schemaConfig.Configs = append(schemaConfig.Configs, config.PeriodConfig{
From: config.DayTime{Time: timeToModelTime(secondPeriodDate)},
IndexType: config.TSDBType,
ObjectType: "named-store",
Schema: "v11",
IndexTables: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: time.Hour * 24,
},
RowShards: 2,
})
// time ranges adding a chunk to the new period and one that overlaps both
chunksToBuildForTimeRanges = []timeRange{
{
// chunk overlapping both the stores
secondPeriodDate.Add(-time.Hour),
secondPeriodDate.Add(time.Hour),
},
{
// chunk just for second store
secondPeriodDate.Add(2 * time.Hour),
secondPeriodDate.Add(3 * time.Hour),
},
}
// restart to load the updated schema
store.Stop()
store, err = NewStore(cfg, config.ChunkStoreConfig{}, schemaConfig, limits, cm, nil, util_log.Logger)
require.NoError(t, err)
defer store.Stop()
// build and add chunks to the store
for _, tr := range chunksToBuildForTimeRanges {
chk := newChunk(buildTestStreams(fooLabelsWithName, tr))
err := store.PutOne(ctx, chk.From, chk.Through, chk)
require.NoError(t, err)
addedChunkIDs[schemaConfig.ExternalKey(chk.ChunkRef)] = struct{}{}
}
// get all the chunks from both the stores
chunks, _, err = store.GetChunkRefs(ctx, "fake", timeToModelTime(firstPeriodDate), timeToModelTime(secondPeriodDate.Add(24*time.Hour)), newMatchers(fooLabelsWithName.String())...)
require.NoError(t, err)
totalChunks = 0
for _, chks := range chunks {
totalChunks += len(chks)
}
// we get common chunk twice because it is indexed in both the stores
require.Equal(t, len(addedChunkIDs)+1, totalChunks)
// check whether we got back all the chunks which were added
for i := range chunks {
for _, c := range chunks[i] {
_, ok := addedChunkIDs[schemaConfig.ExternalKey(c.ChunkRef)]
require.True(t, ok)
}
}
}
func TestStore_MultiPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{}, nil)
require.NoError(t, err)

Loading…
Cancel
Save