diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index f425174711..d9db5c24fd 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -509,6 +509,10 @@ func (s *testStore) Volume(_ context.Context, _ string, _, _ model.Time, _ int32 return &logproto.VolumeResponse{}, nil } +func (s *testStore) Series(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { + return nil, nil +} + func pushTestSamples(t *testing.T, ing logproto.PusherServer) map[string][]logproto.Stream { userIDs := []string{"1", "2", "3"} diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f36d048801..9169ef5c72 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -1356,7 +1356,48 @@ func (i *Ingester) series(ctx context.Context, req *logproto.SeriesRequest) (*lo if err != nil { return nil, err } - return instance.Series(ctx, req) + + resp, err := instance.Series(ctx, req) + if err != nil { + return nil, err + } + + if start, end, ok := buildStoreRequest(i.cfg, req.Start, req.End, time.Now()); ok { + var storeSeries []logproto.SeriesIdentifier + var parsed syntax.Expr + + groups := []string{""} + if len(req.Groups) != 0 { + groups = req.Groups + } + + for _, group := range groups { + if group != "" { + parsed, err = syntax.ParseExpr(group) + if err != nil { + return nil, err + } + } + storeSeries, err = i.store.SelectSeries(ctx, logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: group, + Limit: 1, + Start: start, + End: end, + Direction: logproto.FORWARD, + Shards: req.Shards, + Plan: &plan.QueryPlan{ + AST: parsed, + }, + }, + }) + if err != nil { + return nil, err + } + resp.Series = append(resp.Series, storeSeries...) + } + } + return resp, nil } func (i *Ingester) GetStats(ctx context.Context, req *logproto.IndexStatsRequest) (*logproto.IndexStatsResponse, error) { diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 5f9714267d..1df6b206b9 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -51,6 +51,7 @@ import ( "github.com/grafana/loki/v3/pkg/storage/stores/index/stats" "github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/sharding" "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/util/marshal" "github.com/grafana/loki/v3/pkg/validation" ) @@ -458,7 +459,14 @@ func (s *mockStore) SelectSamples(_ context.Context, _ logql.SelectSampleParams) return nil, nil } -func (s *mockStore) SelectSeries(_ context.Context, _ logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { +func (s *mockStore) SelectSeries(_ context.Context, req logql.SelectLogParams) ([]logproto.SeriesIdentifier, error) { + // NOTE: If the time range includes one hour before the current time, the return value is returned. + thresTime := time.Now().Add(-1 * time.Hour) + if !thresTime.Before(req.Start) && !thresTime.After(req.End) { + return []logproto.SeriesIdentifier{ + {Labels: mustParseLabels(`{a="11",c="33"}`)}, + }, nil + } return nil, nil } @@ -1390,6 +1398,141 @@ func TestVolume(t *testing.T) { }) } +func Test_Series(t *testing.T) { + ingesterConfig := defaultIngesterTestConfig(t) + limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) + require.NoError(t, err) + + store := &mockStore{ + chunks: map[string][]chunk.Chunk{}, + } + + mockRing := mockReadRingWithOneActiveIngester() + + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil, writefailures.Cfg{}, constants.Loki, log.NewNopLogger(), nil, mockRing, nil) + require.NoError(t, err) + defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck + + ctx := user.InjectOrgID(context.Background(), "test") + t.Run("only in-memory series", func(t *testing.T) { + i.cfg.QueryStore = false + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{a="11",b="22"}`, + }, + { + Labels: `{a="11",c="33"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: fmt.Sprintf("line %d", i), + }) + } + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Unix(0, 0), + End: time.Unix(1, 0), + Groups: []string{`{a="11"}`}, + }) + + expectedSeries := []logproto.SeriesIdentifier{ + {Labels: mustParseLabels(`{a="11", b="22"}`)}, + {Labels: mustParseLabels(`{a="11", c="33"}`)}, + } + // ignore order + sortLabels(res.Series) + sortLabels(expectedSeries) + + require.NoError(t, err) + require.ElementsMatch(t, expectedSeries, res.Series) + }) + + t.Run("in-memory and storage (If data exists in storage)", func(t *testing.T) { + i.cfg.QueryStore = true + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{a="11",b="22"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("line %d", i), + }, + ) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Now().Add(-20 * time.Hour), + End: time.Now(), + Groups: []string{`{a="11"}`}, + }) + + expectedSeries := []logproto.SeriesIdentifier{ + {Labels: mustParseLabels(`{a="11", b="22"}`)}, + {Labels: mustParseLabels(`{a="11", c="33"}`)}, + } + // ignore order + sortLabels(res.Series) + sortLabels(expectedSeries) + + require.NoError(t, err) + require.ElementsMatch(t, expectedSeries, res.Series) + }) + + t.Run("in-memory and storage (If no data exists in storage)", func(t *testing.T) { + i.cfg.QueryStore = true + req := logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: `{a="11",b="22"}`, + }, + }, + } + for i := 0; i < 10; i++ { + req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ + Timestamp: time.Now(), + Line: fmt.Sprintf("line %d", i), + }, + ) + } + + _, err = i.Push(ctx, &req) + require.NoError(t, err) + + res, err := i.Series(ctx, &logproto.SeriesRequest{ + Start: time.Now().Add(-30 * time.Minute), + End: time.Now(), + Groups: []string{`{a="11"}`}, + }) + + expectedSeries := []logproto.SeriesIdentifier{ + {Labels: mustParseLabels(`{a="11", b="22"}`)}, + } + // ignore order + sortLabels(res.Series) + sortLabels(expectedSeries) + + require.NoError(t, err) + require.ElementsMatch(t, expectedSeries, res.Series) + }) +} + type ingesterClient struct { logproto.PusherClient logproto.QuerierClient @@ -1662,3 +1805,24 @@ func TestUpdateOwnedStreams(t *testing.T) { ownedStreams := i.instances["test"].ownedStreamsSvc.getOwnedStreamCount() require.Equal(t, 8, ownedStreams) } + +func mustParseLabels(s string) []logproto.SeriesIdentifier_LabelsEntry { + l, err := marshal.NewLabelSet(s) + if err != nil { + panic(fmt.Sprintf("Failed to parse %s", s)) + } + + result := make([]logproto.SeriesIdentifier_LabelsEntry, 0, len(l)) + for k, v := range l { + result = append(result, logproto.SeriesIdentifier_LabelsEntry{Key: k, Value: v}) + } + return result +} + +func sortLabels(series []logproto.SeriesIdentifier) { + for i := range series { + sort.SliceStable(series[i].Labels, func(j, k int) bool { + return series[i].Labels[j].Key < series[i].Labels[k].Key + }) + } +}