diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 2414d5e891..4296d48219 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -300,10 +300,6 @@ func (s *mockStore) PutOne(ctx context.Context, from, through model.Time, chunk return nil } -func (s *mockStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) { - return nil, nil -} - func (s *mockStore) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) { return nil, nil, nil } @@ -320,14 +316,6 @@ func (s *mockStore) GetChunkFetcher(tm model.Time) *chunk.Fetcher { return nil } -func (s *mockStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error { - return nil -} - -func (s *mockStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error { - return nil -} - func (s *mockStore) Stop() {} type mockQuerierServer struct { diff --git a/pkg/loki/config_test.go b/pkg/loki/config_test.go index 76601fcb18..8df99b8980 100644 --- a/pkg/loki/config_test.go +++ b/pkg/loki/config_test.go @@ -28,14 +28,6 @@ func TestCrossComponentValidation(t *testing.T) { SchemaConfig: storage.SchemaConfig{ SchemaConfig: chunk.SchemaConfig{ Configs: []chunk.PeriodConfig{ - { - // zero should not error - RowShards: 0, - Schema: "v6", - From: chunk.DayTime{ - Time: model.Now().Add(-48 * time.Hour), - }, - }, { RowShards: 16, Schema: "v11", diff --git a/pkg/querier/querier_mock_test.go b/pkg/querier/querier_mock_test.go index b6250a41b7..2b40406d90 100644 --- a/pkg/querier/querier_mock_test.go +++ b/pkg/querier/querier_mock_test.go @@ -275,11 +275,6 @@ func (s *storeMock) SelectSamples(ctx context.Context, req logql.SelectSamplePar return res.(iter.SampleIterator), args.Error(1) } -func (s *storeMock) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]chunk.Chunk, error) { - args := s.Called(ctx, userID, from, through, matchers) - return args.Get(0).([]chunk.Chunk), args.Error(1) -} - func (s *storeMock) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]chunk.Chunk, []*chunk.Fetcher, error) { args := s.Called(ctx, userID, from, through, matchers) return args.Get(0).([][]chunk.Chunk), args.Get(0).([]*chunk.Fetcher), args.Error(2) @@ -303,14 +298,6 @@ func (s *storeMock) LabelNamesForMetricName(ctx context.Context, userID string, return args.Get(0).([]string), args.Error(1) } -func (s *storeMock) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error { - panic("don't call me please") -} - -func (s *storeMock) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error { - panic("don't call me please") -} - func (s *storeMock) GetChunkFetcher(_ model.Time) *chunk.Fetcher { panic("don't call me please") } diff --git a/pkg/storage/chunk/chunk.go b/pkg/storage/chunk/chunk.go index f278540aec..3ad05515ea 100644 --- a/pkg/storage/chunk/chunk.go +++ b/pkg/storage/chunk/chunk.go @@ -430,11 +430,3 @@ func (c *Chunk) Slice(from, through model.Time) (*Chunk, error) { nc := NewChunk(c.UserID, c.Fingerprint, c.Metric, pc, from, through) return &nc, nil } - -func intervalsOverlap(interval1, interval2 model.Interval) bool { - if interval1.Start > interval2.End || interval2.Start > interval1.End { - return false - } - - return true -} diff --git a/pkg/storage/chunk/chunk_store.go b/pkg/storage/chunk/chunk_store.go index 0f1f7107b6..95e07c2d76 100644 --- a/pkg/storage/chunk/chunk_store.go +++ b/pkg/storage/chunk/chunk_store.go @@ -17,8 +17,6 @@ import ( "github.com/prometheus/prometheus/model/labels" "github.com/grafana/loki/pkg/storage/chunk/cache" - "github.com/grafana/loki/pkg/storage/chunk/encoding" - "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/extract" util_log "github.com/grafana/loki/pkg/util/log" "github.com/grafana/loki/pkg/util/spanlogger" @@ -121,106 +119,6 @@ func (c *baseStore) Stop() { c.index.Stop() } -// store implements Store -type store struct { - baseStore - schema StoreSchema -} - -func newStore(cfg StoreConfig, scfg SchemaConfig, schema StoreSchema, index IndexClient, chunks Client, limits StoreLimits, chunksCache cache.Cache) (Store, error) { - rs, err := newBaseStore(cfg, scfg, schema, index, chunks, limits, chunksCache) - if err != nil { - return nil, err - } - - return &store{ - baseStore: rs, - schema: schema, - }, nil -} - -// Put implements Store -func (c *store) Put(ctx context.Context, chunks []Chunk) error { - for _, chunk := range chunks { - if err := c.PutOne(ctx, chunk.From, chunk.Through, chunk); err != nil { - return err - } - } - return nil -} - -// PutOne implements Store -func (c *store) PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error { - log, ctx := spanlogger.New(ctx, "ChunkStore.PutOne") - defer log.Finish() - chunks := []Chunk{chunk} - - err := c.fetcher.storage.PutChunks(ctx, chunks) - if err != nil { - return err - } - - if cacheErr := c.fetcher.writeBackCache(ctx, chunks); cacheErr != nil { - level.Warn(log).Log("msg", "could not store chunks in chunk cache", "err", cacheErr) - } - - writeReqs, err := c.calculateIndexEntries(chunk.UserID, from, through, chunk) - if err != nil { - return err - } - - return c.index.BatchWrite(ctx, writeReqs) -} - -// calculateIndexEntries creates a set of batched WriteRequests for all the chunks it is given. -func (c *store) calculateIndexEntries(userID string, from, through model.Time, chunk Chunk) (WriteBatch, error) { - seenIndexEntries := map[string]struct{}{} - - metricName := chunk.Metric.Get(labels.MetricName) - if metricName == "" { - return nil, ErrMetricNameLabelMissing - } - - entries, err := c.schema.GetWriteEntries(from, through, userID, metricName, chunk.Metric, c.baseStore.schemaCfg.ExternalKey(chunk)) - if err != nil { - return nil, err - } - indexEntriesPerChunk.Observe(float64(len(entries))) - - // Remove duplicate entries based on tableName:hashValue:rangeValue - result := c.index.NewWriteBatch() - for _, entry := range entries { - key := fmt.Sprintf("%s:%s:%x", entry.TableName, entry.HashValue, entry.RangeValue) - if _, ok := seenIndexEntries[key]; !ok { - seenIndexEntries[key] = struct{}{} - result.Add(entry.TableName, entry.HashValue, entry.RangeValue, entry.Value) - } - } - return result, nil -} - -// Get implements Store -func (c *store) Get(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.Get") - defer log.Span.Finish() - level.Debug(log).Log("from", from, "through", through, "matchers", len(allMatchers)) - - // Validate the query is within reasonable bounds. - metricName, matchers, shortcut, err := c.validateQuery(ctx, userID, &from, &through, allMatchers) - if err != nil { - return nil, err - } else if shortcut { - return nil, nil - } - - log.Span.SetTag("metric", metricName) - return c.getMetricNameChunks(ctx, userID, from, through, matchers, metricName) -} - -func (c *store) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, allMatchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) { - return nil, nil, errors.New("not implemented") -} - // LabelValuesForMetricName retrieves all label values for a single label name and metric name. func (c *baseStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName, labelName string, matchers ...*labels.Matcher) ([]string, error) { log, ctx := spanlogger.New(ctx, "ChunkStore.LabelValues") @@ -257,40 +155,6 @@ func (c *baseStore) LabelValuesForMetricName(ctx context.Context, userID string, } return nil, errors.New("unimplemented: Matchers are not supported by chunk store") - -} - -// LabelNamesForMetricName retrieves all label names for a metric name. -func (c *store) LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.LabelNamesForMetricName") - defer log.Span.Finish() - level.Debug(log).Log("from", from, "through", through, "metricName", metricName) - - shortcut, err := c.validateQueryTimeRange(ctx, userID, &from, &through) - if err != nil { - return nil, err - } else if shortcut { - return nil, nil - } - - chunks, err := c.lookupChunksByMetricName(ctx, userID, from, through, nil, metricName) - if err != nil { - return nil, err - } - level.Debug(log).Log("msg", "Chunks in index", "chunks", len(chunks)) - - // Filter out chunks that are not in the selected time range and keep a single chunk per fingerprint - filtered := filterChunksByTime(from, through, chunks) - filtered, keys := filterChunksByUniqueFingerprint(c.baseStore.schemaCfg, filtered) - level.Debug(log).Log("msg", "Chunks post filtering", "chunks", len(chunks)) - - // Now fetch the actual chunk data from Memcache / S3 - allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) - if err != nil { - level.Error(log).Log("msg", "FetchChunks", "err", err) - return nil, err - } - return labelNamesFromChunks(allChunks), nil } func (c *baseStore) validateQueryTimeRange(ctx context.Context, userID string, from *model.Time, through *model.Time) (bool, error) { @@ -340,108 +204,6 @@ func (c *baseStore) validateQuery(ctx context.Context, userID string, from *mode return metricNameMatcher.Value, matchers, false, nil } -func (c *store) getMetricNameChunks(ctx context.Context, userID string, from, through model.Time, allMatchers []*labels.Matcher, metricName string) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.getMetricNameChunks") - defer log.Finish() - level.Debug(log).Log("from", from, "through", through, "metricName", metricName, "matchers", len(allMatchers)) - - filters, matchers := util.SplitFiltersAndMatchers(allMatchers) - chunks, err := c.lookupChunksByMetricName(ctx, userID, from, through, matchers, metricName) - if err != nil { - return nil, err - } - level.Debug(log).Log("Chunks in index", len(chunks)) - - // Filter out chunks that are not in the selected time range. - filtered := filterChunksByTime(from, through, chunks) - level.Debug(log).Log("Chunks post filtering", len(chunks)) - - maxChunksPerQuery := c.limits.MaxChunksPerQueryFromStore(userID) - if maxChunksPerQuery > 0 && len(filtered) > maxChunksPerQuery { - err := QueryError(fmt.Sprintf("Query %v fetched too many chunks (%d > %d)", allMatchers, len(filtered), maxChunksPerQuery)) - level.Error(log).Log("err", err) - return nil, err - } - - // Now fetch the actual chunk data from Memcache / S3 - keys := keysFromChunks(c.baseStore.schemaCfg, filtered) - allChunks, err := c.fetcher.FetchChunks(ctx, filtered, keys) - if err != nil { - return nil, err - } - - // Filter out chunks based on the empty matchers in the query. - filteredChunks := filterChunksByMatchers(allChunks, filters) - return filteredChunks, nil -} - -func (c *store) lookupChunksByMetricName(ctx context.Context, userID string, from, through model.Time, matchers []*labels.Matcher, metricName string) ([]Chunk, error) { - log, ctx := spanlogger.New(ctx, "ChunkStore.lookupChunksByMetricName") - defer log.Finish() - - // Just get chunks for metric if there are no matchers - if len(matchers) == 0 { - queries, err := c.schema.GetReadQueriesForMetric(from, through, userID, metricName) - if err != nil { - return nil, err - } - level.Debug(log).Log("queries", len(queries)) - - entries, err := c.lookupEntriesByQueries(ctx, queries) - if err != nil { - return nil, err - } - level.Debug(log).Log("entries", len(entries)) - - chunkIDs, err := c.parseIndexEntries(ctx, entries, nil) - if err != nil { - return nil, err - } - level.Debug(log).Log("chunkIDs", len(chunkIDs)) - - return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) - } - - // Otherwise get chunks which include other matchers - incomingChunkIDs := make(chan []string) - incomingErrors := make(chan error) - for _, matcher := range matchers { - go func(matcher *labels.Matcher) { - chunkIDs, err := c.lookupIdsByMetricNameMatcher(ctx, from, through, userID, metricName, matcher, nil) - if err != nil { - incomingErrors <- err - } else { - incomingChunkIDs <- chunkIDs - } - }(matcher) - } - - // Receive chunkSets from all matchers - var chunkIDs []string - var lastErr error - var initialized bool - for i := 0; i < len(matchers); i++ { - select { - case incoming := <-incomingChunkIDs: - if !initialized { - chunkIDs = incoming - initialized = true - } else { - chunkIDs = intersectStrings(chunkIDs, incoming) - } - case err := <-incomingErrors: - lastErr = err - } - } - if lastErr != nil { - return nil, lastErr - } - level.Debug(log).Log("msg", "post intersection", "chunkIDs", len(chunkIDs)) - - // Convert IndexEntry's into chunks - return c.convertChunkIDsToChunks(ctx, userID, chunkIDs) -} - func (c *baseStore) lookupIdsByMetricNameMatcher(ctx context.Context, from, through model.Time, userID, metricName string, matcher *labels.Matcher, filter func([]IndexQuery) []IndexQuery) ([]string, error) { var err error var queries []IndexQuery @@ -586,129 +348,6 @@ func (c *baseStore) convertChunkIDsToChunks(_ context.Context, userID string, ch return chunkSet, nil } -func (c *store) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error { - metricName := metric.Get(model.MetricNameLabel) - if metricName == "" { - return ErrMetricNameLabelMissing - } - - chunkWriteEntries, err := c.schema.GetWriteEntries(from, through, userID, metricName, metric, chunkID) - if err != nil { - return errors.Wrapf(err, "when getting index entries to delete for chunkID=%s", chunkID) - } - - return c.deleteChunk(ctx, userID, chunkID, metric, chunkWriteEntries, partiallyDeletedInterval, func(chunk Chunk) error { - return c.PutOne(ctx, chunk.From, chunk.Through, chunk) - }) -} - -func (c *baseStore) deleteChunk(ctx context.Context, - userID string, - chunkID string, - metric labels.Labels, - chunkWriteEntries []IndexEntry, - partiallyDeletedInterval *model.Interval, - putChunkFunc func(chunk Chunk) error) error { - - metricName := metric.Get(model.MetricNameLabel) - if metricName == "" { - return ErrMetricNameLabelMissing - } - - // if chunk is partially deleted, fetch it, slice non-deleted portion and put it to store before deleting original chunk - if partiallyDeletedInterval != nil { - err := c.reboundChunk(ctx, userID, chunkID, *partiallyDeletedInterval, putChunkFunc) - if err != nil { - return errors.Wrapf(err, "chunkID=%s", chunkID) - } - } - - batch := c.index.NewWriteBatch() - for i := range chunkWriteEntries { - batch.Delete(chunkWriteEntries[i].TableName, chunkWriteEntries[i].HashValue, chunkWriteEntries[i].RangeValue) - } - - err := c.index.BatchWrite(ctx, batch) - if err != nil { - return errors.Wrapf(err, "when deleting index entries for chunkID=%s", chunkID) - } - - err = c.chunks.DeleteChunk(ctx, userID, chunkID) - if err != nil { - if c.chunks.IsChunkNotFoundErr(err) { - return nil - } - return errors.Wrapf(err, "when deleting chunk from storage with chunkID=%s", chunkID) - } - - return nil -} - -func (c *baseStore) reboundChunk(ctx context.Context, userID, chunkID string, partiallyDeletedInterval model.Interval, putChunkFunc func(chunk Chunk) error) error { - chunk, err := ParseExternalKey(userID, chunkID) - if err != nil { - return errors.Wrap(err, "when parsing external key") - } - - if !intervalsOverlap(model.Interval{Start: chunk.From, End: chunk.Through}, partiallyDeletedInterval) { - return ErrParialDeleteChunkNoOverlap - } - - chunks, err := c.fetcher.FetchChunks(ctx, []Chunk{chunk}, []string{chunkID}) - if err != nil { - if c.fetcher.IsChunkNotFoundErr(err) { - return nil - } - return errors.Wrap(err, "when fetching chunk from storage for slicing") - } - - if len(chunks) != 1 { - return fmt.Errorf("expected to get 1 chunk from storage got %d instead", len(chunks)) - } - - chunk = chunks[0] - var newChunks []*Chunk - if partiallyDeletedInterval.Start > chunk.From { - newChunk, err := chunk.Slice(chunk.From, partiallyDeletedInterval.Start-1) - if err != nil && err != encoding.ErrSliceNoDataInRange { - return errors.Wrapf(err, "when slicing chunk for interval %d - %d", chunk.From, partiallyDeletedInterval.Start-1) - } - - if newChunk != nil { - newChunks = append(newChunks, newChunk) - } - } - - if partiallyDeletedInterval.End < chunk.Through { - newChunk, err := chunk.Slice(partiallyDeletedInterval.End+1, chunk.Through) - if err != nil && err != encoding.ErrSliceNoDataInRange { - return errors.Wrapf(err, "when slicing chunk for interval %d - %d", partiallyDeletedInterval.End+1, chunk.Through) - } - - if newChunk != nil { - newChunks = append(newChunks, newChunk) - } - } - - for _, newChunk := range newChunks { - if err := newChunk.Encode(); err != nil { - return errors.Wrapf(err, "when encoding new chunk formed after slicing for interval %d - %d", newChunk.From, newChunk.Through) - } - - err = putChunkFunc(*newChunk) - if err != nil { - return errors.Wrapf(err, "when putting new chunk formed after slicing for interval %d - %d", newChunk.From, newChunk.Through) - } - } - - return nil -} - -func (c *store) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error { - // SeriesID is something which is only used in SeriesStore so we need not do anything here - return nil -} - func (c *baseStore) GetChunkFetcher(_ model.Time) *Fetcher { return c.fetcher } diff --git a/pkg/storage/chunk/chunk_store_test.go b/pkg/storage/chunk/chunk_store_test.go index d8d50c6165..3faf050986 100644 --- a/pkg/storage/chunk/chunk_store_test.go +++ b/pkg/storage/chunk/chunk_store_test.go @@ -5,12 +5,12 @@ import ( "fmt" "math/rand" "reflect" + "sort" "testing" "time" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" - "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -26,9 +26,7 @@ import ( type configFactory func() StoreConfig -var seriesStoreSchemas = []string{"v9", "v10", "v11"} - -var schemas = append([]string{"v1", "v2", "v3", "v4", "v5", "v6"}, seriesStoreSchemas...) +var schemas = []string{"v9", "v10", "v11", "v12"} var stores = []struct { name string @@ -102,146 +100,6 @@ func newTestChunkStoreConfigWithMockStorage(t require.TestingT, schemaCfg Schema return store } -// TestChunkStore_Get tests results are returned correctly depending on the type of query -func TestChunkStore_Get(t *testing.T) { - ctx := context.Background() - now := model.Now() - - fooMetric1 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz"}, - {Name: "flip", Value: "flop"}, - {Name: "toms", Value: "code"}, - } - fooMetric2 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "beep"}, - {Name: "toms", Value: "code"}, - } - - // barMetric1 is a subset of barMetric2 to test over-matching bug. - barMetric1 := labels.Labels{ - {Name: labels.MetricName, Value: "bar"}, - {Name: "bar", Value: "baz"}, - } - barMetric2 := labels.Labels{ - {Name: labels.MetricName, Value: "bar"}, - {Name: "bar", Value: "baz"}, - {Name: "toms", Value: "code"}, - } - - fooChunk1 := dummyChunkFor(now, fooMetric1) - fooChunk2 := dummyChunkFor(now, fooMetric2) - - barChunk1 := dummyChunkFor(now, barMetric1) - barChunk2 := dummyChunkFor(now, barMetric2) - - testCases := []struct { - query string - expect []Chunk - err string - }{ - { - query: `foo`, - expect: []Chunk{fooChunk1, fooChunk2}, - }, - { - query: `foo{flip=""}`, - expect: []Chunk{fooChunk2}, - }, - { - query: `foo{bar="baz"}`, - expect: []Chunk{fooChunk1}, - }, - { - query: `foo{bar="beep"}`, - expect: []Chunk{fooChunk2}, - }, - { - query: `foo{toms="code"}`, - expect: []Chunk{fooChunk1, fooChunk2}, - }, - { - query: `foo{bar!="baz"}`, - expect: []Chunk{fooChunk2}, - }, - { - query: `foo{bar=~"beep|baz"}`, - expect: []Chunk{fooChunk1, fooChunk2}, - }, - { - query: `foo{toms="code", bar=~"beep|baz"}`, - expect: []Chunk{fooChunk1, fooChunk2}, - }, - { - query: `foo{toms="code", bar="baz"}`, - expect: []Chunk{fooChunk1}, - }, - { - query: `foo{a="b", bar="baz"}`, - expect: nil, - }, - { - query: `{__name__=~"foo"}`, - err: "query must contain metric name", - }, - } - for _, schema := range schemas { - for _, storeCase := range stores { - storeCfg := storeCase.configFn() - store, _ := newTestChunkStoreConfig(t, schema, storeCfg) - defer store.Stop() - - if err := store.Put(ctx, []Chunk{ - fooChunk1, - fooChunk2, - barChunk1, - barChunk2, - }); err != nil { - t.Fatal(err) - } - - for _, tc := range testCases { - t.Run(fmt.Sprintf("%s / %s / %s", tc.query, schema, storeCase.name), func(t *testing.T) { - t.Log("========= Running query", tc.query, "with schema", schema) - matchers, err := parser.ParseMetricSelector(tc.query) - if err != nil { - t.Fatal(err) - } - - // Query with ordinary time-range - chunks1, err := store.Get(ctx, userID, now.Add(-time.Hour), now, matchers...) - if tc.err != "" { - require.Error(t, err) - require.Equal(t, tc.err, err.Error()) - return - } - require.NoError(t, err) - - if !reflect.DeepEqual(tc.expect, chunks1) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks1)) - } - - // Pushing end of time-range into future should yield exact same resultset - chunks2, err := store.Get(ctx, userID, now.Add(-time.Hour), now.Add(time.Hour*24*10), matchers...) - require.NoError(t, err) - if !reflect.DeepEqual(tc.expect, chunks2) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks2)) - } - - // Query with both begin & end of time-range in future should yield empty resultset - chunks3, err := store.Get(ctx, userID, now.Add(time.Hour), now.Add(time.Hour*2), matchers...) - require.NoError(t, err) - if len(chunks3) != 0 { - t.Fatalf("%s: future query should yield empty resultset ... actually got %v chunks: %#v", - tc.query, len(chunks3), chunks3) - } - }) - } - } - } -} - func TestChunkStore_LabelValuesForMetricName(t *testing.T) { ctx := context.Background() now := model.Now() @@ -518,7 +376,8 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { for _, schema := range schemas { for _, storeCase := range stores { storeCfg := storeCase.configFn() - store, _ := newTestChunkStoreConfig(t, schema, storeCfg) + + store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg) defer store.Stop() if err := store.Put(ctx, []Chunk{chunk1, chunk2}); err != nil { @@ -533,11 +392,36 @@ func TestChunkStore_getMetricNameChunks(t *testing.T) { t.Fatal(err) } - chunks, err := store.Get(ctx, userID, now.Add(-time.Hour), now, matchers...) + chunks, fetchers, err := store.GetChunkRefs(ctx, userID, now.Add(-time.Hour), now, matchers...) require.NoError(t, err) + fetchedChunk := []Chunk{} + for _, f := range fetchers { + for _, cs := range chunks { + keys := make([]string, 0, len(cs)) + sort.Slice(chunks, func(i, j int) bool { return schemaCfg.ExternalKey(cs[i]) < schemaCfg.ExternalKey(cs[j]) }) + + for _, c := range cs { + keys = append(keys, schemaCfg.ExternalKey(c)) + } + cks, err := f.FetchChunks(ctx, cs, keys) + if err != nil { + t.Fatal(err) + } + outer: + for _, c := range cks { + for _, matcher := range matchers { + if !matcher.Matches(c.Metric.Get(matcher.Name)) { + continue outer + } + } + fetchedChunk = append(fetchedChunk, c) + } + + } + } - if !reflect.DeepEqual(tc.expect, chunks) { - t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, chunks)) + if !reflect.DeepEqual(tc.expect, fetchedChunk) { + t.Fatalf("%s: wrong chunks - %s", tc.query, test.Diff(tc.expect, fetchedChunk)) } }) } @@ -555,7 +439,7 @@ func TestChunkStoreRandom(t *testing.T) { for _, schema := range schemas { t.Run(schema, func(t *testing.T) { - store, _ := newTestChunkStore(t, schema) + store, schemaCfg := newTestChunkStore(t, schema) defer store.Stop() // put 100 chunks from 0 to 99 @@ -599,11 +483,27 @@ func TestChunkStoreRandom(t *testing.T) { mustNewLabelMatcher(labels.MatchEqual, labels.MetricName, "foo"), mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), } - chunks, err := store.Get(ctx, userID, startTime, endTime, matchers...) + chunks, fetchers, err := store.GetChunkRefs(ctx, userID, startTime, endTime, matchers...) require.NoError(t, err) + fetchedChunk := make([]Chunk, 0, len(chunks)) + for _, f := range fetchers { + for _, cs := range chunks { + keys := make([]string, 0, len(cs)) + sort.Slice(chunks, func(i, j int) bool { return schemaCfg.ExternalKey(cs[i]) < schemaCfg.ExternalKey(cs[j]) }) + + for _, c := range cs { + keys = append(keys, schemaCfg.ExternalKey(c)) + } + cks, err := f.FetchChunks(ctx, cs, keys) + if err != nil { + t.Fatal(err) + } + fetchedChunk = append(fetchedChunk, cks...) + } + } // We need to check that each chunk is in the time range - for _, chunk := range chunks { + for _, chunk := range fetchedChunk { assert.False(t, chunk.From.After(endTime)) assert.False(t, chunk.Through.Before(startTime)) samples, err := chunk.Samples(chunk.From, chunk.Through) @@ -614,7 +514,7 @@ func TestChunkStoreRandom(t *testing.T) { // And check we got all the chunks we want numChunks := (end / chunkLen) - (start / chunkLen) + 1 - assert.Equal(t, int(numChunks), len(chunks)) + assert.Equal(t, int(numChunks), len(fetchedChunk)) } }) } @@ -623,7 +523,7 @@ func TestChunkStoreRandom(t *testing.T) { func TestChunkStoreLeastRead(t *testing.T) { // Test we don't read too much from the index ctx := context.Background() - store, _ := newTestChunkStore(t, "v6") + store, schemaCfg := newTestChunkStore(t, "v12") defer store.Stop() // Put 24 chunks 1hr chunks in the store @@ -668,11 +568,27 @@ func TestChunkStoreLeastRead(t *testing.T) { mustNewLabelMatcher(labels.MatchEqual, "bar", "baz"), } - chunks, err := store.Get(ctx, userID, startTime, endTime, matchers...) + chunks, fetchers, err := store.GetChunkRefs(ctx, userID, startTime, endTime, matchers...) require.NoError(t, err) + fetchedChunk := make([]Chunk, 0, len(chunks)) + for _, f := range fetchers { + for _, cs := range chunks { + keys := make([]string, 0, len(cs)) + sort.Slice(chunks, func(i, j int) bool { return schemaCfg.ExternalKey(cs[i]) < schemaCfg.ExternalKey(cs[j]) }) + + for _, c := range cs { + keys = append(keys, schemaCfg.ExternalKey(c)) + } + cks, err := f.FetchChunks(ctx, cs, keys) + if err != nil { + t.Fatal(err) + } + fetchedChunk = append(fetchedChunk, cks...) + } + } // We need to check that each chunk is in the time range - for _, chunk := range chunks { + for _, chunk := range fetchedChunk { assert.False(t, chunk.From.After(endTime)) assert.False(t, chunk.Through.Before(startTime)) samples, err := chunk.Samples(chunk.From, chunk.Through) @@ -682,7 +598,7 @@ func TestChunkStoreLeastRead(t *testing.T) { // And check we got all the chunks we want numChunks := 24 - (start / chunkLen) + 1 - assert.Equal(t, int(numChunks), len(chunks)) + assert.Equal(t, int(numChunks), len(fetchedChunk)) } } @@ -775,7 +691,7 @@ func TestChunkStoreError(t *testing.T) { require.NoError(t, err) // Query with ordinary time-range - _, err = store.Get(ctx, userID, tc.from, tc.through, matchers...) + _, _, err = store.GetChunkRefs(ctx, userID, tc.from, tc.through, matchers...) require.EqualError(t, err, tc.err) }) } @@ -785,7 +701,7 @@ func TestChunkStoreError(t *testing.T) { func benchmarkParseIndexEntries(i int64, regex string, b *testing.B) { b.ReportAllocs() b.StopTimer() - store := &store{} + store := &baseStore{} ctx := context.Background() entries := generateIndexEntries(i) matcher, err := labels.NewMatcher(labels.MatchRegexp, "", regex) @@ -844,262 +760,6 @@ func generateIndexEntries(n int64) []IndexEntry { return res } -func getNonDeletedIntervals(originalInterval, deletedInterval model.Interval) []model.Interval { - if !intervalsOverlap(originalInterval, deletedInterval) { - return []model.Interval{originalInterval} - } - - nonDeletedIntervals := []model.Interval{} - if deletedInterval.Start > originalInterval.Start { - nonDeletedIntervals = append(nonDeletedIntervals, model.Interval{Start: originalInterval.Start, End: deletedInterval.Start - 1}) - } - - if deletedInterval.End < originalInterval.End { - nonDeletedIntervals = append(nonDeletedIntervals, model.Interval{Start: deletedInterval.End + 1, End: originalInterval.End}) - } - - return nonDeletedIntervals -} - -func TestStore_DeleteChunk(t *testing.T) { - ctx := context.Background() - - metric1 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz"}, - } - - metric2 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz2"}, - } - - metric3 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz3"}, - } - - fooChunk1 := dummyChunkForEncoding(model.Now(), metric1, encoding.Varbit, 200) - err := fooChunk1.Encode() - require.NoError(t, err) - - fooChunk2 := dummyChunkForEncoding(model.Now(), metric2, encoding.Varbit, 200) - err = fooChunk2.Encode() - require.NoError(t, err) - - nonExistentChunk := dummyChunkForEncoding(model.Now(), metric3, encoding.Varbit, 200) - - fooMetricNameMatcher, err := parser.ParseMetricSelector(`foo`) - require.NoError(t, err) - for _, schema := range schemas { - scfg := DefaultSchemaConfig("", schema, 0) - for _, tc := range []struct { - name string - chunks []Chunk - chunkToDelete Chunk - partialDeleteInterval *model.Interval - err error - numChunksToExpectAfterDeletion int - }{ - { - name: "delete whole chunk", - chunkToDelete: fooChunk1, - numChunksToExpectAfterDeletion: 1, - }, - { - name: "delete chunk partially at start", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From.Add(30 * time.Minute)}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk partially at end", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(-30 * time.Minute), End: fooChunk1.Through}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk partially in the middle", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From.Add(15 * time.Minute), End: fooChunk1.Through.Add(-15 * time.Minute)}, - numChunksToExpectAfterDeletion: 3, - }, - { - name: "delete non-existent chunk", - chunkToDelete: nonExistentChunk, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete first second", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.From, End: fooChunk1.From}, - numChunksToExpectAfterDeletion: 2, - }, - { - name: "delete chunk out of range", - chunkToDelete: fooChunk1, - partialDeleteInterval: &model.Interval{Start: fooChunk1.Through.Add(time.Minute), End: fooChunk1.Through.Add(10 * time.Minute)}, - numChunksToExpectAfterDeletion: 2, - err: errors.Wrapf(ErrParialDeleteChunkNoOverlap, "chunkID=%s", scfg.ExternalKey(fooChunk1)), - }, - } { - t.Run(fmt.Sprintf("%s / %s", schema, tc.name), func(t *testing.T) { - store, scfg := newTestChunkStore(t, schema) - defer store.Stop() - - // inserting 2 chunks with different labels but same metric name - err = store.Put(ctx, []Chunk{fooChunk1, fooChunk2}) - require.NoError(t, err) - - // we expect to get 2 chunks back using just metric name matcher - chunks, err := store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), fooMetricNameMatcher...) - require.NoError(t, err) - require.Equal(t, 2, len(chunks)) - - err = store.DeleteChunk(ctx, tc.chunkToDelete.From, tc.chunkToDelete.Through, userID, - scfg.ExternalKey(tc.chunkToDelete), tc.chunkToDelete.Metric, tc.partialDeleteInterval) - - if tc.err != nil { - require.Error(t, err) - require.Equal(t, tc.err.Error(), err.Error()) - - // we expect to get same results back if delete operation is expected to fail - chunks, err := store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), fooMetricNameMatcher...) - require.NoError(t, err) - - require.Equal(t, 2, len(chunks)) - - return - } - require.NoError(t, err) - - matchersForDeletedChunk, err := parser.ParseMetricSelector(tc.chunkToDelete.Metric.String()) - require.NoError(t, err) - - var nonDeletedIntervals []model.Interval - - if tc.partialDeleteInterval != nil { - nonDeletedIntervals = getNonDeletedIntervals(model.Interval{ - Start: tc.chunkToDelete.From, - End: tc.chunkToDelete.Through, - }, *tc.partialDeleteInterval) - } - - // we expect to get 1 non deleted chunk + new chunks that were created (if any) after partial deletion - chunks, err = store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), fooMetricNameMatcher...) - require.NoError(t, err) - require.Equal(t, tc.numChunksToExpectAfterDeletion, len(chunks)) - - chunks, err = store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), matchersForDeletedChunk...) - require.NoError(t, err) - require.Equal(t, len(nonDeletedIntervals), len(chunks)) - - // comparing intervals of new chunks that were created after partial deletion - for i, nonDeletedInterval := range nonDeletedIntervals { - require.Equal(t, chunks[i].From, nonDeletedInterval.Start) - require.Equal(t, chunks[i].Through, nonDeletedInterval.End) - } - }) - } - } -} - -func TestStore_DeleteSeriesIDs(t *testing.T) { - ctx := context.Background() - metric1 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz"}, - } - - metric2 := labels.Labels{ - {Name: labels.MetricName, Value: "foo"}, - {Name: "bar", Value: "baz2"}, - } - - matchers, err := parser.ParseMetricSelector(`foo`) - if err != nil { - t.Fatal(err) - } - - for _, schema := range seriesStoreSchemas { - t.Run(schema, func(t *testing.T) { - store, scfg := newTestChunkStore(t, schema) - defer store.Stop() - - seriesStore := store.(CompositeStore).stores[0].Store.(*seriesStore) - - fooChunk1 := dummyChunkForEncoding(model.Now(), metric1, encoding.Varbit, 200) - err := fooChunk1.Encode() - require.NoError(t, err) - - fooChunk2 := dummyChunkForEncoding(model.Now(), metric2, encoding.Varbit, 200) - err = fooChunk2.Encode() - require.NoError(t, err) - - err = store.Put(ctx, []Chunk{fooChunk1, fooChunk2}) - require.NoError(t, err) - - // we expect to have 2 series IDs in index for the chunks that were added above - seriesIDs, err := seriesStore.lookupSeriesByMetricNameMatcher(ctx, model.Now().Add(-time.Hour), model.Now(), - userID, "foo", nil, nil) - require.NoError(t, err) - require.Equal(t, 2, len(seriesIDs)) - - // we expect to have 2 chunks in store that were added above - chunks, err := store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), matchers...) - require.NoError(t, err) - require.Equal(t, 2, len(chunks)) - - // lets try deleting series ID without deleting the chunk - err = store.DeleteSeriesIDs(ctx, fooChunk1.From, fooChunk1.Through, userID, fooChunk1.Metric) - require.NoError(t, err) - - // series IDs should still be there since chunks for them still exist - seriesIDs, err = seriesStore.lookupSeriesByMetricNameMatcher(ctx, model.Now().Add(-time.Hour), model.Now(), - userID, "foo", nil, nil) - require.NoError(t, err) - require.Equal(t, 2, len(seriesIDs)) - - // lets delete a chunk and then delete its series ID - err = store.DeleteChunk(ctx, fooChunk1.From, fooChunk1.Through, userID, scfg.ExternalKey(fooChunk1), metric1, nil) - require.NoError(t, err) - - err = store.DeleteSeriesIDs(ctx, fooChunk1.From, fooChunk1.Through, userID, fooChunk1.Metric) - require.NoError(t, err) - - // there should be only be 1 chunk and 1 series ID left for it - chunks, err = store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), matchers...) - require.NoError(t, err) - require.Equal(t, 1, len(chunks)) - - seriesIDs, err = seriesStore.lookupSeriesByMetricNameMatcher(ctx, model.Now().Add(-time.Hour), model.Now(), - userID, "foo", nil, nil) - require.NoError(t, err) - require.Equal(t, 1, len(seriesIDs)) - require.Equal(t, string(labelsSeriesID(fooChunk2.Metric)), seriesIDs[0]) - - // lets delete the other chunk partially and try deleting the series ID - err = store.DeleteChunk(ctx, fooChunk2.From, fooChunk2.Through, userID, scfg.ExternalKey(fooChunk2), metric2, - &model.Interval{Start: fooChunk2.From, End: fooChunk2.From.Add(30 * time.Minute)}) - require.NoError(t, err) - - err = store.DeleteSeriesIDs(ctx, fooChunk1.From, fooChunk1.Through, userID, fooChunk1.Metric) - require.NoError(t, err) - - // partial deletion should have left another chunk and a series ID in store - chunks, err = store.Get(ctx, userID, model.Now().Add(-time.Hour), model.Now(), matchers...) - require.NoError(t, err) - require.Equal(t, 1, len(chunks)) - - seriesIDs, err = seriesStore.lookupSeriesByMetricNameMatcher(ctx, model.Now().Add(-time.Hour), model.Now(), - userID, "foo", nil, nil) - require.NoError(t, err) - require.Equal(t, 1, len(seriesIDs)) - require.Equal(t, string(labelsSeriesID(fooChunk2.Metric)), seriesIDs[0]) - }) - } -} - func TestDisableIndexDeduplication(t *testing.T) { for i, disableIndexDeduplication := range []bool{ false, true, diff --git a/pkg/storage/chunk/chunk_store_utils.go b/pkg/storage/chunk/chunk_store_utils.go index e03df92fd2..c04b5c88f1 100644 --- a/pkg/storage/chunk/chunk_store_utils.go +++ b/pkg/storage/chunk/chunk_store_utils.go @@ -9,7 +9,6 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" - "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql" "github.com/grafana/loki/pkg/storage/chunk/cache" @@ -46,15 +45,6 @@ func filterChunksByTime(from, through model.Time, chunks []Chunk) []Chunk { return filtered } -func keysFromChunks(s SchemaConfig, chunks []Chunk) []string { - keys := make([]string, 0, len(chunks)) - for _, chk := range chunks { - keys = append(keys, s.ExternalKey(chk)) - } - - return keys -} - func labelNamesFromChunks(chunks []Chunk) []string { var result UniqueStrings for _, c := range chunks { @@ -81,20 +71,6 @@ func filterChunksByUniqueFingerprint(s SchemaConfig, chunks []Chunk) ([]Chunk, [ return filtered, keys } -func filterChunksByMatchers(chunks []Chunk, filters []*labels.Matcher) []Chunk { - filteredChunks := make([]Chunk, 0, len(chunks)) -outer: - for _, chunk := range chunks { - for _, filter := range filters { - if !filter.Matches(chunk.Metric.Get(filter.Name)) { - continue outer - } - } - filteredChunks = append(filteredChunks, chunk) - } - return filteredChunks -} - // Fetcher deals with fetching chunk contents from the cache/store, // and writing back any misses to the cache. Also responsible for decoding // chunks from the cache, in parallel. diff --git a/pkg/storage/chunk/composite_store.go b/pkg/storage/chunk/composite_store.go index a668589a44..8ec1fbdbbb 100644 --- a/pkg/storage/chunk/composite_store.go +++ b/pkg/storage/chunk/composite_store.go @@ -26,7 +26,6 @@ type CacheGenNumLoader interface { type Store interface { Put(ctx context.Context, chunks []Chunk) error PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error - Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) // GetChunkRefs returns the un-loaded chunks and the fetchers to be used to load them. You can load each slice of chunks ([]Chunk), // using the corresponding Fetcher (fetchers[i].FetchChunks(ctx, chunks[i], ...) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) @@ -34,11 +33,6 @@ type Store interface { LabelNamesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string) ([]string, error) GetChunkFetcher(tm model.Time) *Fetcher - // DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage. - // It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk - DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error - // DeleteSeriesIDs is only relevant for SeriesStore. - DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error Stop() } @@ -83,8 +77,6 @@ func (c *CompositeStore) addSchema(storeCfg StoreConfig, schemaCfg SchemaConfig, switch s := schema.(type) { case SeriesStoreSchema: store, err = newSeriesStore(storeCfg, schemaCfg, s, index, chunks, limits, chunksCache, writeDedupeCache) - case StoreSchema: - store, err = newStore(storeCfg, schemaCfg, s, index, chunks, limits, chunksCache) default: err = errors.New("invalid schema type") } @@ -113,19 +105,6 @@ func (c compositeStore) PutOne(ctx context.Context, from, through model.Time, ch }) } -func (c compositeStore) Get(ctx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { - var results []Chunk - err := c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { - chunks, err := store.Get(innerCtx, userID, from, through, matchers...) - if err != nil { - return err - } - results = append(results, chunks...) - return nil - }) - return results, err -} - // LabelValuesForMetricName retrieves all label values for a single label name and metric name. func (c compositeStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { var result UniqueStrings @@ -191,21 +170,6 @@ func (c compositeStore) GetChunkFetcher(tm model.Time) *Fetcher { return nil } -// DeleteSeriesIDs deletes series IDs from index in series store -func (c CompositeStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error { - return c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { - return store.DeleteSeriesIDs(innerCtx, from, through, userID, metric) - }) -} - -// DeleteChunk deletes a chunks index entry and then deletes the actual chunk from chunk storage. -// It takes care of chunks which are deleting partially by creating and inserting a new chunk first and then deleting the original chunk -func (c CompositeStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error { - return c.forStores(ctx, userID, from, through, func(innerCtx context.Context, from, through model.Time, store Store) error { - return store.DeleteChunk(innerCtx, from, through, userID, chunkID, metric, partiallyDeletedInterval) - }) -} - func (c compositeStore) Stop() { for _, store := range c.stores { store.Stop() diff --git a/pkg/storage/chunk/composite_store_test.go b/pkg/storage/chunk/composite_store_test.go index 73e8233396..2eb702da74 100644 --- a/pkg/storage/chunk/composite_store_test.go +++ b/pkg/storage/chunk/composite_store_test.go @@ -23,9 +23,6 @@ func (m mockStore) PutOne(ctx context.Context, from, through model.Time, chunk C return nil } -func (m mockStore) Get(tx context.Context, userID string, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error) { - return nil, nil -} func (m mockStore) LabelValuesForMetricName(ctx context.Context, userID string, from, through model.Time, metricName string, labelName string, matchers ...*labels.Matcher) ([]string, error) { return nil, nil } @@ -38,13 +35,6 @@ func (m mockStore) LabelNamesForMetricName(ctx context.Context, userID string, f return nil, nil } -func (m mockStore) DeleteChunk(ctx context.Context, from, through model.Time, userID, chunkID string, metric labels.Labels, partiallyDeletedInterval *model.Interval) error { - return nil -} -func (m mockStore) DeleteSeriesIDs(ctx context.Context, from, through model.Time, userID string, metric labels.Labels) error { - return nil -} - func (m mockStore) GetChunkFetcher(tm model.Time) *Fetcher { return nil } @@ -233,7 +223,6 @@ func TestCompositeStoreLabels(t *testing.T) { } }) } - } type mockStoreGetChunkFetcher struct { @@ -287,5 +276,4 @@ func TestCompositeStore_GetChunkFetcher(t *testing.T) { require.Same(t, tc.expectedFetcher, cs.GetChunkFetcher(tc.tm)) }) } - } diff --git a/pkg/storage/chunk/schema.go b/pkg/storage/chunk/schema.go index 710063706c..70f36ea4d9 100644 --- a/pkg/storage/chunk/schema.go +++ b/pkg/storage/chunk/schema.go @@ -53,14 +53,6 @@ type BaseSchema interface { FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery } -// StoreSchema is a schema used by store -type StoreSchema interface { - BaseSchema - - // When doing a write, use this method to return the list of entries you should write to. - GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) -} - // SeriesStoreSchema is a schema used by seriesStore type SeriesStoreSchema interface { BaseSchema @@ -122,25 +114,12 @@ type baseSchema struct { entries baseEntries } -// storeSchema implements StoreSchema given a bucketing function and and set of range key callbacks -type storeSchema struct { - baseSchema - entries storeEntries -} - // seriesStoreSchema implements SeriesStoreSchema given a bucketing function and and set of range key callbacks type seriesStoreSchema struct { baseSchema entries seriesStoreEntries } -func newStoreSchema(buckets schemaBucketsFunc, entries storeEntries) storeSchema { - return storeSchema{ - baseSchema: baseSchema{buckets: buckets, entries: entries}, - entries: entries, - } -} - func newSeriesStoreSchema(buckets schemaBucketsFunc, entries seriesStoreEntries) seriesStoreSchema { return seriesStoreSchema{ baseSchema: baseSchema{buckets: buckets, entries: entries}, @@ -148,19 +127,6 @@ func newSeriesStoreSchema(buckets schemaBucketsFunc, entries seriesStoreEntries) } } -func (s storeSchema) GetWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) { - var result []IndexEntry - - for _, bucket := range s.buckets(from, through, userID) { - entries, err := s.entries.GetWriteEntries(bucket, metricName, labels, chunkID) - if err != nil { - return nil, err - } - result = append(result, entries...) - } - return result, nil -} - // returns cache key string and []IndexEntry per bucket, matched in order func (s seriesStoreSchema) GetCacheKeysAndLabelWriteEntries(from, through model.Time, userID string, metricName string, labels labels.Labels, chunkID string) ([]string, [][]IndexEntry, error) { var keys []string @@ -354,13 +320,6 @@ type baseEntries interface { FilterReadQueries(queries []IndexQuery, shard *astmapper.ShardAnnotation) []IndexQuery } -// used by storeSchema -type storeEntries interface { - baseEntries - - GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) -} - // used by seriesStoreSchema type seriesStoreEntries interface { baseEntries @@ -372,301 +331,6 @@ type seriesStoreEntries interface { GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) } -// original entries: -// - hash key: :: -// - range key: