diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go index efbad487ce..8dc8a01bc4 100644 --- a/pkg/chunkenc/gzip.go +++ b/pkg/chunkenc/gzip.go @@ -18,6 +18,8 @@ import ( "github.com/pkg/errors" ) +const blocksPerChunk = 10 + var ( magicNumber = uint32(0x12EE56A) @@ -307,12 +309,20 @@ func (c *MemChunk) Size() int { // SpaceFor implements Chunk. func (c *MemChunk) SpaceFor(*logproto.Entry) bool { - return len(c.blocks) < 10 + return len(c.blocks) < blocksPerChunk } // Append implements Chunk. func (c *MemChunk) Append(entry *logproto.Entry) error { - return c.head.append(entry.Timestamp.UnixNano(), entry.Line) + if err := c.head.append(entry.Timestamp.UnixNano(), entry.Line); err != nil { + return err + } + + if c.head.size >= c.blockSize { + return c.cut() + } + + return nil } // Close implements Chunk. @@ -341,6 +351,7 @@ func (c *MemChunk) cut() error { c.head.entries = c.head.entries[:0] c.head.mint = 0 // Will be set on first append. + c.head.size = 0 return nil } diff --git a/pkg/chunkenc/gzip_test.go b/pkg/chunkenc/gzip_test.go index c316094bf9..0f6f0e80e4 100644 --- a/pkg/chunkenc/gzip_test.go +++ b/pkg/chunkenc/gzip_test.go @@ -172,6 +172,41 @@ func TestGZIPSerialisation(t *testing.T) { require.True(t, bytes.Equal(byt, byt2)) } +func TestGZIPChunkFilling(t *testing.T) { + chk := NewMemChunk(EncGZIP) + chk.blockSize = 1024 + + // We should be able to append only 10KB of logs. + maxBytes := chk.blockSize * blocksPerChunk + lineSize := 512 + lines := maxBytes / lineSize + + logLine := string(make([]byte, lineSize)) + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: logLine, + } + + i := int64(0) + for ; chk.SpaceFor(entry) && i < 30; i++ { + entry.Timestamp = time.Unix(0, i) + require.NoError(t, chk.Append(entry)) + } + + require.Equal(t, int64(lines), i) + + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD) + require.NoError(t, err) + i = 0 + for it.Next() { + entry := it.Entry() + require.Equal(t, i, entry.Timestamp.UnixNano()) + i++ + } + + require.Equal(t, int64(lines), i) +} + func logprotoEntry(ts int64, line string) *logproto.Entry { return &logproto.Entry{ Timestamp: time.Unix(0, ts),