Speed up re-labelling in multi-tenant queries. (#5663)

pull/5673/head
Karsten Jeschkies 3 years ago committed by GitHub
parent f3a86ded23
commit 705baa39f4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 62
      pkg/querier/multi_tenant_querier.go
  2. 51
      pkg/querier/multi_tenant_querier_test.go

@ -76,50 +76,68 @@ func (q *MultiTenantQuerier) SelectSamples(ctx context.Context, params logql.Sel
return iter.NewSortSampleIterator(iters), nil
}
// TenantEntry Iterator wraps an entry iterator and adds the tenant label.
type TenantEntryIterator struct {
iter.EntryIterator
type relabel struct {
tenantID string
cache map[string]labels.Labels
}
func NewTenantEntryIterator(iter iter.EntryIterator, id string) *TenantEntryIterator {
return &TenantEntryIterator{EntryIterator: iter, tenantID: id}
func (r relabel) relabel(original string) string {
lbls, ok := r.cache[original]
if ok {
return lbls.String()
}
func (i *TenantEntryIterator) Labels() string {
// TODO: cache manipulated labels and add a benchmark.
lbls, _ := syntax.ParseLabels(i.EntryIterator.Labels())
lbls, _ = syntax.ParseLabels(original)
builder := labels.NewBuilder(lbls.WithoutLabels(defaultTenantLabel))
// Prefix label if it conflicts with the tenant label.
if lbls.Has(defaultTenantLabel) {
builder.Set(retainExistingPrefix+defaultTenantLabel, lbls.Get(defaultTenantLabel))
}
builder.Set(defaultTenantLabel, i.tenantID)
builder.Set(defaultTenantLabel, r.tenantID)
return builder.Labels().String()
lbls = builder.Labels()
r.cache[original] = lbls
return lbls.String()
}
// TenantEntry Iterator wraps an entry iterator and adds the tenant label.
type TenantEntryIterator struct {
iter.EntryIterator
relabel
}
func NewTenantEntryIterator(iter iter.EntryIterator, id string) *TenantEntryIterator {
return &TenantEntryIterator{
EntryIterator: iter,
relabel: relabel{
tenantID: id,
cache: map[string]labels.Labels{},
},
}
}
func (i *TenantEntryIterator) Labels() string {
return i.relabel.relabel(i.EntryIterator.Labels())
}
// TenantEntry Iterator wraps a sample iterator and adds the tenant label.
type TenantSampleIterator struct {
iter.SampleIterator
tenantID string
relabel
}
func NewTenantSampleIterator(iter iter.SampleIterator, id string) *TenantSampleIterator {
return &TenantSampleIterator{SampleIterator: iter, tenantID: id}
return &TenantSampleIterator{
SampleIterator: iter,
relabel: relabel{
tenantID: id,
cache: map[string]labels.Labels{},
},
}
func (i *TenantSampleIterator) Labels() string {
// TODO: cache manipulated labels
lbls, _ := syntax.ParseLabels(i.SampleIterator.Labels())
builder := labels.NewBuilder(lbls.WithoutLabels(defaultTenantLabel))
// Prefix label if it conflicts with the tenant label.
if lbls.Has(defaultTenantLabel) {
builder.Set(retainExistingPrefix+defaultTenantLabel, lbls.Get(defaultTenantLabel))
}
builder.Set(defaultTenantLabel, i.tenantID)
return builder.Labels().String()
func (i *TenantSampleIterator) Labels() string {
return i.relabel.relabel(i.SampleIterator.Labels())
}

@ -2,10 +2,13 @@ package querier
import (
"context"
"fmt"
"strconv"
"testing"
"time"
"github.com/go-kit/log"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
@ -158,3 +161,51 @@ func newSampleIterator() iter.SampleIterator {
}),
})
}
func BenchmarkTenantEntryIteratorLabels(b *testing.B) {
it := newMockEntryIterator(12)
tenantIter := NewTenantEntryIterator(it, "tenant_1")
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
tenantIter.Labels()
}
}
type mockEntryIterator struct {
labels string
}
func newMockEntryIterator(numLabels int) mockEntryIterator {
builder := labels.NewBuilder(nil)
for i := 1; i <= numLabels; i++ {
builder.Set(fmt.Sprintf("label_%d", i), strconv.Itoa(i))
}
return mockEntryIterator{labels: builder.Labels().String()}
}
func (it mockEntryIterator) Labels() string {
return it.labels
}
func (it mockEntryIterator) Entry() logproto.Entry {
return logproto.Entry{}
}
func (it mockEntryIterator) Next() bool {
return true
}
func (it mockEntryIterator) StreamHash() uint64 {
return 0
}
func (it mockEntryIterator) Error() error {
return nil
}
func (it mockEntryIterator) Close() error {
return nil
}

Loading…
Cancel
Save