Unordered chunks (#4001)

* merge feature/unordered-replay

* interoperable head chunks

* memchunk block interop

* retain ordered memchunk optimizations when possible

* tests+bench for unordered chunk reads

* reorder on chunk close

* [wip] ingester stream unorderd

* unordered writes default in testware config, fixes OOO bug & removes unused lastChunkTimestamp var

* validity window is 1/2 of max age & fixes old transfer test

* more consistent headblock checking/creation

* more cohesive encoding tests

* unordered stream test with validity bounds

* compat - unordered

* reinstates memchunk defaults when rebounding & updates storage test compatibility

* lint

* reorder across blocks doesnt overflow

* respect chunk configs during rebounding when possible

* only sync checks on ordered writes
pull/3842/head
Owen Diehl 4 years ago committed by GitHub
parent 4c2feb32b5
commit 56256bfd5b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 146
      pkg/chunkenc/memchunk.go
  2. 753
      pkg/chunkenc/memchunk_test.go
  3. 23
      pkg/chunkenc/unordered.go
  4. 279
      pkg/chunkenc/unordered_test.go
  5. 34
      pkg/chunkenc/util_test.go
  6. 7
      pkg/ingester/checkpoint.go
  7. 133
      pkg/ingester/checkpoint.pb.go
  8. 6
      pkg/ingester/checkpoint.proto
  9. 4
      pkg/ingester/checkpoint_test.go
  10. 4
      pkg/ingester/chunk_test.go
  11. 118
      pkg/ingester/encoding_test.go
  12. 5
      pkg/ingester/flush_test.go
  13. 3
      pkg/ingester/ingester.go
  14. 1
      pkg/ingester/recovery.go
  15. 46
      pkg/ingester/stream.go
  16. 78
      pkg/ingester/stream_test.go
  17. 1
      pkg/ingester/transfer_test.go
  18. 4
      pkg/storage/hack/main.go
  19. 2
      pkg/storage/stores/shipper/compactor/retention/retention_test.go
  20. 3
      pkg/storage/util_test.go

@ -44,10 +44,34 @@ const (
defaultBlockSize = 256 * 1024
)
var (
HeadBlockFmts = []HeadBlockFmt{OrderedHeadBlockFmt, UnorderedHeadBlockFmt}
)
type HeadBlockFmt byte
func (f HeadBlockFmt) Byte() byte { return byte(f) }
func (f HeadBlockFmt) String() string {
switch {
case f < UnorderedHeadBlockFmt:
return "ordered"
case f == UnorderedHeadBlockFmt:
return "unordered"
default:
return fmt.Sprintf("unknown: %v", byte(f))
}
}
func (f HeadBlockFmt) NewBlock() HeadBlock {
switch {
case f < UnorderedHeadBlockFmt:
return &headBlock{}
default:
return newUnorderedHeadBlock()
}
}
const (
_ HeadBlockFmt = iota
// placeholders to start splitting chunk formats vs head block
@ -93,6 +117,7 @@ type MemChunk struct {
// the chunk format default to v2
format byte
encoding Encoding
headFmt HeadBlockFmt
}
type block struct {
@ -309,19 +334,18 @@ type entry struct {
}
// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding, blockSize, targetSize int) *MemChunk {
c := &MemChunk{
func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk {
return &MemChunk{
blockSize: blockSize, // The blockSize in bytes.
targetSize: targetSize, // Desired chunk size in compressed bytes
blocks: []block{},
head: &headBlock{},
format: DefaultChunkFormat,
head: head.NewBlock(),
encoding: enc,
headFmt: head,
}
return c
}
// NewByteChunk returns a MemChunk on the passed bytes.
@ -563,12 +587,19 @@ func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize()
}
func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
func MemchunkFromCheckpoint(chk, head []byte, desired HeadBlockFmt, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.LoadBytes(head)
h, err := HeadFromCheckpoint(head, desired)
if err != nil {
return nil, err
}
mc.head = h
mc.headFmt = desired
return mc, nil
}
// Encoding implements Chunk.
@ -642,7 +673,7 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
if c.headFmt < UnorderedHeadBlockFmt && c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}
@ -660,7 +691,37 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
// Close implements Chunk.
// TODO: Fix this to check edge cases.
func (c *MemChunk) Close() error {
return c.cut()
if err := c.cut(); err != nil {
return err
}
return c.reorder()
}
// reorder ensures all blocks in a chunk are in
// monotonically increasing order.
// This mutates
func (c *MemChunk) reorder() error {
var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
}
if ordered {
return nil
}
// Otherwise, we need to rebuild the blocks
from, to := c.Bounds()
newC, err := c.Rebound(from, to)
if err != nil {
return err
}
*c = *newC.(*MemChunk)
return nil
}
// cut a new block and add it to finished blocks.
@ -712,14 +773,28 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)
var headIterator iter.EntryIterator
var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
// skip this block
if maxt < b.mint || b.maxt < mint {
continue
}
if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}
if !c.head.IsEmpty() {
from, _ := c.head.Bounds()
if from < lastMax {
ordered = false
}
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
}
@ -728,8 +803,16 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if headIterator != nil {
blockItrs = append(blockItrs, headIterator)
}
var it iter.EntryIterator
if ordered {
it = iter.NewNonOverlappingIterator(blockItrs, "")
} else {
it = iter.NewHeapIterator(ctx, blockItrs, direction)
}
return iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(blockItrs, ""),
it,
time.Unix(0, mint),
time.Unix(0, maxt),
), nil
@ -755,7 +838,11 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs[i], blockItrs[j] = blockItrs[j], blockItrs[i]
}
return iter.NewNonOverlappingIterator(blockItrs, ""), nil
if ordered {
return iter.NewNonOverlappingIterator(blockItrs, ""), nil
}
return iter.NewHeapIterator(ctx, blockItrs, direction), nil
}
// Iterator implements Chunk.
@ -763,19 +850,38 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
mint, maxt := from.UnixNano(), through.UnixNano()
its := make([]iter.SampleIterator, 0, len(c.blocks)+1)
var lastMax int64 // placeholder to check order across blocks
ordered := true
for _, b := range c.blocks {
// skip this block
if maxt < b.mint || b.maxt < mint {
continue
}
if b.mint < lastMax {
ordered = false
}
lastMax = b.maxt
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}
if !c.head.IsEmpty() {
from, _ := c.head.Bounds()
if from < lastMax {
ordered = false
}
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
}
var it iter.SampleIterator
if ordered {
it = iter.NewNonOverlappingSampleIterator(its, "")
} else {
it = iter.NewHeapSampleIterator(ctx, its)
}
return iter.NewTimeRangedSampleIterator(
iter.NewNonOverlappingSampleIterator(its, ""),
it,
mint,
maxt,
)
@ -802,10 +908,18 @@ func (c *MemChunk) Rebound(start, end time.Time) (Chunk, error) {
return nil, err
}
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk := NewMemChunk(c.Encoding(), defaultBlockSize, c.CompressedSize())
var newChunk *MemChunk
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), c.headFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize())
}
for itr.Next() {
entry := itr.Entry()

@ -51,8 +51,10 @@ var (
}()
)
const DefaultHeadBlockFmt = OrderedHeadBlockFmt
func TestBlocksInclusive(t *testing.T) {
chk := NewMemChunk(EncNone, testBlockSize, testTargetSize)
chk := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
err := chk.Append(logprotoEntry(1, "1"))
require.Nil(t, err)
err = chk.cut()
@ -68,7 +70,7 @@ func TestBlock(t *testing.T) {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
chk := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
cases := []struct {
ts int64
str string
@ -178,7 +180,7 @@ func TestBlock(t *testing.T) {
func TestReadFormatV1(t *testing.T) {
t.Parallel()
c := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
c := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
fillChunk(c)
// overrides default v2 format
c.format = chunkFormatV1
@ -213,179 +215,184 @@ func TestReadFormatV1(t *testing.T) {
// 1) memory populated chunks <-> []byte loaded chunks
// 2) []byte loaded chunks <-> []byte loaded chunks
func TestRoundtripV2(t *testing.T) {
for _, enc := range testEncoding {
for _, version := range []byte{chunkFormatV2, chunkFormatV3} {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
for _, version := range []byte{chunkFormatV2, chunkFormatV3} {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
c.format = version
populated := fillChunk(c)
assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
i := int64(0)
var data int64
for it.Next() {
require.Equal(t, i, it.Entry().Timestamp.UnixNano())
require.Equal(t, testdata.LogString(i), it.Entry().Line)
data += int64(len(it.Entry().Line))
i++
}
require.Equal(t, populated, data)
}
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c.format = version
populated := fillChunk(c)
assertLines(c)
assertLines := func(c *MemChunk) {
require.Equal(t, enc, c.Encoding())
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
// test MemChunk -> NewByteChunk loading
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
i := int64(0)
var data int64
for it.Next() {
require.Equal(t, i, it.Entry().Timestamp.UnixNano())
require.Equal(t, testdata.LogString(i), it.Entry().Line)
data += int64(len(it.Entry().Line))
i++
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
require.Equal(t, populated, data)
}
assertLines(c)
// test MemChunk -> NewByteChunk loading
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
if err != nil {
t.Fatal(err)
}
assertLines(r)
assertLines(r)
// test NewByteChunk -> NewByteChunk loading
rOut, err := r.Bytes()
require.Nil(t, err)
// test NewByteChunk -> NewByteChunk loading
rOut, err := r.Bytes()
require.Nil(t, err)
loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
require.Nil(t, err)
loaded, err := NewByteChunk(rOut, testBlockSize, testTargetSize)
require.Nil(t, err)
assertLines(loaded)
})
assertLines(loaded)
})
}
}
}
}
func TestRoundtripV3(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c.format = chunkFormatV3
_ = fillChunk(c)
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(fmt.Sprintf("%v-%v", f, enc), func(t *testing.T) {
t.Parallel()
b, err := c.Bytes()
require.Nil(t, err)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
require.Nil(t, err)
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
c.format = chunkFormatV3
_ = fillChunk(c)
// have to populate then clear the head block or fail comparing against nil vs zero values
err = r.head.Append(1, "1")
require.Nil(t, err)
r.head.Reset()
b, err := c.Bytes()
require.Nil(t, err)
r, err := NewByteChunk(b, testBlockSize, testTargetSize)
require.Nil(t, err)
require.Equal(t, c, r)
})
b2, err := r.Bytes()
require.Nil(t, err)
require.Equal(t, b, b2)
})
}
}
}
func TestSerialization(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, testTargetSize)
chk := NewMemChunk(enc, f, testBlockSize, testTargetSize)
numSamples := 50000
numSamples := 50000
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i))))
}
require.NoError(t, chk.Close())
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i))))
}
require.NoError(t, chk.Close())
byt, err := chk.Bytes()
require.NoError(t, err)
byt, err := chk.Bytes()
require.NoError(t, err)
bc, err := NewByteChunk(byt, testBlockSize, testTargetSize)
require.NoError(t, err)
bc, err := NewByteChunk(byt, testBlockSize, testTargetSize)
require.NoError(t, err)
it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
it, err := bc.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
e := it.Entry()
require.Equal(t, int64(i), e.Timestamp.UnixNano())
require.Equal(t, strconv.Itoa(i), e.Line)
}
require.NoError(t, it.Error())
e := it.Entry()
require.Equal(t, int64(i), e.Timestamp.UnixNano())
require.Equal(t, strconv.Itoa(i), e.Line)
}
require.NoError(t, it.Error())
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)
sampleIt := bc.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
for i := 0; i < numSamples; i++ {
require.True(t, sampleIt.Next(), i)
s := sampleIt.Sample()
require.Equal(t, int64(i), s.Timestamp)
require.Equal(t, 1., s.Value)
}
require.NoError(t, sampleIt.Error())
s := sampleIt.Sample()
require.Equal(t, int64(i), s.Timestamp)
require.Equal(t, 1., s.Value)
}
require.NoError(t, sampleIt.Error())
byt2, err := chk.Bytes()
require.NoError(t, err)
byt2, err := chk.Bytes()
require.NoError(t, err)
require.True(t, bytes.Equal(byt, byt2))
})
require.True(t, bytes.Equal(byt, byt2))
})
}
}
}
func TestChunkFilling(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
chk := NewMemChunk(enc, testBlockSize, 0)
chk.blockSize = 1024
chk := NewMemChunk(enc, f, testBlockSize, 0)
chk.blockSize = 1024
// We should be able to append only 10KB of logs.
maxBytes := chk.blockSize * blocksPerChunk
lineSize := 512
lines := maxBytes / lineSize
// We should be able to append only 10KB of logs.
maxBytes := chk.blockSize * blocksPerChunk
lineSize := 512
lines := maxBytes / lineSize
logLine := string(make([]byte, lineSize))
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: logLine,
}
logLine := string(make([]byte, lineSize))
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: logLine,
}
i := int64(0)
for ; chk.SpaceFor(entry) && i < 30; i++ {
entry.Timestamp = time.Unix(0, i)
require.NoError(t, chk.Append(entry))
}
i := int64(0)
for ; chk.SpaceFor(entry) && i < 30; i++ {
entry.Timestamp = time.Unix(0, i)
require.NoError(t, chk.Append(entry))
}
require.Equal(t, int64(lines), i)
require.Equal(t, int64(lines), i)
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
i = 0
for it.Next() {
entry := it.Entry()
require.Equal(t, i, entry.Timestamp.UnixNano())
i++
}
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, noopStreamPipeline)
require.NoError(t, err)
i = 0
for it.Next() {
entry := it.Entry()
require.Equal(t, i, entry.Timestamp.UnixNano())
i++
}
require.Equal(t, int64(lines), i)
})
require.Equal(t, int64(lines), i)
})
}
}
}
func TestGZIPChunkTargetSize(t *testing.T) {
t.Parallel()
chk := NewMemChunk(EncGZIP, testBlockSize, testTargetSize)
chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
lineSize := 512
entry := &logproto.Entry{
@ -443,14 +450,22 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
if chk.headFmt == OrderedHeadBlockFmt {
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
} else {
assert.NoError(t, chk.Append(logprotoEntry(1, "test")))
}
},
"append out of order in a new block right after cutting the previous one": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
if chk.headFmt == OrderedHeadBlockFmt {
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
} else {
assert.NoError(t, chk.Append(logprotoEntry(1, "test")))
}
},
"append out of order in a new block after multiple cuts": func(t *testing.T, chk *MemChunk) {
assert.NoError(t, chk.Append(logprotoEntry(5, "test")))
@ -459,40 +474,48 @@ func TestMemChunk_AppendOutOfOrder(t *testing.T) {
assert.NoError(t, chk.Append(logprotoEntry(6, "test")))
assert.NoError(t, chk.cut())
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
if chk.headFmt == OrderedHeadBlockFmt {
assert.EqualError(t, chk.Append(logprotoEntry(1, "test")), ErrOutOfOrder.Error())
} else {
assert.NoError(t, chk.Append(logprotoEntry(1, "test")))
}
},
}
for testName, tester := range tests {
tester := tester
for _, f := range HeadBlockFmts {
for testName, tester := range tests {
tester := tester
t.Run(testName, func(t *testing.T) {
t.Parallel()
t.Run(testName, func(t *testing.T) {
t.Parallel()
tester(t, NewMemChunk(EncGZIP, testBlockSize, testTargetSize))
})
tester(t, NewMemChunk(EncGZIP, f, testBlockSize, testTargetSize))
})
}
}
}
func TestChunkSize(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
t.Log("Chunk size", humanize.Bytes(uint64(len(b))))
t.Log("characters ", humanize.Bytes(uint64(inserted)))
t.Log("Ratio", float64(inserted)/float64(len(b)))
})
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
inserted := fillChunk(c)
b, err := c.Bytes()
if err != nil {
t.Fatal(err)
}
t.Log("Chunk size", humanize.Bytes(uint64(len(b))))
t.Log("characters ", humanize.Bytes(uint64(inserted)))
t.Log("Ratio", float64(inserted)/float64(len(b)))
})
}
}
}
func TestChunkStats(t *testing.T) {
c := NewMemChunk(EncSnappy, testBlockSize, 0)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, 0)
first := time.Now()
entry := &logproto.Entry{
Timestamp: first,
@ -559,42 +582,44 @@ func TestChunkStats(t *testing.T) {
}
func TestIteratorClose(t *testing.T) {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
for _, test := range []func(iter iter.EntryIterator, t *testing.T){
func(iter iter.EntryIterator, t *testing.T) {
// close without iterating
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after iterating
for iter.Next() {
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
t.Run(enc.String(), func(t *testing.T) {
for _, test := range []func(iter iter.EntryIterator, t *testing.T){
func(iter iter.EntryIterator, t *testing.T) {
// close without iterating
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after iterating
for iter.Next() {
_ = iter.Entry()
}
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after a single iteration
iter.Next()
_ = iter.Entry()
}
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
func(iter iter.EntryIterator, t *testing.T) {
// close after a single iteration
iter.Next()
_ = iter.Entry()
if err := iter.Close(); err != nil {
if err := iter.Close(); err != nil {
t.Fatal(err)
}
},
} {
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
}
},
} {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
inserted := fillChunk(c)
iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline)
if err != nil {
t.Fatal(err)
test(iter, t)
}
test(iter, t)
}
})
})
}
}
}
@ -609,21 +634,24 @@ func BenchmarkWrite(b *testing.B) {
}
i := int64(0)
for _, enc := range testEncoding {
b.Run(enc.String(), func(b *testing.B) {
for n := 0; n < b.N; n++ {
c := NewMemChunk(enc, testBlockSize, testTargetSize)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
entry.Timestamp = time.Unix(0, i)
entry.Line = testdata.LogString(i)
i++
for _, f := range HeadBlockFmts {
for _, enc := range testEncoding {
b.Run(fmt.Sprintf("%v-%v", f, enc), func(b *testing.B) {
for n := 0; n < b.N; n++ {
c := NewMemChunk(enc, f, testBlockSize, testTargetSize)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
entry.Timestamp = time.Unix(0, i)
entry.Line = testdata.LogString(i)
i++
}
chunks = append(chunks, c)
}
chunks = append(chunks, c)
}
result = chunks
})
result = chunks
})
}
}
}
@ -686,7 +714,7 @@ func BenchmarkRead(b *testing.B) {
func BenchmarkBackwardIterator(b *testing.B) {
b.ReportAllocs()
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
_ = fillChunk(c)
b.ResetTimer()
for n := 0; n < b.N; n++ {
@ -781,7 +809,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
func TestMemChunk_IteratorBounds(t *testing.T) {
createChunk := func() *MemChunk {
t.Helper()
c := NewMemChunk(EncNone, 1e6, 1e6)
c := NewMemChunk(EncNone, DefaultHeadBlockFmt, 1e6, 1e6)
if err := c.Append(&logproto.Entry{
Timestamp: time.Unix(0, 1),
@ -852,7 +880,7 @@ func TestMemchunkLongLine(t *testing.T) {
t.Run(enc.String(), func(t *testing.T) {
t.Parallel()
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for i := 1; i <= 10; i++ {
require.NoError(t, c.Append(&logproto.Entry{Timestamp: time.Unix(0, int64(i)), Line: strings.Repeat("e", 200000)}))
}
@ -870,80 +898,58 @@ func TestMemchunkLongLine(t *testing.T) {
func TestBytesWith(t *testing.T) {
t.Parallel()
exp, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith(nil)
exp, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith(nil)
require.Nil(t, err)
out, err := NewMemChunk(EncNone, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
out, err := NewMemChunk(EncNone, DefaultHeadBlockFmt, testBlockSize, testTargetSize).BytesWith([]byte{1, 2, 3})
require.Nil(t, err)
require.Equal(t, exp, out)
}
func TestHeadBlockCheckpointing(t *testing.T) {
t.Parallel()
c := NewMemChunk(EncSnappy, 256*1024, 1500*1024)
// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))
b, err := c.head.CheckpointBytes(nil)
require.Nil(t, err)
hb := &headBlock{}
require.Nil(t, hb.LoadBytes(b))
require.Equal(t, c.head, hb)
}
func TestCheckpointEncoding(t *testing.T) {
t.Parallel()
blockSize, targetSize := 256*1024, 1500*1024
c := NewMemChunk(EncSnappy, blockSize, targetSize)
// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
for _, f := range HeadBlockFmts {
t.Run(f.String(), func(t *testing.T) {
c := NewMemChunk(EncSnappy, f, blockSize, targetSize)
// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// cut it
require.Nil(t, c.cut())
// cut it
require.Nil(t, c.cut())
// add a few more to head
for i := 5; i < 10; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// add a few more to head
for i := 5; i < 10; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))
// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))
var chk, head bytes.Buffer
err := c.SerializeForCheckpointTo(&chk, &head)
require.Nil(t, err)
var chk, head bytes.Buffer
err := c.SerializeForCheckpointTo(&chk, &head)
require.Nil(t, err)
cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), blockSize, targetSize)
require.Nil(t, err)
cpy, err := MemchunkFromCheckpoint(chk.Bytes(), head.Bytes(), f, blockSize, targetSize)
require.Nil(t, err)
require.Equal(t, c, cpy)
require.Equal(t, c, cpy)
})
}
}
var (
@ -952,131 +958,140 @@ var (
)
func BenchmarkBufferedIteratorLabels(b *testing.B) {
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
_ = fillChunk(c)
labelsSet := []labels.Labels{
{
{Name: "cluster", Value: "us-central1"},
{Name: "stream", Value: "stdout"},
{Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/query-frontend"},
{Name: "container", Value: "query-frontend"},
{Name: "pod", Value: "query-frontend-6894f97b98-89q2n"},
},
{
{Name: "cluster", Value: "us-central2"},
{Name: "stream", Value: "stderr"},
{Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/querier"},
{Name: "container", Value: "querier"},
{Name: "pod", Value: "querier-6894f97b98-89q2n"},
},
}
for _, test := range []string{
`{app="foo"}`,
`{app="foo"} != "foo"`,
`{app="foo"} != "foo" | logfmt `,
`{app="foo"} != "foo" | logfmt | duration > 10ms`,
`{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseLogSelector(test, true)
if err != nil {
b.Fatal(err)
}
p, err := expr.Pipeline()
if err != nil {
b.Fatal(err)
}
var iters []iter.EntryIterator
for _, lbs := range labelsSet {
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs))
if err != nil {
b.Fatal(err)
}
iters = append(iters, it)
for _, f := range HeadBlockFmts {
b.Run(f.String(), func(b *testing.B) {
c := NewMemChunk(EncSnappy, f, testBlockSize, testTargetSize)
_ = fillChunk(c)
labelsSet := []labels.Labels{
{
{Name: "cluster", Value: "us-central1"},
{Name: "stream", Value: "stdout"},
{Name: "filename", Value: "/var/log/pods/loki-prod_query-frontend-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/query-frontend"},
{Name: "container", Value: "query-frontend"},
{Name: "pod", Value: "query-frontend-6894f97b98-89q2n"},
},
{
{Name: "cluster", Value: "us-central2"},
{Name: "stream", Value: "stderr"},
{Name: "filename", Value: "/var/log/pods/loki-prod_querier-6894f97b98-89q2n_eac98024-f60f-44af-a46f-d099bc99d1e7/query-frontend/0.log"},
{Name: "namespace", Value: "loki-dev"},
{Name: "job", Value: "loki-prod/querier"},
{Name: "container", Value: "querier"},
{Name: "pod", Value: "querier-6894f97b98-89q2n"},
},
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}})
for _, test := range []string{
`{app="foo"}`,
`{app="foo"} != "foo"`,
`{app="foo"} != "foo" | logfmt `,
`{app="foo"} != "foo" | logfmt | duration > 10ms`,
`{app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseLogSelector(test, true)
if err != nil {
b.Fatal(err)
}
}
p, err := expr.Pipeline()
if err != nil {
b.Fatal(err)
}
var iters []iter.EntryIterator
for _, lbs := range labelsSet {
it, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Now(), logproto.FORWARD, p.ForStream(lbs))
if err != nil {
b.Fatal(err)
}
iters = append(iters, it)
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
streams = append(streams, logproto.Stream{Labels: it.Labels(), Entries: []logproto.Entry{it.Entry()}})
}
}
}
streams = streams[:0]
})
}
streams = streams[:0]
})
}
for _, test := range []string{
`rate({app="foo"}[1m])`,
`sum by (cluster) (rate({app="foo"}[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" [10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseSampleExpr(test)
if err != nil {
b.Fatal(err)
}
ex, err := expr.Extractor()
if err != nil {
b.Fatal(err)
}
var iters []iter.SampleIterator
for _, lbs := range labelsSet {
iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs)))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}})
for _, test := range []string{
`rate({app="foo"}[1m])`,
`sum by (cluster) (rate({app="foo"}[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" [10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (caller) (rate({app="foo"} != "foo" | logfmt[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms[10s]))`,
`sum by (cluster) (rate({app="foo"} != "foo" | logfmt | duration > 10ms and component="tsdb"[1m]))`,
} {
b.Run(test, func(b *testing.B) {
b.ReportAllocs()
expr, err := logql.ParseSampleExpr(test)
if err != nil {
b.Fatal(err)
}
}
ex, err := expr.Extractor()
if err != nil {
b.Fatal(err)
}
var iters []iter.SampleIterator
for _, lbs := range labelsSet {
iters = append(iters, c.SampleIterator(context.Background(), time.Unix(0, 0), time.Now(), ex.ForStream(lbs)))
}
b.ResetTimer()
for n := 0; n < b.N; n++ {
for _, it := range iters {
for it.Next() {
series = append(series, logproto.Series{Labels: it.Labels(), Samples: []logproto.Sample{it.Sample()}})
}
}
}
series = series[:0]
})
}
series = series[:0]
})
}
}
func Test_HeadIteratorReverse(t *testing.T) {
c := NewMemChunk(EncSnappy, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf(`msg="%d"`, i),
}
}
var i int64
for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 {
require.NoError(t, c.Append(e))
}
for _, f := range HeadBlockFmts {
t.Run(f.String(), func(t *testing.T) {
c := NewMemChunk(EncSnappy, f, testBlockSize, testTargetSize)
genEntry := func(i int64) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, i),
Line: fmt.Sprintf(`msg="%d"`, i),
}
}
var i int64
for e := genEntry(i); c.SpaceFor(e); e, i = genEntry(i+1), i+1 {
require.NoError(t, c.Append(e))
}
assertOrder := func(t *testing.T, total int64) {
expr, err := logql.ParseLogSelector(`{app="foo"} | logfmt`, true)
require.NoError(t, err)
p, err := expr.Pipeline()
require.NoError(t, err)
it, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(0, i), logproto.BACKWARD, p.ForStream(labels.Labels{{Name: "app", Value: "foo"}}))
require.NoError(t, err)
for it.Next() {
total--
require.Equal(t, total, it.Entry().Timestamp.UnixNano())
}
}
assertOrder := func(t *testing.T, total int64) {
expr, err := logql.ParseLogSelector(`{app="foo"} | logfmt`, true)
require.NoError(t, err)
p, err := expr.Pipeline()
require.NoError(t, err)
it, err := c.Iterator(context.TODO(), time.Unix(0, 0), time.Unix(0, i), logproto.BACKWARD, p.ForStream(labels.Labels{{Name: "app", Value: "foo"}}))
require.NoError(t, err)
for it.Next() {
total--
require.Equal(t, total, it.Entry().Timestamp.UnixNano())
}
}
assertOrder(t, i)
// let's try again without the headblock.
require.NoError(t, c.cut())
assertOrder(t, i)
assertOrder(t, i)
// let's try again without the headblock.
require.NoError(t, c.cut())
assertOrder(t, i)
})
}
}
func TestMemChunk_Rebound(t *testing.T) {
@ -1159,7 +1174,7 @@ func TestMemChunk_Rebound(t *testing.T) {
}
func buildTestMemChunk(t *testing.T, from, through time.Time) *MemChunk {
chk := NewMemChunk(EncGZIP, defaultBlockSize, 0)
chk := NewMemChunk(EncGZIP, DefaultHeadBlockFmt, defaultBlockSize, 0)
for ; from.Before(through); from = from.Add(time.Second) {
err := chk.Append(&logproto.Entry{
Line: from.String(),

@ -361,7 +361,7 @@ func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version > OrderedHeadBlockFmt {
return hb, nil
}
out := &headBlock{}
out := version.NewBlock()
err := hb.forEntries(
context.Background(),
@ -455,9 +455,8 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
switch version {
case UnorderedHeadBlockFmt.Byte():
default:
if version != UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version)
}
@ -487,6 +486,10 @@ func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
// This is particularly helpful replaying WALs from different configurations
// such as after enabling unordered writes.
func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) {
if len(b) == 0 {
return desired.NewBlock(), nil
}
db := decbuf{b: b}
version := db.byte()
@ -494,17 +497,11 @@ func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) {
return nil, errors.Wrap(db.err(), "verifying headblock header")
}
format := HeadBlockFmt(version)
var decodedBlock HeadBlock
switch {
case format <= OrderedHeadBlockFmt:
decodedBlock = &headBlock{}
case format == UnorderedHeadBlockFmt:
decodedBlock = newUnorderedHeadBlock()
default:
return nil, fmt.Errorf("unexpected head block version: %v", version)
if format > UnorderedHeadBlockFmt {
return nil, fmt.Errorf("unexpected head block version: %v", format)
}
decodedBlock := format.NewBlock()
if err := decodedBlock.LoadBytes(b); err != nil {
return nil, err
}

@ -13,6 +13,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
)
func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) {
@ -331,3 +332,281 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
})
}
}
func TestUnorderedChunkIterators(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
for i := 0; i < 100; i++ {
// push in reverse order
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(int64(99-i), 0),
Line: fmt.Sprint(99 - i),
}))
// ensure we have a mix of cut blocks + head block.
if i%30 == 0 {
require.Nil(t, c.cut())
}
}
// ensure head block has data
require.Equal(t, false, c.head.IsEmpty())
forward, err := c.Iterator(
context.Background(),
time.Unix(0, 0),
time.Unix(100, 0),
logproto.FORWARD,
noopStreamPipeline,
)
require.Nil(t, err)
backward, err := c.Iterator(
context.Background(),
time.Unix(0, 0),
time.Unix(100, 0),
logproto.BACKWARD,
noopStreamPipeline,
)
require.Nil(t, err)
smpl := c.SampleIterator(
context.Background(),
time.Unix(0, 0),
time.Unix(100, 0),
countExtractor,
)
for i := 0; i < 100; i++ {
require.Equal(t, true, forward.Next())
require.Equal(t, true, backward.Next())
require.Equal(t, true, smpl.Next())
require.Equal(t, time.Unix(int64(i), 0), forward.Entry().Timestamp)
require.Equal(t, time.Unix(int64(99-i), 0), backward.Entry().Timestamp)
require.Equal(t, float64(1), smpl.Sample().Value)
require.Equal(t, time.Unix(int64(i), 0).UnixNano(), smpl.Sample().Timestamp)
}
require.Equal(t, false, forward.Next())
require.Equal(t, false, backward.Next())
}
func BenchmarkUnorderedRead(b *testing.B) {
legacy := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(legacy, false)
ordered := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkClose(ordered, false)
unordered := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(unordered, false)
tcs := []struct {
desc string
c *MemChunk
}{
{
desc: "ordered+legacy hblock",
c: legacy,
},
{
desc: "ordered+unordered hblock",
c: ordered,
},
{
desc: "unordered+unordered hblock",
c: unordered,
},
}
b.Run("itr", func(b *testing.B) {
for _, tc := range tcs {
b.Run(tc.desc, func(b *testing.B) {
for n := 0; n < b.N; n++ {
iterator, err := tc.c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
for iterator.Next() {
_ = iterator.Entry()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
})
}
})
b.Run("smpl", func(b *testing.B) {
for _, tc := range tcs {
b.Run(tc.desc, func(b *testing.B) {
for n := 0; n < b.N; n++ {
iterator := tc.c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
for iterator.Next() {
_ = iterator.Sample()
}
if err := iterator.Close(); err != nil {
b.Fatal(err)
}
}
})
}
})
}
func TestUnorderedIteratorCountsAllEntries(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
fillChunkRandomOrder(c, false)
ct := 0
var i int64
iterator, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline)
if err != nil {
panic(err)
}
for iterator.Next() {
next := iterator.Entry().Timestamp.UnixNano()
require.GreaterOrEqual(t, next, i)
i = next
ct++
}
if err := iterator.Close(); err != nil {
t.Fatal(err)
}
require.Equal(t, c.Size(), ct)
ct = 0
i = 0
smpl := c.SampleIterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), countExtractor)
for smpl.Next() {
next := smpl.Sample().Timestamp
require.GreaterOrEqual(t, next, i)
i = next
ct += int(smpl.Sample().Value)
}
require.Equal(t, c.Size(), ct)
if err := iterator.Close(); err != nil {
t.Fatal(err)
}
}
func chunkFrom(xs []logproto.Entry) ([]byte, error) {
c := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range xs {
if err := c.Append(&x); err != nil {
return nil, err
}
}
if err := c.Close(); err != nil {
return nil, err
}
return c.Bytes()
}
func TestReorder(t *testing.T) {
for _, tc := range []struct {
desc string
input []logproto.Entry
expected []logproto.Entry
}{
{
desc: "unordered",
input: []logproto.Entry{
{
Timestamp: time.Unix(4, 0),
Line: "x",
},
{
Timestamp: time.Unix(2, 0),
Line: "x",
},
{
Timestamp: time.Unix(3, 0),
Line: "x",
},
{
Timestamp: time.Unix(1, 0),
Line: "x",
},
},
expected: []logproto.Entry{
{
Timestamp: time.Unix(1, 0),
Line: "x",
},
{
Timestamp: time.Unix(2, 0),
Line: "x",
},
{
Timestamp: time.Unix(3, 0),
Line: "x",
},
{
Timestamp: time.Unix(4, 0),
Line: "x",
},
},
},
} {
t.Run(tc.desc, func(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
for _, x := range tc.input {
require.Nil(t, c.Append(&x))
}
require.Nil(t, c.Close())
b, err := c.Bytes()
require.Nil(t, err)
exp, err := chunkFrom(tc.expected)
require.Nil(t, err)
require.Equal(t, exp, b)
})
}
}
func TestReorderAcrossBlocks(t *testing.T) {
c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize)
for _, batch := range [][]int{
// ensure our blocks have overlapping bounds and must be reordered
// before closing.
{1, 5},
{3, 7},
} {
for _, x := range batch {
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(int64(x), 0),
Line: fmt.Sprint(x),
}))
}
require.Nil(t, c.cut())
}
// get bounds before it's reordered
from, to := c.Bounds()
require.Nil(t, c.Close())
itr, err := c.Iterator(context.Background(), from, to.Add(time.Nanosecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.Nil(t, err)
exp := []entry{
{
t: time.Unix(1, 0).UnixNano(),
s: "1",
},
{
t: time.Unix(3, 0).UnixNano(),
s: "3",
},
{
t: time.Unix(5, 0).UnixNano(),
s: "5",
},
{
t: time.Unix(7, 0).UnixNano(),
s: "7",
},
}
iterEq(t, exp, itr)
}

@ -1,6 +1,7 @@
package chunkenc
import (
"math/rand"
"time"
"github.com/grafana/loki/pkg/chunkenc/testdata"
@ -21,7 +22,7 @@ func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) {
for n := 0; n < chunksCount; n++ {
entry := logprotoEntry(0, testdata.LogString(0))
c := NewMemChunk(enc, testBlockSize, testTargetSize)
c := NewMemChunk(enc, DefaultHeadBlockFmt, testBlockSize, testTargetSize)
for c.SpaceFor(entry) {
size += uint64(len(entry.Line))
_ = c.Append(entry)
@ -35,6 +36,10 @@ func generateData(enc Encoding, chunksCount int) ([]Chunk, uint64) {
}
func fillChunk(c Chunk) int64 {
return fillChunkClose(c, true)
}
func fillChunkClose(c Chunk, close bool) int64 {
i := int64(0)
inserted := int64(0)
entry := &logproto.Entry{
@ -52,6 +57,31 @@ func fillChunk(c Chunk) int64 {
entry.Line = testdata.LogString(i)
}
_ = c.Close()
if close {
_ = c.Close()
}
return inserted
}
func fillChunkRandomOrder(c Chunk, close bool) {
ub := int64(1 << 30)
i := int64(0)
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: testdata.LogString(i),
}
for c.SpaceFor(entry) {
err := c.Append(entry)
if err != nil {
panic(err)
}
i++
entry.Timestamp = time.Unix(0, rand.Int63n(ub))
entry.Line = testdata.LogString(i)
}
if close {
_ = c.Close()
}
}

@ -100,7 +100,11 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) {
lastUpdated: c.LastUpdated,
}
mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, conf.BlockSize, conf.TargetChunkSize)
hbType := chunkenc.OrderedHeadBlockFmt
if conf.UnorderedWrites {
hbType = chunkenc.UnorderedHeadBlockFmt
}
mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize)
if err != nil {
return nil, err
}
@ -273,6 +277,7 @@ func (s *streamIterator) Next() bool {
s.current.To = stream.lastLine.ts
s.current.LastLine = stream.lastLine.content
s.current.EntryCt = stream.entryCt
s.current.HighestTs = stream.highestTs
return true
}

@ -141,12 +141,15 @@ type Series struct {
Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
Labels []github_com_cortexproject_cortex_pkg_cortexpb.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter" json:"labels"`
Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
// Last timestamp of the last chunk.
To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
// most recently pushed timestamp.
To time.Time `protobuf:"bytes,5,opt,name=to,proto3,stdtime" json:"to"`
// most recently pushed line.
LastLine string `protobuf:"bytes,6,opt,name=lastLine,proto3" json:"lastLine,omitempty"`
// highest counter value for pushes to this stream.
// Used to skip already applied entries during WAL replay.
EntryCt int64 `protobuf:"varint,7,opt,name=entryCt,proto3" json:"entryCt,omitempty"`
// highest timestamp pushed to this stream.
HighestTs time.Time `protobuf:"bytes,8,opt,name=highestTs,proto3,stdtime" json:"highestTs"`
}
func (m *Series) Reset() { *m = Series{} }
@ -223,6 +226,13 @@ func (m *Series) GetEntryCt() int64 {
return 0
}
func (m *Series) GetHighestTs() time.Time {
if m != nil {
return m.HighestTs
}
return time.Time{}
}
func init() {
proto.RegisterType((*Chunk)(nil), "loki_ingester.Chunk")
proto.RegisterType((*Series)(nil), "loki_ingester.Series")
@ -231,39 +241,40 @@ func init() {
func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) }
var fileDescriptor_00f4b7152db9bdb5 = []byte{
// 503 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0x3d, 0x8f, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0xc6, 0xba, 0x2a, 0x0d,
0xb6, 0x14, 0x28, 0xa0, 0x41, 0x8a, 0x0f, 0x21, 0x21, 0x5d, 0x81, 0x0c, 0x34, 0x34, 0xc8, 0x1f,
0x1b, 0xdb, 0xc4, 0xf1, 0x5a, 0xbb, 0x6b, 0x89, 0xeb, 0xf8, 0x09, 0xf7, 0x33, 0xf8, 0x29, 0x57,
0x46, 0x54, 0x27, 0x90, 0x0e, 0xe2, 0x34, 0x94, 0xf7, 0x13, 0xd0, 0xae, 0x6d, 0x2e, 0x94, 0xee,
0xe6, 0xbd, 0x99, 0xe7, 0x79, 0x9e, 0x7d, 0xf0, 0x71, 0xb9, 0x49, 0xdc, 0xac, 0x48, 0xa8, 0x90,
0x94, 0xbb, 0x51, 0x4a, 0xa3, 0x4d, 0xc9, 0xb2, 0x42, 0x3a, 0x25, 0x67, 0x92, 0xa1, 0xfb, 0x39,
0xdb, 0x64, 0x9f, 0xba, 0xfe, 0x6c, 0x9e, 0x30, 0x96, 0xe4, 0xd4, 0xd5, 0xcd, 0xb0, 0x5a, 0xbb,
0x32, 0xdb, 0x52, 0x21, 0x83, 0x6d, 0xd9, 0xcc, 0xcf, 0x9e, 0x24, 0x99, 0x4c, 0xab, 0xd0, 0x89,
0xd8, 0xd6, 0x4d, 0x58, 0xc2, 0xee, 0x26, 0x15, 0xd2, 0x40, 0x57, 0xed, 0xf8, 0x8b, 0xa3, 0xf1,
0x88, 0x71, 0x49, 0xbf, 0x94, 0x9c, 0x7d, 0xa6, 0x91, 0x6c, 0x91, 0xab, 0xdc, 0xb5, 0x8d, 0xb0,
0x2d, 0x1a, 0xe9, 0xe9, 0xcf, 0x01, 0x1c, 0x9d, 0xa5, 0x55, 0xb1, 0x41, 0xcf, 0xa1, 0xb9, 0xe6,
0x6c, 0x8b, 0x81, 0x0d, 0x16, 0xd3, 0xe5, 0xcc, 0x69, 0x3c, 0x3a, 0xdd, 0x66, 0xe7, 0x7d, 0xe7,
0xd1, 0x1b, 0x5f, 0xdd, 0xcc, 0x8d, 0xcb, 0x5f, 0x73, 0xe0, 0x6b, 0x05, 0x7a, 0x06, 0x07, 0x92,
0xe1, 0x41, 0x0f, 0xdd, 0x40, 0x32, 0xe4, 0xc1, 0xc9, 0x3a, 0xaf, 0x44, 0x4a, 0xe3, 0x95, 0xc4,
0xc3, 0x1e, 0xe2, 0x3b, 0x19, 0x7a, 0x0d, 0xa7, 0x79, 0x20, 0xe4, 0x87, 0x32, 0x0e, 0x24, 0x8d,
0xb1, 0xd9, 0xe3, 0x2b, 0xc7, 0x42, 0xf4, 0x08, 0x5a, 0x51, 0xce, 0x04, 0x8d, 0xf1, 0xc8, 0x06,
0x8b, 0xb1, 0xdf, 0x22, 0xc5, 0x8b, 0x8b, 0x22, 0xa2, 0x31, 0xb6, 0x1a, 0xbe, 0x41, 0x08, 0x41,
0x33, 0x0e, 0x64, 0x80, 0x4f, 0x6c, 0xb0, 0xb8, 0xe7, 0xeb, 0x5a, 0x71, 0x29, 0x0d, 0x62, 0x3c,
0x6e, 0x38, 0x55, 0x9f, 0x7e, 0x1f, 0x40, 0xeb, 0x1d, 0xe5, 0x19, 0x15, 0xea, 0x53, 0x95, 0xa0,
0xfc, 0xcd, 0x2b, 0x7d, 0xe0, 0x89, 0xdf, 0x22, 0x64, 0xc3, 0xe9, 0x5a, 0x05, 0x83, 0x97, 0x3c,
0x2b, 0xa4, 0xbe, 0xa2, 0xe9, 0x1f, 0x53, 0xa8, 0x80, 0x56, 0x1e, 0x84, 0x34, 0x17, 0x78, 0x68,
0x0f, 0x17, 0xd3, 0xe5, 0x03, 0xa7, 0x7b, 0x4a, 0xe7, 0x5c, 0xf1, 0x6f, 0x83, 0x8c, 0x7b, 0x2b,
0xf5, 0x63, 0x3f, 0x6e, 0xe6, 0xbd, 0xa2, 0xd0, 0xe8, 0x57, 0x71, 0x50, 0x4a, 0xca, 0xfd, 0x76,
0x0b, 0x5a, 0x42, 0x2b, 0x52, 0x89, 0x10, 0xd8, 0xd4, 0xfb, 0x1e, 0x3a, 0xff, 0xa5, 0xd7, 0xd1,
0x71, 0xf1, 0x4c, 0xb5, 0xd0, 0x6f, 0x27, 0xdb, 0x08, 0x8c, 0x7a, 0x46, 0x60, 0x06, 0xc7, 0xea,
0x15, 0xce, 0xb3, 0x82, 0xea, 0x03, 0x4f, 0xfc, 0x7f, 0x18, 0x61, 0x78, 0x42, 0x0b, 0xc9, 0x2f,
0xce, 0xa4, 0xbe, 0xf2, 0xd0, 0xef, 0xa0, 0xf7, 0x72, 0xb7, 0x27, 0xc6, 0xf5, 0x9e, 0x18, 0xb7,
0x7b, 0x02, 0xbe, 0xd6, 0x04, 0x7c, 0xab, 0x09, 0xb8, 0xaa, 0x09, 0xd8, 0xd5, 0x04, 0xfc, 0xae,
0x09, 0xf8, 0x53, 0x13, 0xe3, 0xb6, 0x26, 0xe0, 0xf2, 0x40, 0x8c, 0xdd, 0x81, 0x18, 0xd7, 0x07,
0x62, 0x7c, 0x1c, 0x77, 0xfe, 0x43, 0x4b, 0xfb, 0x7a, 0xfa, 0x37, 0x00, 0x00, 0xff, 0xff, 0x70,
0x87, 0xe1, 0x9b, 0xb4, 0x03, 0x00, 0x00,
// 520 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x94, 0x52, 0xb1, 0x8e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x8e, 0x2f, 0xd9, 0x40, 0xb3, 0x20, 0xb4, 0x8a, 0xc4, 0x26, 0xba, 0x2a, 0x0d,
0xb6, 0x14, 0x28, 0xa0, 0x41, 0x4a, 0x0e, 0x21, 0x21, 0x5d, 0x81, 0xcc, 0xd1, 0xd0, 0x20, 0xc7,
0x9e, 0xd8, 0x26, 0x8e, 0xd7, 0xda, 0xdd, 0x48, 0x5c, 0xc7, 0x27, 0x5c, 0xc5, 0x37, 0xf0, 0x29,
0x57, 0xa6, 0x3c, 0x81, 0x74, 0x10, 0xa7, 0xa1, 0xbc, 0x4f, 0x40, 0xbb, 0xb6, 0xef, 0x42, 0x77,
0xe9, 0xe6, 0xbd, 0x99, 0xe7, 0x19, 0xbf, 0x7d, 0xf8, 0x69, 0xb1, 0x8c, 0xbd, 0x34, 0x8f, 0x41,
0x2a, 0x10, 0x5e, 0x98, 0x40, 0xb8, 0x2c, 0x78, 0x9a, 0x2b, 0xb7, 0x10, 0x5c, 0x71, 0xf2, 0x30,
0xe3, 0xcb, 0xf4, 0x73, 0xd3, 0x1f, 0x0c, 0x63, 0xce, 0xe3, 0x0c, 0x3c, 0xd3, 0x9c, 0xaf, 0x17,
0x9e, 0x4a, 0x57, 0x20, 0x55, 0xb0, 0x2a, 0xaa, 0xf9, 0xc1, 0xb3, 0x38, 0x55, 0xc9, 0x7a, 0xee,
0x86, 0x7c, 0xe5, 0xc5, 0x3c, 0xe6, 0x77, 0x93, 0x1a, 0x19, 0x60, 0xaa, 0x7a, 0xfc, 0xd5, 0xde,
0x78, 0xc8, 0x85, 0x82, 0xaf, 0x85, 0xe0, 0x5f, 0x20, 0x54, 0x35, 0xf2, 0xf4, 0x75, 0x75, 0x63,
0x5e, 0x17, 0x95, 0xf4, 0xf8, 0x57, 0x0b, 0x77, 0x4e, 0x92, 0x75, 0xbe, 0x24, 0x2f, 0xb1, 0xbd,
0x10, 0x7c, 0x45, 0xd1, 0x08, 0x8d, 0xfb, 0x93, 0x81, 0x5b, 0xdd, 0xe8, 0x36, 0x9b, 0xdd, 0xb3,
0xe6, 0xc6, 0x59, 0xf7, 0xf2, 0x7a, 0x68, 0x5d, 0xfc, 0x1e, 0x22, 0xdf, 0x28, 0xc8, 0x0b, 0xdc,
0x52, 0x9c, 0xb6, 0x0e, 0xd0, 0xb5, 0x14, 0x27, 0x33, 0xdc, 0x5b, 0x64, 0x6b, 0x99, 0x40, 0x34,
0x55, 0xb4, 0x7d, 0x80, 0xf8, 0x4e, 0x46, 0xde, 0xe2, 0x7e, 0x16, 0x48, 0xf5, 0xb1, 0x88, 0x02,
0x05, 0x11, 0xb5, 0x0f, 0xf8, 0xca, 0xbe, 0x90, 0x3c, 0xc1, 0x4e, 0x98, 0x71, 0x09, 0x11, 0xed,
0x8c, 0xd0, 0xb8, 0xeb, 0xd7, 0x48, 0xf3, 0xf2, 0x3c, 0x0f, 0x21, 0xa2, 0x4e, 0xc5, 0x57, 0x88,
0x10, 0x6c, 0x47, 0x81, 0x0a, 0xe8, 0xd1, 0x08, 0x8d, 0x1f, 0xf8, 0xa6, 0xd6, 0x5c, 0x02, 0x41,
0x44, 0xbb, 0x15, 0xa7, 0xeb, 0xe3, 0xef, 0x6d, 0xec, 0x7c, 0x00, 0x91, 0x82, 0xd4, 0x9f, 0x5a,
0x4b, 0x10, 0xef, 0xde, 0x18, 0x83, 0x7b, 0x7e, 0x8d, 0xc8, 0x08, 0xf7, 0x17, 0x3a, 0x18, 0xa2,
0x10, 0x69, 0xae, 0x8c, 0x8b, 0xb6, 0xbf, 0x4f, 0x91, 0x1c, 0x3b, 0x59, 0x30, 0x87, 0x4c, 0xd2,
0xf6, 0xa8, 0x3d, 0xee, 0x4f, 0x1e, 0xb9, 0xcd, 0x53, 0xba, 0xa7, 0x9a, 0x7f, 0x1f, 0xa4, 0x62,
0x36, 0xd5, 0x3f, 0xf6, 0xf3, 0x7a, 0x78, 0x50, 0x14, 0x2a, 0xfd, 0x34, 0x0a, 0x0a, 0x05, 0xc2,
0xaf, 0xb7, 0x90, 0x09, 0x76, 0x42, 0x9d, 0x08, 0x49, 0x6d, 0xb3, 0xef, 0xb1, 0xfb, 0x5f, 0x7a,
0x5d, 0x13, 0x97, 0x99, 0xad, 0x17, 0xfa, 0xf5, 0x64, 0x1d, 0x81, 0xce, 0x81, 0x11, 0x18, 0xe0,
0xae, 0x7e, 0x85, 0xd3, 0x34, 0x07, 0x63, 0x70, 0xcf, 0xbf, 0xc5, 0x84, 0xe2, 0x23, 0xc8, 0x95,
0x38, 0x3f, 0x51, 0xc6, 0xe5, 0xb6, 0xdf, 0x40, 0x1d, 0x9c, 0x24, 0x8d, 0x13, 0x90, 0xea, 0x4c,
0x1a, 0xb7, 0xef, 0x1d, 0x9c, 0x5b, 0xd9, 0xec, 0xf5, 0x66, 0xcb, 0xac, 0xab, 0x2d, 0xb3, 0x6e,
0xb6, 0x0c, 0x7d, 0x2b, 0x19, 0xfa, 0x51, 0x32, 0x74, 0x59, 0x32, 0xb4, 0x29, 0x19, 0xfa, 0x53,
0x32, 0xf4, 0xb7, 0x64, 0xd6, 0x4d, 0xc9, 0xd0, 0xc5, 0x8e, 0x59, 0x9b, 0x1d, 0xb3, 0xae, 0x76,
0xcc, 0xfa, 0xd4, 0x6d, 0x3c, 0x98, 0x3b, 0x66, 0xd1, 0xf3, 0x7f, 0x01, 0x00, 0x00, 0xff, 0xff,
0x38, 0x4d, 0x9b, 0xd7, 0xf8, 0x03, 0x00, 0x00,
}
func (this *Chunk) Equal(that interface{}) bool {
@ -361,6 +372,9 @@ func (this *Series) Equal(that interface{}) bool {
if this.EntryCt != that1.EntryCt {
return false
}
if !this.HighestTs.Equal(that1.HighestTs) {
return false
}
return true
}
func (this *Chunk) GoString() string {
@ -384,7 +398,7 @@ func (this *Series) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 11)
s := make([]string, 0, 12)
s = append(s, "&ingester.Series{")
s = append(s, "UserID: "+fmt.Sprintf("%#v", this.UserID)+",\n")
s = append(s, "Fingerprint: "+fmt.Sprintf("%#v", this.Fingerprint)+",\n")
@ -399,6 +413,7 @@ func (this *Series) GoString() string {
s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n")
s = append(s, "LastLine: "+fmt.Sprintf("%#v", this.LastLine)+",\n")
s = append(s, "EntryCt: "+fmt.Sprintf("%#v", this.EntryCt)+",\n")
s = append(s, "HighestTs: "+fmt.Sprintf("%#v", this.HighestTs)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -561,6 +576,14 @@ func (m *Series) MarshalTo(dAtA []byte) (int, error) {
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(m.EntryCt))
}
dAtA[i] = 0x42
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.HighestTs)))
n6, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.HighestTs, dAtA[i:])
if err != nil {
return 0, err
}
i += n6
return i, nil
}
@ -638,6 +661,8 @@ func (m *Series) Size() (n int) {
if m.EntryCt != 0 {
n += 1 + sovCheckpoint(uint64(m.EntryCt))
}
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.HighestTs)
n += 1 + l + sovCheckpoint(uint64(l))
return n
}
@ -683,6 +708,7 @@ func (this *Series) String() string {
`To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`LastLine:` + fmt.Sprintf("%v", this.LastLine) + `,`,
`EntryCt:` + fmt.Sprintf("%v", this.EntryCt) + `,`,
`HighestTs:` + strings.Replace(strings.Replace(this.HighestTs.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`}`,
}, "")
return s
@ -1220,6 +1246,39 @@ func (m *Series) Unmarshal(dAtA []byte) error {
break
}
}
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field HighestTs", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthCheckpoint
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthCheckpoint
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.HighestTs, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCheckpoint(dAtA[iNdEx:])

@ -30,11 +30,13 @@ message Series {
uint64 fingerprint = 2;
repeated cortexpb.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/cortexpb.LabelAdapter"];
repeated Chunk chunks = 4 [(gogoproto.nullable) = false];
// Last timestamp of the last chunk.
// most recently pushed timestamp.
google.protobuf.Timestamp to = 5 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
// most recently pushed line.
string lastLine = 6;
// highest counter value for pushes to this stream.
// Used to skip already applied entries during WAL replay.
int64 entryCt = 7;
// highest timestamp pushed to this stream.
google.protobuf.Timestamp highestTs = 8 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
}

@ -468,7 +468,7 @@ func Test_SeriesIterator(t *testing.T) {
for j := 0; j < 2; j++ {
iter.Next()
assert.Equal(t, fmt.Sprintf("%d", i), iter.Stream().UserID)
memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, 0, 0)
memchunk, err := chunkenc.MemchunkFromCheckpoint(iter.Stream().Chunks[0].Data, iter.Stream().Chunks[0].Head, chunkenc.UnorderedHeadBlockFmt, 0, 0)
require.NoError(t, err)
it, err := memchunk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, log.NewNoopPipeline().ForStream(nil))
require.NoError(t, err)
@ -561,7 +561,7 @@ func buildChunks(t testing.TB, size int) []Chunk {
for i := 0; i < size; i++ {
// build chunks of 256k blocks, 1.5MB target size. Same as default config.
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 1500*1024)
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 1500*1024)
fillChunk(t, c)
descs = append(descs, chunkDesc{
chunk: c,

@ -48,7 +48,9 @@ func TestIterator(t *testing.T) {
new func() chunkenc.Chunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
{"gzipChunk", func() chunkenc.Chunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
chunk := chk.new()

@ -230,53 +230,87 @@ func dummyConf() *Config {
}
func Test_EncodingChunks(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)
}
backAgain, err := fromWireChunks(conf, chunks)
require.Nil(t, err)
for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil
matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil
require.Equal(t, exp, enc)
require.Equal(t, matched, to)
for _, f := range chunkenc.HeadBlockFmts {
for _, close := range []bool{true, false} {
for _, tc := range []struct {
desc string
conf Config
}{
{
// mostly for historical parity
desc: "dummyConf",
conf: *dummyConf(),
},
{
desc: "default",
conf: defaultIngesterTestConfig(t),
},
} {
t.Run(fmt.Sprintf("%v-%v-%s", f, close, tc.desc), func(t *testing.T) {
conf := tc.conf
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
if close {
require.Nil(t, c.Close())
}
from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
chunks := make([]Chunk, 0, len(there))
for _, c := range there {
chunks = append(chunks, c.Chunk)
// Ensure closed head chunks are empty
if close {
require.Equal(t, 0, len(c.Head))
} else {
require.Greater(t, len(c.Head), 0)
}
}
backAgain, err := fromWireChunks(&conf, chunks)
require.Nil(t, err)
for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil
matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil
require.Equal(t, exp, enc)
require.Equal(t, matched, to)
}
})
}
}
}
}
func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize)
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize)
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",

@ -121,7 +121,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc {
for i := range res {
res[i] = &chunkDesc{
closed: true,
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, dummyConf().BlockSize, dummyConf().TargetChunkSize),
chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize),
}
fillChunk(t, res[i].chunk)
require.NoError(t, res[i].chunk.Close())
@ -289,6 +289,9 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.ID = "localhost"
cfg.LifecyclerConfig.FinalSleep = 0
cfg.LifecyclerConfig.MinReadyDuration = 0
cfg.BlockSize = 256 * 1024
cfg.TargetChunkSize = 1500 * 1024
cfg.UnorderedWrites = true
return cfg
}

@ -80,6 +80,8 @@ type Config struct {
WAL WALConfig `yaml:"wal,omitempty"`
ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
UnorderedWrites bool `yaml:"unordered_writes_enabled"`
}
// RegisterFlags registers the flags.
@ -102,6 +104,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.")
}
func (cfg *Config) Validate() error {

@ -126,6 +126,7 @@ func (r *ingesterRecoverer) Series(series *Series) error {
stream.lastLine.ts = series.To
stream.lastLine.content = series.LastLine
stream.entryCt = series.EntryCt
stream.highestTs = series.HighestTs
if err != nil {
return err

@ -72,8 +72,17 @@ type stream struct {
labels labels.Labels
labelsString string
lastLine line
metrics *ingesterMetrics
// most recently pushed line. This is used to prevent duplicate pushes.
// It also determines chunk synchronization when unordered writes are disabled.
lastLine line
// keeps track of the highest timestamp accepted by the stream.
// This is used when unordered writes are enabled to cap the validity window
// of accepted writes and for chunk synchronization.
highestTs time.Time
metrics *ingesterMetrics
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
@ -113,6 +122,7 @@ func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics
// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
if err != nil {
@ -145,7 +155,11 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er
}
func (s *stream) NewChunk() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, s.cfg.BlockSize, s.cfg.TargetChunkSize)
hbType := chunkenc.OrderedHeadBlockFmt
if s.cfg.UnorderedWrites {
hbType = chunkenc.UnorderedHeadBlockFmt
}
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, hbType, s.cfg.BlockSize, s.cfg.TargetChunkSize)
}
func (s *stream) Push(
@ -169,14 +183,11 @@ func (s *stream) Push(
var bytesAdded int
prevNumChunks := len(s.chunks)
var lastChunkTimestamp time.Time
if prevNumChunks == 0 {
s.chunks = append(s.chunks, chunkDesc{
chunk: s.NewChunk(),
})
chunksCreatedTotal.Inc()
} else {
_, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()
}
var storedEntries []logproto.Entry
@ -198,7 +209,7 @@ func (s *stream) Push(
}
chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
err := chunk.chunk.Close()
if err != nil {
@ -216,15 +227,20 @@ func (s *stream) Push(
chunk: s.NewChunk(),
})
chunk = &s.chunks[len(s.chunks)-1]
lastChunkTimestamp = time.Time{}
}
if err := chunk.chunk.Append(&entries[i]); err != nil {
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
if s.cfg.UnorderedWrites && !s.highestTs.IsZero() && s.highestTs.Add(-s.cfg.MaxChunkAge/2).After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrOutOfOrder})
} else if err := chunk.chunk.Append(&entries[i]); err != nil {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
} else {
storedEntries = append(storedEntries, entries[i])
lastChunkTimestamp = entries[i].Timestamp
s.lastLine.ts = lastChunkTimestamp
s.lastLine.ts = entries[i].Timestamp
s.lastLine.content = entries[i].Line
if s.highestTs.Before(entries[i].Timestamp) {
s.highestTs = entries[i].Timestamp
}
s.entryCt++
// length of string plus
@ -307,15 +323,17 @@ func (s *stream) Push(
// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters
// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries.
func (s *stream) cutChunkForSynchronization(entryTimestamp, prevEntryTimestamp time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool {
if synchronizePeriod <= 0 || prevEntryTimestamp.IsZero() {
func (s *stream) cutChunkForSynchronization(entryTimestamp, latestTs time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool {
// Never sync when it's not enabled, it's the first push, or if a write isn't the latest ts
// to prevent syncing many unordered writes.
if synchronizePeriod <= 0 || latestTs.IsZero() || latestTs.After(entryTimestamp) {
return false
}
// we use fingerprint as a jitter here, basically offsetting stream synchronization points to different
// this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad.
cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
pts := (uint64(prevEntryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
pts := (uint64(latestTs.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
// if current entry timestamp has rolled over synchronization period
if cts < pts {

@ -15,6 +15,7 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
)
@ -137,7 +138,9 @@ func TestStreamIterator(t *testing.T) {
name string
new func() *chunkenc.MemChunk
}{
{"gzipChunk", func() *chunkenc.MemChunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
{"gzipChunk", func() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
}},
} {
t.Run(chk.name, func(t *testing.T) {
var s stream
@ -177,6 +180,79 @@ func TestStreamIterator(t *testing.T) {
}
}
func TestUnorderedPush(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.MaxChunkAge = 10 * time.Second
s := newStream(
&cfg,
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
NilMetrics,
)
for _, x := range []struct {
entries []logproto.Entry
err bool
written int
}{
{
entries: []logproto.Entry{
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(1, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"}, // duplicate ts/line is ignored
{Timestamp: time.Unix(10, 0), Line: "x"},
},
written: 4, // 1 ignored
},
// highest ts is now 10, validity bound is (10-10/2) = 5
{
entries: []logproto.Entry{
{Timestamp: time.Unix(4, 0), Line: "x"}, // ordering err, too far
{Timestamp: time.Unix(8, 0), Line: "x"},
{Timestamp: time.Unix(9, 0), Line: "x"},
},
err: true,
written: 2, // 1 ignored
},
} {
written, err := s.Push(context.Background(), x.entries, recordPool.GetRecord(), 0)
if x.err {
require.NotNil(t, err)
} else {
require.Nil(t, err)
}
require.Equal(t, x.written, written)
}
exp := []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "x"},
{Timestamp: time.Unix(2, 0), Line: "x"},
// duplicate was allowed here b/c it wasnt written sequentially
{Timestamp: time.Unix(2, 0), Line: "x"},
{Timestamp: time.Unix(8, 0), Line: "x"},
{Timestamp: time.Unix(9, 0), Line: "x"},
{Timestamp: time.Unix(10, 0), Line: "x"},
}
itr, err := s.Iterator(context.Background(), nil, time.Unix(int64(0), 0), time.Unix(11, 0), logproto.FORWARD, log.NewNoopPipeline().ForStream(s.labels))
require.Nil(t, err)
iterEq(t, exp, itr)
}
func iterEq(t *testing.T, exp []logproto.Entry, got iter.EntryIterator) {
var i int
for got.Next() {
require.Equal(t, exp[i].Timestamp, got.Entry().Timestamp, "failed on the (%d) ts", i)
require.Equal(t, exp[i].Line, got.Entry().Line)
i++
}
require.Equal(t, i, len(exp))
}
func Benchmark_PushStream(b *testing.B) {
ls := labels.Labels{
labels.Label{Name: "namespace", Value: "loki-dev"},

@ -30,6 +30,7 @@ func TestTransferOut(t *testing.T) {
f := newTestIngesterFactory(t)
ing := f.getIngester(time.Duration(0), t)
ing.cfg.UnorderedWrites = false // enforce ordered writes on old testware (transfers are deprecated).
// Push some data into our original ingester
ctx := user.InjectOrgID(context.Background(), "test")

@ -107,7 +107,7 @@ func fillStore() error {
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, 262144, 1572864)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
@ -130,7 +130,7 @@ func fillStore() error {
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, 262144, 1572864)
chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864)
}
}
}(i)

@ -222,7 +222,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.Fingerprint(lbs)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, blockSize, targetSize)
chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, blockSize, targetSize)
for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) {
require.NoError(t, chunkEnc.Append(&logproto.Entry{

@ -99,9 +99,8 @@ func newChunk(stream logproto.Stream) chunk.Chunk {
builder.Set(labels.MetricName, "logs")
lbs = builder.Labels()
}
from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp)
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0)
chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0)
for _, e := range stream.Entries {
_ = chk.Append(&e)
}

Loading…
Cancel
Save