Implement `Series` and `Label` for MultiTenantQuerier (#5566)

**What this PR does / why we need it**:

re: #5490, #5517

This PR continues implementing the `Querier` interface by adding `Label` and `Series` methods for `MultiTenantQuerier`. 

Two new functions are also added to `pkg/logproto`: `MergeSeriesResponses` and `MergeLabelResponses` which consolidate multi-tenant responses into a single response.

**Which issue(s) this PR fixes**:

N/A

**Special notes for your reviewer**:

N/A

**Checklist**
- [ ] Documentation added
- [x] Tests updated
- [ ] Add an entry in the `CHANGELOG.md` about the changes.

Signed-off-by: JordanRushing <rushing.jordan@gmail.com>
Co-authored-by: Karsten Jeschkies <k@jeschkies.xyz>
pull/5732/head
JordanRushing 3 years ago committed by GitHub
parent a5921f791a
commit 1cfff09117
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 51
      pkg/logproto/compat.go
  2. 193
      pkg/logproto/compat_test.go
  3. 59
      pkg/querier/multi_tenant_querier.go
  4. 149
      pkg/querier/multi_tenant_querier_test.go
  5. 6
      pkg/querier/querier_mock_test.go

@ -214,3 +214,54 @@ func init() {
jsoniter.RegisterTypeEncoderFunc("logproto.LegacySample", SampleJsoniterEncode, func(unsafe.Pointer) bool { return false })
jsoniter.RegisterTypeDecoderFunc("logproto.LegacySample", SampleJsoniterDecode)
}
// Combine unique values from multiple LabelResponses into a single, sorted LabelResponse.
func MergeLabelResponses(responses []*LabelResponse) (*LabelResponse, error) {
if len(responses) == 0 {
return &LabelResponse{}, nil
} else if len(responses) == 1 {
return responses[0], nil
}
unique := map[string]struct{}{}
for _, r := range responses {
for _, v := range r.Values {
if _, ok := unique[v]; !ok {
unique[v] = struct{}{}
} else {
continue
}
}
}
result := &LabelResponse{Values: make([]string, 0, len(unique))}
for value := range unique {
result.Values = append(result.Values, value)
}
// Sort the unique values before returning because we can't rely on map key ordering
sort.Strings(result.Values)
return result, nil
}
// Combine unique label sets from multiple SeriesResponse and return a single SeriesResponse.
func MergeSeriesResponses(responses []*SeriesResponse) (*SeriesResponse, error) {
if len(responses) == 0 {
return &SeriesResponse{}, nil
} else if len(responses) == 1 {
return responses[0], nil
}
result := &SeriesResponse{
Series: make([]SeriesIdentifier, 0, len(responses)),
}
for _, r := range responses {
result.Series = append(result.Series, r.Series...)
}
return result, nil
}

@ -166,3 +166,196 @@ func TestLegacyLabelPairCompatibilityUnmarshalling(t *testing.T) {
require.NoError(t, err)
require.NotEqualValues(t, expectedLabelPair, incompatibleLabelPair)
}
func TestMergeLabelResponses(t *testing.T) {
for _, tc := range []struct {
desc string
responses []*LabelResponse
expected []*LabelResponse
err error
}{
{
desc: "merge two label responses",
responses: []*LabelResponse{
{Values: []string{"test"}},
{Values: []string{"test2"}},
},
expected: []*LabelResponse{
{Values: []string{"test", "test2"}},
},
},
{
desc: "merge three label responses",
responses: []*LabelResponse{
{Values: []string{"test"}},
{Values: []string{"test2"}},
{Values: []string{"test3"}},
},
expected: []*LabelResponse{
{Values: []string{"test", "test2", "test3"}},
},
},
{
desc: "merge three label responses with one non-unique",
responses: []*LabelResponse{
{Values: []string{"test"}},
{Values: []string{"test"}},
{Values: []string{"test2"}},
{Values: []string{"test3"}},
},
expected: []*LabelResponse{
{Values: []string{"test", "test2", "test3"}},
},
},
{
desc: "merge one and expect one",
responses: []*LabelResponse{
{Values: []string{"test"}},
},
expected: []*LabelResponse{
{Values: []string{"test"}},
},
},
{
desc: "merge empty and expect empty",
responses: []*LabelResponse{},
expected: []*LabelResponse{},
},
} {
t.Run(tc.desc, func(t *testing.T) {
merged, err := MergeLabelResponses(tc.responses)
if err != nil {
require.Equal(t, tc.err, err)
} else if len(tc.expected) == 0 {
require.Empty(t, merged)
} else {
require.ElementsMatch(t, tc.expected[0].Values, merged.Values)
}
})
}
}
func TestMergeSeriesResponses(t *testing.T) {
mockSeriesResponse := func(series []map[string]string) *SeriesResponse {
resp := &SeriesResponse{}
for _, s := range series {
resp.Series = append(resp.Series, SeriesIdentifier{
Labels: s,
})
}
return resp
}
for _, tc := range []struct {
desc string
responses []*SeriesResponse
expected []*SeriesResponse
err error
}{
{
desc: "merge one series response and expect one",
responses: []*SeriesResponse{
{Series: []SeriesIdentifier{{Labels: map[string]string{"test": "test"}}}},
},
expected: []*SeriesResponse{
mockSeriesResponse([]map[string]string{{"test": "test"}}),
},
},
{
desc: "merge two series responses",
responses: []*SeriesResponse{
{Series: []SeriesIdentifier{{Labels: map[string]string{"test": "test"}}}},
{Series: []SeriesIdentifier{{Labels: map[string]string{"test2": "test2"}}}},
},
expected: []*SeriesResponse{
mockSeriesResponse([]map[string]string{{"test": "test"}, {"test2": "test2"}}),
},
},
{
desc: "merge three series responses",
responses: []*SeriesResponse{
{Series: []SeriesIdentifier{{Labels: map[string]string{"test": "test"}}}},
{Series: []SeriesIdentifier{{Labels: map[string]string{"test2": "test2"}}}},
{Series: []SeriesIdentifier{{Labels: map[string]string{"test3": "test3"}}}},
},
expected: []*SeriesResponse{
mockSeriesResponse([]map[string]string{{"test": "test"}, {"test2": "test2"}, {"test3": "test3"}}),
},
},
{
desc: "merge empty and expect empty",
responses: []*SeriesResponse{},
expected: []*SeriesResponse{},
},
} {
t.Run(tc.desc, func(t *testing.T) {
merged, err := MergeSeriesResponses(tc.responses)
if err != nil {
require.Equal(t, tc.err, err)
} else if len(tc.expected) == 0 {
require.Empty(t, merged)
} else {
require.ElementsMatch(t, tc.expected[0].Series, merged.Series)
}
})
}
}
func benchmarkMergeLabelResponses(b *testing.B, responses []*LabelResponse) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
MergeLabelResponses(responses) //nolint:errcheck
}
}
func benchmarkMergeSeriesResponses(b *testing.B, responses []*SeriesResponse) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
MergeSeriesResponses(responses) //nolint:errcheck
}
}
func BenchmarkMergeALabelResponse(b *testing.B) {
response := []*LabelResponse{{Values: []string{"test"}}}
benchmarkMergeLabelResponses(b, response)
}
func BenchmarkMergeASeriesResponse(b *testing.B) {
response := []*SeriesResponse{{Series: []SeriesIdentifier{{Labels: map[string]string{"test": "test"}}}}}
benchmarkMergeSeriesResponses(b, response)
}
func BenchmarkMergeSomeLabelResponses(b *testing.B) {
responses := []*LabelResponse{
{Values: []string{"test"}},
{Values: []string{"test2"}},
{Values: []string{"test3"}},
}
benchmarkMergeLabelResponses(b, responses)
}
func BenchmarkMergeSomeSeriesResponses(b *testing.B) {
responses := []*SeriesResponse{
{Series: []SeriesIdentifier{{Labels: map[string]string{"test": "test"}}}},
{Series: []SeriesIdentifier{{Labels: map[string]string{"test2": "test2"}}}},
{Series: []SeriesIdentifier{{Labels: map[string]string{"test3": "test3"}}}},
}
benchmarkMergeSeriesResponses(b, responses)
}
func BenchmarkMergeManyLabelResponses(b *testing.B) {
responses := []*LabelResponse{}
for i := 0; i < 20; i++ {
responses = append(responses, &LabelResponse{Values: []string{fmt.Sprintf("test%d", i)}})
}
benchmarkMergeLabelResponses(b, responses)
}
func BenchmarkMergeManySeriesResponses(b *testing.B) {
responses := []*SeriesResponse{}
for i := 0; i < 20; i++ {
test := fmt.Sprintf("test%d", i)
responses = append(responses, &SeriesResponse{Series: []SeriesIdentifier{{Labels: map[string]string{test: test}}}})
}
benchmarkMergeSeriesResponses(b, responses)
}

