Improve checkpoint series iterator. (#3193)

* Add basic benchmark.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Improves memory usage of checkpointer series iterator.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* make lint.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* better size computation.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes test ordering flakyness.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3200/head^2
Cyril Tovena 5 years ago committed by GitHub
parent 0a81f70800
commit 07ece2bc2a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/chunkenc/encoding_helpers.go
  2. 127
      pkg/chunkenc/memchunk.go
  3. 57
      pkg/chunkenc/memchunk_test.go
  4. 20
      pkg/chunkenc/pool.go
  5. 233
      pkg/ingester/checkpoint.go
  6. 153
      pkg/ingester/checkpoint_test.go
  7. 6
      pkg/ingester/encoding_test.go
  8. 2
      pkg/ingester/flush_test.go
  9. 69
      pkg/util/pool/bytesbuffer.go

@ -15,8 +15,7 @@ type encbuf struct {
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }

@ -33,9 +33,7 @@ const (
maxLineLength = 1024 * 1024 * 1024
)
var (
magicNumber = uint32(0x12EE56A)
)
var magicNumber = uint32(0x12EE56A)
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
@ -155,20 +153,36 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
encB := BytesBufferPool.Get(1 << 10).([]byte)
func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
return buf.Bytes(), err
}
defer func() {
BytesBufferPool.Put(encB[:0])
}()
// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *headBlock) CheckpointSize(version byte) int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * len(hb.entries) // ts + len of log line.
buf := bytes.NewBuffer(make([]byte, 0, 1<<10))
eb := encbuf{b: encB}
for _, e := range hb.entries {
size += len(e.s)
}
return size
}
// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
eb.putByte(version)
_, err := buf.Write(eb.get())
_, err := w.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock version")
return errors.Wrap(err, "write headBlock version")
}
eb.reset()
@ -177,27 +191,27 @@ func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
eb.putVarint64(hb.mint)
eb.putVarint64(hb.maxt)
_, err = buf.Write(eb.get())
_, err = w.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock metas")
return errors.Wrap(err, "write headBlock metas")
}
eb.reset()
for _, entry := range hb.entries {
eb.putVarint64(entry.t)
eb.putUvarint(len(entry.s))
_, err = buf.Write(eb.get())
_, err = w.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock entry ts")
return errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()
_, err := buf.WriteString(entry.s)
_, err := io.WriteString(w, entry.s)
if err != nil {
return nil, errors.Wrap(err, "write headblock entry line")
return errors.Wrap(err, "write headblock entry line")
}
}
return buf.Bytes(), nil
return nil
}
func (hb *headBlock) FromCheckpoint(b []byte) error {
@ -361,6 +375,37 @@ func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}
// BytesSize returns the raw size of the chunk.
// NOTE: This does not account for the head block nor include any head block data.
func (c *MemChunk) BytesSize() int {
size := 4 // magic number
size++ // format
if c.format > chunkFormatV1 {
size++ // chunk format v2+ has a byte for encoding.
}
// blocks
for _, b := range c.blocks {
size += len(b.b) + crc32.Size // size + crc
size += binary.MaxVarintLen32 // num entries
size += binary.MaxVarintLen64 // mint
size += binary.MaxVarintLen64 // maxt
size += binary.MaxVarintLen32 // offset
if c.format == chunkFormatV3 {
size += binary.MaxVarintLen32 // uncompressed size
}
size += binary.MaxVarintLen32 // len(b)
}
// blockmeta
size += binary.MaxVarintLen32 // len blocks
size += crc32.Size // metablock crc
size += 8 // metaoffset
return size
}
// WriteTo Implements io.WriterTo
// NOTE: Does not cut head block or include any head block data.
// For this to be the case you must call Close() first.
@ -368,11 +413,16 @@ func (c *MemChunk) Bytes() ([]byte, error) {
// result in different content addressable chunks in storage based on the timing of when
// they were checkpointed (which would cause new blocks to be cut early).
func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
crc32Hash := newCRC32()
crc32Hash := crc32HashPool.Get().(hash.Hash32)
defer crc32HashPool.Put(crc32Hash)
crc32Hash.Reset()
offset := int64(0)
eb := encbuf{b: make([]byte, 0, 1<<10)}
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
// Write the header (magicNum + version).
eb.putBE32(magicNumber)
@ -392,11 +442,13 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
for i, b := range c.blocks {
c.blocks[i].offset = int(offset)
eb.reset()
eb.putBytes(b.b)
eb.putHash(crc32Hash)
crc32Hash.Reset()
_, err := crc32Hash.Write(b.b)
if err != nil {
return offset, errors.Wrap(err, "write block")
}
n, err := w.Write(eb.get())
n, err := w.Write(crc32Hash.Sum(b.b))
if err != nil {
return offset, errors.Wrap(err, "write block")
}
@ -439,25 +491,29 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
return offset, nil
}
// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually
// flushed chunks don't have different substructures depending on when they were checkpointed.
// SerializeForCheckpointTo serialize the chunk & head into different `io.Writer` for checkpointing use.
// This is to ensure eventually flushed chunks don't have different substructures depending on when they were checkpointed.
// In turn this allows us to maintain a more effective dedupe ratio in storage.
func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) {
chk, err = c.BytesWith(b)
func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
_, err := c.WriteTo(chk)
if err != nil {
return nil, nil, err
return err
}
if c.head.isEmpty() {
return chk, nil, nil
return nil
}
head, err = c.head.CheckpointBytes(c.format)
err = c.head.CheckpointTo(c.format, head)
if err != nil {
return nil, nil, err
return err
}
return chk, head, nil
return nil
}
func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize(c.format)
}
func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
@ -537,7 +593,6 @@ func (c *MemChunk) Utilization() float64 {
}
size := c.UncompressedSize()
return float64(size) / float64(blocksPerChunk*c.blockSize)
}
// Append implements Chunk.
@ -721,9 +776,11 @@ func (b block) Offset() int {
func (b block) Entries() int {
return b.numEntries
}
func (b block) MinTime() int64 {
return b.mint
}
func (b block) MaxTime() int64 {
return b.maxt
}

@ -65,6 +65,8 @@ func TestBlocksInclusive(t *testing.T) {
func TestBlock(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
cases := []struct {
ts int64
@ -173,6 +175,8 @@ func TestBlock(t *testing.T) {
}
func TestReadFormatV1(t *testing.T) {
t.Parallel()
c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
fillChunk(c)
// overrides default v2 format
@ -211,6 +215,8 @@ func TestRoundtripV2(t *testing.T) {
for _, enc := range testEncoding {
for _, version := range []byte{chunkFormatV2, chunkFormatV3} {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c.format = version
populated := fillChunk(c)
@ -258,14 +264,14 @@ func TestRoundtripV2(t *testing.T) {
assertLines(loaded)
})
}
}
}
func TestRoundtripV3(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c.format = chunkFormatV3
_ = fillChunk(c)
@ -281,15 +287,15 @@ func TestRoundtripV3(t *testing.T) {
r.head.clear()
require.Equal(t, c, r)
})
}
}
func TestSerialization(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
numSamples := 50000
@ -337,6 +343,8 @@ func TestSerialization(t *testing.T) {
func TestChunkFilling(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, 0)
chk.blockSize = 1024
@ -374,6 +382,8 @@ func TestChunkFilling(t *testing.T) {
}
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()
chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
lineSize := 512
@ -420,7 +430,6 @@ func TestGZIPChunkTargetSize(t *testing.T) {
ut := chk.Utilization()
require.Greater(t, ut, 0.99)
require.Less(t, ut, 1.01)
}
func TestMemChunk_AppendOutOfOrder(t *testing.T) {
@ -467,6 +476,7 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
func TestChunkSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
@ -477,7 +487,6 @@ func TestChunkSize(t *testing.T) {
t.Log("characters ", humanize.Bytes(uint64(inserted)))
t.Log("Ratio", float64(inserted)/float64(len(b)))
})
}
}
@ -508,7 +517,6 @@ func TestChunkStats(t *testing.T) {
t.Fatal(err)
}
for it.Next() {
}
if err := it.Close(); err != nil {
t.Fatal(err)
@ -537,7 +545,6 @@ func TestChunkStats(t *testing.T) {
t.Fatal(err)
}
for it.Next() {
}
if err := it.Close(); err != nil {
t.Fatal(err)
@ -586,7 +593,6 @@ func TestIteratorClose(t *testing.T) {
}
test(iter, t)
}
})
}
}
@ -618,7 +624,6 @@ func BenchmarkWrite(b *testing.B) {
result = chunks
})
}
}
type nomatchPipeline struct{}
@ -703,10 +708,8 @@ func TestGenerateDataSize(t *testing.T) {
}
func BenchmarkHeadBlockIterator(b *testing.B) {
for _, j := range []int{100000, 50000, 15000, 10000} {
b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) {
h := headBlock{}
for i := 0; i < j; i++ {
@ -729,10 +732,8 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
}
func BenchmarkHeadBlockSampleIterator(b *testing.B) {
for _, j := range []int{100000, 50000, 15000, 10000} {
b.Run(fmt.Sprintf("Size %d", j), func(b *testing.B) {
h := headBlock{}
for i := 0; i < j; i++ {
@ -755,8 +756,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
}
func TestMemChunk_IteratorBounds(t *testing.T) {
var createChunk = func() *MemChunk {
createChunk := func() *MemChunk {
t.Helper()
c := NewMemChunk(EncNone, 1e6, 1e6)
@ -799,6 +799,8 @@ func TestMemChunk_IteratorBounds(t *testing.T) {
t.Run(
fmt.Sprintf("mint:%d,maxt:%d,direction:%s", tt.mint.UnixNano(), tt.maxt.UnixNano(), tt.direction),
func(t *testing.T) {
t.Parallel()
tt := tt
c := createChunk()
@ -819,14 +821,14 @@ func TestMemChunk_IteratorBounds(t *testing.T) {
}
require.NoError(t, it.Close())
})
}
}
func TestMemchunkLongLine(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
for i := 1; i <= 10; i++ {
require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)}))
@ -843,6 +845,8 @@ func TestMemchunkLongLine(t *testing.T) {
// Ensure passing a reusable []byte doesn't affect output
func TestBytesWith(t *testing.T) {
t.Parallel()
exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
@ -852,6 +856,8 @@ func TestBytesWith(t *testing.T) {
}
func TestHeadBlockCheckpointing(t *testing.T) {
t.Parallel()
c := NewMemChunk(EncSnappy, 256*1024, 1500*1024)
// add a few entries
@ -867,7 +873,7 @@ func TestHeadBlockCheckpointing(t *testing.T) {
// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))
b, err := c.head.CheckpointBytes(c.format)
b, err := c.head.CheckpointBytes(c.format, nil)
require.Nil(t, err)
hb := &headBlock{}
@ -876,6 +882,8 @@ func TestHeadBlockCheckpointing(t *testing.T) {
}
func TestCheckpointEncoding(t *testing.T) {
t.Parallel()
blockSize, targetSize := 256*1024, 1500*1024
c := NewMemChunk(EncSnappy, blockSize, targetSize)
@ -905,10 +913,11 @@ func TestCheckpointEncoding(t *testing.T) {
// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))
chk, head, err := c.SerializeForCheckpoint(nil)
var chk, head bytes.Buffer
err := c.SerializeForCheckpointTo(&chk, &head)
require.Nil(t, err)
cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize)
cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize)
require.Nil(t, err)
// TODO(owen-d): remove once v3+ is the default chunk version
@ -921,8 +930,10 @@ func TestCheckpointEncoding(t *testing.T) {
require.Equal(t, c, cpy)
}
var streams = []logproto.Stream{}
var series = []logproto.Series{}
var (
streams = []logproto.Stream{}
series = []logproto.Series{}
)
func BenchmarkBufferedIteratorLabels(b *testing.B) {
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)

@ -47,14 +47,32 @@ var (
New: func() interface{} { return bufio.NewReader(nil) },
},
}
// BytesBufferPool is a bytes buffer used for lines decompressed.
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
// Pool of crc32 hash
crc32HashPool = sync.Pool{
New: func() interface{} {
return newCRC32()
},
}
serializeBytesBufferPool = sync.Pool{
New: func() interface{} {
return &bytes.Buffer{}
},
}
// EncodeBufferPool is a pool used to binary encode.
EncodeBufferPool = sync.Pool{
New: func() interface{} {
return &encbuf{
b: make([]byte, 0, 256),
}
},
}
)
func getWriterPool(enc Encoding) WriterPool {

@ -1,6 +1,7 @@
package ingester
import (
"bytes"
fmt "fmt"
"io/ioutil"
"os"
@ -16,44 +17,73 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/pool"
prompool "github.com/prometheus/prometheus/pkg/pool"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/util/pool"
)
var (
// todo(ctovena) those pools should be in factor of the actual configuration (blocksize, targetsize).
// Starting with something sane first then we can refine with more experience.
// Buckets [1KB 2KB 4KB 16KB 32KB to 4MB] by 2
blocksBufferPool = pool.NewBuffer(1024, 4*1024*1024, 2)
// Buckets [64B 128B 256B 512B... to 2MB] by 2
headBufferPool = pool.NewBuffer(64, 2*1024*1024, 2)
)
type chunkWithBuffer struct {
blocks, head *bytes.Buffer
Chunk
}
// The passed wireChunks slice is for re-use.
func toWireChunks(descs []chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
func toWireChunks(descs []chunkDesc, wireChunks []chunkWithBuffer) ([]chunkWithBuffer, error) {
// release memory from previous list of chunks.
for _, wc := range wireChunks {
blocksBufferPool.Put(wc.blocks)
headBufferPool.Put(wc.head)
wc.Data = nil
wc.Head = nil
}
if cap(wireChunks) < len(descs) {
wireChunks = make([]Chunk, len(descs))
wireChunks = make([]chunkWithBuffer, len(descs))
} else {
wireChunks = wireChunks[:len(descs)]
}
for i, d := range descs {
from, to := d.chunk.Bounds()
wireChunk := Chunk{
From: from,
To: to,
Closed: d.closed,
FlushedAt: d.flushed,
LastUpdated: d.lastUpdated,
Synced: d.synced,
}
slice := wireChunks[i].Data[:0] // try to re-use the memory from last time
if cap(slice) < d.chunk.CompressedSize() {
slice = make([]byte, 0, d.chunk.CompressedSize())
chunkSize, headSize := d.chunk.CheckpointSize()
wireChunk := chunkWithBuffer{
Chunk: Chunk{
From: from,
To: to,
Closed: d.closed,
FlushedAt: d.flushed,
LastUpdated: d.lastUpdated,
Synced: d.synced,
},
blocks: blocksBufferPool.Get(chunkSize),
head: headBufferPool.Get(headSize),
}
chk, head, err := d.chunk.SerializeForCheckpoint(slice)
err := d.chunk.SerializeForCheckpointTo(
wireChunk.blocks,
wireChunk.head,
)
if err != nil {
return nil, err
}
wireChunk.Data = chk
wireChunk.Head = head
wireChunk.Data = wireChunk.blocks.Bytes()
wireChunk.Head = wireChunk.head.Bytes()
wireChunks[i] = wireChunk
}
return wireChunks, nil
@ -118,7 +148,7 @@ type SeriesWithErr struct {
type SeriesIter interface {
Count() int
Iter() <-chan *SeriesWithErr
Iter() *streamIterator
Stop()
}
@ -150,56 +180,108 @@ func (i *ingesterSeriesIter) Stop() {
close(i.done)
}
func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr {
ch := make(chan *SeriesWithErr)
go func() {
for _, inst := range i.ing.getInstances() {
inst.streamsMtx.RLock()
// Need to buffer streams internally so the read lock isn't held trying to write to a blocked channel.
streams := make([]*stream, 0, len(inst.streams))
inst.streamsMtx.RUnlock()
_ = inst.forAllStreams(func(stream *stream) error {
streams = append(streams, stream)
return nil
})
for _, stream := range streams {
stream.chunkMtx.RLock()
if len(stream.chunks) < 1 {
stream.chunkMtx.RUnlock()
// it's possible the stream has been flushed to storage
// in between starting the checkpointing process and
// checkpointing this stream.
continue
}
// TODO(owen-d): use a pool
// Only send chunks for checkpointing that have yet to be flushed.
chunks, err := toWireChunks(unflushedChunks(stream.chunks), nil)
stream.chunkMtx.RUnlock()
var s *Series
if err == nil {
s = &Series{
UserID: inst.instanceID,
Fingerprint: uint64(stream.fp),
Labels: client.FromLabelsToLabelAdapters(stream.labels),
Chunks: chunks,
}
}
select {
case ch <- &SeriesWithErr{
Err: err,
Series: s,
}:
case <-i.done:
return
}
}
func (i *ingesterSeriesIter) Iter() *streamIterator {
return newStreamsIterator(i.ing)
}
type streamInstance struct {
id string
streams []*stream
}
type streamIterator struct {
instances []streamInstance
current Series
buffer []chunkWithBuffer
err error
}
// newStreamsIterator returns a new stream iterators that iterates over one instance at a time, then
// each stream per instances.
func newStreamsIterator(ing ingesterInstances) *streamIterator {
instances := ing.getInstances()
streamInstances := make([]streamInstance, len(instances))
for i, inst := range ing.getInstances() {
inst.streamsMtx.RLock()
streams := make([]*stream, 0, len(inst.streams))
inst.streamsMtx.RUnlock()
_ = inst.forAllStreams(func(s *stream) error {
streams = append(streams, s)
return nil
})
streamInstances[i] = streamInstance{
streams: streams,
id: inst.instanceID,
}
close(ch)
}()
return ch
}
return &streamIterator{
instances: streamInstances,
}
}
// Next loads the next stream of the current instance.
// If the instance is empty, it moves to the next instance until there is no more.
// Return true if there's a next stream, each successful calls will replace the current stream.
func (s *streamIterator) Next() bool {
if len(s.instances) == 0 {
s.instances = nil
return false
}
currentInstance := s.instances[0]
if len(currentInstance.streams) == 0 {
s.instances = s.instances[1:]
return s.Next()
}
// current stream
stream := currentInstance.streams[0]
// remove the first stream
s.instances[0].streams = s.instances[0].streams[1:]
stream.chunkMtx.RLock()
defer stream.chunkMtx.RUnlock()
if len(stream.chunks) < 1 {
// it's possible the stream has been flushed to storage
// in between starting the checkpointing process and
// checkpointing this stream.
return s.Next()
}
chunks, err := toWireChunks(stream.chunks, s.buffer)
if err != nil {
s.err = err
return false
}
s.buffer = chunks
s.current.Chunks = s.current.Chunks[:0]
if cap(s.current.Chunks) == 0 {
s.current.Chunks = make([]Chunk, 0, len(chunks))
}
for _, c := range chunks {
s.current.Chunks = append(s.current.Chunks, c.Chunk)
}
s.current.UserID = currentInstance.id
s.current.Fingerprint = uint64(stream.fp)
s.current.Labels = client.FromLabelsToLabelAdapters(stream.labels)
return true
}
// Err returns an errors thrown while iterating over the streams.
func (s *streamIterator) Error() error {
return s.err
}
// Stream is serializable (for checkpointing) stream of chunks.
// NOTE: the series is re-used between successful Next calls.
// This means you should make a copy or use the data before calling Next.
func (s *streamIterator) Stream() *Series {
return &s.current
}
type CheckpointWriter interface {
@ -275,7 +357,7 @@ func (w *WALCheckpointWriter) Advance() (bool, error) {
}
// Buckets [64KB to 256MB] by 2
var recordBufferPool = pool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) })
var recordBufferPool = prompool.New(1<<16, 1<<28, 2, func(size int) interface{} { return make([]byte, 0, size) })
func (w *WALCheckpointWriter) Write(s *Series) error {
size := s.Size() + 1 // +1 for header
@ -483,11 +565,10 @@ func (c *Checkpointer) PerformCheckpoint() (err error) {
level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String())
c.metrics.checkpointDuration.Observe(elapsed.Seconds())
}()
for s := range c.iter.Iter() {
if s.Err != nil {
return s.Err
}
if err := c.writer.Write(s.Series); err != nil {
iter := c.iter.Iter()
for iter.Next() {
if err := c.writer.Write(iter.Stream()); err != nil {
return err
}
@ -508,6 +589,10 @@ func (c *Checkpointer) PerformCheckpoint() (err error) {
}
if iter.Error() != nil {
return iter.Error()
}
return c.writer.Close(false)
}

@ -5,6 +5,7 @@ import (
fmt "fmt"
"io/ioutil"
"os"
"sort"
"testing"
"time"
@ -12,12 +13,14 @@ import (
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/validation"
)
@ -254,6 +257,150 @@ func expectCheckpoint(t *testing.T, walDir string, shouldExist bool) {
require.True(t, found == shouldExist)
}
type ingesterInstancesFunc func() []*instance
func (i ingesterInstancesFunc) getInstances() []*instance {
return i()
}
var currentSeries *Series
func buildStreams() []logproto.Stream {
streams := make([]logproto.Stream, 10)
for i := range streams {
labels := makeRandomLabels().String()
entries := make([]logproto.Entry, 15*1e3)
for j := range entries {
entries[j] = logproto.Entry{
Timestamp: time.Unix(0, int64(j)),
Line: fmt.Sprintf("entry for line %d", j),
}
}
streams[i] = logproto.Stream{
Labels: labels,
Entries: entries,
}
}
return streams
}
var (
stream1 = logproto.Stream{
Labels: labels.Labels{labels.Label{Name: "stream", Value: "1"}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "1",
},
{
Timestamp: time.Unix(0, 2),
Line: "2",
},
},
}
stream2 = logproto.Stream{
Labels: labels.Labels{labels.Label{Name: "stream", Value: "2"}}.String(),
Entries: []logproto.Entry{
{
Timestamp: time.Unix(0, 1),
Line: "3",
},
{
Timestamp: time.Unix(0, 2),
Line: "4",
},
},
}
)
func Test_SeriesIterator(t *testing.T) {
var instances []*instance
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
}
iter := newStreamsIterator(ingesterInstancesFunc(func() []*instance {
return instances
}))
for i := 0; i < 3; i++ {
var streams []logproto.Stream
for j := 0; j < 2; j++ {
iter.Next()
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID)
memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0)
require.NoError(t, err)
it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.NoError(t, err)
stream := logproto.Stream{
Labels: cortex_client.FromLabelAdaptersToLabels(iter.Stream().Labels).String(),
}
for it.Next() {
stream.Entries = append(stream.Entries, it.Entry())
}
require.NoError(t, it.Close())
streams = append(streams, stream)
}
sort.Slice(streams, func(i, j int) bool { return streams[i].Labels < streams[j].Labels })
require.Equal(t, stream1, streams[0])
require.Equal(t, stream2, streams[1])
}
require.False(t, iter.Next())
require.Nil(t, iter.Error())
}
func Benchmark_SeriesIterator(b *testing.B) {
streams := buildStreams()
instances := make([]*instance, 10)
limits, err := validation.NewOverrides(validation.Limits{
MaxLocalStreamsPerUser: 1000,
IngestionRateMB: 1e4,
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{
Streams: streams,
}),
)
instances[i] = inst
}
it := newIngesterSeriesIter(ingesterInstancesFunc(func() []*instance {
return instances
}))
defer it.Stop()
b.ResetTimer()
b.ReportAllocs()
for n := 0; n < b.N; n++ {
iter := it.Iter()
for iter.Next() {
currentSeries = iter.Stream()
}
require.NoError(b, iter.Error())
}
}
type noOpWalLogger struct{}
func (noOpWalLogger) Log(recs ...[]byte) error { return nil }
@ -282,6 +429,7 @@ func Benchmark_CheckpointWrite(b *testing.B) {
func buildChunks(t testing.TB, size int) []Chunk {
descs := make([]chunkDesc, 0, size)
chks := make([]Chunk, size)
for i := 0; i < size; i++ {
// build chunks of 256k blocks, 1.5MB target size. Same as default config.
@ -294,5 +442,8 @@ func buildChunks(t testing.TB, size int) []Chunk {
there, err := toWireChunks(descs, nil)
require.NoError(t, err)
return there
for i := range there {
chks[i] = there[i].Chunk
}
return chks
}

@ -164,7 +164,11 @@ func Test_EncodingChunks(t *testing.T) {
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
backAgain, err := fromWireChunks(conf, there)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)
}
backAgain, err := fromWireChunks(conf, chunks)
require.Nil(t, err)
for i, to := range backAgain {

@ -38,7 +38,7 @@ const (
)
func init() {
//util.Logger = log.NewLogfmtLogger(os.Stdout)
// util.Logger = log.NewLogfmtLogger(os.Stdout)
}
func TestChunkFlushingIdle(t *testing.T) {

@ -0,0 +1,69 @@
package pool
import (
"bytes"
"sync"
)
// BufferPool is a bucketed pool for variably bytes buffers.
type BufferPool struct {
buckets []sync.Pool
sizes []int
}
// NewBuffer a new Pool with size buckets for minSize to maxSize
// increasing by the given factor.
func NewBuffer(minSize, maxSize int, factor float64) *BufferPool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
panic("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
return &BufferPool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
}
}
// Get returns a byte buffer that fits the given size.
func (p *BufferPool) Get(sz int) *bytes.Buffer {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b := p.buckets[i].Get()
if b == nil {
b = bytes.NewBuffer(make([]byte, bktSize))
}
buf := b.(*bytes.Buffer)
buf.Reset()
return b.(*bytes.Buffer)
}
return bytes.NewBuffer(make([]byte, sz))
}
// Put adds a byte buffer to the right bucket in the pool.
func (p *BufferPool) Put(s *bytes.Buffer) {
if s == nil {
return
}
cap := s.Cap()
for i, size := range p.sizes {
if cap > size {
continue
}
p.buckets[i].Put(s)
return
}
}
Loading…
Cancel
Save