Fix bugs in non-indexed-labels (#10142)

**What this PR does / why we need it**:

This PR fixes some bugs discovered while setting the default chunk
format yo _V4_ and the default head format to _unordered with
non-indexed labels_.

Note that this PR doesn't change the defaults, but fixes some bugs that
would prevent us from changing them in the future.

The bugs it fixes are:

- Index out-of-range panic when symbols' labels are empty and we do a
Lookup. **Test added**.
- When applying retention, new chunks wouldn't contain non-indexed
labels (caused by https://github.com/grafana/loki/pull/10090). **Test
added**.
- New chunks would fail to serialize non-indexed labels when applying
retention because the head format would be the [dummy one (ordered)][1]
[(used here via `c.headFmt`)][2]. **Cannot test unless we change the
default chunk/head format** but tested manually.

**Checklist**
- [ ] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [ ] Documentation added
- [ ] Tests updated
- [ ] `CHANGELOG.md` updated
- [ ] If the change is worth mentioning in the release notes, add
`add-to-release-notes` label
- [ ] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/setup/upgrade/_index.md`
- [ ] For Helm chart changes bump the Helm chart version in
`production/helm/loki/Chart.yaml` and update
`production/helm/loki/CHANGELOG.md` and
`production/helm/loki/README.md`. [Example
PR](d10549e3ec)

[1]:
457f2e6ec0/pkg/chunkenc/memchunk.go (L375)
[2]:
24fd566833/pkg/chunkenc/memchunk.go (L1088-L1095)
pull/10147/head
Salva Corts 2 years ago committed by GitHub
parent c795565ef7
commit 48660d5cd5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/chunkenc/dumb_chunk.go
  2. 4
      pkg/chunkenc/interface.go
  3. 47
      pkg/chunkenc/memchunk.go
  4. 58
      pkg/chunkenc/memchunk_test.go
  5. 2
      pkg/chunkenc/symbols.go
  6. 12
      pkg/chunkenc/symbols_test.go
  7. 28
      pkg/chunkenc/unordered.go
  8. 12
      pkg/iter/entry_iterator.go
  9. 2
      pkg/storage/lazy_chunk_test.go
  10. 14
      pkg/storage/stores/indexshipper/compactor/retention/retention_test.go

@ -72,7 +72,7 @@ func (c *dumbChunk) Encoding() Encoding { return EncNone }
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline, _ ...iter.EntryIteratorOption) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})

@ -129,7 +129,7 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error)
Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) (iter.EntryIterator, error)
SampleIterator(ctx context.Context, from, through time.Time, extractor log.StreamSampleExtractor) iter.SampleIterator
// Returns the list of blocks in the chunks.
Blocks(mintT, maxtT time.Time) []Block
@ -158,7 +158,7 @@ type Block interface {
// Entries is the amount of entries in the block.
Entries() int
// Iterator returns an entry iterator for the block.
Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator
Iterator(ctx context.Context, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator
// SampleIterator returns a sample iterator for the block.
SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator
}

@ -927,7 +927,7 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}
// Iterator implements Chunk.
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, direction logproto.Direction, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
blockItrs := make([]iter.EntryIterator, 0, len(c.blocks)+1)
@ -954,7 +954,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
}
lastMax = b.maxt
blockItrs = append(blockItrs, encBlock{c.encoding, c.format, c.symbolizer, b}.Iterator(ctx, pipeline))
blockItrs = append(blockItrs, encBlock{c.encoding, c.format, c.symbolizer, b}.Iterator(ctx, pipeline, options...))
}
if !c.head.IsEmpty() {
@ -962,7 +962,7 @@ func (c *MemChunk) Iterator(ctx context.Context, mintT, maxtT time.Time, directi
if from < lastMax {
ordered = false
}
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline)
headIterator = c.head.Iterator(ctx, direction, mint, maxt, pipeline, options...)
}
if direction == logproto.FORWARD {
@ -1077,21 +1077,29 @@ func (c *MemChunk) Blocks(mintT, maxtT time.Time) []Block {
// Rebound builds a smaller chunk with logs having timestamp from start and end(both inclusive)
func (c *MemChunk) Rebound(start, end time.Time, filter filter.Func) (Chunk, error) {
// add a millisecond to end time because the Chunk.Iterator considers end time to be non-inclusive.
itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
itr, err := c.Iterator(context.Background(), start, end.Add(time.Millisecond), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}), iter.WithKeepNonIndexedLabels())
if err != nil {
return nil, err
}
// If the head format is not explicitly set, use the default.
// This will be the most common case for chunks read from storage since
// they have a dummy head block.
headFmt := c.headFmt
if headFmt < OrderedHeadBlockFmt {
headFmt = DefaultHeadBlockFmt
}
var newChunk *MemChunk
// as close as possible, respect the block/target sizes specified. However,
// if the blockSize is not set, use reasonable defaults.
if c.blockSize > 0 {
newChunk = NewMemChunk(c.Encoding(), c.headFmt, c.blockSize, c.targetSize)
newChunk = NewMemChunk(c.Encoding(), headFmt, c.blockSize, c.targetSize)
} else {
// Using defaultBlockSize for target block size.
// The alternative here could be going over all the blocks and using the size of the largest block as target block size but I(Sandeep) feel that it is not worth the complexity.
// For target chunk size I am using compressed size of original chunk since the newChunk should anyways be lower in size than that.
newChunk = NewMemChunk(c.Encoding(), c.headFmt, defaultBlockSize, c.CompressedSize())
newChunk = NewMemChunk(c.Encoding(), headFmt, defaultBlockSize, c.CompressedSize())
}
for itr.Next() {
@ -1126,11 +1134,11 @@ type encBlock struct {
block
}
func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline) iter.EntryIterator {
func (b encBlock) Iterator(ctx context.Context, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator {
if len(b.b) == 0 {
return iter.NoopIterator
}
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer)
return newEntryIterator(ctx, getReaderPool(b.enc), b.b, pipeline, b.format, b.symbolizer, options...)
}
func (b encBlock) SampleIterator(ctx context.Context, extractor log.StreamSampleExtractor) iter.SampleIterator {
@ -1156,7 +1164,7 @@ func (b block) MaxTime() int64 {
return b.maxt
}
func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline) iter.EntryIterator {
func (hb *headBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, _ ...iter.EntryIteratorOption) iter.EntryIterator {
if hb.IsEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return iter.NoopIterator
}
@ -1559,16 +1567,23 @@ func (si *bufferedIterator) close() {
si.origBytes = nil
}
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer) iter.EntryIterator {
return &entryBufferedIterator{
func newEntryIterator(ctx context.Context, pool ReaderPool, b []byte, pipeline log.StreamPipeline, format byte, symbolizer *symbolizer, options ...iter.EntryIteratorOption) iter.EntryIterator {
entryIter := &entryBufferedIterator{
bufferedIterator: newBufferedIterator(ctx, pool, b, format, symbolizer),
pipeline: pipeline,
}
for _, opt := range options {
opt(&entryIter.iterOptions)
}
return entryIter
}
type entryBufferedIterator struct {
*bufferedIterator
pipeline log.StreamPipeline
pipeline log.StreamPipeline
iterOptions iter.EntryIteratorOptions
cur logproto.Entry
currLabels log.LabelsResult
@ -1593,8 +1608,12 @@ func (e *entryBufferedIterator) Next() bool {
e.currLabels = lbs
e.cur.Timestamp = time.Unix(0, e.currTs)
e.cur.Line = string(newLine)
// There is no need to send back the non-indexed labels, as they are already part of the labels results
// e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels)
// Most of the time, there is no need to send back the non-indexed labels, as they are already part of the labels results.
// Still it might be needed for example when appending entries from one chunk into another one.
if e.iterOptions.KeepNonIndexedLabels {
e.cur.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(e.currNonIndexedLabels)
}
return true
}
return false

@ -1817,3 +1817,61 @@ func TestMemChunk_IteratorWithNonIndexedLabels(t *testing.T) {
})
}
}
func TestMemChunk_IteratorOptions(t *testing.T) {
chk := newMemChunkWithFormat(chunkFormatV4, EncNone, UnorderedWithNonIndexedLabelsHeadBlockFmt, testBlockSize, testTargetSize)
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(0, "0", logproto.FromLabelsToLabelAdapters(
labels.FromStrings("a", "0"),
))))
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(1, "1", logproto.FromLabelsToLabelAdapters(
labels.FromStrings("a", "1"),
))))
require.NoError(t, chk.cut())
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(2, "2", logproto.FromLabelsToLabelAdapters(
labels.FromStrings("a", "2"),
))))
require.NoError(t, chk.Append(logprotoEntryWithNonIndexedLabels(3, "3", logproto.FromLabelsToLabelAdapters(
labels.FromStrings("a", "3"),
))))
for _, tc := range []struct {
name string
options []iter.EntryIteratorOption
expectNonIndexedLabels bool
}{
{
name: "No options",
expectNonIndexedLabels: false,
},
{
name: "WithKeepNonIndexedLabels",
options: []iter.EntryIteratorOption{
iter.WithKeepNonIndexedLabels(),
},
expectNonIndexedLabels: true,
},
} {
t.Run(tc.name, func(t *testing.T) {
it, err := chk.Iterator(context.Background(), time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, noopStreamPipeline, tc.options...)
require.NoError(t, err)
var idx int64
for it.Next() {
expectedLabels := labels.FromStrings("a", fmt.Sprintf("%d", idx))
expectedEntry := logproto.Entry{
Timestamp: time.Unix(0, idx),
Line: fmt.Sprintf("%d", idx),
}
if tc.expectNonIndexedLabels {
expectedEntry.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(expectedLabels)
}
require.Equal(t, expectedEntry, it.Entry())
require.Equal(t, expectedLabels.String(), it.Labels())
idx++
}
})
}
}

@ -109,7 +109,7 @@ func (s *symbolizer) lookup(idx uint32) string {
defer s.mtx.RUnlock()
}
if idx > uint32(len(s.labels)-1) {
if idx >= uint32(len(s.labels)) {
return ""
}

@ -134,6 +134,18 @@ func TestSymbolizer(t *testing.T) {
require.Equal(t, labels, s.Lookup(symbols))
}
// Test that Lookup returns empty labels if no symbols are provided.
if len(tc.labelsToAdd) == 0 {
ret := s.Lookup([]symbol{
{
Name: 0,
Value: 0,
},
})
require.Equal(t, "", ret[0].Name)
require.Equal(t, "", ret[0].Value)
}
require.Equal(t, tc.expectedNumLabels, len(s.labels))
require.Equal(t, tc.expectedCheckpointSize, s.CheckpointSize())
require.Equal(t, tc.expectedUncompressedSize, s.UncompressedSize())

@ -41,6 +41,7 @@ type HeadBlock interface {
mint,
maxt int64,
pipeline log.StreamPipeline,
options ...iter.EntryIteratorOption,
) iter.EntryIterator
SampleIterator(
ctx context.Context,
@ -243,13 +244,12 @@ func (hb *unorderedHeadBlock) forEntries(
return nil
}
func (hb *unorderedHeadBlock) Iterator(
ctx context.Context,
direction logproto.Direction,
mint,
maxt int64,
pipeline log.StreamPipeline,
) iter.EntryIterator {
func (hb *unorderedHeadBlock) Iterator(ctx context.Context, direction logproto.Direction, mint, maxt int64, pipeline log.StreamPipeline, options ...iter.EntryIteratorOption) iter.EntryIterator {
var iterOptions iter.EntryIteratorOptions
for _, option := range options {
option(&iterOptions)
}
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
@ -278,12 +278,18 @@ func (hb *unorderedHeadBlock) Iterator(
streams[labels] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
entry := logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: newLine,
// There is no need to send back the non-indexed labels, as they are already part of the labels results
// NonIndexedLabels: logproto.FromLabelsToLabelAdapters(hb.symbolizer.Lookup(nonIndexedLabelsSymbols)),
})
}
// Most of the time, there is no need to send back the non-indexed labels, as they are already part of the labels results.
// Still it might be needed for example when appending entries from one chunk into another one.
if iterOptions.KeepNonIndexedLabels {
entry.NonIndexedLabels = logproto.FromLabelsToLabelAdapters(hb.symbolizer.Lookup(nonIndexedLabelsSymbols))
}
stream.Entries = append(stream.Entries, entry)
return nil
},
)

@ -19,6 +19,18 @@ type EntryIterator interface {
Entry() logproto.Entry
}
type EntryIteratorOptions struct {
KeepNonIndexedLabels bool
}
type EntryIteratorOption func(*EntryIteratorOptions)
func WithKeepNonIndexedLabels() EntryIteratorOption {
return func(o *EntryIteratorOptions) {
o.KeepNonIndexedLabels = true
}
}
// streamIterator iterates over entries in a stream.
type streamIterator struct {
i int

@ -178,7 +178,7 @@ func (fakeBlock) Entries() int { return 0 }
func (fakeBlock) Offset() int { return 0 }
func (f fakeBlock) MinTime() int64 { return f.mint }
func (f fakeBlock) MaxTime() int64 { return f.maxt }
func (fakeBlock) Iterator(context.Context, log.StreamPipeline) iter.EntryIterator {
func (fakeBlock) Iterator(context.Context, log.StreamPipeline, ...iter.EntryIteratorOption) iter.EntryIterator {
return nil
}

@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
ingesterclient "github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/storage/chunk"
@ -516,15 +517,22 @@ func TestChunkRewriter(t *testing.T) {
require.Equal(t, expectedChunks[i][len(expectedChunks[i])-1].End, chunks[i].Through)
lokiChunk := chunks[i].Data.(*chunkenc.Facade).LokiChunk()
newChunkItr, err := lokiChunk.Iterator(context.Background(), chunks[i].From.Time(), chunks[i].Through.Add(time.Minute).Time(), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}))
newChunkItr, err := lokiChunk.Iterator(context.Background(), chunks[i].From.Time(), chunks[i].Through.Add(time.Minute).Time(), logproto.FORWARD, log.NewNoopPipeline().ForStream(labels.Labels{}), iter.WithKeepNonIndexedLabels())
require.NoError(t, err)
for _, interval := range expectedChunks[i] {
for curr := interval.Start; curr <= interval.End; curr = curr.Add(time.Minute) {
// Test ready to pass/fail when we change the default chunk and head format.
var nonIndexedLabels []logproto.LabelAdapter
if chunkenc.DefaultChunkFormat == 4 && chunkenc.DefaultHeadBlockFmt == chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt {
nonIndexedLabels = logproto.FromLabelsToLabelAdapters(labels.FromStrings("foo", curr.String()))
}
require.True(t, newChunkItr.Next())
require.Equal(t, logproto.Entry{
Timestamp: curr.Time(),
Line: curr.String(),
Timestamp: curr.Time(),
Line: curr.String(),
NonIndexedLabels: nonIndexedLabels,
}, newChunkItr.Entry())
}
}

Loading…
Cancel
Save