Cut new blocks once the existing ones are full (#71)

* Cut new blocks once the existing ones are full

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Address PR comments

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
pull/161/head
Goutham Veeramachaneni 7 years ago committed by GitHub
parent f6c483e2c1
commit 8609fcab74
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      pkg/chunkenc/gzip.go
  2. 35
      pkg/chunkenc/gzip_test.go

@ -18,6 +18,8 @@ import (
"github.com/pkg/errors"
)
const blocksPerChunk = 10
var (
magicNumber = uint32(0x12EE56A)
@ -307,12 +309,20 @@ func (c *MemChunk) Size() int {
// SpaceFor implements Chunk.
func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
return len(c.blocks) < 10
return len(c.blocks) < blocksPerChunk
}
// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
return c.head.append(entry.Timestamp.UnixNano(), entry.Line)
if err := c.head.append(entry.Timestamp.UnixNano(), entry.Line); err != nil {
return err
}
if c.head.size >= c.blockSize {
return c.cut()
}
return nil
}
// Close implements Chunk.
@ -341,6 +351,7 @@ func (c *MemChunk) cut() error {
c.head.entries = c.head.entries[:0]
c.head.mint = 0 // Will be set on first append.
c.head.size = 0
return nil
}

@ -172,6 +172,41 @@ func TestGZIPSerialisation(t *testing.T) {
require.True(t, bytes.Equal(byt, byt2))
}
func TestGZIPChunkFilling(t *testing.T) {
chk := NewMemChunk(EncGZIP)
chk.blockSize = 1024
// We should be able to append only 10KB of logs.
maxBytes := chk.blockSize * blocksPerChunk
lineSize := 512
lines := maxBytes / lineSize
logLine := string(make([]byte, lineSize))
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: logLine,
}
i := int64(0)
for ; chk.SpaceFor(entry) && i < 30; i++ {
entry.Timestamp = time.Unix(0, i)
require.NoError(t, chk.Append(entry))
}
require.Equal(t, int64(lines), i)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD)
require.NoError(t, err)
i = 0
for it.Next() {
entry := it.Entry()
require.Equal(t, i, entry.Timestamp.UnixNano())
i++
}
require.Equal(t, int64(lines), i)
}
func logprotoEntry(ts int64, line string) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, ts),

Loading…
Cancel
Save