Headblock interop (#3995)

* merge feature/unordered-replay

* interoperable head chunks
pull/4016/head
Owen Diehl 4 years ago committed by GitHub
parent 2107148acd
commit a4db1a4359
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 137
      pkg/chunkenc/memchunk.go
  2. 18
      pkg/chunkenc/memchunk_test.go
  3. 134
      pkg/chunkenc/unordered.go
  4. 54
      pkg/chunkenc/unordered_test.go

@ -44,6 +44,20 @@ const (
defaultBlockSize = 256 * 1024
)
type HeadBlockFmt byte
func (f HeadBlockFmt) Byte() byte { return byte(f) }
const (
_ HeadBlockFmt = iota
// placeholders to start splitting chunk formats vs head block
// fmts at v3
_
_
OrderedHeadBlockFmt
UnorderedHeadBlockFmt
)
var magicNumber = uint32(0x12EE56A)
// The table gets initialized with sync.Once but may still cause a race
@ -74,7 +88,7 @@ type MemChunk struct {
cutBlockSize int
// Current in-mem block being appended to.
head *headBlock
head HeadBlock
// the chunk format default to v2
format byte
@ -102,11 +116,17 @@ type headBlock struct {
mint, maxt int64
}
func (hb *headBlock) isEmpty() bool {
func (hb *headBlock) Format() HeadBlockFmt { return OrderedHeadBlockFmt }
func (hb *headBlock) IsEmpty() bool {
return len(hb.entries) == 0
}
func (hb *headBlock) clear() {
func (hb *headBlock) Entries() int { return len(hb.entries) }
func (hb *headBlock) UncompressedSize() int { return hb.size }
func (hb *headBlock) Reset() {
if hb.entries != nil {
hb.entries = hb.entries[:0]
}
@ -115,8 +135,10 @@ func (hb *headBlock) clear() {
hb.maxt = 0
}
func (hb *headBlock) append(ts int64, line string) error {
if !hb.isEmpty() && hb.maxt > ts {
func (hb *headBlock) Bounds() (int64, int64) { return hb.mint, hb.maxt }
func (hb *headBlock) Append(ts int64, line string) error {
if !hb.IsEmpty() && hb.maxt > ts {
return ErrOutOfOrder
}
@ -130,7 +152,7 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}
func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
func (hb *headBlock) Serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
@ -164,14 +186,14 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
func (hb *headBlock) CheckpointBytes(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
err := hb.CheckpointTo(buf)
return buf.Bytes(), err
}
// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *headBlock) CheckpointSize(version byte) int {
func (hb *headBlock) CheckpointSize() int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
@ -184,13 +206,13 @@ func (hb *headBlock) CheckpointSize(version byte) int {
}
// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
func (hb *headBlock) CheckpointTo(w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
eb.putByte(version)
eb.putByte(byte(hb.Format()))
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
@ -225,7 +247,7 @@ func (hb *headBlock) CheckpointTo(version byte, w io.Writer) error {
return nil
}
func (hb *headBlock) FromCheckpoint(b []byte) error {
func (hb *headBlock) LoadBytes(b []byte) error {
if len(b) < 1 {
return nil
}
@ -267,6 +289,20 @@ func (hb *headBlock) FromCheckpoint(b []byte) error {
return nil
}
func (hb *headBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version < UnorderedHeadBlockFmt {
return hb, nil
}
out := newUnorderedHeadBlock()
for _, e := range hb.entries {
if err := out.Append(e.t, e.s); err != nil {
return nil, err
}
}
return out, nil
}
type entry struct {
t int64
s string
@ -511,11 +547,11 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
return err
}
if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}
err = c.head.CheckpointTo(c.format, head)
err = c.head.CheckpointTo(head)
if err != nil {
return err
}
@ -524,7 +560,7 @@ func (c *MemChunk) SerializeForCheckpointTo(chk, head io.Writer) error {
}
func (c *MemChunk) CheckpointSize() (chunk, head int) {
return c.BytesSize(), c.head.CheckpointSize(c.format)
return c.BytesSize(), c.head.CheckpointSize()
}
func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
@ -532,7 +568,7 @@ func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*M
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
return mc, mc.head.LoadBytes(head)
}
// Encoding implements Chunk.
@ -547,9 +583,7 @@ func (c *MemChunk) Size() int {
ne += blk.numEntries
}
if !c.head.isEmpty() {
ne += len(c.head.entries)
}
ne += c.head.Entries()
return ne
}
@ -564,7 +598,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
if c.targetSize > 0 {
// This is looking to see if the uncompressed lines will fit which is not
// a great check, but it will guarantee we are always under the target size
newHBSize := c.head.size + len(e.Line)
newHBSize := c.head.UncompressedSize() + len(e.Line)
return (c.cutBlockSize + newHBSize) < c.targetSize
}
// if targetSize is not defined, default to the original behavior of fixed blocks per chunk
@ -575,9 +609,7 @@ func (c *MemChunk) SpaceFor(e *logproto.Entry) bool {
func (c *MemChunk) UncompressedSize() int {
size := 0
if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()
for _, b := range c.blocks {
size += b.uncompressedSize
@ -590,9 +622,7 @@ func (c *MemChunk) UncompressedSize() int {
func (c *MemChunk) CompressedSize() int {
size := 0
// Better to account for any uncompressed data than ignore it even though this isn't accurate.
if !c.head.isEmpty() {
size += c.head.size
}
size += c.head.UncompressedSize()
size += c.cutBlockSize
return size
}
@ -612,15 +642,15 @@ func (c *MemChunk) Append(entry *logproto.Entry) error {
// If the head block is empty but there are cut blocks, we have to make
// sure the new entry is not out of order compared to the previous block
if c.head.isEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
if c.head.IsEmpty() && len(c.blocks) > 0 && c.blocks[len(c.blocks)-1].maxt > entryTimestamp {
return ErrOutOfOrder
}
if err := c.head.append(entryTimestamp, entry.Line); err != nil {
if err := c.head.Append(entryTimestamp, entry.Line); err != nil {
return err
}
if c.head.size >= c.blockSize {
if c.head.UncompressedSize() >= c.blockSize {
return c.cut()
}
@ -635,44 +665,41 @@ func (c *MemChunk) Close() error {
// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.isEmpty() {
if c.head.IsEmpty() {
return nil
}
b, err := c.head.serialise(getWriterPool(c.encoding))
b, err := c.head.Serialise(getWriterPool(c.encoding))
if err != nil {
return err
}
mint, maxt := c.head.Bounds()
c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
uncompressedSize: c.head.size,
numEntries: c.head.Entries(),
mint: mint,
maxt: maxt,
uncompressedSize: c.head.UncompressedSize(),
})
c.cutBlockSize += len(b)
c.head.clear()
c.head.Reset()
return nil
}
// Bounds implements Chunk.
func (c *MemChunk) Bounds() (fromT, toT time.Time) {
var from, to int64
if len(c.blocks) > 0 {
from = c.blocks[0].mint
to = c.blocks[len(c.blocks)-1].maxt
}
from, to := c.head.Bounds()
if !c.head.isEmpty() {
if from == 0 || from > c.head.mint {
from = c.head.mint
// need to check all the blocks in case they overlap
for _, b := range c.blocks {
if from == 0 || from > b.mint {
from = b.mint
}
if to < c.head.maxt {
to = c.head.maxt
if to < b.maxt {
to = b.maxt
}
}
@ -692,8 +719,8 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
blockItrs = append(blockItrs, encBlock{c.encoding, b}.Iterator(ctx, pipeline))
}
if !c.head.isEmpty() {
headIterator = c.head.iterator(ctx, direction, mint, maxt, pipeline)
if !c.head.IsEmpty() {
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
}
if direction == logproto.FORWARD {
@ -743,8 +770,8 @@ func (c *MemChunk) SampleIterator(ctx context.Context, from, through time.Time,
its = append(its, encBlock{c.encoding, b}.SampleIterator(ctx, extractor))
}
if !c.head.isEmpty() {
its = append(its, c.head.sampleIterator(ctx, mint, maxt, extractor))
if !c.head.IsEmpty() {
its = append(its, c.head.SampleIterator(ctx, mint, maxt, extractor))
}
return iter.NewTimeRangedSampleIterator(
@ -837,8 +864,8 @@ func (b block) MaxTime() int64 {
return b.maxt
}
func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
@ -895,8 +922,8 @@ func (hb *headBlock) iterator(ctx context.Context, direction logproto.Direction,
return iter.NewStreamsIterator(ctx, streamsResult, direction)
}
func (hb *headBlock) sampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
func (hb *headBlock) SampleIterator(ctx context.Context, mint, maxt int64, extractor log.StreamSampleExtractor) iter.SampleIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
chunkStats := stats.GetChunkData(ctx)

@ -283,9 +283,9 @@ func TestRoundtripV3(t *testing.T) {
require.Nil(t, err)
// have to populate then clear the head block or fail comparing against nil vs zero values
err = r.head.append(1, "1")
err = r.head.Append(1, "1")
require.Nil(t, err)
r.head.clear()
r.head.Reset()
require.Equal(t, c, r)
})
@ -418,7 +418,7 @@ func TestGZIPChunkTargetSize(t *testing.T) {
require.NoError(t, chk.Close())
require.Equal(t, 0, chk.head.size)
require.Equal(t, 0, chk.head.UncompressedSize())
// Even though the seed is static above and results should be deterministic,
// we will allow +/- 10% variance
@ -735,7 +735,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
h := headBlock{}
for i := 0; i < j; i++ {
if err := h.append(int64(i), "this is the append string"); err != nil {
if err := h.Append(int64(i), "this is the append string"); err != nil {
b.Fatal(err)
}
}
@ -743,7 +743,7 @@ func BenchmarkHeadBlockIterator(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
iter := h.iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline)
iter := h.Iterator(context.Background(), logproto.BACKWARD, 0, math.MaxInt64, noopStreamPipeline)
for iter.Next() {
_ = iter.Entry()
@ -759,7 +759,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
h := headBlock{}
for i := 0; i < j; i++ {
if err := h.append(int64(i), "this is the append string"); err != nil {
if err := h.Append(int64(i), "this is the append string"); err != nil {
b.Fatal(err)
}
}
@ -767,7 +767,7 @@ func BenchmarkHeadBlockSampleIterator(b *testing.B) {
b.ResetTimer()
for n := 0; n < b.N; n++ {
iter := h.sampleIterator(context.Background(), 0, math.MaxInt64, countExtractor)
iter := h.SampleIterator(context.Background(), 0, math.MaxInt64, countExtractor)
for iter.Next() {
_ = iter.Sample()
@ -896,11 +896,11 @@ func TestHeadBlockCheckpointing(t *testing.T) {
// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))
b, err := c.head.CheckpointBytes(c.format, nil)
b, err := c.head.CheckpointBytes(nil)
require.Nil(t, err)
hb := &headBlock{}
require.Nil(t, hb.FromCheckpoint(b))
require.Nil(t, hb.LoadBytes(b))
require.Equal(t, c.head, hb)
}

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"time"
@ -21,6 +22,35 @@ import (
var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
type HeadBlock interface {
IsEmpty() bool
CheckpointTo(w io.Writer) error
CheckpointBytes(b []byte) ([]byte, error)
CheckpointSize() int
LoadBytes(b []byte) error
Serialise(pool WriterPool) ([]byte, error)
Reset()
Bounds() (mint, maxt int64)
Entries() int
UncompressedSize() int
Convert(HeadBlockFmt) (HeadBlock, error)
Append(int64, string) error
Iterator(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator
SampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator
Format() HeadBlockFmt
}
type unorderedHeadBlock struct {
// Opted for range tree over skiplist for space reduction.
// Inserts: O(log(n))
@ -38,10 +68,29 @@ func newUnorderedHeadBlock() *unorderedHeadBlock {
}
}
func (hb *unorderedHeadBlock) isEmpty() bool {
func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return UnorderedHeadBlockFmt }
func (hb *unorderedHeadBlock) IsEmpty() bool {
return hb.size == 0
}
func (hb *unorderedHeadBlock) Bounds() (int64, int64) {
return hb.mint, hb.maxt
}
func (hb *unorderedHeadBlock) Entries() int {
return hb.lines
}
func (hb *unorderedHeadBlock) UncompressedSize() int {
return hb.size
}
func (hb *unorderedHeadBlock) Reset() {
x := newUnorderedHeadBlock()
*hb = *x
}
// collection of entries belonging to the same nanosecond
type nsEntries struct {
ts int64
@ -52,7 +101,7 @@ func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
return e.ts
}
func (hb *unorderedHeadBlock) append(ts int64, line string) {
func (hb *unorderedHeadBlock) Append(ts int64, line string) error {
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
// and instead will displace any existing entry at the specified timestamp.
@ -83,6 +132,8 @@ func (hb *unorderedHeadBlock) append(ts int64, line string) {
hb.size += len(line)
hb.lines++
return nil
}
// Implements rangetree.Interval
@ -104,7 +155,7 @@ func (hb *unorderedHeadBlock) forEntries(
maxt int64,
entryFn func(int64, string) error, // returning an error exits early
) (err error) {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return
}
@ -157,7 +208,7 @@ func (hb *unorderedHeadBlock) forEntries(
return nil
}
func (hb *unorderedHeadBlock) iterator(
func (hb *unorderedHeadBlock) Iterator(
ctx context.Context,
direction logproto.Direction,
mint,
@ -210,7 +261,7 @@ func (hb *unorderedHeadBlock) iterator(
}
// nolint:unused
func (hb *unorderedHeadBlock) sampleIterator(
func (hb *unorderedHeadBlock) SampleIterator(
ctx context.Context,
mint,
maxt int64,
@ -267,7 +318,7 @@ func (hb *unorderedHeadBlock) sampleIterator(
// nolint:unused
// serialise is used in creating an ordered, compressed block from an unorderedHeadBlock
func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) {
func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
@ -306,8 +357,26 @@ func (hb *unorderedHeadBlock) serialise(pool WriterPool) ([]byte, error) {
return outBuf.Bytes(), nil
}
func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt) (HeadBlock, error) {
if version > OrderedHeadBlockFmt {
return hb, nil
}
out := &headBlock{}
err := hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(ts int64, line string) error {
return out.Append(ts, line)
},
)
return out, err
}
// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *unorderedHeadBlock) CheckpointSize(version byte) int {
func (hb *unorderedHeadBlock) CheckpointSize() int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
@ -319,20 +388,20 @@ func (hb *unorderedHeadBlock) CheckpointSize(version byte) int {
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *unorderedHeadBlock) CheckpointBytes(version byte, b []byte) ([]byte, error) {
func (hb *unorderedHeadBlock) CheckpointBytes(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(version, buf)
err := hb.CheckpointTo(buf)
return buf.Bytes(), err
}
// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error {
func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
eb.putByte(version)
eb.putByte(byte(hb.Format()))
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
@ -372,7 +441,7 @@ func (hb *unorderedHeadBlock) CheckpointTo(version byte, w io.Writer) error {
return nil
}
func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error {
func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
// ensure it's empty
*hb = *newUnorderedHeadBlock()
@ -387,9 +456,9 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error {
return errors.Wrap(db.err(), "verifying headblock header")
}
switch version {
case chunkFormatV1, chunkFormatV2, chunkFormatV3:
case UnorderedHeadBlockFmt.Byte():
default:
return errors.Errorf("incompatible headBlock version (%v), only V1,V2,V3 is currently supported", version)
return errors.Errorf("incompatible headBlock version (%v), only V4 is currently supported", version)
}
n := db.uvarint()
@ -402,7 +471,9 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error {
ts := db.varint64()
lineLn := db.uvarint()
line := string(db.bytes(lineLn))
hb.append(ts, line)
if err := hb.Append(ts, line); err != nil {
return err
}
}
if err := db.err(); err != nil {
@ -411,3 +482,36 @@ func (hb *unorderedHeadBlock) FromCheckpoint(b []byte) error {
return nil
}
// HeadFromCheckpoint handles reading any head block format and returning the desired form.
// This is particularly helpful replaying WALs from different configurations
// such as after enabling unordered writes.
func HeadFromCheckpoint(b []byte, desired HeadBlockFmt) (HeadBlock, error) {
db := decbuf{b: b}
version := db.byte()
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying headblock header")
}
format := HeadBlockFmt(version)
var decodedBlock HeadBlock
switch {
case format <= OrderedHeadBlockFmt:
decodedBlock = &headBlock{}
case format == UnorderedHeadBlockFmt:
decodedBlock = newUnorderedHeadBlock()
default:
return nil, fmt.Errorf("unexpected head block version: %v", version)
}
if err := decodedBlock.LoadBytes(b); err != nil {
return nil, err
}
if decodedBlock.Format() != desired {
return decodedBlock.Convert(desired)
}
return decodedBlock, nil
}

@ -30,7 +30,7 @@ func iterEq(t *testing.T, exp []entry, got iter.EntryIterator) {
func Test_forEntriesEarlyReturn(t *testing.T) {
hb := newUnorderedHeadBlock()
for i := 0; i < 10; i++ {
hb.append(int64(i), fmt.Sprint(i))
require.Nil(t, hb.Append(int64(i), fmt.Sprint(i)))
}
// forward
@ -143,10 +143,10 @@ func Test_Unordered_InsertRetrieval(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
hb := newUnorderedHeadBlock()
for _, e := range tc.input {
hb.append(e.t, e.s)
require.Nil(t, hb.Append(e.t, e.s))
}
itr := hb.iterator(
itr := hb.Iterator(
context.Background(),
tc.dir,
0,
@ -205,10 +205,10 @@ func Test_UnorderedBoundedIter(t *testing.T) {
t.Run(tc.desc, func(t *testing.T) {
hb := newUnorderedHeadBlock()
for _, e := range tc.input {
hb.append(e.t, e.s)
require.Nil(t, hb.Append(e.t, e.s))
}
itr := hb.iterator(
itr := hb.Iterator(
context.Background(),
tc.dir,
tc.mint,
@ -221,26 +221,44 @@ func Test_UnorderedBoundedIter(t *testing.T) {
}
}
func Test_UnorderedHeadBlockCheckpointRoundtrip(t *testing.T) {
hb := newUnorderedHeadBlock()
func TestHeadBlockInterop(t *testing.T) {
unordered, ordered := newUnorderedHeadBlock(), &headBlock{}
for i := 0; i < 100; i++ {
hb.append(int64(i), fmt.Sprint(i))
require.Nil(t, unordered.Append(int64(99-i), fmt.Sprint(99-i)))
require.Nil(t, ordered.Append(int64(i), fmt.Sprint(i)))
}
// turn to bytes
b, err := hb.CheckpointBytes(DefaultChunkFormat, nil)
b1, err := ordered.CheckpointBytes(nil)
require.Nil(t, err)
b2, err := unordered.CheckpointBytes(nil)
require.Nil(t, err)
// restore a copy from bytes
cpy := newUnorderedHeadBlock()
require.Nil(t, cpy.FromCheckpoint(b))
// Ensure we can recover ordered checkpoint into ordered headblock
recovered, err := HeadFromCheckpoint(b1, OrderedHeadBlockFmt)
require.Nil(t, err)
require.Equal(t, ordered, recovered)
// ensure copy's bytes match original
cpyBytes, err := cpy.CheckpointBytes(DefaultChunkFormat, nil)
// Ensure we can recover ordered checkpoint into unordered headblock
recovered, err = HeadFromCheckpoint(b1, UnorderedHeadBlockFmt)
require.Nil(t, err)
require.Equal(t, b, cpyBytes)
require.Equal(t, unordered, recovered)
// Ensure we can recover unordered checkpoint into ordered headblock
recovered, err = HeadFromCheckpoint(b2, OrderedHeadBlockFmt)
require.Nil(t, err)
require.Equal(t, ordered, recovered)
// Ensure we can recover unordered checkpoint into unordered headblock
recovered, err = HeadFromCheckpoint(b2, UnorderedHeadBlockFmt)
require.Nil(t, err)
require.Equal(t, unordered, recovered)
}
// ensure backwards compatibility from when chunk format
// and head block format was split
func TestChunkBlockFmt(t *testing.T) {
require.Equal(t, chunkFormatV3, byte(OrderedHeadBlockFmt))
}
func BenchmarkHeadBlockWrites(b *testing.B) {
@ -254,14 +272,14 @@ func BenchmarkHeadBlockWrites(b *testing.B) {
headBlockFn := func() func(int64, string) {
hb := &headBlock{}
return func(ts int64, line string) {
_ = hb.append(ts, line)
_ = hb.Append(ts, line)
}
}
unorderedHeadBlockFn := func() func(int64, string) {
hb := newUnorderedHeadBlock()
return func(ts int64, line string) {
hb.append(ts, line)
_ = hb.Append(ts, line)
}
}

Loading…
Cancel
Save