diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 14954141e7..7a4f1b0b9a 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -232,8 +232,8 @@ type streamTracker struct { // TODO taken from Cortex, see if we can refactor out an usable interface. type pushTracker struct { - samplesPending atomic.Int32 - samplesFailed atomic.Int32 + streamsPending atomic.Int32 + streamsFailed atomic.Int32 done chan struct{} err chan error } @@ -256,8 +256,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log // We also work out the hash value at the same time. streams := make([]streamTracker, 0, len(req.Streams)) keys := make([]uint32, 0, len(req.Streams)) - validatedSamplesSize := 0 - validatedSamplesCount := 0 + validatedLineSize := 0 + validatedLineCount := 0 var validationErr error validationContext := d.validator.getValidationContextForTime(time.Now(), userID) @@ -305,8 +305,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } n++ - validatedSamplesSize += len(entry.Line) - validatedSamplesCount++ + validatedLineSize += len(entry.Line) + validatedLineCount++ } stream.Entries = stream.Entries[:n] @@ -326,17 +326,17 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log } now := time.Now() - if !d.ingestionRateLimiter.AllowN(now, userID, validatedSamplesSize) { + if !d.ingestionRateLimiter.AllowN(now, userID, validatedLineSize) { // Return a 429 to indicate to the client they are being rate limited - validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesCount)) - validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedSamplesSize)) - return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, userID, int(d.ingestionRateLimiter.Limit(now, userID)), validatedSamplesCount, validatedSamplesSize) + validation.DiscardedSamples.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineCount)) + validation.DiscardedBytes.WithLabelValues(validation.RateLimited, userID).Add(float64(validatedLineSize)) + return nil, httpgrpc.Errorf(http.StatusTooManyRequests, validation.RateLimitedErrorMsg, userID, int(d.ingestionRateLimiter.Limit(now, userID)), validatedLineCount, validatedLineSize) } const maxExpectedReplicationSet = 5 // typical replication factor 3 plus one for inactive plus one for luck var descs [maxExpectedReplicationSet]ring.InstanceDesc - samplesByIngester := map[string][]*streamTracker{} + streamsByIngester := map[string][]*streamTracker{} ingesterDescs := map[string]ring.InstanceDesc{} for i, key := range keys { replicationSet, err := d.ingestersRing.Get(key, ring.Write, descs[:0], nil, nil) @@ -347,7 +347,7 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log streams[i].minSuccess = len(replicationSet.Instances) - replicationSet.MaxErrors streams[i].maxFailures = replicationSet.MaxErrors for _, ingester := range replicationSet.Instances { - samplesByIngester[ingester.Addr] = append(samplesByIngester[ingester.Addr], &streams[i]) + streamsByIngester[ingester.Addr] = append(streamsByIngester[ingester.Addr], &streams[i]) ingesterDescs[ingester.Addr] = ingester } } @@ -356,8 +356,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log done: make(chan struct{}, 1), // buffer avoids blocking if caller terminates - sendSamples() only sends once on each err: make(chan error, 1), } - tracker.samplesPending.Store(int32(len(streams))) - for ingester, samples := range samplesByIngester { + tracker.streamsPending.Store(int32(len(streams))) + for ingester, streams := range streamsByIngester { go func(ingester ring.InstanceDesc, samples []*streamTracker) { // Use a background context to make sure all ingesters get samples even if we return early localCtx, cancel := context.WithTimeout(context.Background(), d.clientCfg.RemoteTimeout) @@ -366,8 +366,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log if sp := opentracing.SpanFromContext(ctx); sp != nil { localCtx = opentracing.ContextWithSpan(localCtx, sp) } - d.sendSamples(localCtx, ingester, samples, &tracker) - }(ingesterDescs[ingester], samples) + d.sendStreams(localCtx, ingester, samples, &tracker) + }(ingesterDescs[ingester], streams) } select { case err := <-tracker.err: @@ -500,31 +500,31 @@ func (d *Distributor) truncateLines(vContext validationContext, stream *logproto } // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { - err := d.sendSamplesErr(ctx, ingester, streamTrackers) - - // If we succeed, decrement each sample's pending count by one. If we reach - // the required number of successful puts on this sample, then decrement the - // number of pending samples by one. If we successfully push all samples to - // min success ingesters, wake up the waiting rpc so it can return early. - // Similarly, track the number of errors, and if it exceeds maxFailures - // shortcut the waiting rpc. +func (d *Distributor) sendStreams(ctx context.Context, ingester ring.InstanceDesc, streamTrackers []*streamTracker, pushTracker *pushTracker) { + err := d.sendStreamsErr(ctx, ingester, streamTrackers) + + // If we succeed, decrement each stream's pending count by one. + // If we reach the required number of successful puts on this stream, then + // decrement the number of pending streams by one. + // If we successfully push all streams to min success ingesters, wake up the + // waiting rpc so it can return early. Similarly, track the number of errors, + // and if it exceeds maxFailures shortcut the waiting rpc. // - // The use of atomic increments here guarantees only a single sendSamples + // The use of atomic increments here guarantees only a single sendStreams // goroutine will write to either channel. for i := range streamTrackers { if err != nil { if streamTrackers[i].failed.Inc() <= int32(streamTrackers[i].maxFailures) { continue } - if pushTracker.samplesFailed.Inc() == 1 { + if pushTracker.streamsFailed.Inc() == 1 { pushTracker.err <- err } } else { if streamTrackers[i].succeeded.Inc() != int32(streamTrackers[i].minSuccess) { continue } - if pushTracker.samplesPending.Dec() == 0 { + if pushTracker.streamsPending.Dec() == 0 { pushTracker.done <- struct{}{} } } @@ -532,7 +532,7 @@ func (d *Distributor) sendSamples(ctx context.Context, ingester ring.InstanceDes } // TODO taken from Cortex, see if we can refactor out an usable interface. -func (d *Distributor) sendSamplesErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error { +func (d *Distributor) sendStreamsErr(ctx context.Context, ingester ring.InstanceDesc, streams []*streamTracker) error { c, err := d.pool.GetClientFor(ingester.Addr) if err != nil { return err diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index ef237c7f01..c37a69c814 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -8,6 +8,7 @@ import ( "net/http" "strconv" "strings" + "sync" "testing" "time" @@ -28,6 +29,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/grafana/loki/pkg/ingester" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/syntax" @@ -38,10 +40,6 @@ import ( "github.com/grafana/loki/pkg/validation" ) -const ( - numIngesters = 5 -) - var ( success = &logproto.PushResponse{} ctx = user.InjectOrgID(context.Background(), "test") @@ -78,7 +76,7 @@ func TestDistributor(t *testing.T) { expectedError: httpgrpc.Errorf(http.StatusBadRequest, validation.InvalidLabelsErrorMsg, "{ab\"", "1:4: parse error: unterminated quoted string"), }, } { - t.Run(fmt.Sprintf("[%d](samples=%v)", i, tc.lines), func(t *testing.T) { + t.Run(fmt.Sprintf("[%d](lines=%v)", i, tc.lines), func(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.EnforceMetricName = false @@ -86,8 +84,7 @@ func TestDistributor(t *testing.T) { limits.IngestionBurstSizeMB = ingestionRateLimit limits.MaxLineSize = fe.ByteSize(tc.maxLineSize) - d := prepare(t, limits, nil, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(t, 1, 5, limits, nil) request := makeWriteRequest(tc.lines, 10) @@ -95,7 +92,7 @@ func TestDistributor(t *testing.T) { request.Streams[0].Labels = `{ab"` } - response, err := d.Push(ctx, request) + response, err := distributors[i%len(distributors)].Push(ctx, request) assert.Equal(t, tc.expectedResponse, response) assert.Equal(t, tc.expectedError, err) }) @@ -327,27 +324,107 @@ func Test_IncrementTimestamp(t *testing.T) { testData := testData t.Run(testName, func(t *testing.T) { - ingester := &mockIngester{} - d := prepare(t, testData.limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - _, err := d.Push(ctx, testData.push) + ing := &mockIngester{} + distributors, _ := prepare(t, 1, 3, testData.limits, func(addr string) (ring_client.PoolClient, error) { return ing, nil }) + _, err := distributors[0].Push(ctx, testData.push) assert.NoError(t, err) - assert.Equal(t, testData.expectedPush, ingester.pushed[0]) + assert.Equal(t, testData.expectedPush, ing.pushed[0]) }) } } +func TestDistributorPushConcurrently(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + distributors, ingesters := prepare(t, 1, 5, limits, nil) + + numReq := 1 + var wg sync.WaitGroup + for i := 0; i < numReq; i++ { + wg.Add(1) + go func(n int) { + defer wg.Done() + request := makeWriteRequestWithLabels(100, 100, + []string{ + fmt.Sprintf(`{app="foo-%d"}`, n), + fmt.Sprintf(`{instance="bar-%d"}`, n), + }, + ) + response, err := distributors[n%len(distributors)].Push(ctx, request) + assert.NoError(t, err) + assert.Equal(t, &logproto.PushResponse{}, response) + }(i) + } + + wg.Wait() + // make sure the ingesters received the push requests + time.Sleep(10 * time.Millisecond) + + counter := 0 + labels := make(map[string]int) + + for i := range ingesters { + pushed := ingesters[i].pushed + counter = counter + len(pushed) + for _, pr := range pushed { + for _, st := range pr.Streams { + labels[st.Labels] = labels[st.Labels] + 1 + } + } + } + assert.Equal(t, numReq*3, counter) // RF=3 + // each stream is present 3 times + for i := 0; i < numReq; i++ { + l := fmt.Sprintf(`{instance="bar-%d"}`, i) + assert.Equal(t, 3, labels[l], "stream %s expected 3 times, got %d", l, labels[l]) + l = fmt.Sprintf(`{app="foo-%d"}`, i) + assert.Equal(t, 3, labels[l], "stream %s expected 3 times, got %d", l, labels[l]) + } +} + +func TestDistributorPushErrors(t *testing.T) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + t.Run("with RF=3 a single push can fail", func(t *testing.T) { + distributors, ingesters := prepare(t, 1, 3, limits, nil) + ingesters[0].failAfter = 5 * time.Millisecond + ingesters[1].succeedAfter = 10 * time.Millisecond + ingesters[2].succeedAfter = 15 * time.Millisecond + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.NoError(t, err) + require.Equal(t, 0, len(ingesters[0].pushed)) + require.Equal(t, 1, len(ingesters[1].pushed)) + require.Equal(t, 1, len(ingesters[2].pushed)) + }) + t.Run("with RF=3 two push failures result in error", func(t *testing.T) { + distributors, ingesters := prepare(t, 1, 3, limits, nil) + ingesters[0].failAfter = 5 * time.Millisecond + ingesters[1].succeedAfter = 10 * time.Millisecond + ingesters[2].failAfter = 15 * time.Millisecond + + request := makeWriteRequest(10, 64) + _, err := distributors[0].Push(ctx, request) + require.Error(t, err) + require.Equal(t, 0, len(ingesters[0].pushed)) + require.Equal(t, 1, len(ingesters[1].pushed)) + require.Equal(t, 0, len(ingesters[2].pushed)) + }) +} + func Test_SortLabelsOnPush(t *testing.T) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.EnforceMetricName = false ingester := &mockIngester{} - d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) request := makeWriteRequest(10, 10) request.Streams[0].Labels = `{buzz="f", a="b"}` - _, err := d.Push(ctx, request) + _, err := distributors[0].Push(ctx, request) require.NoError(t, err) require.Equal(t, `{a="b", buzz="f"}`, ingester.pushed[0].Streams[0].Labels) } @@ -365,11 +442,9 @@ func Test_TruncateLogLines(t *testing.T) { t.Run("it truncates lines to MaxLineSize when MaxLineSizeTruncate is true", func(t *testing.T) { limits, ingester := setup() + distributors, _ := prepare(t, 1, 5, limits, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - d := prepare(t, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck - - _, err := d.Push(ctx, makeWriteRequest(1, 10)) + _, err := distributors[0].Push(ctx, makeWriteRequest(1, 10)) require.NoError(t, err) require.Len(t, ingester.pushed[0].Streams[0].Entries[0].Line, 5) }) @@ -701,9 +776,8 @@ func Benchmark_SortLabelsOnPush(b *testing.B) { limits := &validation.Limits{} flagext.DefaultValues(limits) limits.EnforceMetricName = false - ingester := &mockIngester{} - d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) + d := distributors[0] request := makeWriteRequest(10, 10) vCtx := d.validator.getValidationContextForTime(testTime, "123") for n := 0; n < b.N; n++ { @@ -727,17 +801,14 @@ func Benchmark_Push(b *testing.B) { limits.RejectOldSamples = true limits.RejectOldSamplesMaxAge = model.Duration(24 * time.Hour) limits.CreationGracePeriod = model.Duration(24 * time.Hour) - ingester := &mockIngester{} - d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) request := makeWriteRequest(100000, 100) b.ResetTimer() b.ReportAllocs() for n := 0; n < b.N; n++ { - - _, err := d.Push(ctx, request) + _, err := distributors[0].Push(ctx, request) if err != nil { require.NoError(b, err) } @@ -752,9 +823,7 @@ func Benchmark_PushWithLineTruncation(b *testing.B) { limits.MaxLineSizeTruncate = true limits.MaxLineSize = 50 - ingester := &mockIngester{} - d := prepare(&testing.T{}, limits, nil, func(addr string) (ring_client.PoolClient, error) { return ingester, nil }) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(&testing.T{}, 1, 5, limits, nil) request := makeWriteRequest(100000, 100) b.ResetTimer() @@ -762,7 +831,7 @@ func Benchmark_PushWithLineTruncation(b *testing.B) { for n := 0; n < b.N; n++ { - _, err := d.Push(ctx, request) + _, err := distributors[0].Push(ctx, request) if err != nil { require.NoError(b, err) } @@ -831,26 +900,7 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { limits.IngestionRateMB = testData.ingestionRateMB limits.IngestionBurstSizeMB = testData.ingestionBurstSizeMB - // Init a shared KVStore - kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) - t.Cleanup(func() { assert.NoError(t, closer.Close()) }) - - // Start all expected distributors - distributors := make([]*Distributor, testData.distributors) - for i := 0; i < testData.distributors; i++ { - distributors[i] = prepare(t, limits, kvStore, nil) - defer services.StopAndAwaitTerminated(context.Background(), distributors[i]) //nolint:errcheck - } - - // If the distributors ring is setup, wait until the first distributor - // updates to the expected size - if distributors[0].distributorsLifecycler != nil { - test.Poll(t, time.Second, testData.distributors, func() interface{} { - return distributors[0].distributorsLifecycler.HealthyInstancesCount() - }) - } - - // Push samples in multiple requests to the first distributor + distributors, _ := prepare(t, testData.distributors, 5, limits, nil) for _, push := range testData.pushes { request := makeWriteRequest(1, push.bytes) response, err := distributors[0].Push(ctx, request) @@ -867,156 +917,156 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) { } } -func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory func(addr string) (ring_client.PoolClient, error)) *Distributor { - var ( - distributorConfig Config - clientConfig client.Config - ) - flagext.DefaultValues(&distributorConfig, &clientConfig) - - overrides, err := validation.NewOverrides(*limits, nil) - require.NoError(t, err) +func prepare(t *testing.T, numDistributors, numIngesters int, limits *validation.Limits, factory func(addr string) (ring_client.PoolClient, error)) ([]*Distributor, []mockIngester) { + t.Helper() - // Mock the ingesters ring - ingesters := map[string]*mockIngester{} + ingesters := make([]mockIngester, numIngesters) for i := 0; i < numIngesters; i++ { - ingesters[fmt.Sprintf("ingester%d", i)] = &mockIngester{} + ingesters[i] = mockIngester{} } - ingestersRing := &mockRing{ - replicationFactor: 3, - } - for addr := range ingesters { - ingestersRing.ingesters = append(ingestersRing.ingesters, ring.InstanceDesc{ - Addr: addr, - }) + ingesterByAddr := map[string]*mockIngester{} + ingesterDescs := map[string]ring.InstanceDesc{} + + for i := range ingesters { + addr := fmt.Sprintf("ingester-%d", i) + ingesterDescs[addr] = ring.InstanceDesc{ + Addr: addr, + State: ring.ACTIVE, + Timestamp: time.Now().Unix(), + RegisteredTimestamp: time.Now().Add(-10 * time.Minute).Unix(), + Tokens: []uint32{uint32((math.MaxUint32 / numIngesters) * i)}, + } + ingesterByAddr[addr] = &ingesters[i] } - loopbackName, err := loki_net.LoopbackInterfaceName() + kvStore, closer := consul.NewInMemoryClient(ring.GetCodec(), log.NewNopLogger(), nil) + + err := kvStore.CAS(context.Background(), ingester.RingKey, + func(_ interface{}) (interface{}, bool, error) { + return &ring.Desc{ + Ingesters: ingesterDescs, + }, true, nil + }, + ) require.NoError(t, err) - distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond - distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) - distributorConfig.DistributorRing.KVStore.Mock = kvStore - distributorConfig.DistributorRing.KVStore.Store = "inmemory" - distributorConfig.DistributorRing.InstanceInterfaceNames = []string{loopbackName} - distributorConfig.factory = factory - if factory == nil { - distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) { - return ingesters[addr], nil - } - } + ingestersRing, err := ring.New(ring.Config{ + KVStore: kv.Config{ + Mock: kvStore, + }, + HeartbeatTimeout: 60 * time.Minute, + ReplicationFactor: 3, + }, ingester.RingKey, ingester.RingKey, nil, nil) - d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, nil) require.NoError(t, err) - require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), ingestersRing)) - return d -} + test.Poll(t, time.Second, numIngesters, func() interface{} { + return ingestersRing.InstancesCount() + }) -func makeWriteRequest(lines int, size int) *logproto.PushRequest { - req := logproto.PushRequest{ - Streams: []logproto.Stream{ - { - Labels: `{foo="bar"}`, - }, - }, - } + loopbackName, err := loki_net.LoopbackInterfaceName() + require.NoError(t, err) - for i := 0; i < lines; i++ { - // Construct the log line, honoring the input size - line := strconv.Itoa(i) + strings.Repeat(" ", size) - line = line[:size] + distributors := make([]*Distributor, numDistributors) + for i := 0; i < numDistributors; i++ { + var distributorConfig Config + var clientConfig client.Config + flagext.DefaultValues(&distributorConfig, &clientConfig) + + distributorConfig.DistributorRing.HeartbeatPeriod = 100 * time.Millisecond + distributorConfig.DistributorRing.InstanceID = strconv.Itoa(rand.Int()) + distributorConfig.DistributorRing.KVStore.Mock = kvStore + distributorConfig.DistributorRing.InstanceAddr = "127.0.0.1" + distributorConfig.DistributorRing.InstanceInterfaceNames = []string{loopbackName} + distributorConfig.factory = factory + if factory == nil { + distributorConfig.factory = func(addr string) (ring_client.PoolClient, error) { + return ingesterByAddr[addr], nil + } + } + + overrides, err := validation.NewOverrides(*limits, nil) + require.NoError(t, err) - req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{ - Timestamp: time.Now().Add(time.Duration(i) * time.Millisecond), - Line: line, + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, prometheus.NewPedanticRegistry()) + require.NoError(t, err) + require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) + distributors[i] = d + } + + if distributors[0].distributorsLifecycler != nil { + test.Poll(t, time.Second, numDistributors, func() interface{} { + return distributors[0].distributorsLifecycler.HealthyInstancesCount() }) } - return &req -} -type mockIngester struct { - grpc_health_v1.HealthClient - logproto.PusherClient + t.Cleanup(func() { + assert.NoError(t, closer.Close()) + for _, d := range distributors { + assert.NoError(t, services.StopAndAwaitTerminated(context.Background(), d)) + } + ingestersRing.StopAsync() + }) - pushed []*logproto.PushRequest + return distributors, ingesters } -func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) { - i.pushed = append(i.pushed, in) - return nil, nil -} +func makeWriteRequestWithLabels(lines, size int, labels []string) *logproto.PushRequest { + streams := make([]logproto.Stream, len(labels)) + for i := 0; i < len(labels); i++ { + stream := logproto.Stream{Labels: labels[i]} -func (i *mockIngester) Close() error { - return nil -} + for j := 0; j < lines; j++ { + // Construct the log line, honoring the input size + line := strconv.Itoa(j) + strings.Repeat("0", size) + line = line[:size] -// Copied from Cortex; TODO(twilkie) - factor this our and share it. -// mockRing doesn't do virtual nodes, just returns mod(key) + replicationFactor -// ingesters. -type mockRing struct { - prometheus.Counter - ingesters []ring.InstanceDesc - replicationFactor uint32 -} + stream.Entries = append(stream.Entries, logproto.Entry{ + Timestamp: time.Now().Add(time.Duration(j) * time.Millisecond), + Line: line, + }) + } -func (r mockRing) Get(key uint32, op ring.Operation, buf []ring.InstanceDesc, _ []string, _ []string) (ring.ReplicationSet, error) { - result := ring.ReplicationSet{ - MaxErrors: 1, - Instances: buf[:0], - } - for i := uint32(0); i < r.replicationFactor; i++ { - n := (key + i) % uint32(len(r.ingesters)) - result.Instances = append(result.Instances, r.ingesters[n]) + streams[i] = stream } - return result, nil -} - -func (r mockRing) GetAllHealthy(op ring.Operation) (ring.ReplicationSet, error) { - return r.GetReplicationSetForOperation(op) -} -func (r mockRing) GetReplicationSetForOperation(op ring.Operation) (ring.ReplicationSet, error) { - return ring.ReplicationSet{ - Instances: r.ingesters, - MaxErrors: 1, - }, nil + return &logproto.PushRequest{ + Streams: streams, + } } -func (r mockRing) ReplicationFactor() int { - return int(r.replicationFactor) +func makeWriteRequest(lines, size int) *logproto.PushRequest { + return makeWriteRequestWithLabels(lines, size, []string{`{foo="bar"}`}) } -func (r mockRing) InstancesCount() int { - return len(r.ingesters) -} +type mockIngester struct { + grpc_health_v1.HealthClient + logproto.PusherClient -func (r mockRing) Subring(key uint32, n int) ring.ReadRing { - return r + failAfter time.Duration + succeedAfter time.Duration + mu sync.Mutex + pushed []*logproto.PushRequest } -func (r mockRing) HasInstance(instanceID string) bool { - for _, ing := range r.ingesters { - if ing.Addr != instanceID { - return true - } +func (i *mockIngester) Push(ctx context.Context, in *logproto.PushRequest, opts ...grpc.CallOption) (*logproto.PushResponse, error) { + if i.failAfter > 0 { + time.Sleep(i.failAfter) + return nil, fmt.Errorf("push request failed") + } + if i.succeedAfter > 0 { + time.Sleep(i.succeedAfter) } - return false -} -func (r mockRing) ShuffleShard(identifier string, size int) ring.ReadRing { - // take advantage of pass by value to bound to size: - r.ingesters = r.ingesters[:size] - return r -} + i.mu.Lock() + defer i.mu.Unlock() -func (r mockRing) ShuffleShardWithLookback(identifier string, size int, lookbackPeriod time.Duration, now time.Time) ring.ReadRing { - return r + i.pushed = append(i.pushed, in) + return nil, nil } -func (r mockRing) CleanupShuffleShardCache(identifier string) {} - -func (r mockRing) GetInstanceState(instanceID string) (ring.InstanceState, error) { - return 0, nil +func (i *mockIngester) Close() error { + return nil } diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index 783cd3f77a..ebe810acbc 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -1,14 +1,12 @@ package distributor import ( - "context" "io/ioutil" "net/http" "net/http/httptest" "testing" "github.com/grafana/dskit/flagext" - "github.com/grafana/dskit/services" "github.com/stretchr/testify/require" "github.com/grafana/loki/pkg/validation" @@ -19,11 +17,10 @@ func TestDistributorRingHandler(t *testing.T) { flagext.DefaultValues(limits) runServer := func() *httptest.Server { - d := prepare(t, limits, nil, nil) - defer services.StopAndAwaitTerminated(context.Background(), d) //nolint:errcheck + distributors, _ := prepare(t, 1, 3, limits, nil) return httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - d.ServeHTTP(w, r) + distributors[0].ServeHTTP(w, r) })) }