From 07ece2bc2a0ff098dc064049751e5043dd9dfb71 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 21 Jan 2021 09:37:35 +0100 Subject: [PATCH] Improve checkpoint series iterator. (#3193) * Add basic benchmark. Signed-off-by: Cyril Tovena * Improves memory usage of checkpointer series iterator. Signed-off-by: Cyril Tovena * make lint. Signed-off-by: Cyril Tovena * better size computation. Signed-off-by: Cyril Tovena * Fixes test ordering flakyness. Signed-off-by: Cyril Tovena --- pkg/chunkenc/encoding_helpers.go | 3 +- pkg/chunkenc/memchunk.go | 127 ++++++++++++----- pkg/chunkenc/memchunk_test.go | 57 +++++--- pkg/chunkenc/pool.go | 20 ++- pkg/ingester/checkpoint.go | 233 +++++++++++++++++++++---------- pkg/ingester/checkpoint_test.go | 153 +++++++++++++++++++- pkg/ingester/encoding_test.go | 6 +- pkg/ingester/flush_test.go | 2 +- pkg/util/pool/bytesbuffer.go | 69 +++++++++ 9 files changed, 532 insertions(+), 138 deletions(-) create mode 100644 pkg/util/pool/bytesbuffer.go diff --git a/pkg/chunkenc/encoding_helpers.go b/pkg/chunkenc/encoding_helpers.go index 2850c69f1d..66e90dae06 100644 --- a/pkg/chunkenc/encoding_helpers.go +++ b/pkg/chunkenc/encoding_helpers.go @@ -15,8 +15,7 @@ type encbuf struct { func (e *encbuf) reset() { e.b = e.b[:0] } func (e *encbuf) get() []byte { return e.b } -func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } -func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index d56408fcc0..ec81e8aa99 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -33,9 +33,7 @@ const ( maxLineLength = 1024 * 1024 * 1024 ) -var ( - magicNumber = uint32(0x12EE56A) -) +var magicNumber = uint32(0x12EE56A) // The table gets initialized with sync.Once but may still cause a race // with any other use of the crc32 package anywhere. Thus we initialize it @@ -155,20 +153,36 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) { // CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, // which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but // needs to serialize/deserialize the data to disk to ensure data durability. -func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) { - encB := BytesBufferPool.Get(1 << 10).([]byte) +func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) { + buf := bytes.NewBuffer(b[:0]) + err := hb.CheckpointTo(version, buf) + return buf.Bytes(), err +} - defer func() { - BytesBufferPool.Put(encB[:0]) - }() +// CheckpointSize returns the estimated size of the headblock checkpoint. +func (hb *headBlock) CheckpointSize(version byte) int { + size := 1 // version + size += binary.MaxVarintLen32 * 2 // total entries + total size + size += binary.MaxVarintLen64 * 2 // mint,maxt + size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * len(hb.entries) // ts + len of log line. - buf := bytes.NewBuffer(make([]byte, 0, 1<<10)) - eb := encbuf{b: encB} + for _, e := range hb.entries { + size += len(e.s) + } + return size +} + +// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`. +func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error { + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + eb.reset() eb.putByte(version) - _, err := buf.Write(eb.get()) + _, err := w.Write(eb.get()) if err != nil { - return nil, errors.Wrap(err, "write headBlock version") + return errors.Wrap(err, "write headBlock version") } eb.reset() @@ -177,27 +191,27 @@ func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) { eb.putVarint64(hb.mint) eb.putVarint64(hb.maxt) - _, err = buf.Write(eb.get()) + _, err = w.Write(eb.get()) if err != nil { - return nil, errors.Wrap(err, "write headBlock metas") + return errors.Wrap(err, "write headBlock metas") } eb.reset() for _, entry := range hb.entries { eb.putVarint64(entry.t) eb.putUvarint(len(entry.s)) - _, err = buf.Write(eb.get()) + _, err = w.Write(eb.get()) if err != nil { - return nil, errors.Wrap(err, "write headBlock entry ts") + return errors.Wrap(err, "write headBlock entry ts") } eb.reset() - _, err := buf.WriteString(entry.s) + _, err := io.WriteString(w, entry.s) if err != nil { - return nil, errors.Wrap(err, "write headblock entry line") + return errors.Wrap(err, "write headblock entry line") } } - return buf.Bytes(), nil + return nil } func (hb *headBlock) FromCheckpoint(b []byte) error { @@ -361,6 +375,37 @@ func (c *MemChunk) Bytes() ([]byte, error) { return c.BytesWith(nil) } +// BytesSize returns the raw size of the chunk. +// NOTE: This does not account for the head block nor include any head block data. +func (c *MemChunk) BytesSize() int { + size := 4 // magic number + size++ // format + if c.format > chunkFormatV1 { + size++ // chunk format v2+ has a byte for encoding. + } + + // blocks + for _, b := range c.blocks { + size += len(b.b) + crc32.Size // size + crc + + size += binary.MaxVarintLen32 // num entries + size += binary.MaxVarintLen64 // mint + size += binary.MaxVarintLen64 // maxt + size += binary.MaxVarintLen32 // offset + if c.format == chunkFormatV3 { + size += binary.MaxVarintLen32 // uncompressed size + } + size += binary.MaxVarintLen32 // len(b) + } + + // blockmeta + size += binary.MaxVarintLen32 // len blocks + + size += crc32.Size // metablock crc + size += 8 // metaoffset + return size +} + // WriteTo Implements io.WriterTo // NOTE: Does not cut head block or include any head block data. // For this to be the case you must call Close() first. @@ -368,11 +413,16 @@ func (c *MemChunk) Bytes() ([]byte, error) { // result in different content addressable chunks in storage based on the timing of when // they were checkpointed (which would cause new blocks to be cut early). func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { - crc32Hash := newCRC32() + crc32Hash := crc32HashPool.Get().(hash.Hash32) + defer crc32HashPool.Put(crc32Hash) + crc32Hash.Reset() offset := int64(0) - eb := encbuf{b: make([]byte, 0, 1<<10)} + eb := EncodeBufferPool.Get().(*encbuf) + defer EncodeBufferPool.Put(eb) + + eb.reset() // Write the header (magicNum + version). eb.putBE32(magicNumber) @@ -392,11 +442,13 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { for i, b := range c.blocks { c.blocks[i].offset = int(offset) - eb.reset() - eb.putBytes(b.b) - eb.putHash(crc32Hash) + crc32Hash.Reset() + _, err := crc32Hash.Write(b.b) + if err != nil { + return offset, errors.Wrap(err, "write block") + } - n, err := w.Write(eb.get()) + n, err := w.Write(crc32Hash.Sum(b.b)) if err != nil { return offset, errors.Wrap(err, "write block") } @@ -439,25 +491,29 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { return offset, nil } -// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually -// flushed chunks don't have different substructures depending on when they were checkpointed. +// SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use. +// This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed. // In turn this allows us to maintain a more effective dedupe ratio in storage. -func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) { - chk, err = c.BytesWith(b) +func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error { + _, err := c.WriteTo(chk) if err != nil { - return nil, nil, err + return err } if c.head.isEmpty() { - return chk, nil, nil + return nil } - head, err = c.head.CheckpointBytes(c.format) + err = c.head.CheckpointTo(c.format, head) if err != nil { - return nil, nil, err + return err } - return chk, head, nil + return nil +} + +func (c *MemChunk) CheckpointSize() (chunk, head int) { + return c.BytesSize(), c.head.CheckpointSize(c.format) } func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) { @@ -537,7 +593,6 @@ func (c *MemChunk) Utilization() float64 { } size := c.UncompressedSize() return float64(size) / float64(blocksPerChunk*c.blockSize) - } // Append implements Chunk. @@ -721,9 +776,11 @@ func (b block) Offset() int { func (b block) Entries() int { return b.numEntries } + func (b block) MinTime() int64 { return b.mint } + func (b block) MaxTime() int64 { return b.maxt } diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index dfbb270830..ce63a3cbea 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -65,6 +65,8 @@ func TestBlocksInclusive(t *testing.T) { func TestBlock(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + chk := NewMemChunk(enc, testBlockSize, testTargetSize) cases := []struct { ts int64 @@ -173,6 +175,8 @@ func TestBlock(t *testing.T) { } func TestReadFormatV1(t *testing.T) { + t.Parallel() + c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) fillChunk(c) // overrides default v2 format @@ -211,6 +215,8 @@ func TestRoundtripV2(t *testing.T) { for _, enc := range testEncoding { for _, version := range []byte{chunkFormatV2, chunkFormatV3} { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + c := NewMemChunk(enc, testBlockSize, testTargetSize) c.format = version populated := fillChunk(c) @@ -258,14 +264,14 @@ func TestRoundtripV2(t *testing.T) { assertLines(loaded) }) } - } } func TestRoundtripV3(t *testing.T) { - for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + c := NewMemChunk(enc, testBlockSize, testTargetSize) c.format = chunkFormatV3 _ = fillChunk(c) @@ -281,15 +287,15 @@ func TestRoundtripV3(t *testing.T) { r.head.clear() require.Equal(t, c, r) - }) } - } func TestSerialization(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + chk := NewMemChunk(enc, testBlockSize, testTargetSize) numSamples := 50000 @@ -337,6 +343,8 @@ func TestSerialization(t *testing.T) { func TestChunkFilling(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + chk := NewMemChunk(enc, testBlockSize, 0) chk.blockSize = 1024 @@ -374,6 +382,8 @@ func TestChunkFilling(t *testing.T) { } func TestGZIPChunkTargetSize(t *testing.T) { + t.Parallel() + chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize) lineSize := 512 @@ -420,7 +430,6 @@ func TestGZIPChunkTargetSize(t *testing.T) { ut := chk.Utilization() require.Greater(t, ut, 0.99) require.Less(t, ut, 1.01) - } func TestMemChunk_AppendOutOfOrder(t *testing.T) { @@ -467,6 +476,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) { func TestChunkSize(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() c := NewMemChunk(enc, testBlockSize, testTargetSize) inserted := fillChunk(c) b, err := c.Bytes() @@ -477,7 +487,6 @@ func TestChunkSize(t *testing.T) { t.Log("characters ", humanize.Bytes(uint64(inserted))) t.Log("Ratio", float64(inserted)/float64(len(b))) }) - } } @@ -508,7 +517,6 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } for it.Next() { - } if err := it.Close(); err != nil { t.Fatal(err) @@ -537,7 +545,6 @@ func TestChunkStats(t *testing.T) { t.Fatal(err) } for it.Next() { - } if err := it.Close(); err != nil { t.Fatal(err) @@ -586,7 +593,6 @@ func TestIteratorClose(t *testing.T) { } test(iter, t) } - }) } } @@ -618,7 +624,6 @@ func BenchmarkWrite(b *testing.B) { result = chunks }) } - } type nomatchPipeline struct{} @@ -703,10 +708,8 @@ func TestGenerateDataSize(t *testing.T) { } func BenchmarkHeadBlockIterator(b *testing.B) { - for _, j := range []int{100000, 50000, 15000, 10000} { b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { - h := headBlock{} for i := 0; i < j; i++ { @@ -729,10 +732,8 @@ func BenchmarkHeadBlockIterator(b *testing.B) { } func BenchmarkHeadBlockSampleIterator(b *testing.B) { - for _, j := range []int{100000, 50000, 15000, 10000} { b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) { - h := headBlock{} for i := 0; i < j; i++ { @@ -755,8 +756,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) { } func TestMemChunk_IteratorBounds(t *testing.T) { - - var createChunk = func() *MemChunk { + createChunk := func() *MemChunk { t.Helper() c := NewMemChunk(EncNone, 1e6, 1e6) @@ -799,6 +799,8 @@ func TestMemChunk_IteratorBounds(t *testing.T) { t.Run( fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction), func(t *testing.T) { + t.Parallel() + tt := tt c := createChunk() @@ -819,14 +821,14 @@ func TestMemChunk_IteratorBounds(t *testing.T) { } require.NoError(t, it.Close()) }) - } - } func TestMemchunkLongLine(t *testing.T) { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { + t.Parallel() + c := NewMemChunk(enc, testBlockSize, testTargetSize) for i := 1; i <= 10; i++ { require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)})) @@ -843,6 +845,8 @@ func TestMemchunkLongLine(t *testing.T) { // Ensure passing a reusable []byte doesn't affect output func TestBytesWith(t *testing.T) { + t.Parallel() + exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil) require.Nil(t, err) out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3}) @@ -852,6 +856,8 @@ func TestBytesWith(t *testing.T) { } func TestHeadBlockCheckpointing(t *testing.T) { + t.Parallel() + c := NewMemChunk(EncSnappy, 256*1024, 1500*1024) // add a few entries @@ -867,7 +873,7 @@ func TestHeadBlockCheckpointing(t *testing.T) { // ensure blocks are not cut require.Equal(t, 0, len(c.blocks)) - b, err := c.head.CheckpointBytes(c.format) + b, err := c.head.CheckpointBytes(c.format, nil) require.Nil(t, err) hb := &headBlock{} @@ -876,6 +882,8 @@ func TestHeadBlockCheckpointing(t *testing.T) { } func TestCheckpointEncoding(t *testing.T) { + t.Parallel() + blockSize, targetSize := 256*1024, 1500*1024 c := NewMemChunk(EncSnappy, blockSize, targetSize) @@ -905,10 +913,11 @@ func TestCheckpointEncoding(t *testing.T) { // ensure new blocks are not cut require.Equal(t, 1, len(c.blocks)) - chk, head, err := c.SerializeForCheckpoint(nil) + var chk, head bytes.Buffer + err := c.SerializeForCheckpointTo(&chk, &head) require.Nil(t, err) - cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize) + cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize) require.Nil(t, err) // TODO(owen-d): remove once v3+ is the default chunk version @@ -921,8 +930,10 @@ func TestCheckpointEncoding(t *testing.T) { require.Equal(t, c, cpy) } -var streams = []logproto.Stream{} -var series = []logproto.Series{} +var ( + streams = []logproto.Stream{} + series = []logproto.Series{} +) func BenchmarkBufferedIteratorLabels(b *testing.B) { c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize) diff --git a/pkg/chunkenc/pool.go b/pkg/chunkenc/pool.go index 8d37e2ffce..49c6147402 100644 --- a/pkg/chunkenc/pool.go +++ b/pkg/chunkenc/pool.go @@ -47,14 +47,32 @@ var ( New: func() interface{} { return bufio.NewReader(nil) }, }, } + // BytesBufferPool is a bytes buffer used for lines decompressed. // Buckets [0.5KB,1KB,2KB,4KB,8KB] - BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) }) + + // Pool of crc32 hash + crc32HashPool = sync.Pool{ + New: func() interface{} { + return newCRC32() + }, + } + serializeBytesBufferPool = sync.Pool{ New: func() interface{} { return &bytes.Buffer{} }, } + + // EncodeBufferPool is a pool used to binary encode. + EncodeBufferPool = sync.Pool{ + New: func() interface{} { + return &encbuf{ + b: make([]byte, 0, 256), + } + }, + } ) func getWriterPool(enc Encoding) WriterPool { diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index 31a8f29163..9a04667a17 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -1,6 +1,7 @@ package ingester import ( + "bytes" fmt "fmt" "io/ioutil" "os" @@ -16,44 +17,73 @@ import ( "github.com/go-kit/kit/log/level" "github.com/gogo/protobuf/proto" "github.com/pkg/errors" - "github.com/prometheus/prometheus/pkg/pool" + prompool "github.com/prometheus/prometheus/pkg/pool" tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/prometheus/prometheus/tsdb/wal" "github.com/grafana/loki/pkg/chunkenc" + "github.com/grafana/loki/pkg/util/pool" ) +var ( + // todo(ctovena) those pools should be in factor of the actual configuration (blocksize, targetsize). + // Starting with something sane first then we can refine with more experience. + + // Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2 + blocksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2) + // Buckets [64B 128B 256B 512B... to 2MB] by 2 + headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2) +) + +type chunkWithBuffer struct { + blocks, head *bytes.Buffer + Chunk +} + // The passed wireChunks slice is for re-use. -func toWireChunks(descs []chunkDesc, wireChunks []Chunk) ([]Chunk, error) { +func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithBuffer, error) { + // release memory from previous list of chunks. + for _, wc := range wireChunks { + blocksBufferPool.Put(wc.blocks) + headBufferPool.Put(wc.head) + wc.Data = nil + wc.Head = nil + } + if cap(wireChunks) < len(descs) { - wireChunks = make([]Chunk, len(descs)) + wireChunks = make([]chunkWithBuffer, len(descs)) } else { wireChunks = wireChunks[:len(descs)] } + for i, d := range descs { from, to := d.chunk.Bounds() - wireChunk := Chunk{ - From: from, - To: to, - Closed: d.closed, - FlushedAt: d.flushed, - LastUpdated: d.lastUpdated, - Synced: d.synced, - } - - slice := wireChunks[i].Data[:0] // try to re-use the memory from last time - if cap(slice) < d.chunk.CompressedSize() { - slice = make([]byte, 0, d.chunk.CompressedSize()) + chunkSize, headSize := d.chunk.CheckpointSize() + + wireChunk := chunkWithBuffer{ + Chunk: Chunk{ + From: from, + To: to, + Closed: d.closed, + FlushedAt: d.flushed, + LastUpdated: d.lastUpdated, + Synced: d.synced, + }, + blocks: blocksBufferPool.Get(chunkSize), + head: headBufferPool.Get(headSize), } - chk, head, err := d.chunk.SerializeForCheckpoint(slice) + err := d.chunk.SerializeForCheckpointTo( + wireChunk.blocks, + wireChunk.head, + ) if err != nil { return nil, err } - wireChunk.Data = chk - wireChunk.Head = head + wireChunk.Data = wireChunk.blocks.Bytes() + wireChunk.Head = wireChunk.head.Bytes() wireChunks[i] = wireChunk } return wireChunks, nil @@ -118,7 +148,7 @@ type SeriesWithErr struct { type SeriesIter interface { Count() int - Iter() <-chan *SeriesWithErr + Iter() *streamIterator Stop() } @@ -150,56 +180,108 @@ func (i *ingesterSeriesIter) Stop() { close(i.done) } -func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr { - ch := make(chan *SeriesWithErr) - go func() { - for _, inst := range i.ing.getInstances() { - inst.streamsMtx.RLock() - // Need to buffer streams internally so the read lock isn't held trying to write to a blocked channel. - streams := make([]*stream, 0, len(inst.streams)) - inst.streamsMtx.RUnlock() - _ = inst.forAllStreams(func(stream *stream) error { - streams = append(streams, stream) - return nil - }) - - for _, stream := range streams { - stream.chunkMtx.RLock() - if len(stream.chunks) < 1 { - stream.chunkMtx.RUnlock() - // it's possible the stream has been flushed to storage - // in between starting the checkpointing process and - // checkpointing this stream. - continue - } - - // TODO(owen-d): use a pool - // Only send chunks for checkpointing that have yet to be flushed. - chunks, err := toWireChunks(unflushedChunks(stream.chunks), nil) - stream.chunkMtx.RUnlock() - - var s *Series - if err == nil { - s = &Series{ - UserID: inst.instanceID, - Fingerprint: uint64(stream.fp), - Labels: client.FromLabelsToLabelAdapters(stream.labels), - Chunks: chunks, - } - } - select { - case ch <- &SeriesWithErr{ - Err: err, - Series: s, - }: - case <-i.done: - return - } - } +func (i *ingesterSeriesIter) Iter() *streamIterator { + return newStreamsIterator(i.ing) +} + +type streamInstance struct { + id string + streams []*stream +} + +type streamIterator struct { + instances []streamInstance + + current Series + buffer []chunkWithBuffer + err error +} + +// newStreamsIterator returns a new stream iterators that iterates over one instance at a time, then +// each stream per instances. +func newStreamsIterator(ing ingesterInstances) *streamIterator { + instances := ing.getInstances() + streamInstances := make([]streamInstance, len(instances)) + for i, inst := range ing.getInstances() { + inst.streamsMtx.RLock() + streams := make([]*stream, 0, len(inst.streams)) + inst.streamsMtx.RUnlock() + _ = inst.forAllStreams(func(s *stream) error { + streams = append(streams, s) + return nil + }) + streamInstances[i] = streamInstance{ + streams: streams, + id: inst.instanceID, } - close(ch) - }() - return ch + } + return &streamIterator{ + instances: streamInstances, + } +} + +// Next loads the next stream of the current instance. +// If the instance is empty, it moves to the next instance until there is no more. +// Return true if there's a next stream, each successful calls will replace the current stream. +func (s *streamIterator) Next() bool { + if len(s.instances) == 0 { + s.instances = nil + return false + } + currentInstance := s.instances[0] + if len(currentInstance.streams) == 0 { + s.instances = s.instances[1:] + return s.Next() + } + + // current stream + stream := currentInstance.streams[0] + + // remove the first stream + s.instances[0].streams = s.instances[0].streams[1:] + + stream.chunkMtx.RLock() + defer stream.chunkMtx.RUnlock() + + if len(stream.chunks) < 1 { + // it's possible the stream has been flushed to storage + // in between starting the checkpointing process and + // checkpointing this stream. + return s.Next() + } + chunks, err := toWireChunks(stream.chunks, s.buffer) + if err != nil { + s.err = err + return false + } + s.buffer = chunks + + s.current.Chunks = s.current.Chunks[:0] + if cap(s.current.Chunks) == 0 { + s.current.Chunks = make([]Chunk, 0, len(chunks)) + } + + for _, c := range chunks { + s.current.Chunks = append(s.current.Chunks, c.Chunk) + } + + s.current.UserID = currentInstance.id + s.current.Fingerprint = uint64(stream.fp) + s.current.Labels = client.FromLabelsToLabelAdapters(stream.labels) + + return true +} + +// Err returns an errors thrown while iterating over the streams. +func (s *streamIterator) Error() error { + return s.err +} + +// Stream is serializable (for checkpointing) stream of chunks. +// NOTE: the series is re-used between successful Next calls. +// This means you should make a copy or use the data before calling Next. +func (s *streamIterator) Stream() *Series { + return &s.current } type CheckpointWriter interface { @@ -275,7 +357,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) { } // Buckets [64KB to 256MB] by 2 -var recordBufferPool = pool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) }) +var recordBufferPool = prompool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) }) func (w *WALCheckpointWriter) Write(s *Series) error { size := s.Size() + 1 // +1 for header @@ -483,11 +565,10 @@ func (c *Checkpointer) PerformCheckpoint() (err error) { level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String()) c.metrics.checkpointDuration.Observe(elapsed.Seconds()) }() - for s := range c.iter.Iter() { - if s.Err != nil { - return s.Err - } - if err := c.writer.Write(s.Series); err != nil { + + iter := c.iter.Iter() + for iter.Next() { + if err := c.writer.Write(iter.Stream()); err != nil { return err } @@ -508,6 +589,10 @@ func (c *Checkpointer) PerformCheckpoint() (err error) { } + if iter.Error() != nil { + return iter.Error() + } + return c.writer.Close(false) } diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index e7b7a26894..a0ccc84b40 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -5,6 +5,7 @@ import ( fmt "fmt" "io/ioutil" "os" + "sort" "testing" "time" @@ -12,12 +13,14 @@ import ( cortex_client "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/util/services" "github.com/prometheus/prometheus/pkg/labels" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" + "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/util/validation" ) @@ -254,6 +257,150 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) { require.True(t, found == shouldExist) } +type ingesterInstancesFunc func() []*instance + +func (i ingesterInstancesFunc) getInstances() []*instance { + return i() +} + +var currentSeries *Series + +func buildStreams() []logproto.Stream { + streams := make([]logproto.Stream, 10) + for i := range streams { + labels := makeRandomLabels().String() + entries := make([]logproto.Entry, 15*1e3) + for j := range entries { + entries[j] = logproto.Entry{ + Timestamp: time.Unix(0, int64(j)), + Line: fmt.Sprintf("entry for line %d", j), + } + } + streams[i] = logproto.Stream{ + Labels: labels, + Entries: entries, + } + } + return streams +} + +var ( + stream1 = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "1", + }, + { + Timestamp: time.Unix(0, 2), + Line: "2", + }, + }, + } + stream2 = logproto.Stream{ + Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(), + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(0, 1), + Line: "3", + }, + { + Timestamp: time.Unix(0, 2), + Line: "4", + }, + }, + } +) + +func Test_SeriesIterator(t *testing.T) { + var instances []*instance + + limits, err := validation.NewOverrides(validation.Limits{ + MaxLocalStreamsPerUser: 1000, + IngestionRateMB: 1e4, + IngestionBurstSizeMB: 1e4, + }, nil) + require.NoError(t, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + + for i := 0; i < 3; i++ { + inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil) + require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) + require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) + instances = append(instances, inst) + } + + iter := newStreamsIterator(ingesterInstancesFunc(func() []*instance { + return instances + })) + + for i := 0; i < 3; i++ { + var streams []logproto.Stream + for j := 0; j < 2; j++ { + iter.Next() + assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID) + memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0) + require.NoError(t, err) + it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil)) + require.NoError(t, err) + stream := logproto.Stream{ + Labels: cortex_client.FromLabelAdaptersToLabels(iter.Stream().Labels).String(), + } + for it.Next() { + stream.Entries = append(stream.Entries, it.Entry()) + } + require.NoError(t, it.Close()) + streams = append(streams, stream) + } + sort.Slice(streams, func(i, j int) bool { return streams[i].Labels < streams[j].Labels }) + require.Equal(t, stream1, streams[0]) + require.Equal(t, stream2, streams[1]) + } + + require.False(t, iter.Next()) + require.Nil(t, iter.Error()) +} + +func Benchmark_SeriesIterator(b *testing.B) { + streams := buildStreams() + instances := make([]*instance, 10) + + limits, err := validation.NewOverrides(validation.Limits{ + MaxLocalStreamsPerUser: 1000, + IngestionRateMB: 1e4, + IngestionBurstSizeMB: 1e4, + }, nil) + require.NoError(b, err) + limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) + + for i := range instances { + inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil) + + require.NoError(b, + inst.Push(context.Background(), &logproto.PushRequest{ + Streams: streams, + }), + ) + instances[i] = inst + } + it := newIngesterSeriesIter(ingesterInstancesFunc(func() []*instance { + return instances + })) + defer it.Stop() + + b.ResetTimer() + b.ReportAllocs() + + for n := 0; n < b.N; n++ { + iter := it.Iter() + for iter.Next() { + currentSeries = iter.Stream() + } + require.NoError(b, iter.Error()) + } +} + type noOpWalLogger struct{} func (noOpWalLogger) Log(recs ...[]byte) error { return nil } @@ -282,6 +429,7 @@ func Benchmark_CheckpointWrite(b *testing.B) { func buildChunks(t testing.TB, size int) []Chunk { descs := make([]chunkDesc, 0, size) + chks := make([]Chunk, size) for i := 0; i < size; i++ { // build chunks of 256k blocks, 1.5MB target size. Same as default config. @@ -294,5 +442,8 @@ func buildChunks(t testing.TB, size int) []Chunk { there, err := toWireChunks(descs, nil) require.NoError(t, err) - return there + for i := range there { + chks[i] = there[i].Chunk + } + return chks } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 6be331c16a..4762873657 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -164,7 +164,11 @@ func Test_EncodingChunks(t *testing.T) { } there, err := toWireChunks(from, nil) require.Nil(t, err) - backAgain, err := fromWireChunks(conf, there) + chunks := make([]Chunk, 0, len(there)) + for _, c := range there { + chunks = append(chunks, c.Chunk) + } + backAgain, err := fromWireChunks(conf, chunks) require.Nil(t, err) for i, to := range backAgain { diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 3255f50f79..2210f5d2e4 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -38,7 +38,7 @@ const ( ) func init() { - //util.Logger = log.NewLogfmtLogger(os.Stdout) + // util.Logger = log.NewLogfmtLogger(os.Stdout) } func TestChunkFlushingIdle(t *testing.T) { diff --git a/pkg/util/pool/bytesbuffer.go b/pkg/util/pool/bytesbuffer.go new file mode 100644 index 0000000000..5e1da7b8bb --- /dev/null +++ b/pkg/util/pool/bytesbuffer.go @@ -0,0 +1,69 @@ +package pool + +import ( + "bytes" + "sync" +) + +// BufferPool is a bucketed pool for variably bytes buffers. +type BufferPool struct { + buckets []sync.Pool + sizes []int +} + +// NewBuffer a new Pool with size buckets for minSize to maxSize +// increasing by the given factor. +func NewBuffer(minSize, maxSize int, factor float64) *BufferPool { + if minSize < 1 { + panic("invalid minimum pool size") + } + if maxSize < 1 { + panic("invalid maximum pool size") + } + if factor < 1 { + panic("invalid factor") + } + + var sizes []int + + for s := minSize; s <= maxSize; s = int(float64(s) * factor) { + sizes = append(sizes, s) + } + + return &BufferPool{ + buckets: make([]sync.Pool, len(sizes)), + sizes: sizes, + } +} + +// Get returns a byte buffer that fits the given size. +func (p *BufferPool) Get(sz int) *bytes.Buffer { + for i, bktSize := range p.sizes { + if sz > bktSize { + continue + } + b := p.buckets[i].Get() + if b == nil { + b = bytes.NewBuffer(make([]byte, bktSize)) + } + buf := b.(*bytes.Buffer) + buf.Reset() + return b.(*bytes.Buffer) + } + return bytes.NewBuffer(make([]byte, sz)) +} + +// Put adds a byte buffer to the right bucket in the pool. +func (p *BufferPool) Put(s *bytes.Buffer) { + if s == nil { + return + } + cap := s.Cap() + for i, size := range p.sizes { + if cap > size { + continue + } + p.buckets[i].Put(s) + return + } +}