diff --git a/pkg/chunkenc/dumb_chunk.go b/pkg/chunkenc/dumb_chunk.go index 793e0b7518..2aa7dee955 100644 --- a/pkg/chunkenc/dumb_chunk.go +++ b/pkg/chunkenc/dumb_chunk.go @@ -72,7 +72,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone } // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) { +func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline, _ ...iter.EntryIteratorOption) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go index 0985f4a883..9a066402be 100644 --- a/pkg/chunkenc/interface.go +++ b/pkg/chunkenc/interface.go @@ -129,7 +129,7 @@ type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool Append(*logproto.Entry) error - Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) + Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) (iter.EntryIterator, error) SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator // Returns the list of blocks in the chunks. Blocks(mintT, maxtT time.Time) []Block @@ -158,7 +158,7 @@ type Block interface { // Entries is the amount of entries in the block. Entries() int // Iterator returns an entry iterator for the block. - Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator + Iterator(ctx context.Context, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator // SampleIterator returns a sample iterator for the block. SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index bec6e2f87f..fd12177a80 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -927,7 +927,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) { } // Iterator implements Chunk. -func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) { +func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) (iter.EntryIterator, error) { mint, maxt := mintT.UnixNano(), maxtT.UnixNano() blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1) @@ -954,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi } lastMax = b.maxt - blockItrs = append(blockItrs, encBlock{c.encoding, c.format, c.symbolizer, b}.Iterator(ctx, pipeline)) + blockItrs = append(blockItrs, encBlock{c.encoding, c.format, c.symbolizer, b}.Iterator(ctx, pipeline, options...)) } if !c.head.IsEmpty() { @@ -962,7 +962,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi if from < lastMax { ordered = false } - headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline) + headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline, options...) } if direction == logproto.FORWARD { @@ -1077,21 +1077,29 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block { // Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive) func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) { // add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive. - itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}), iter.WithKeepNonIndexedLabels()) if err != nil { return nil, err } + // If the head format is not explicitly set, use the default. + // This will be the most common case for chunks read from storage since + // they have a dummy head block. + headFmt := c.headFmt + if headFmt < OrderedHeadBlockFmt { + headFmt = DefaultHeadBlockFmt + } + var newChunk *MemChunk // as close as possible, respect the block/target sizes specified. However, // if the blockSize is not set, use reasonable defaults. if c.blockSize > 0 { - newChunk = NewMemChunk(c.Encoding(), c.headFmt, c.blockSize, c.targetSize) + newChunk = NewMemChunk(c.Encoding(), headFmt, c.blockSize, c.targetSize) } else { // Using defaultBlockSize for target block size. // The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity. // For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that. - newChunk = NewMemChunk(c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize()) + newChunk = NewMemChunk(c.Encoding(), headFmt, defaultBlockSize, c.CompressedSize()) } for itr.Next() { @@ -1126,11 +1134,11 @@ type encBlock struct { block } -func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator { +func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator { if len(b.b) == 0 { return iter.NoopIterator } - return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer) + return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...) } func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator { @@ -1156,7 +1164,7 @@ func (b block) MaxTime() int64 { return b.maxt } -func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator { +func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, _ ...iter.EntryIteratorOption) iter.EntryIterator { if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) { return iter.NoopIterator } @@ -1559,16 +1567,23 @@ func (si *bufferedIterator) close() { si.origBytes = nil } -func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator { - return &entryBufferedIterator{ +func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer, options ...iter.EntryIteratorOption) iter.EntryIterator { + entryIter := &entryBufferedIterator{ bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer), pipeline: pipeline, } + + for _, opt := range options { + opt(&entryIter.iterOptions) + } + + return entryIter } type entryBufferedIterator struct { *bufferedIterator - pipeline log.StreamPipeline + pipeline log.StreamPipeline + iterOptions iter.EntryIteratorOptions cur logproto.Entry currLabels log.LabelsResult @@ -1593,8 +1608,12 @@ func (e *entryBufferedIterator) Next() bool { e.currLabels = lbs e.cur.Timestamp = time.Unix(0, e.currTs) e.cur.Line = string(newLine) - // There is no need to send back the non-indexed labels, as they are already part of the labels results - // e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels) + + // Most of the time, there is no need to send back the non-indexed labels, as they are already part of the labels results. + // Still it might be needed for example when appending entries from one chunk into another one. + if e.iterOptions.KeepNonIndexedLabels { + e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels) + } return true } return false diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index 51ccdc5bda..8037a85e41 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -1817,3 +1817,61 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) { }) } } + +func TestMemChunk_IteratorOptions(t *testing.T) { + chk := newMemChunkWithFormat(chunkFormatV4, EncNone, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(0, "0", logproto.FromLabelsToLabelAdapters( + labels.FromStrings("a", "0"), + )))) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(1, "1", logproto.FromLabelsToLabelAdapters( + labels.FromStrings("a", "1"), + )))) + require.NoError(t, chk.cut()) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(2, "2", logproto.FromLabelsToLabelAdapters( + labels.FromStrings("a", "2"), + )))) + require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(3, "3", logproto.FromLabelsToLabelAdapters( + labels.FromStrings("a", "3"), + )))) + + for _, tc := range []struct { + name string + options []iter.EntryIteratorOption + expectNonIndexedLabels bool + }{ + { + name: "No options", + expectNonIndexedLabels: false, + }, + { + name: "WithKeepNonIndexedLabels", + options: []iter.EntryIteratorOption{ + iter.WithKeepNonIndexedLabels(), + }, + + expectNonIndexedLabels: true, + }, + } { + t.Run(tc.name, func(t *testing.T) { + it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline, tc.options...) + require.NoError(t, err) + + var idx int64 + for it.Next() { + expectedLabels := labels.FromStrings("a", fmt.Sprintf("%d", idx)) + expectedEntry := logproto.Entry{ + Timestamp: time.Unix(0, idx), + Line: fmt.Sprintf("%d", idx), + } + + if tc.expectNonIndexedLabels { + expectedEntry.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(expectedLabels) + } + + require.Equal(t, expectedEntry, it.Entry()) + require.Equal(t, expectedLabels.String(), it.Labels()) + idx++ + } + }) + } +} diff --git a/pkg/chunkenc/symbols.go b/pkg/chunkenc/symbols.go index 94d04d863f..1fa063fba9 100644 --- a/pkg/chunkenc/symbols.go +++ b/pkg/chunkenc/symbols.go @@ -109,7 +109,7 @@ func (s *symbolizer) lookup(idx uint32) string { defer s.mtx.RUnlock() } - if idx > uint32(len(s.labels)-1) { + if idx >= uint32(len(s.labels)) { return "" } diff --git a/pkg/chunkenc/symbols_test.go b/pkg/chunkenc/symbols_test.go index 661a042974..244acff9ce 100644 --- a/pkg/chunkenc/symbols_test.go +++ b/pkg/chunkenc/symbols_test.go @@ -134,6 +134,18 @@ func TestSymbolizer(t *testing.T) { require.Equal(t, labels, s.Lookup(symbols)) } + // Test that Lookup returns empty labels if no symbols are provided. + if len(tc.labelsToAdd) == 0 { + ret := s.Lookup([]symbol{ + { + Name: 0, + Value: 0, + }, + }) + require.Equal(t, "", ret[0].Name) + require.Equal(t, "", ret[0].Value) + } + require.Equal(t, tc.expectedNumLabels, len(s.labels)) require.Equal(t, tc.expectedCheckpointSize, s.CheckpointSize()) require.Equal(t, tc.expectedUncompressedSize, s.UncompressedSize()) diff --git a/pkg/chunkenc/unordered.go b/pkg/chunkenc/unordered.go index f881fb2b32..e2d16707d6 100644 --- a/pkg/chunkenc/unordered.go +++ b/pkg/chunkenc/unordered.go @@ -41,6 +41,7 @@ type HeadBlock interface { mint, maxt int64, pipeline log.StreamPipeline, + options ...iter.EntryIteratorOption, ) iter.EntryIterator SampleIterator( ctx context.Context, @@ -243,13 +244,12 @@ func (hb *unorderedHeadBlock) forEntries( return nil } -func (hb *unorderedHeadBlock) Iterator( - ctx context.Context, - direction logproto.Direction, - mint, - maxt int64, - pipeline log.StreamPipeline, -) iter.EntryIterator { +func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator { + var iterOptions iter.EntryIteratorOptions + for _, option := range options { + option(&iterOptions) + } + // We are doing a copy everytime, this is because b.entries could change completely, // the alternate would be that we allocate a new b.entries everytime we cut a block, // but the tradeoff is that queries to near-realtime data would be much lower than @@ -278,12 +278,18 @@ func (hb *unorderedHeadBlock) Iterator( streams[labels] = stream } - stream.Entries = append(stream.Entries, logproto.Entry{ + entry := logproto.Entry{ Timestamp: time.Unix(0, ts), Line: newLine, - // There is no need to send back the non-indexed labels, as they are already part of the labels results - // NonIndexedLabels: logproto.FromLabelsToLabelAdapters(hb.symbolizer.Lookup(nonIndexedLabelsSymbols)), - }) + } + + // Most of the time, there is no need to send back the non-indexed labels, as they are already part of the labels results. + // Still it might be needed for example when appending entries from one chunk into another one. + if iterOptions.KeepNonIndexedLabels { + entry.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(hb.symbolizer.Lookup(nonIndexedLabelsSymbols)) + } + + stream.Entries = append(stream.Entries, entry) return nil }, ) diff --git a/pkg/iter/entry_iterator.go b/pkg/iter/entry_iterator.go index fa67da6a3b..ef4cf55bb1 100644 --- a/pkg/iter/entry_iterator.go +++ b/pkg/iter/entry_iterator.go @@ -19,6 +19,18 @@ type EntryIterator interface { Entry() logproto.Entry } +type EntryIteratorOptions struct { + KeepNonIndexedLabels bool +} + +type EntryIteratorOption func(*EntryIteratorOptions) + +func WithKeepNonIndexedLabels() EntryIteratorOption { + return func(o *EntryIteratorOptions) { + o.KeepNonIndexedLabels = true + } +} + // streamIterator iterates over entries in a stream. type streamIterator struct { i int diff --git a/pkg/storage/lazy_chunk_test.go b/pkg/storage/lazy_chunk_test.go index 3962deaaa3..5b712f2717 100644 --- a/pkg/storage/lazy_chunk_test.go +++ b/pkg/storage/lazy_chunk_test.go @@ -178,7 +178,7 @@ func (fakeBlock) Entries() int { return 0 } func (fakeBlock) Offset() int { return 0 } func (f fakeBlock) MinTime() int64 { return f.mint } func (f fakeBlock) MaxTime() int64 { return f.maxt } -func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterator { +func (fakeBlock) Iterator(context.Context, log.StreamPipeline, ...iter.EntryIteratorOption) iter.EntryIterator { return nil } diff --git a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go index 287fed0736..15dd3e48e0 100644 --- a/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go +++ b/pkg/storage/stores/indexshipper/compactor/retention/retention_test.go @@ -22,6 +22,7 @@ import ( "github.com/grafana/loki/pkg/chunkenc" ingesterclient "github.com/grafana/loki/pkg/ingester/client" + "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" "github.com/grafana/loki/pkg/storage/chunk" @@ -516,15 +517,22 @@ func TestChunkRewriter(t *testing.T) { require.Equal(t, expectedChunks[i][len(expectedChunks[i])-1].End, chunks[i].Through) lokiChunk := chunks[i].Data.(*chunkenc.Facade).LokiChunk() - newChunkItr, err := lokiChunk.Iterator(context.Background(), chunks[i].From.Time(), chunks[i].Through.Add(time.Minute).Time(), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{})) + newChunkItr, err := lokiChunk.Iterator(context.Background(), chunks[i].From.Time(), chunks[i].Through.Add(time.Minute).Time(), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}), iter.WithKeepNonIndexedLabels()) require.NoError(t, err) for _, interval := range expectedChunks[i] { for curr := interval.Start; curr <= interval.End; curr = curr.Add(time.Minute) { + // Test ready to pass/fail when we change the default chunk and head format. + var nonIndexedLabels []logproto.LabelAdapter + if chunkenc.DefaultChunkFormat == 4 && chunkenc.DefaultHeadBlockFmt == chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt { + nonIndexedLabels = logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", curr.String())) + } + require.True(t, newChunkItr.Next()) require.Equal(t, logproto.Entry{ - Timestamp: curr.Time(), - Line: curr.String(), + Timestamp: curr.Time(), + Line: curr.String(), + NonIndexedLabels: nonIndexedLabels, }, newChunkItr.Entry()) } }