Fixed out of order entries allowed in a chunk on edge case (#859)

pull/867/head
Marco Pracucci 6 years ago committed by Cyril Tovena
parent 2b8a3d9794
commit c955702b32
  1. 10
      pkg/chunkenc/gzip.go
  2. 43
      pkg/chunkenc/gzip_test.go

@ -314,7 +314,15 @@ func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
if err := c.head.append(entry.Timestamp.UnixNano(), entry.Line); err != nil {
entryTimestamp := entry.Timestamp.UnixNano()
// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}
if err := c.head.append(entryTimestamp, entry.Line); err != nil {
return err
}

@ -8,6 +8,8 @@ import (
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
@ -172,6 +174,47 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
}
func TestMemChunk_AppendOutOfOrder(t *testing.T) {
t.Parallel()
type tester func(t *testing.T, chk *MemChunk)
tests := map[string]tester{
"append out of order in the same block": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
"append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
"append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.cut())
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
},
}
for testName, tester := range tests {
tester := tester
t.Run(testName, func(t *testing.T) {
t.Parallel()
tester(t, NewMemChunk(EncGZIP))
})
}
}
var result []Chunk
func BenchmarkWriteGZIP(b *testing.B) {

Loading…
Cancel
Save