transfers one chunk at a time per series instead of all (#2328)

pull/2330/head
Ed Welch 6 years ago committed by GitHub
parent e22f365025
commit edaf0ed422
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 41
      pkg/ingester/transfer.go
  2. 13
      pkg/ingester/transfer_test.go

@ -221,36 +221,39 @@ func (i *Ingester) transferOut(ctx context.Context) error {
for instanceID, inst := range i.instances {
for _, istream := range inst.streams {
chunks := make([]*logproto.Chunk, 0, len(istream.chunks))
lbls := []*logproto.LabelPair{}
for _, lbl := range istream.labels {
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value})
}
// We moved to sending one chunk at a time in a stream instead of sending all chunks for a stream
// as large chunks can create large payloads of >16MB which can hit GRPC limits,
// typically streams won't have many chunks in memory so sending one at a time
// shouldn't add too much overhead.
for _, c := range istream.chunks {
bb, err := c.chunk.Bytes()
if err != nil {
return err
}
chunks = append(chunks, &logproto.Chunk{
chunks := make([]*logproto.Chunk, 1, 1)
chunks[0] = &logproto.Chunk{
Data: bb,
})
}
}
lbls := []*logproto.LabelPair{}
for _, lbl := range istream.labels {
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value})
}
err = stream.Send(&logproto.TimeSeriesChunk{
Chunks: chunks,
UserId: instanceID,
Labels: lbls,
FromIngesterId: i.lifecycler.ID,
})
if err != nil {
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
return err
}
err := stream.Send(&logproto.TimeSeriesChunk{
Chunks: chunks,
UserId: instanceID,
Labels: lbls,
FromIngesterId: i.lifecycler.ID,
})
if err != nil {
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err)
return err
sentChunks.Add(float64(len(chunks)))
}
sentChunks.Add(float64(len(chunks)))
}
}

@ -20,6 +20,7 @@ import (
"golang.org/x/net/context"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
@ -62,8 +63,9 @@ func TestTransferOut(t *testing.T) {
Streams: []logproto.Stream{
{
Entries: []logproto.Entry{
{Line: "out of order line", Timestamp: time.Unix(0, 0)},
{Line: "line 4", Timestamp: time.Unix(2, 0)},
{Line: "ooo", Timestamp: time.Unix(0, 0)},
{Line: "line 5", Timestamp: time.Unix(3, 0)},
},
Labels: `{foo="bar",bar="baz1"}`,
},
@ -72,7 +74,7 @@ func TestTransferOut(t *testing.T) {
require.Error(t, err2)
require.Contains(t, err2.Error(), "out of order")
require.Contains(t, err2.Error(), "total ignored: 1 out of 2")
require.Contains(t, err2.Error(), "total ignored: 1 out of 3")
// Create a new ingester and transfer data to it
ing2 := f.getIngester(time.Second*60, t)
@ -108,7 +110,7 @@ func TestTransferOut(t *testing.T) {
assert.Equal(
t,
[]string{"line 0", "line 1", "line 2", "line 3", "line 4"},
[]string{"line 0", "line 1", "line 2", "line 3", "line 4", "line 5"},
lines,
)
}
@ -141,6 +143,11 @@ func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T)
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store
cfg.LifecyclerConfig.JoinAfter = joinAfter
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID
// Force a tiny chunk size and no encoding so we can guarantee multiple chunks
// These values are also crafted around the specific use of `line _` in the log line which is 6 bytes long
cfg.BlockSize = 3 // Block size needs to be less than chunk size so we can get more than one block per chunk
cfg.TargetChunkSize = 24
cfg.ChunkEncoding = chunkenc.EncNone.String()
cfg.ingesterClientFactory = func(cfg client.Config, addr string) (client.HealthAndIngesterClient, error) {
ingester, ok := f.ingesters[addr]

Loading…
Cancel
Save