Add tenant to tee (#11573)

It is useful for implementations of Tee to also know the tenant who sent
the logs. This PR adds tenant to the tee interface.
pull/11597/head
Travis Patterson 1 year ago committed by GitHub
parent e9446a93de
commit d2f43787b6
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/distributor/distributor.go
  2. 6
      pkg/distributor/distributor_test.go
  3. 4
      pkg/distributor/tee.go

@ -420,7 +420,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log
// Nil check for performance reasons, to avoid dynamic lookup and/or no-op // Nil check for performance reasons, to avoid dynamic lookup and/or no-op
// function calls that cannot be inlined. // function calls that cannot be inlined.
if d.tee != nil { if d.tee != nil {
d.tee.Duplicate(streams) d.tee.Duplicate(tenantID, streams)
} }
const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck

@ -1251,12 +1251,14 @@ func (s *fakeRateStore) RateFor(_ string, _ uint64) (int64, float64) {
type mockTee struct { type mockTee struct {
mu sync.Mutex mu sync.Mutex
duplicated [][]KeyedStream duplicated [][]KeyedStream
tenant string
} }
func (mt *mockTee) Duplicate(streams []KeyedStream) { func (mt *mockTee) Duplicate(tenant string, streams []KeyedStream) {
mt.mu.Lock() mt.mu.Lock()
defer mt.mu.Unlock() defer mt.mu.Unlock()
mt.duplicated = append(mt.duplicated, streams) mt.duplicated = append(mt.duplicated, streams)
mt.tenant = tenant
} }
func TestDistributorTee(t *testing.T) { func TestDistributorTee(t *testing.T) {
@ -1307,5 +1309,7 @@ func TestDistributorTee(t *testing.T) {
for j, streams := range td.Streams { for j, streams := range td.Streams {
assert.Equal(t, tee.duplicated[i][j].Stream.Entries, streams.Entries) assert.Equal(t, tee.duplicated[i][j].Stream.Entries, streams.Entries)
} }
require.Equal(t, "test", tee.tenant)
} }
} }

@ -1,6 +1,6 @@
package distributor package distributor
// Tee imlpementations can duplicate the log streams to another endpoint. // Tee implementations can duplicate the log streams to another endpoint.
type Tee interface { type Tee interface {
Duplicate([]KeyedStream) Duplicate(tenant string, streams []KeyedStream)
} }

Loading…
Cancel
Save