@ -10,6 +10,7 @@ import (
"github.com/grafana/dskit/tenant"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/syntax"
)
@ -77,6 +78,64 @@ func (q *MultiTenantQuerier) SelectSamples(ctx context.Context, params logql.Sel
return iter.NewSortSampleIterator(iters), nil
}
func (q *MultiTenantQuerier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
if req.Name == defaultTenantLabel {
return &logproto.LabelResponse{Values: tenantIDs}, nil
}
if len(tenantIDs) == 1 {
return q.Querier.Label(ctx, req)
}
responses := make([]*logproto.LabelResponse, len(tenantIDs))
for i, id := range tenantIDs {
singleContext := user.InjectUserID(ctx, id)
resp, err := q.Querier.Label(singleContext, req)
if err != nil {
return nil, err
}
responses[i] = resp
}
return logproto.MergeLabelResponses(responses)
}
func (q *MultiTenantQuerier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
tenantIDs, err := tenant.TenantIDs(ctx)
if err != nil {
return nil, err
}
if len(tenantIDs) == 1 {
return q.Querier.Series(ctx, req)
}
responses := make([]*logproto.SeriesResponse, len(tenantIDs))
for i, id := range tenantIDs {
singleContext := user.InjectUserID(ctx, id)
resp, err := q.Querier.Series(singleContext, req)
if err != nil {
return nil, err
}
for _, s := range resp.GetSeries() {
if _, ok := s.Labels[defaultTenantLabel]; !ok {
s.Labels[defaultTenantLabel] = id
}
}
responses[i] = resp
}
return logproto.MergeSeriesResponses(responses)
}
type relabel struct {
tenantID string
cache map[string]labels.Labels

@ -206,3 +206,152 @@ func (it mockEntryIterator) Error() error {
func (it mockEntryIterator) Close() error {
return nil
}
func TestMultiTenantQuerier_Label(t *testing.T) {
start := time.Unix(0, 0)
end := time.Unix(10, 0)
mockLabelRequest := func(name string) *logproto.LabelRequest {
return &logproto.LabelRequest{
Name: name,
Values: true,
Start: &start,
End: &end,
}
}
tenant.WithDefaultResolver(tenant.NewMultiResolver())
for _, tc := range []struct {
desc string
name string
orgID string
expectedLabels []string
}{
{
desc: "test label request for multiple tenants",
name: "test",
orgID: "1|2",
expectedLabels: []string{"test"},
},
{
desc: "test label request for a single tenant",
name: "test",
orgID: "1",
expectedLabels: []string{"test"},
},
{
desc: "defaultTenantLabel label request for multiple tenants",
name: defaultTenantLabel,
orgID: "1|2",
expectedLabels: []string{"1", "2"},
},
{
desc: "defaultTenantLabel label request for a single tenant",
name: defaultTenantLabel,
orgID: "1",
expectedLabels: []string{"1"},
},
} {
t.Run(tc.desc, func(t *testing.T) {
querier := newQuerierMock()
querier.On("Label", mock.Anything, mock.Anything).Return(mockLabelResponse([]string{"test"}), nil)
multiTenantQuerier := NewMultiTenantQuerier(querier, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), tc.orgID)
resp, err := multiTenantQuerier.Label(ctx, mockLabelRequest(tc.name))
require.NoError(t, err)
require.Equal(t, tc.expectedLabels, resp.GetValues())
})
}
}
func TestMultiTenantQuerierSeries(t *testing.T) {
tenant.WithDefaultResolver(tenant.NewMultiResolver())
for _, tc := range []struct {
desc string
orgID string
expectedSeries []logproto.SeriesIdentifier
}{
{
desc: "two tenantIDs",
orgID: "1|2",
expectedSeries: []logproto.SeriesIdentifier{
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "2"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "3"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "4"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "5"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "2"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "3"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "4"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "5"}},
},
},
{
desc: "three tenantIDs",
orgID: "1|2|3",
expectedSeries: []logproto.SeriesIdentifier{
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "2"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "3"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "4"}},
{Labels: map[string]string{"__tenant_id__": "1", "a": "1", "b": "5"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "2"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "3"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "4"}},
{Labels: map[string]string{"__tenant_id__": "2", "a": "1", "b": "5"}},
{Labels: map[string]string{"__tenant_id__": "3", "a": "1", "b": "2"}},
{Labels: map[string]string{"__tenant_id__": "3", "a": "1", "b": "3"}},
{Labels: map[string]string{"__tenant_id__": "3", "a": "1", "b": "4"}},
{Labels: map[string]string{"__tenant_id__": "3", "a": "1", "b": "5"}},
},
},
{
desc: "single tenantID; behaves like a normal `Series` call",
orgID: "2",
expectedSeries: []logproto.SeriesIdentifier{
{Labels: map[string]string{"a": "1", "b": "2"}},
{Labels: map[string]string{"a": "1", "b": "3"}},
{Labels: map[string]string{"a": "1", "b": "4"}},
{Labels: map[string]string{"a": "1", "b": "5"}},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
querier := newQuerierMock()
querier.On("Series", mock.Anything, mock.Anything).Return(func() *logproto.SeriesResponse { return mockSeriesResponse() }, nil)
multiTenantQuerier := NewMultiTenantQuerier(querier, log.NewNopLogger())
ctx := user.InjectOrgID(context.Background(), tc.orgID)
resp, err := multiTenantQuerier.Series(ctx, mockSeriesRequest())
require.NoError(t, err)
require.Equal(t, tc.expectedSeries, resp.GetSeries())
})
}
}
func mockSeriesRequest() *logproto.SeriesRequest {
return &logproto.SeriesRequest{
Start: time.Unix(0, 0),
End: time.Unix(10, 0),
}
}
func mockSeriesResponse() *logproto.SeriesResponse {
return &logproto.SeriesResponse{
Series: []logproto.SeriesIdentifier{
{
Labels: map[string]string{"a": "1", "b": "2"},
},
{
Labels: map[string]string{"a": "1", "b": "3"},
},
{
Labels: map[string]string{"a": "1", "b": "4"},
},
{
Labels: map[string]string{"a": "1", "b": "5"},
},
},
}
}

@ -464,11 +464,13 @@ func (q *querierMock) SelectSamples(ctx context.Context, params logql.SelectSamp
}
func (q *querierMock) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
return nil, errors.New("querierMock.Label() has not been mocked")
args := q.Called(ctx, req)
return args.Get(0).(*logproto.LabelResponse), args.Error(1)
}
func (q *querierMock) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
return nil, errors.New("querierMock.Series() has not been mocked")
args := q.Called(ctx, req)
return args.Get(0).(func() *logproto.SeriesResponse)(), args.Error(1)
}
func (q *querierMock) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, error) {

Loading…
Cancel
Save