Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/chunkenc/unordered.go

630 lines
17 KiB

package chunkenc
import (
"bytes"
"context"
"encoding/binary"
"fmt"
"io"
"math"
"time"
"github.com/Workiva/go-datastructures/rangetree"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
)
var noopStreamPipeline = log.NewNoopPipeline().ForStream(labels.Labels{})
type HeadBlock interface {
IsEmpty() bool
CheckpointTo(w io.Writer) error
CheckpointBytes(b []byte) ([]byte, error)
CheckpointSize() int
LoadBytes(b []byte) error
Serialise(pool WriterPool) ([]byte, error)
Reset()
Bounds() (mint, maxt int64)
Entries() int
UncompressedSize() int
Convert(HeadBlockFmt, *symbolizer) (HeadBlock, error)
Append(int64, string, labels.Labels) (bool, error)
Iterator(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator
SampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator
Format() HeadBlockFmt
}
type unorderedHeadBlock struct {
format HeadBlockFmt
// Opted for range tree over skiplist for space reduction.
// Inserts: O(log(n))
// Scans: (O(k+log(n))) where k=num_scanned_entries & n=total_entries
rt rangetree.RangeTree
symbolizer *symbolizer
lines int // number of entries
size int // size of uncompressed bytes.
mint, maxt int64 // upper and lower bounds
}
func newUnorderedHeadBlock(headBlockFmt HeadBlockFmt, symbolizer *symbolizer) *unorderedHeadBlock {
return &unorderedHeadBlock{
format: headBlockFmt,
symbolizer: symbolizer,
rt: rangetree.New(1),
}
}
func (hb *unorderedHeadBlock) Format() HeadBlockFmt { return hb.format }
func (hb *unorderedHeadBlock) IsEmpty() bool {
return hb.size == 0
}
func (hb *unorderedHeadBlock) Bounds() (int64, int64) {
return hb.mint, hb.maxt
}
func (hb *unorderedHeadBlock) Entries() int {
return hb.lines
}
func (hb *unorderedHeadBlock) UncompressedSize() int {
return hb.size
}
func (hb *unorderedHeadBlock) Reset() {
x := newUnorderedHeadBlock(hb.format, hb.symbolizer)
*hb = *x
}
type nsEntry struct {
line string
structuredMetadataSymbols symbols
}
// collection of entries belonging to the same nanosecond
type nsEntries struct {
ts int64
entries []nsEntry
}
func (e *nsEntries) ValueAtDimension(_ uint64) int64 {
return e.ts
}
// unorderedHeadBlock will return true if the entry is a duplicate, false otherwise
func (hb *unorderedHeadBlock) Append(ts int64, line string, structuredMetadata labels.Labels) (bool, error) {
if hb.format < UnorderedWithStructuredMetadataHeadBlockFmt {
// structuredMetadata must be ignored for the previous head block formats
structuredMetadata = nil
}
// This is an allocation hack. The rangetree lib does not
// support the ability to pass a "mutate" function during an insert
// and instead will displace any existing entry at the specified timestamp.
// Since Loki supports multiple lines per timestamp,
// we insert an entry without any log lines,
// which is ordered by timestamp alone.
// Then, we detect if we've displaced any existing entries, and
// append the new one to the existing, preallocated slice.
// If not, we create a slice with one entry.
e := &nsEntries{
ts: ts,
}
displaced := hb.rt.Add(e)
if displaced[0] != nil {
// While we support multiple entries at the same timestamp, we _do_ de-duplicate
// entries at the same time with the same content, iterate through any existing
// entries and ignore the line if we already have an entry with the same content
for _, et := range displaced[0].(*nsEntries).entries {
if et.line == line {
e.entries = displaced[0].(*nsEntries).entries
return true, nil
}
}
e.entries = append(displaced[0].(*nsEntries).entries, nsEntry{line, hb.symbolizer.Add(structuredMetadata)})
} else {
e.entries = []nsEntry{{line, hb.symbolizer.Add(structuredMetadata)}}
}
// Update hb metdata
if hb.size == 0 || hb.mint > ts {
hb.mint = ts
}
if hb.maxt < ts {
hb.maxt = ts
}
hb.size += len(line)
hb.size += len(structuredMetadata) * 2 * 4 // 4 bytes per label and value pair as structuredMetadataSymbols
hb.lines++
return false, nil
}
func metaLabelsLen(metaLabels labels.Labels) int {
length := 0
for _, label := range metaLabels {
length += len(label.Name) + len(label.Value)
}
return length
}
// Implements rangetree.Interval
type interval struct {
mint, maxt int64
}
func (i interval) LowAtDimension(_ uint64) int64 { return i.mint }
// rangetree library treats this as inclusive, but we want exclusivity,
// or [from, through) in nanoseconds
func (i interval) HighAtDimension(_ uint64) int64 { return i.maxt - 1 }
// helper for base logic across {Entry,Sample}Iterator
func (hb *unorderedHeadBlock) forEntries(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
entryFn func(*stats.Context, int64, string, symbols) error, // returning an error exits early
) (err error) {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return
}
entries := hb.rt.Query(interval{
mint: mint,
maxt: maxt,
})
chunkStats := stats.FromContext(ctx)
process := func(es *nsEntries) {
chunkStats.AddHeadChunkLines(int64(len(es.entries)))
// preserve write ordering of entries with the same ts
var i int
if direction == logproto.BACKWARD {
i = len(es.entries) - 1
}
next := func() {
if direction == logproto.FORWARD {
i++
} else {
i--
}
}
for ; i < len(es.entries) && i >= 0; next() {
line := es.entries[i].line
structuredMetadataSymbols := es.entries[i].structuredMetadataSymbols
structuredMetadataBytes := int64(2 * len(structuredMetadataSymbols) * 4) // 2 * num_symbols * 4 bytes(uint32)
chunkStats.AddHeadChunkStructuredMetadataBytes(structuredMetadataBytes)
chunkStats.AddHeadChunkBytes(int64(len(line)) + structuredMetadataBytes)
err = entryFn(chunkStats, es.ts, line, structuredMetadataSymbols)
}
}
if direction == logproto.FORWARD {
for _, e := range entries {
process(e.(*nsEntries))
if err != nil {
return err
}
}
} else {
for i := len(entries) - 1; i >= 0; i-- {
process(entries[i].(*nsEntries))
if err != nil {
return err
}
}
}
return nil
}
Flag categorize labels on streams response (#10419) We recently introduced support for ingesting and querying structured metadata in Loki. This adds a new dimension to Loki's labels since now we arguably have three categories of labels: _stream_, _structured metadata_, and _parsed_ labels. Depending on the origin of the labels, they should be used in LogQL expressions differently to achieve optimal performance. _stream_ labels should be added to stream matchers, _structured metadata_ labels should be used in a filter expression before any parsing expression, and _parsed_ labels should be placed after the parser expression extracting them. The Grafana UI has a hard time dealing with this same problem. Before https://github.com/grafana/grafana/pull/73955, the filtering functionality in Grafana was broken since it was not able to distinguish between _stream_ and _structured metadata_ labels. Also, as soon as a parser expression was added to the query, filters added by Grafana would be appended to the end of the query regardless of the label category. The PR above implements a workaround for this problem but needs a better API on Loki's end to mitigate all corner cases. Loki currently returns the following JSON for log queries: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "info", "traceID": "68810cf0c94bfcca" }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n" ], ... }, { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "debug", "traceID": "a7116cj54c4bjz8s" }, "values": [ [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n" ], ... }, ... ``` As can be seen, there is no way we can distinguish the category of each label. This PR introduces a new flag `X-Loki-Response-Encoding-Flags: categorize-labels` that makes Loki return categorized labels as follows: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n", { "structuredMetadata": { "traceID": "68810cf0c94bfcca" }, "parsed": { "level": "info" } } ], [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n", { "structuredMetadata": { "traceID": "a7116cj54c4bjz8s" }, "parsed": { "level": "debug" } } ], ... }, ... ``` Note that this PR only supports log queries, not metric queries. From a UX perspective, being able to categorize labels in metric queries doesn't have any benefit yet. Having said that, supporting this for metric queries would require some minor refactoring on top of what has been implemented here. If we decide to do that, I think we should do it on a separate PR to avoid making this PR even larger. I also decided to leave out support for Tail queries to avoid making this PR even larger. Once this one gets merged, we can work to support tailing. --- **Note to reviewers** This PR is huge since we need to forward categorized all over the codebase (from parsing logs all the way to marshaling), fortunately, many of the changes come from updating tests and refactoring iterators. Tested out in a dev cell with query `'{stream="stdout"} | label_format new="text"`. - Without the new flag: ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED { "data": { "result": [ { "stream": { "new": "text", "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n" ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n" ], ... ``` - With the new flag ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED X-Loki-Response-Encoding-Flags:categorize-labels { "data": { "encodingFlags": [ "categorize-labels" ], "result": [ { "stream": { "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n", { "parsed": { "new": "text" } } ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n", { "parsed": { "new": "text" } } ], ... ```
2 years ago
func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
streams := map[string]*logproto.Stream{}
baseHash := pipeline.BaseLabels().Hash()
_ = hb.forEntries(
ctx,
direction,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
newLine, parsedLbs, matches := pipeline.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...)
if !matches {
return nil
}
statsCtx.AddPostFilterLines(1)
var stream *logproto.Stream
labels := parsedLbs.String()
var ok bool
if stream, ok = streams[labels]; !ok {
stream = &logproto.Stream{
Labels: labels,
Hash: baseHash,
}
streams[labels] = stream
}
Flag categorize labels on streams response (#10419) We recently introduced support for ingesting and querying structured metadata in Loki. This adds a new dimension to Loki's labels since now we arguably have three categories of labels: _stream_, _structured metadata_, and _parsed_ labels. Depending on the origin of the labels, they should be used in LogQL expressions differently to achieve optimal performance. _stream_ labels should be added to stream matchers, _structured metadata_ labels should be used in a filter expression before any parsing expression, and _parsed_ labels should be placed after the parser expression extracting them. The Grafana UI has a hard time dealing with this same problem. Before https://github.com/grafana/grafana/pull/73955, the filtering functionality in Grafana was broken since it was not able to distinguish between _stream_ and _structured metadata_ labels. Also, as soon as a parser expression was added to the query, filters added by Grafana would be appended to the end of the query regardless of the label category. The PR above implements a workaround for this problem but needs a better API on Loki's end to mitigate all corner cases. Loki currently returns the following JSON for log queries: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "info", "traceID": "68810cf0c94bfcca" }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n" ], ... }, { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", "level": "debug", "traceID": "a7116cj54c4bjz8s" }, "values": [ [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n" ], ... }, ... ``` As can be seen, there is no way we can distinguish the category of each label. This PR introduces a new flag `X-Loki-Response-Encoding-Flags: categorize-labels` that makes Loki return categorized labels as follows: ```json ... { "stream": { "cluster": "us-central", "container": "query-frontend", "namespace": "loki", }, "values": [ [ "1693996529000222496", "1693996529000222496 aaaaaaaaa.....\n", { "structuredMetadata": { "traceID": "68810cf0c94bfcca" }, "parsed": { "level": "info" } } ], [ "1693996529000222497", "1693996529000222497 bbbbbbbbb.....\n", { "structuredMetadata": { "traceID": "a7116cj54c4bjz8s" }, "parsed": { "level": "debug" } } ], ... }, ... ``` Note that this PR only supports log queries, not metric queries. From a UX perspective, being able to categorize labels in metric queries doesn't have any benefit yet. Having said that, supporting this for metric queries would require some minor refactoring on top of what has been implemented here. If we decide to do that, I think we should do it on a separate PR to avoid making this PR even larger. I also decided to leave out support for Tail queries to avoid making this PR even larger. Once this one gets merged, we can work to support tailing. --- **Note to reviewers** This PR is huge since we need to forward categorized all over the codebase (from parsing logs all the way to marshaling), fortunately, many of the changes come from updating tests and refactoring iterators. Tested out in a dev cell with query `'{stream="stdout"} | label_format new="text"`. - Without the new flag: ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED { "data": { "result": [ { "stream": { "new": "text", "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n" ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n" ], ... ``` - With the new flag ``` $ http http://127.0.0.1:3100/loki/api/v1/query_range\?direction\=BACKWARD\&end\=1693996529322486000\&limit\=30\&query\=%7Bstream%3D%22stdout%22%7D+%7C+label_format+new%3D%22text%22\&start\=1693992929322486000 X-Scope-Orgid:REDACTED X-Loki-Response-Encoding-Flags:categorize-labels { "data": { "encodingFlags": [ "categorize-labels" ], "result": [ { "stream": { "pod": "loki-canary-986bd6f4b-xqmb7", "stream": "stdout" }, "values": [ [ "1693996529000222496", "1693996529000222496 pppppppppppp...\n", { "parsed": { "new": "text" } } ], [ "1693996528499160852", "1693996528499160852 pppppppppppp...\n", { "parsed": { "new": "text" } } ], ... ```
2 years ago
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: newLine,
StructuredMetadata: logproto.FromLabelsToLabelAdapters(parsedLbs.StructuredMetadata()),
Parsed: logproto.FromLabelsToLabelAdapters(parsedLbs.Parsed()),
})
return nil
},
)
if pipeline.ReferencedStructuredMetadata() {
stats.FromContext(ctx).SetQueryReferencedStructuredMetadata()
}
if len(streams) == 0 {
return iter.NoopEntryIterator
}
streamsResult := make([]logproto.Stream, 0, len(streams))
for _, stream := range streams {
streamsResult = append(streamsResult, *stream)
}
return iter.NewStreamsIterator(streamsResult, direction)
}
// nolint:unused
func (hb *unorderedHeadBlock) SampleIterator(
ctx context.Context,
mint,
maxt int64,
extractor log.StreamSampleExtractor,
) iter.SampleIterator {
series := map[string]*logproto.Series{}
baseHash := extractor.BaseLabels().Hash()
_ = hb.forEntries(
ctx,
logproto.FORWARD,
mint,
maxt,
func(statsCtx *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
value, parsedLabels, ok := extractor.ProcessString(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)...)
if !ok {
return nil
}
statsCtx.AddPostFilterLines(1)
var (
found bool
s *logproto.Series
)
lbs := parsedLabels.String()
s, found = series[lbs]
if !found {
s = &logproto.Series{
Labels: lbs,
Samples: SamplesPool.Get(hb.lines).([]logproto.Sample)[:0],
StreamHash: baseHash,
}
series[lbs] = s
}
s.Samples = append(s.Samples, logproto.Sample{
Timestamp: ts,
Value: value,
Hash: xxhash.Sum64(unsafeGetBytes(line)),
})
return nil
},
)
if extractor.ReferencedStructuredMetadata() {
stats.FromContext(ctx).SetQueryReferencedStructuredMetadata()
}
if len(series) == 0 {
return iter.NoopSampleIterator
}
seriesRes := make([]logproto.Series, 0, len(series))
for _, s := range series {
seriesRes = append(seriesRes, *s)
}
return iter.SampleIteratorWithClose(iter.NewMultiSeriesIterator(seriesRes), func() error {
for _, s := range series {
SamplesPool.Put(s.Samples)
}
return nil
})
}
// nolint:unused
// serialise is used in creating an ordered, compressed block from an unorderedHeadBlock
func (hb *unorderedHeadBlock) Serialise(pool WriterPool) ([]byte, error) {
inBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
inBuf.Reset()
serializeBytesBufferPool.Put(inBuf)
}()
symbolsSectionBuf := serializeBytesBufferPool.Get().(*bytes.Buffer)
defer func() {
symbolsSectionBuf.Reset()
serializeBytesBufferPool.Put(symbolsSectionBuf)
}()
outBuf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := pool.GetWriter(outBuf)
defer pool.PutWriter(compressedWriter)
_ = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
n := binary.PutVarint(encBuf, ts)
inBuf.Write(encBuf[:n])
n = binary.PutUvarint(encBuf, uint64(len(line)))
inBuf.Write(encBuf[:n])
inBuf.WriteString(line)
if hb.format >= UnorderedWithStructuredMetadataHeadBlockFmt {
symbolsSectionBuf.Reset()
// Serialize structured metadata symbols to symbolsSectionBuf so that we can find and write its length before
// writing symbols section to inbuf since we can't estimate its size beforehand due to variable length encoding.
// write the number of symbol pairs
n = binary.PutUvarint(encBuf, uint64(len(structuredMetadataSymbols)))
symbolsSectionBuf.Write(encBuf[:n])
// write the symbols
for _, l := range structuredMetadataSymbols {
n = binary.PutUvarint(encBuf, uint64(l.Name))
symbolsSectionBuf.Write(encBuf[:n])
n = binary.PutUvarint(encBuf, uint64(l.Value))
symbolsSectionBuf.Write(encBuf[:n])
}
// write the length of symbols section first
n = binary.PutUvarint(encBuf, uint64(symbolsSectionBuf.Len()))
inBuf.Write(encBuf[:n])
// copy the symbols section
inBuf.Write(symbolsSectionBuf.Bytes())
}
return nil
},
)
if _, err := compressedWriter.Write(inBuf.Bytes()); err != nil {
return nil, errors.Wrap(err, "appending entry")
}
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}
return outBuf.Bytes(), nil
}
func (hb *unorderedHeadBlock) Convert(version HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error) {
if hb.format == version {
return hb, nil
}
out := version.NewBlock(symbolizer)
err := hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
_, err := out.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols))
return err
},
)
return out, err
}
// CheckpointSize returns the estimated size of the headblock checkpoint.
func (hb *unorderedHeadBlock) CheckpointSize() int {
size := 1 // version
size += binary.MaxVarintLen32 * 2 // total entries + total size
size += binary.MaxVarintLen64 * 2 // mint,maxt
size += (binary.MaxVarintLen64 + binary.MaxVarintLen32) * hb.lines // ts + len of log line.
if hb.format >= UnorderedWithStructuredMetadataHeadBlockFmt {
// number of labels of structured metadata stored for each log entry
size += binary.MaxVarintLen32 * hb.lines
}
size += hb.size // uncompressed bytes of lines
return size
}
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *unorderedHeadBlock) CheckpointBytes(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
err := hb.CheckpointTo(buf)
return buf.Bytes(), err
}
// CheckpointTo serializes a headblock to a `io.Writer`. see `CheckpointBytes`.
func (hb *unorderedHeadBlock) CheckpointTo(w io.Writer) error {
eb := EncodeBufferPool.Get().(*encbuf)
defer EncodeBufferPool.Put(eb)
eb.reset()
eb.putByte(byte(hb.Format()))
_, err := w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock version")
}
eb.reset()
eb.putUvarint(hb.lines)
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock metas")
}
eb.reset()
err = hb.forEntries(
context.Background(),
logproto.FORWARD,
0,
math.MaxInt64,
func(_ *stats.Context, ts int64, line string, structuredMetadataSymbols symbols) error {
eb.putVarint64(ts)
eb.putUvarint(len(line))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()
_, err := io.WriteString(w, line)
if err != nil {
return errors.Wrap(err, "write headblock entry line")
}
if hb.format >= UnorderedWithStructuredMetadataHeadBlockFmt {
// structured metadata
eb.putUvarint(len(structuredMetadataSymbols))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry meta labels length")
}
eb.reset()
for _, l := range structuredMetadataSymbols {
eb.putUvarint(int(l.Name))
eb.putUvarint(int(l.Value))
_, err = w.Write(eb.get())
if err != nil {
return errors.Wrap(err, "write headBlock entry structuredMetadataSymbols")
}
eb.reset()
}
}
return nil
},
)
return nil
}
func (hb *unorderedHeadBlock) LoadBytes(b []byte) error {
// ensure it's empty
*hb = *newUnorderedHeadBlock(hb.format, hb.symbolizer)
if len(b) < 1 {
return nil
}
db := decbuf{b: b}
version := db.byte()
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
if version < UnorderedHeadBlockFmt.Byte() {
return errors.Errorf("incompatible headBlock version (%v), only V4 and the next versions are currently supported", version)
}
n := db.uvarint()
if err := db.err(); err != nil {
return errors.Wrap(err, "verifying headblock metadata")
}
for i := 0; i < n && db.err() == nil; i++ {
ts := db.varint64()
lineLn := db.uvarint()
line := string(db.bytes(lineLn))
var structuredMetadataSymbols symbols
if version >= UnorderedWithStructuredMetadataHeadBlockFmt.Byte() {
metaLn := db.uvarint()
if metaLn > 0 {
structuredMetadataSymbols = make([]symbol, metaLn)
for j := 0; j < metaLn && db.err() == nil; j++ {
structuredMetadataSymbols[j] = symbol{
Name: uint32(db.uvarint()),
Value: uint32(db.uvarint()),
}
}
}
}
if _, err := hb.Append(ts, line, hb.symbolizer.Lookup(structuredMetadataSymbols)); err != nil {
return err
}
}
if err := db.err(); err != nil {
return errors.Wrap(err, "decoding entries")
}
return nil
}
// HeadFromCheckpoint handles reading any head block format and returning the desired form.
// This is particularly helpful replaying WALs from different configurations
// such as after enabling unordered writes.
func HeadFromCheckpoint(b []byte, desiredIfNotUnordered HeadBlockFmt, symbolizer *symbolizer) (HeadBlock, error) {
if len(b) == 0 {
return desiredIfNotUnordered.NewBlock(symbolizer), nil
}
db := decbuf{b: b}
version := db.byte()
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying headblock header")
}
format := HeadBlockFmt(version)
if format > UnorderedWithStructuredMetadataHeadBlockFmt {
return nil, fmt.Errorf("unexpected head block version: %v", format)
}
decodedBlock := format.NewBlock(symbolizer)
if err := decodedBlock.LoadBytes(b); err != nil {
return nil, err
}
if decodedBlock.Format() < UnorderedHeadBlockFmt && decodedBlock.Format() != desiredIfNotUnordered {
return decodedBlock.Convert(desiredIfNotUnordered, nil)
}
return decodedBlock, nil
}