From 191c252d7c64ec2e5a08b5ff6b29e6ac1bd6b671 Mon Sep 17 00:00:00 2001 From: "Grot (@grafanabot)" <43478413+grafanabot@users.noreply.github.com> Date: Mon, 14 Aug 2023 14:59:23 +0100 Subject: [PATCH] [k163] Update chunk and head fmt to v4 (non-indexed labels) (#10245) Backport 287f29bc46e077f43cfa8778732bdcb43ed882a3 from #10242 --- **What this PR does / why we need it**: This PR updates the default chunk and heads format to v4 to support writing and reading non-indexed labels. Co-authored-by: Salva Corts --- pkg/chunkenc/memchunk.go | 15 +- pkg/chunkenc/memchunk_test.go | 13 +- pkg/chunkenc/unordered_test.go | 10 +- pkg/ingester/chunk_test.go | 2 +- pkg/ingester/encoding_test.go | 139 ++++++++---------- pkg/ingester/flush_test.go | 2 +- pkg/ingester/stream_test.go | 2 +- pkg/ingester/wal/encoding.go | 2 +- pkg/logql/metrics.go | 1 + pkg/storage/hack/main.go | 4 +- .../compactor/retention/retention_test.go | 16 +- pkg/storage/stores/series_store_write_test.go | 2 +- .../stores/shipper/index/compactor/util.go | 2 +- pkg/storage/util_test.go | 2 +- 14 files changed, 107 insertions(+), 105 deletions(-) diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index dccc5ced9c..90d8ef8d32 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -33,7 +33,7 @@ const ( chunkFormatV3 chunkFormatV4 - DefaultChunkFormat = chunkFormatV3 // the currently used chunk format + DefaultChunkFormat = chunkFormatV4 // the currently used chunk format blocksPerChunk = 10 maxLineLength = 1024 * 1024 * 1024 @@ -85,7 +85,7 @@ const ( UnorderedHeadBlockFmt UnorderedWithNonIndexedLabelsHeadBlockFmt - DefaultHeadBlockFmt = UnorderedHeadBlockFmt + DefaultHeadBlockFmt = UnorderedWithNonIndexedLabelsHeadBlockFmt ) var magicNumber = uint32(0x12EE56A) @@ -348,8 +348,19 @@ func NewMemChunk(enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *Me return newMemChunkWithFormat(DefaultChunkFormat, enc, head, blockSize, targetSize) } +func panicIfInvalidFormat(chunkFmt byte, head HeadBlockFmt) { + if chunkFmt == chunkFormatV2 && head != OrderedHeadBlockFmt { + panic("only OrderedHeadBlockFmt is supported for V2 chunks") + } + if chunkFmt == chunkFormatV4 && head != UnorderedWithNonIndexedLabelsHeadBlockFmt { + panic("only UnorderedWithNonIndexedLabelsHeadBlockFmt is supported for V4 chunks") + } +} + // NewMemChunk returns a new in-mem chunk. func newMemChunkWithFormat(format byte, enc Encoding, head HeadBlockFmt, blockSize, targetSize int) *MemChunk { + panicIfInvalidFormat(format, head) + symbolizer := newSymbolizer() return &MemChunk{ blockSize: blockSize, // The blockSize in bytes. diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index edce162e33..efc9c180c8 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -74,7 +74,7 @@ var ( } ) -const DefaultTestHeadBlockFmt = OrderedHeadBlockFmt +const DefaultTestHeadBlockFmt = DefaultHeadBlockFmt func TestBlocksInclusive(t *testing.T) { chk := NewMemChunk(EncNone, DefaultTestHeadBlockFmt, testBlockSize, testTargetSize) @@ -637,11 +637,11 @@ func TestChunkSize(t *testing.T) { } var result []res for _, bs := range testBlockSizes { - for _, f := range HeadBlockFmts { + for _, f := range allPossibleFormats { for _, enc := range testEncoding { name := fmt.Sprintf("%s_%s", enc.String(), humanize.Bytes(uint64(bs))) t.Run(name, func(t *testing.T) { - c := NewMemChunk(enc, f, bs, testTargetSize) + c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, bs, testTargetSize) inserted := fillChunk(c) b, err := c.Bytes() if err != nil { @@ -685,7 +685,8 @@ func TestChunkStats(t *testing.T) { inserted++ entry.Timestamp = entry.Timestamp.Add(time.Nanosecond) } - expectedSize := (inserted * len(entry.Line)) + (inserted * 2 * binary.MaxVarintLen64) + // For each entry: timestamp , line size , line , num of non-indexed labels + expectedSize := inserted * (len(entry.Line) + 3*binary.MaxVarintLen64) statsCtx, ctx := stats.NewContext(context.Background()) it, err := c.Iterator(ctx, first.Add(-time.Hour), entry.Timestamp.Add(time.Hour), logproto.BACKWARD, noopStreamPipeline) @@ -734,7 +735,7 @@ func TestChunkStats(t *testing.T) { } func TestIteratorClose(t *testing.T) { - for _, f := range HeadBlockFmts { + for _, f := range allPossibleFormats { for _, enc := range testEncoding { t.Run(enc.String(), func(t *testing.T) { for _, test := range []func(iter iter.EntryIterator, t *testing.T){ @@ -762,7 +763,7 @@ func TestIteratorClose(t *testing.T) { } }, } { - c := NewMemChunk(enc, f, testBlockSize, testTargetSize) + c := newMemChunkWithFormat(f.chunkFormat, enc, f.headBlockFmt, testBlockSize, testTargetSize) inserted := fillChunk(c) iter, err := c.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, inserted), logproto.BACKWARD, noopStreamPipeline) if err != nil { diff --git a/pkg/chunkenc/unordered_test.go b/pkg/chunkenc/unordered_test.go index 1cdfe3441b..5a3f0cc874 100644 --- a/pkg/chunkenc/unordered_test.go +++ b/pkg/chunkenc/unordered_test.go @@ -428,7 +428,7 @@ func BenchmarkHeadBlockWrites(b *testing.B) { } func TestUnorderedChunkIterators(t *testing.T) { - c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize) for i := 0; i < 100; i++ { // push in reverse order require.Nil(t, c.Append(&logproto.Entry{ @@ -546,7 +546,7 @@ func BenchmarkUnorderedRead(b *testing.B) { } func TestUnorderedIteratorCountsAllEntries(t *testing.T) { - c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize) fillChunkRandomOrder(c, false) ct := 0 @@ -583,7 +583,7 @@ func TestUnorderedIteratorCountsAllEntries(t *testing.T) { } func chunkFrom(xs []logproto.Entry) ([]byte, error) { - c := NewMemChunk(EncSnappy, OrderedHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range xs { if err := c.Append(&x); err != nil { return nil, err @@ -643,7 +643,7 @@ func TestReorder(t *testing.T) { }, } { t.Run(tc.desc, func(t *testing.T) { - c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize) for _, x := range tc.input { require.Nil(t, c.Append(&x)) } @@ -660,7 +660,7 @@ func TestReorder(t *testing.T) { } func TestReorderAcrossBlocks(t *testing.T) { - c := NewMemChunk(EncSnappy, UnorderedHeadBlockFmt, testBlockSize, testTargetSize) + c := NewMemChunk(EncSnappy, DefaultHeadBlockFmt, testBlockSize, testTargetSize) for _, batch := range [][]int{ // ensure our blocks have overlapping bounds and must be reordered // before closing. diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index cabb49fb94..a615b9030d 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -49,7 +49,7 @@ func TestIterator(t *testing.T) { }{ {"dumbChunk", chunkenc.NewDumbChunk}, {"gzipChunk", func() chunkenc.Chunk { - return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index ba1a892373..af6a63bdda 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -38,95 +38,86 @@ func dummyConf() *Config { } func Test_EncodingChunks(t *testing.T) { - for _, f := range chunkenc.HeadBlockFmts { - for _, close := range []bool{true, false} { - for _, tc := range []struct { - desc string - conf Config - }{ - { - // mostly for historical parity - desc: "dummyConf", - conf: *dummyConf(), - }, - { - desc: "default", - conf: defaultIngesterTestConfig(t), - }, - } { - - t.Run(fmt.Sprintf("%v-%v-%s", f, close, tc.desc), func(t *testing.T) { - conf := tc.conf - c := chunkenc.NewMemChunk(chunkenc.EncGZIP, f, conf.BlockSize, conf.TargetChunkSize) - fillChunk(t, c) + for _, close := range []bool{true, false} { + for _, tc := range []struct { + desc string + conf Config + }{ + { + // mostly for historical parity + desc: "dummyConf", + conf: *dummyConf(), + }, + { + desc: "default", + conf: defaultIngesterTestConfig(t), + }, + } { + + t.Run(fmt.Sprintf("%v-%s", close, tc.desc), func(t *testing.T) { + conf := tc.conf + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + fillChunk(t, c) + if close { + require.Nil(t, c.Close()) + } + + from := []chunkDesc{ + { + chunk: c, + }, + // test non zero values + { + chunk: c, + closed: true, + synced: true, + flushed: time.Unix(1, 0), + lastUpdated: time.Unix(0, 1), + }, + } + there, err := toWireChunks(from, nil) + require.Nil(t, err) + chunks := make([]Chunk, 0, len(there)) + for _, c := range there { + chunks = append(chunks, c.Chunk) + + // Ensure closed head chunks only contain the head metadata but no entries if close { - require.Nil(t, c.Close()) + const unorderedHeadSize = 2 + require.Equal(t, unorderedHeadSize, len(c.Head)) + } else { + require.Greater(t, len(c.Head), 0) } + } - from := []chunkDesc{ - { - chunk: c, - }, - // test non zero values - { - chunk: c, - closed: true, - synced: true, - flushed: time.Unix(1, 0), - lastUpdated: time.Unix(0, 1), - }, - } - there, err := toWireChunks(from, nil) - require.Nil(t, err) - chunks := make([]Chunk, 0, len(there)) - for _, c := range there { - chunks = append(chunks, c.Chunk) - - // Ensure closed head chunks only contain the head metadata but no entries - if close { - if f < chunkenc.UnorderedHeadBlockFmt { - // format + #entries + size + mint + maxt - const orderedHeadSize = 5 - require.Equal(t, orderedHeadSize, len(c.Head)) - } else { - // format + #lines - const unorderedHeadSize = 2 - require.Equal(t, unorderedHeadSize, len(c.Head)) - } - } else { - require.Greater(t, len(c.Head), 0) - } - } + backAgain, err := fromWireChunks(&conf, chunks) + require.Nil(t, err) - backAgain, err := fromWireChunks(&conf, chunks) + for i, to := range backAgain { + // test the encoding directly as the substructure may change. + // for instance the uncompressed size for each block is not included in the encoded version. + enc, err := to.chunk.Bytes() require.Nil(t, err) + to.chunk = nil - for i, to := range backAgain { - // test the encoding directly as the substructure may change. - // for instance the uncompressed size for each block is not included in the encoded version. - enc, err := to.chunk.Bytes() - require.Nil(t, err) - to.chunk = nil - - matched := from[i] - exp, err := matched.chunk.Bytes() - require.Nil(t, err) - matched.chunk = nil + matched := from[i] + exp, err := matched.chunk.Bytes() + require.Nil(t, err) + matched.chunk = nil - require.Equal(t, exp, enc) - require.Equal(t, matched, to) + require.Equal(t, exp, enc) + require.Equal(t, matched, to) - } + } - }) - } + }) } } } func Test_EncodingCheckpoint(t *testing.T) { conf := dummyConf() - c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, conf.BlockSize, conf.TargetChunkSize) require.Nil(t, c.Append(&logproto.Entry{ Timestamp: time.Unix(1, 0), Line: "hi there", diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 14d679eb71..f20090bd78 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -124,7 +124,7 @@ func buildChunkDecs(t testing.TB) []*chunkDesc { for i := range res { res[i] = &chunkDesc{ closed: true, - chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), + chunk: chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, dummyConf().BlockSize, dummyConf().TargetChunkSize), } fillChunk(t, res[i].chunk) require.NoError(t, res[i].chunk.Close()) diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index 5068aa0eff..705119984a 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -179,7 +179,7 @@ func TestStreamIterator(t *testing.T) { new func() *chunkenc.MemChunk }{ {"gzipChunk", func() *chunkenc.MemChunk { - return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + return chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { diff --git a/pkg/ingester/wal/encoding.go b/pkg/ingester/wal/encoding.go index 0a3f6a40a4..f207024dbc 100644 --- a/pkg/ingester/wal/encoding.go +++ b/pkg/ingester/wal/encoding.go @@ -32,7 +32,7 @@ const ( // The current type of Entries that this distribution writes. // Loki can read in a backwards compatible manner, but will write the newest variant. // TODO: Change to WALRecordEntriesV3? -const CurrentEntriesRec = WALRecordEntriesV2 +const CurrentEntriesRec = WALRecordEntriesV3 // Record is a struct combining the series and samples record. type Record struct { diff --git a/pkg/logql/metrics.go b/pkg/logql/metrics.go index 13836f9491..943f1316e7 100644 --- a/pkg/logql/metrics.go +++ b/pkg/logql/metrics.go @@ -129,6 +129,7 @@ func RecordRangeAndInstantQueryMetrics( "returned_lines", returnedLines, "throughput", strings.Replace(humanize.Bytes(uint64(stats.Summary.BytesProcessedPerSecond)), " ", "", 1), "total_bytes", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalBytesProcessed)), " ", "", 1), + "total_bytes_non_indexed_labels", strings.Replace(humanize.Bytes(uint64(stats.Summary.TotalNonIndexedLabelsBytesProcessed)), " ", "", 1), "lines_per_second", stats.Summary.LinesProcessedPerSecond, "total_lines", stats.Summary.TotalLinesProcessed, "post_filter_lines", stats.Summary.TotalPostFilterLines, diff --git a/pkg/storage/hack/main.go b/pkg/storage/hack/main.go index 34555a27ae..b9802d1678 100644 --- a/pkg/storage/hack/main.go +++ b/pkg/storage/hack/main.go @@ -91,7 +91,7 @@ func fillStore(cm storage.ClientMetrics) error { labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := client.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncLZ4_4M, chunkenc.DefaultHeadBlockFmt, 262144, 1572864) for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() { entry := &logproto.Entry{ Timestamp: time.Unix(0, ts), @@ -114,7 +114,7 @@ func fillStore(cm storage.ClientMetrics) error { if flushCount >= maxChunks { return } - chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.UnorderedHeadBlockFmt, 262144, 1572864) + chunkEnc = chunkenc.NewMemChunk(chunkenc.EncLZ4_64k, chunkenc.DefaultHeadBlockFmt, 262144, 1572864) } } }(i) diff --git a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go index 15dd3e48e0..e3e4479343 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go @@ -216,12 +216,13 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { require.NoError(t, chunkEnc.Append(&logproto.Entry{ - Timestamp: ts.Time(), - Line: ts.String(), + Timestamp: ts.Time(), + Line: ts.String(), + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", ts.String())), })) } @@ -522,18 +523,15 @@ func TestChunkRewriter(t *testing.T) { for _, interval := range expectedChunks[i] { for curr := interval.Start; curr <= interval.End; curr = curr.Add(time.Minute) { - // Test ready to pass/fail when we change the default chunk and head format. - var nonIndexedLabels []logproto.LabelAdapter - if chunkenc.DefaultChunkFormat == 4 && chunkenc.DefaultHeadBlockFmt == chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt { - nonIndexedLabels = logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", curr.String())) - } + expectedNonIndexedLabels := labels.FromStrings("foo", curr.String()) require.True(t, newChunkItr.Next()) require.Equal(t, logproto.Entry{ Timestamp: curr.Time(), Line: curr.String(), - NonIndexedLabels: nonIndexedLabels, + NonIndexedLabels: logproto.FromLabelsToLabelAdapters(expectedNonIndexedLabels), }, newChunkItr.Entry()) + require.Equal(t, expectedNonIndexedLabels.String(), newChunkItr.Labels()) } } diff --git a/pkg/storage/stores/series_store_write_test.go b/pkg/storage/stores/series_store_write_test.go index 5de667f241..97a050c6ab 100644 --- a/pkg/storage/stores/series_store_write_test.go +++ b/pkg/storage/stores/series_store_write_test.go @@ -82,7 +82,7 @@ func TestChunkWriter_PutOne(t *testing.T) { }, } - memchk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + memchk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0) chk := chunk.NewChunk("fake", model.Fingerprint(0), []labels.Label{{Name: "foo", Value: "bar"}}, chunkenc.NewFacade(memchk, 0, 0), 100, 400) for name, tc := range map[string]struct { diff --git a/pkg/storage/stores/shipper/index/compactor/util.go b/pkg/storage/stores/shipper/index/compactor/util.go index b26475b445..2cfc693d04 100644 --- a/pkg/storage/stores/shipper/index/compactor/util.go +++ b/pkg/storage/stores/shipper/index/compactor/util.go @@ -31,7 +31,7 @@ func createChunk(t testing.TB, userID string, lbs labels.Labels, from model.Time labelsBuilder.Set(labels.MetricName, "logs") metric := labelsBuilder.Labels() fp := ingesterclient.Fingerprint(lbs) - chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.UnorderedHeadBlockFmt, blockSize, targetSize) + chunkEnc := chunkenc.NewMemChunk(chunkenc.EncSnappy, chunkenc.DefaultHeadBlockFmt, blockSize, targetSize) for ts := from; !ts.After(through); ts = ts.Add(1 * time.Minute) { require.NoError(t, chunkEnc.Append(&logproto.Entry{ diff --git a/pkg/storage/util_test.go b/pkg/storage/util_test.go index 20e52fb6f6..6a2f8119b3 100644 --- a/pkg/storage/util_test.go +++ b/pkg/storage/util_test.go @@ -105,7 +105,7 @@ func newChunk(stream logproto.Stream) chunk.Chunk { lbs = builder.Labels() } from, through := loki_util.RoundToMilliseconds(stream.Entries[0].Timestamp, stream.Entries[len(stream.Entries)-1].Timestamp) - chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.UnorderedHeadBlockFmt, 256*1024, 0) + chk := chunkenc.NewMemChunk(chunkenc.EncGZIP, chunkenc.DefaultHeadBlockFmt, 256*1024, 0) for _, e := range stream.Entries { _ = chk.Append(&e) }