diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index 9aa3e384a6..ac30baf5df 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -221,36 +221,39 @@ func (i *Ingester) transferOut(ctx context.Context) error { for instanceID, inst := range i.instances { for _, istream := range inst.streams { - chunks := make([]*logproto.Chunk, 0, len(istream.chunks)) + lbls := []*logproto.LabelPair{} + for _, lbl := range istream.labels { + lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) + } + // We moved to sending one chunk at a time in a stream instead of sending all chunks for a stream + // as large chunks can create large payloads of >16MB which can hit GRPC limits, + // typically streams won't have many chunks in memory so sending one at a time + // shouldn't add too much overhead. for _, c := range istream.chunks { bb, err := c.chunk.Bytes() if err != nil { return err } - chunks = append(chunks, &logproto.Chunk{ + chunks := make([]*logproto.Chunk, 1, 1) + chunks[0] = &logproto.Chunk{ Data: bb, - }) - } + } - lbls := []*logproto.LabelPair{} - for _, lbl := range istream.labels { - lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) - } + err = stream.Send(&logproto.TimeSeriesChunk{ + Chunks: chunks, + UserId: instanceID, + Labels: lbls, + FromIngesterId: i.lifecycler.ID, + }) + if err != nil { + level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) + return err + } - err := stream.Send(&logproto.TimeSeriesChunk{ - Chunks: chunks, - UserId: instanceID, - Labels: lbls, - FromIngesterId: i.lifecycler.ID, - }) - if err != nil { - level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) - return err + sentChunks.Add(float64(len(chunks))) } - - sentChunks.Add(float64(len(chunks))) } } diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index 9b81495bca..76e567b3a2 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -20,6 +20,7 @@ import ( "golang.org/x/net/context" "google.golang.org/grpc" + "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" @@ -62,8 +63,9 @@ func TestTransferOut(t *testing.T) { Streams: []logproto.Stream{ { Entries: []logproto.Entry{ - {Line: "out of order line", Timestamp: time.Unix(0, 0)}, {Line: "line 4", Timestamp: time.Unix(2, 0)}, + {Line: "ooo", Timestamp: time.Unix(0, 0)}, + {Line: "line 5", Timestamp: time.Unix(3, 0)}, }, Labels: `{foo="bar",bar="baz1"}`, }, @@ -72,7 +74,7 @@ func TestTransferOut(t *testing.T) { require.Error(t, err2) require.Contains(t, err2.Error(), "out of order") - require.Contains(t, err2.Error(), "total ignored: 1 out of 2") + require.Contains(t, err2.Error(), "total ignored: 1 out of 3") // Create a new ingester and transfer data to it ing2 := f.getIngester(time.Second*60, t) @@ -108,7 +110,7 @@ func TestTransferOut(t *testing.T) { assert.Equal( t, - []string{"line 0", "line 1", "line 2", "line 3", "line 4"}, + []string{"line 0", "line 1", "line 2", "line 3", "line 4", "line 5"}, lines, ) } @@ -141,6 +143,11 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T) cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store cfg.LifecyclerConfig.JoinAfter = joinAfter cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID + // Force a tiny chunk size and no encoding so we can guarantee multiple chunks + // These values are also crafted around the specific use of `line _` in the log line which is 6 bytes long + cfg.BlockSize = 3 // Block size needs to be less than chunk size so we can get more than one block per chunk + cfg.TargetChunkSize = 24 + cfg.ChunkEncoding = chunkenc.EncNone.String() cfg.ingesterClientFactory = func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) { ingester, ok := f.ingesters[addr]