diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 28c3e67ba9..3af6cff6b1 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -633,7 +633,8 @@ func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) ( return nil, err } - labelsToMatch, matchers, matchAny := util.PrepareLabelsAndMatchers(req.TargetLabels, matchers) + targetLabels := req.TargetLabels + labelsToMatch, matchers, matchAny := util.PrepareLabelsAndMatchers(targetLabels, matchers) matchAny = matchAny || len(matchers) == 0 seriesNames := make(map[uint64]string) @@ -673,7 +674,11 @@ func (i *instance) GetVolume(ctx context.Context, req *logproto.VolumeRequest) ( } else { labelVolumes = make(map[string]uint64, len(s.labels)) for _, l := range s.labels { - if _, ok := labelsToMatch[l.Name]; matchAny || ok { + if len(targetLabels) > 0 { + if _, ok := labelsToMatch[l.Name]; matchAny || ok { + labelVolumes[l.Name] += size + } + } else { labelVolumes[l.Name] += size } } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 944a591d45..511ee32175 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -845,11 +845,17 @@ func TestInstance_Volume(t *testing.T) { err := instance.Push(context.TODO(), &logproto.PushRequest{ Streams: []logproto.Stream{ { - Labels: `{host="other"}`, + Labels: `{fizz="buzz", host="other"}`, Entries: []logproto.Entry{ {Timestamp: time.Unix(0, 1e6), Line: `msg="other"`}, }, }, + { + Labels: `{foo="bar", host="other", log_stream="worker"}`, + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1e6), Line: `msg="other worker"`}, + }, + }, }, }) require.NoError(t, err) @@ -864,7 +870,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: "{}", - Limit: 3, + Limit: 5, AggregateBy: seriesvolume.Series, }) require.NoError(t, err) @@ -872,7 +878,8 @@ func TestInstance_Volume(t *testing.T) { require.Equal(t, []logproto.Volume{ {Name: `{host="agent", job="3", log_stream="dispatcher"}`, Volume: 90}, {Name: `{host="agent", job="3", log_stream="worker"}`, Volume: 70}, - {Name: `{host="other"}`, Volume: 11}, + {Name: `{foo="bar", host="other", log_stream="worker"}`, Volume: 18}, + {Name: `{fizz="buzz", host="other"}`, Volume: 11}, }, volumes.Volumes) }) @@ -882,7 +889,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, - Limit: 3, + Limit: 5, AggregateBy: seriesvolume.Series, }) require.NoError(t, err) @@ -898,7 +905,7 @@ func TestInstance_Volume(t *testing.T) { From: 5, Through: 1.1 * 1e3, //milliseconds Matchers: "{}", - Limit: 3, + Limit: 5, AggregateBy: seriesvolume.Series, }) require.NoError(t, err) @@ -932,7 +939,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"log_stream"}, AggregateBy: seriesvolume.Series, }) @@ -940,7 +947,7 @@ func TestInstance_Volume(t *testing.T) { require.Equal(t, []logproto.Volume{ {Name: `{log_stream="dispatcher"}`, Volume: 90}, - {Name: `{log_stream="worker"}`, Volume: 70}, + {Name: `{log_stream="worker"}`, Volume: 88}, }, volumes.Volumes) }) @@ -950,7 +957,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"host"}, AggregateBy: seriesvolume.Series, }) @@ -967,7 +974,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"host", "job"}, AggregateBy: seriesvolume.Series, }) @@ -987,32 +994,39 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: "{}", - Limit: 3, + Limit: 5, AggregateBy: seriesvolume.Labels, }) require.NoError(t, err) require.Equal(t, []logproto.Volume{ - {Name: `host`, Volume: 171}, + {Name: `host`, Volume: 189}, + {Name: `log_stream`, Volume: 178}, {Name: `job`, Volume: 160}, - {Name: `log_stream`, Volume: 160}, + {Name: `foo`, Volume: 18}, + {Name: `fizz`, Volume: 11}, }, volumes.Volumes) }) - t.Run("with matchers", func(t *testing.T) { + t.Run("with matchers it returns intersecting labels", func(t *testing.T) { instance := prepareInstance(t) volumes, err := instance.GetVolume(context.Background(), &logproto.VolumeRequest{ From: 0, Through: 1.1 * 1e3, //milliseconds - Matchers: `{log_stream="dispatcher"}`, - Limit: 3, + Matchers: `{log_stream="worker"}`, + Limit: 5, AggregateBy: seriesvolume.Labels, }) require.NoError(t, err) require.Equal(t, []logproto.Volume{ - {Name: `log_stream`, Volume: 90}, + {Name: `host`, Volume: 88}, + {Name: `log_stream`, Volume: 88}, + {Name: `job`, Volume: 70}, + {Name: `foo`, Volume: 18}, }, volumes.Volumes) + + require.NotContains(t, volumes.Volumes, logproto.Volume{Name: `fizz`, Volume: 11}) }) t.Run("excludes streams outside of time bounds", func(t *testing.T) { @@ -1021,7 +1035,7 @@ func TestInstance_Volume(t *testing.T) { From: 5, Through: 1.1 * 1e3, //milliseconds Matchers: "{}", - Limit: 3, + Limit: 5, AggregateBy: seriesvolume.Labels, }) require.NoError(t, err) @@ -1045,7 +1059,7 @@ func TestInstance_Volume(t *testing.T) { require.NoError(t, err) require.Equal(t, []logproto.Volume{ - {Name: `host`, Volume: 171}, + {Name: `host`, Volume: 189}, }, volumes.Volumes) }) @@ -1056,14 +1070,14 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"host"}, AggregateBy: seriesvolume.Labels, }) require.NoError(t, err) require.Equal(t, []logproto.Volume{ - {Name: `host`, Volume: 171}, + {Name: `host`, Volume: 189}, }, volumes.Volumes) }) @@ -1073,7 +1087,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream="dispatcher"}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"host"}, AggregateBy: seriesvolume.Labels, }) @@ -1090,7 +1104,7 @@ func TestInstance_Volume(t *testing.T) { From: 0, Through: 1.1 * 1e3, //milliseconds Matchers: `{log_stream=~".+"}`, - Limit: 3, + Limit: 5, TargetLabels: []string{"host", "job"}, AggregateBy: seriesvolume.Labels, }) diff --git a/pkg/storage/stores/tsdb/single_file_index.go b/pkg/storage/stores/tsdb/single_file_index.go index 9cbd03d9ec..b913e44c53 100644 --- a/pkg/storage/stores/tsdb/single_file_index.go +++ b/pkg/storage/stores/tsdb/single_file_index.go @@ -409,10 +409,18 @@ func (i *TSDBIndex) Volume( } } } else { + // when aggregating by labels, capture sizes for target labels if provided, + // otherwise for all intersecting labels labelVolumes = make(map[string]uint64, len(ls)) for _, l := range ls { - if _, ok := labelsToMatch[l.Name]; l.Name != TenantLabel && includeAll || ok { - labelVolumes[l.Name] += stats.KB << 10 + if len(targetLabels) > 0 { + if _, ok := labelsToMatch[l.Name]; l.Name != TenantLabel && includeAll || ok { + labelVolumes[l.Name] += stats.KB << 10 + } + } else { + if l.Name != TenantLabel { + labelVolumes[l.Name] += stats.KB << 10 + } } } } diff --git a/pkg/storage/stores/tsdb/single_file_index_test.go b/pkg/storage/stores/tsdb/single_file_index_test.go index d1952b0b05..7df713eece 100644 --- a/pkg/storage/stores/tsdb/single_file_index_test.go +++ b/pkg/storage/stores/tsdb/single_file_index_test.go @@ -376,7 +376,7 @@ func TestTSDBIndex_Volume(t *testing.T) { series := []LoadableSeries{ { - Labels: mustParseLabels(`{foo="bar", fizz="buzz", __loki_tenant__="fake"}`), + Labels: mustParseLabels(`{foo="bar", fizz="buzz", us="them", __loki_tenant__="fake"}`), Chunks: []index.ChunkMeta{ { @@ -396,7 +396,7 @@ func TestTSDBIndex_Volume(t *testing.T) { }, }, { - Labels: mustParseLabels(`{foo="bar", fizz="fizz", __loki_tenant__="fake"}`), + Labels: mustParseLabels(`{foo="bar", fizz="fizz", in="out", __loki_tenant__="fake"}`), Chunks: []index.ChunkMeta{ { MinTime: t1.UnixMilli(), @@ -451,8 +451,8 @@ func TestTSDBIndex_Volume(t *testing.T) { require.Equal(t, &logproto.VolumeResponse{ Volumes: []logproto.Volume{ {Name: `{foo="baz"}`, Volume: (50 + 60) * 1024}, - {Name: `{fizz="fizz", foo="bar"}`, Volume: (30 + 40) * 1024}, - {Name: `{fizz="buzz", foo="bar"}`, Volume: (10 + 20) * 1024}, + {Name: `{fizz="fizz", foo="bar", in="out"}`, Volume: (30 + 40) * 1024}, + {Name: `{fizz="buzz", foo="bar", us="them"}`, Volume: (10 + 20) * 1024}, }, Limit: 10, }, acc.Volumes()) @@ -602,6 +602,8 @@ func TestTSDBIndex_Volume(t *testing.T) { Volumes: []logproto.Volume{ {Name: `foo`, Volume: (10 + 20 + 30 + 40 + 50 + 60) * 1024}, {Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024}, + {Name: `in`, Volume: (30 + 40) * 1024}, + {Name: `us`, Volume: (10 + 20) * 1024}, }, Limit: 10, }, acc.Volumes()) @@ -619,6 +621,8 @@ func TestTSDBIndex_Volume(t *testing.T) { Volumes: []logproto.Volume{ {Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024}, {Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024}, + {Name: `in`, Volume: (30 + 40) * 1024}, + {Name: `us`, Volume: (10 + 20) * 1024}, }, Limit: 10, }, acc.Volumes()) @@ -635,14 +639,16 @@ func TestTSDBIndex_Volume(t *testing.T) { }, acc.Volumes()) }) - t.Run("it only returns results for the labels in the matcher", func(t *testing.T) { - matcher := labels.MustNewMatcher(labels.MatchEqual, "foo", "bar") + t.Run("it only returns labels that exist on series intersecting with the matcher ", func(t *testing.T) { + matcher := labels.MustNewMatcher(labels.MatchEqual, "us", "them") acc := seriesvolume.NewAccumulator(10, 10) err := tsdbIndex.Volume(context.Background(), "fake", from, through, acc, nil, nil, nil, seriesvolume.Labels, matcher) require.NoError(t, err) require.Equal(t, &logproto.VolumeResponse{ Volumes: []logproto.Volume{ - {Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024}, + {Name: `fizz`, Volume: (10 + 20) * 1024}, + {Name: `foo`, Volume: (10 + 20) * 1024}, + {Name: `us`, Volume: (10 + 20) * 1024}, }, Limit: 10, }, acc.Volumes()) @@ -660,6 +666,8 @@ func TestTSDBIndex_Volume(t *testing.T) { Volumes: []logproto.Volume{ {Name: `fizz`, Volume: (10 + 20 + 30 + 40) * 1024}, {Name: `foo`, Volume: (10 + 20 + 30 + 40) * 1024}, + {Name: `in`, Volume: (30 + 40) * 1024}, + {Name: `us`, Volume: (10 + 20) * 1024}, }, Limit: 10, }, acc.Volumes()) @@ -689,6 +697,8 @@ func TestTSDBIndex_Volume(t *testing.T) { Volumes: []logproto.Volume{ {Name: `foo`, Volume: (29 + 9 + 48) * 1024}, {Name: `fizz`, Volume: (29 + 9) * 1024}, + {Name: `in`, Volume: (29) * 1024}, + {Name: `us`, Volume: (9) * 1024}, }, Limit: 10, }, acc.Volumes())