brief: adds page support for chunks within a series. This lets us do a
few optimizations, most notably skipping chunk ranges that don't overlap
with our query bounds. It also allows us to use aggregated stats for
pages when computing `Stats` calls that completely overlap all chunks in
a page.

teaser:
```
pkg: github.com/grafana/loki/pkg/storage/stores/tsdb/index
BenchmarkChunkStats/version_2/2_chunks-10         	10121240	       109.4 ns/op	      24 B/op	       1 allocs/op
BenchmarkChunkStats/version_3/2_chunks-10         	27591069	        43.51 ns/op	       0 B/op	       0 allocs/op
BenchmarkChunkStats/version_2/4_chunks-10         	 9410245	       128.7 ns/op	      24 B/op	       1 allocs/op
BenchmarkChunkStats/version_3/4_chunks-10         	21808070	        54.44 ns/op	       0 B/op	       0 allocs/op
BenchmarkChunkStats/version_2/10_chunks-10        	 6048235	       201.3 ns/op	      24 B/op	       1 allocs/op
BenchmarkChunkStats/version_3/10_chunks-10        	 9517605	       125.1 ns/op	       0 B/op	       0 allocs/op
BenchmarkChunkStats/version_2/100_chunks-10       	 1000000	      1081 ns/op	      24 B/op	       1 allocs/op
BenchmarkChunkStats/version_3/100_chunks-10       	 1669972	       715.4 ns/op	     528 B/op	       3 allocs/op
BenchmarkChunkStats/version_2/1000_chunks-10      	  118125	     10165 ns/op	      24 B/op	       1 allocs/op
BenchmarkChunkStats/version_3/1000_chunks-10      	  570576	      2125 ns/op	    4816 B/op	       6 allocs/op
BenchmarkChunkStats/version_2/10000_chunks-10     	   10000	    117447 ns/op	  123014 B/op	       3 allocs/op
BenchmarkChunkStats/version_3/10000_chunks-10     	   74524	     16225 ns/op	   48601 B/op	       9 allocs/op
BenchmarkChunkStats/version_2/100000_chunks-10    	     842	   1240380 ns/op	 3211679 B/op	      13 allocs/op
BenchmarkChunkStats/version_3/100000_chunks-10    	    8494	    141169 ns/op	  516530 B/op	      13 allocs/op

```

```
pkg: github.com/grafana/loki/pkg/storage/stores/tsdb/index
BenchmarkReadChunks/version_2/2_chunks-10         	13673050	        80.52 ns/op	     172 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/2_chunks-10         	25713412	        64.06 ns/op	     201 B/op	       0 allocs/op
BenchmarkReadChunks/version_2/4_chunks-10         	12302040	        96.56 ns/op	     373 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/4_chunks-10         	20512030	        84.93 ns/op	     346 B/op	       0 allocs/op
BenchmarkReadChunks/version_2/10_chunks-10        	 6974947	       168.7 ns/op	     490 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/10_chunks-10        	 7436360	       161.0 ns/op	     632 B/op	       0 allocs/op
BenchmarkReadChunks/version_2/50_chunks-10        	 2038724	       599.3 ns/op	    2034 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/50_chunks-10        	 2251822	       619.8 ns/op	    1926 B/op	       0 allocs/op
BenchmarkReadChunks/version_2/100_chunks-10       	  946430	      1092 ns/op	    4132 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/100_chunks-10       	 1467297	      1000 ns/op	    3308 B/op	       1 allocs/op
BenchmarkReadChunks/version_2/150_chunks-10       	  674575	      1721 ns/op	    5767 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/150_chunks-10       	 1000000	      1373 ns/op	    6171 B/op	       1 allocs/op
BenchmarkReadChunks/version_2/1000_chunks-10      	  109153	     10554 ns/op	   33063 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/1000_chunks-10      	  158086	      6963 ns/op	   32829 B/op	       1 allocs/op
BenchmarkReadChunks/version_2/10000_chunks-10     	    9043	    114121 ns/op	  345258 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/10000_chunks-10     	   22083	     79896 ns/op	  435493 B/op	       1 allocs/op
BenchmarkReadChunks/version_2/100000_chunks-10    	    1039	   1103621 ns/op	 3479873 B/op	       0 allocs/op
BenchmarkReadChunks/version_3/100000_chunks-10    	    2101	   1155419 ns/op	 4680266 B/op	       1 allocs/op
```

---------

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
pull/9047/head
Owen Diehl 2 years ago committed by GitHub
parent 3d017a4b25
commit dbbadafca2
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      pkg/logproto/extensions.go
  2. 4
      pkg/storage/stores/index/stats/stats.go
  3. 24
      pkg/storage/stores/tsdb/head_read.go
  4. 115
      pkg/storage/stores/tsdb/index/chunk.go
  5. 345
      pkg/storage/stores/tsdb/index/chunk_test.go
  6. 513
      pkg/storage/stores/tsdb/index/index.go
  7. 5
      pkg/storage/stores/tsdb/index/index_test.go
  8. 30
      pkg/storage/stores/tsdb/index/pool.go
  9. 2
      pkg/storage/stores/tsdb/index_client.go
  10. 4
      pkg/storage/stores/tsdb/index_client_test.go
  11. 3
      pkg/storage/stores/tsdb/querier.go
  12. 98
      pkg/storage/stores/tsdb/single_file_index.go
  13. 4
      pkg/util/encoding/encoding.go

@ -37,10 +37,10 @@ func (m *IndexStatsResponse) AddStream(_ model.Fingerprint) {
}
// Safe for concurrent use
func (m *IndexStatsResponse) AddChunk(_ model.Fingerprint, chk index.ChunkMeta) {
atomic.AddUint64(&m.Chunks, 1)
atomic.AddUint64(&m.Bytes, uint64(chk.KB<<10))
atomic.AddUint64(&m.Entries, uint64(chk.Entries))
func (m *IndexStatsResponse) AddChunkStats(s index.ChunkStats) {
atomic.AddUint64(&m.Chunks, s.Chunks)
atomic.AddUint64(&m.Bytes, s.KB<<10)
atomic.AddUint64(&m.Entries, s.Entries)
}
func (m *IndexStatsResponse) Stats() IndexStatsResponse {

@ -96,8 +96,8 @@ func (b *Blooms) AddStream(fp model.Fingerprint) {
})
}
func (b *Blooms) AddChunk(fp model.Fingerprint, chk index.ChunkMeta) {
b.stats.AddChunk(fp, chk)
func (b *Blooms) AddChunkStats(s index.ChunkStats) {
b.stats.AddChunkStats(s)
}
func (b *Blooms) add(filter *bloom.BloomFilter, key []byte, update func()) {

@ -146,6 +146,30 @@ func (h *headIndexReader) Series(ref storage.SeriesRef, from int64, through int6
return s.fp, nil
}
func (h *headIndexReader) ChunkStats(ref storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, index.ChunkStats, error) {
s := h.head.series.getByID(uint64(ref))
if s == nil {
h.head.metrics.seriesNotFound.Inc()
return 0, index.ChunkStats{}, storage.ErrNotFound
}
*lbls = append((*lbls)[:0], s.ls...)
queryBounds := newBounds(model.Time(from), model.Time(through))
var res index.ChunkStats
s.Lock()
for _, chk := range s.chks {
if !Overlap(chk, queryBounds) {
continue
}
res.AddChunk(&chk, from, through)
}
s.Unlock()
return s.fp, res, nil
}
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (h *headIndexReader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
memSeries := h.head.series.getByID(uint64(id))

@ -4,6 +4,9 @@ import (
"sort"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util/encoding"
"github.com/grafana/loki/pkg/util/math"
)
// Meta holds information about a chunk of data.
@ -154,3 +157,115 @@ func (c ChunkMetas) Drop(chk ChunkMeta) (ChunkMetas, bool) {
return c[:j+copy(c[j:], c[j+1:])], true
}
// Some of these fields can realistically be 32bit, but
// this gives us a lot of wiggle room and they're already
// encoded only for every n-th chunk based on `ChunkPageSize`
type chunkPageMarker struct {
// ChunksInPage denotes the number of chunks
// in the page
ChunksInPage int
// KB, Entries denote the KB and number of entries
// in each page
KB, Entries uint32
// byte offset where this chunk starts relative
// to the chunks in this series
Offset int
// bounds associated with this page
MinTime, MaxTime int64
// internal field to test if MinTime has been set since the zero
// value is valid and I don't want to use a pointer. This is only
// used during encoding.
minTimeSet bool
}
func (m *chunkPageMarker) subsetOf(from, through int64) bool {
return from <= m.MinTime && through >= m.MaxTime
}
func (m *chunkPageMarker) combine(c ChunkMeta) {
m.KB += c.KB
m.Entries += c.Entries
if !m.minTimeSet || c.MinTime < m.MinTime {
m.MinTime = c.MinTime
m.minTimeSet = true
}
if c.MaxTime > m.MaxTime {
m.MaxTime = c.MaxTime
}
}
func (m *chunkPageMarker) encode(e *encoding.Encbuf, offset int, chunksRemaining int) {
// put chunks, kb, entries, offset, mintime, maxtime
e.PutUvarint(chunksRemaining)
e.PutBE32(m.KB)
e.PutBE32(m.Entries)
e.PutUvarint(offset)
e.PutVarint64(m.MinTime)
e.PutVarint64(m.MaxTime - m.MinTime) // delta-encoded
}
func (m *chunkPageMarker) clear() {
*m = chunkPageMarker{}
}
func (m *chunkPageMarker) decode(d *encoding.Decbuf) {
m.ChunksInPage = d.Uvarint()
m.KB = d.Be32()
m.Entries = d.Be32()
m.Offset = d.Uvarint()
m.MinTime = d.Varint64()
m.MaxTime = m.MinTime + d.Varint64()
}
// Chunks per page. This can be inferred in the data format
// via the ChunksRemaining field
// and can thus be changed without needing a
// new tsdb version
const ChunkPageSize = 16
// Minimum number of chunks present to use page based lookup
// instead of linear scan which performs better at lower n-values.
const DefaultMaxChunksToBypassMarkerLookup = 64
type chunkPageMarkers []chunkPageMarker
type ChunkStats struct {
Chunks, KB, Entries uint64
}
func (cs *ChunkStats) addRaw(chunks int, kb, entries uint32) {
cs.Chunks += uint64(chunks)
cs.KB += uint64(kb)
cs.Entries += uint64(entries)
}
func (cs *ChunkStats) AddChunk(chk *ChunkMeta, from, through int64) {
// Assuming entries and bytes are evenly distributed in the chunk,
// We will take the proportional number of entries and number of bytes
// if (chk.MinTime < from) and/or (chk.MaxTime > through).
//
// MinTime From Through MaxTime
// ┌────────┬─────────────────┬────────┐
// │ * Chunk * │
// └────────┴─────────────────┴────────┘
// ▲ A | C | B ▲
// └───────────────────────────────────┘
// T = MinTime - MaxTime
//
// We want to get the percentage of time that fits into C
// to use it as a factor to get the amount of bytes and entries
// factor = C = (T - (A + B)) / T = (chunkTime - (leadingTime + trailingTime)) / chunkTime
chunkTime := chk.MaxTime - chk.MinTime
leadingTime := math.Max64(0, from-chk.MinTime)
trailingTime := math.Max64(0, chk.MaxTime-through)
factor := float32(chunkTime-(leadingTime+trailingTime)) / float32(chunkTime)
kb := uint32(float32(chk.KB) * factor)
entries := uint32(float32(chk.Entries) * factor)
cs.addRaw(1, kb, entries)
}

@ -2,9 +2,13 @@ package index
import (
"fmt"
"math"
"testing"
tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/util/encoding"
)
// Test all sort variants
@ -367,3 +371,344 @@ func TestChunkMetas_Drop(t *testing.T) {
})
}
}
// TestChunkPageMarkerEncodeDecode tests that the chunk page marker can be encoded and decoded.
func TestChunkPageMarkerEncodeDecode(t *testing.T) {
// Create a chunk page marker.
marker := chunkPageMarker{
ChunksInPage: 1,
KB: 2,
Entries: 3,
Offset: 4,
MinTime: 5,
MaxTime: 6,
}
// Encode the chunk page marker.
encbuf := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
marker.encode(&encbuf, 4, 1)
bs := RealByteSlice(encbuf.Get())
// decode
d := encoding.DecWrap(tsdb_enc.NewDecbufRaw(bs, bs.Len()))
var decMarker chunkPageMarker
decMarker.decode(&d)
// Verify the decoded chunk page marker.
require.Equal(t, marker, decMarker)
}
func mkChks(n int) (chks []ChunkMeta) {
for i := 0; i < n; i++ {
chks = append(chks, chkFrom(i))
}
return chks
}
func chkFrom(i int) ChunkMeta {
return ChunkMeta{
Checksum: uint32(i),
MinTime: int64(i),
MaxTime: int64(i + 1),
KB: uint32(i),
Entries: uint32(i),
}
}
func TestChunkEncodingRoundTrip(t *testing.T) {
for _, version := range []int{
FormatV2,
FormatV3,
} {
for _, nChks := range []int{
0,
8,
} {
for _, pageSize := range []int{
4,
5,
8,
10,
ChunkPageSize,
} {
t.Run(fmt.Sprintf("version %d nChks %d pageSize %d", version, nChks, pageSize), func(t *testing.T) {
chks := mkChks(nChks)
var w Writer
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
w.addChunks(chks, &primary, &scratch, pageSize)
decbuf := encoding.DecWrap(tsdb_enc.Decbuf{B: primary.Get()})
dec := newDecoder(nil, 0)
dst := []ChunkMeta{}
require.Nil(t, dec.readChunks(version, &decbuf, 0, 0, math.MaxInt64, &dst))
if len(chks) == 0 {
require.Equal(t, 0, len(dst))
} else {
require.Equal(t, chks, dst)
}
})
}
}
}
}
func TestSearchWithPageMarkers(t *testing.T) {
for _, pageSize := range []int{
2,
10,
} {
for _, tc := range []struct {
desc string
chks, exp []ChunkMeta
mint, maxt int64
}{
{
desc: "half time range",
chks: []ChunkMeta{
{MinTime: 0, MaxTime: 1},
{MinTime: 1, MaxTime: 2},
{MinTime: 2, MaxTime: 3},
{MinTime: 3, MaxTime: 4},
{MinTime: 4, MaxTime: 5},
},
mint: 2,
maxt: 4,
exp: []ChunkMeta{
{MinTime: 1, MaxTime: 2},
{MinTime: 2, MaxTime: 3},
{MinTime: 3, MaxTime: 4},
},
},
{
desc: "no chunks in time range",
chks: []ChunkMeta{
{MinTime: 0, MaxTime: 1},
{MinTime: 1, MaxTime: 2},
{MinTime: 2, MaxTime: 3},
{MinTime: 3, MaxTime: 4},
{MinTime: 4, MaxTime: 5},
},
mint: 6,
maxt: 7,
exp: []ChunkMeta{},
},
{
desc: "all chunks within time range",
chks: []ChunkMeta{
{MinTime: 1, MaxTime: 2},
{MinTime: 2, MaxTime: 3},
{MinTime: 3, MaxTime: 4},
},
mint: 1,
maxt: 4,
exp: []ChunkMeta{
{MinTime: 1, MaxTime: 2},
{MinTime: 2, MaxTime: 3},
{MinTime: 3, MaxTime: 4},
},
},
{
desc: "semi ordered chunks",
chks: []ChunkMeta{
{MinTime: 5, MaxTime: 50},
{MinTime: 10, MaxTime: 20},
{MinTime: 11, MaxTime: 20},
{MinTime: 12, MaxTime: 20},
{MinTime: 13, MaxTime: 20},
{MinTime: 15, MaxTime: 30},
},
mint: 25,
maxt: 30,
exp: []ChunkMeta{
{MinTime: 5, MaxTime: 50},
{MinTime: 15, MaxTime: 30},
},
},
} {
t.Run(fmt.Sprintf("%s-pagesize-%d", tc.desc, pageSize), func(t *testing.T) {
var w Writer
w.Version = FormatV3
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
w.addChunks(tc.chks, &primary, &scratch, pageSize)
decbuf := encoding.DecWrap(tsdb_enc.Decbuf{B: primary.Get()})
dec := newDecoder(nil, 0)
dst := []ChunkMeta{}
require.Nil(t, dec.readChunksV3(&decbuf, tc.mint, tc.maxt, &dst))
require.Equal(t, tc.exp, dst)
})
}
}
}
func TestDecoderChunkStats(t *testing.T) {
for _, pageSize := range []int{2, 10} {
for _, version := range []int{
FormatV2,
FormatV3,
} {
for _, tc := range []struct {
desc string
chks []ChunkMeta
from, through int64
exp ChunkStats
}{
{
desc: "full range",
chks: []ChunkMeta{
{MinTime: 0, MaxTime: 1, KB: 1, Entries: 1},
{MinTime: 1, MaxTime: 2, KB: 1, Entries: 1},
{MinTime: 2, MaxTime: 3, KB: 1, Entries: 1},
{MinTime: 3, MaxTime: 4, KB: 1, Entries: 1},
{MinTime: 4, MaxTime: 5, KB: 1, Entries: 1},
},
from: 0,
through: 5,
exp: ChunkStats{
Chunks: 5,
KB: 5,
Entries: 5,
},
},
{
desc: "overlapping",
chks: []ChunkMeta{
{MinTime: 0, MaxTime: 1, KB: 1, Entries: 1},
{MinTime: 1, MaxTime: 4, KB: 1, Entries: 1},
{MinTime: 2, MaxTime: 3, KB: 1, Entries: 1},
{MinTime: 3, MaxTime: 5, KB: 1, Entries: 1},
{MinTime: 4, MaxTime: 5, KB: 1, Entries: 1},
},
from: 0,
through: 5,
exp: ChunkStats{
Chunks: 5,
KB: 5,
Entries: 5,
},
},
{
desc: "middle",
chks: []ChunkMeta{
{MinTime: 0, MaxTime: 1, KB: 1, Entries: 1},
{MinTime: 1, MaxTime: 2, KB: 1, Entries: 1},
{MinTime: 2, MaxTime: 3, KB: 1, Entries: 1},
{MinTime: 3, MaxTime: 4, KB: 1, Entries: 1},
{MinTime: 5, MaxTime: 6, KB: 1, Entries: 1},
},
from: 2,
through: 4,
exp: ChunkStats{
// technically the 2nd chunk overlaps, but we don't add
// any of it's stats as its only 1 nanosecond in the range
// and thus gets integer-divisioned to 0
Chunks: 3,
KB: 2,
Entries: 2,
},
},
{
desc: "middle with complete overlaps",
chks: []ChunkMeta{
// for example with pageSize=2
{MinTime: 0, MaxTime: 1, KB: 1, Entries: 1},
{MinTime: 1, MaxTime: 2, KB: 1, Entries: 1},
{MinTime: 2, MaxTime: 3, KB: 1, Entries: 1},
{MinTime: 3, MaxTime: 4, KB: 1, Entries: 1}, // partial overlap
{MinTime: 4, MaxTime: 5, KB: 1, Entries: 1}, // full overlap
{MinTime: 5, MaxTime: 6, KB: 1, Entries: 1},
{MinTime: 6, MaxTime: 7, KB: 1, Entries: 1}, // full overlap
{MinTime: 7, MaxTime: 8, KB: 1, Entries: 1},
{MinTime: 8, MaxTime: 9, KB: 1, Entries: 1}, // partial overlap
{MinTime: 9, MaxTime: 10, KB: 1, Entries: 1},
},
from: 4,
through: 9,
exp: ChunkStats{
// same deal with previous case, 1ns overlap isn't enough
// to include its data
Chunks: 6,
KB: 5,
Entries: 5,
},
},
} {
t.Run(fmt.Sprintf("%s_version=%d_pageSize=%d", tc.desc, version, pageSize), func(t *testing.T) {
var w Writer
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
w.addChunks(tc.chks, &primary, &scratch, pageSize)
decbuf := encoding.DecWrap(tsdb_enc.Decbuf{B: primary.Get()})
dec := newDecoder(nil, 0)
stats, err := dec.readChunkStats(version, &decbuf, 1, tc.from, tc.through)
require.Nil(t, err)
require.Equal(t, tc.exp, stats)
})
}
}
}
}
func BenchmarkChunkStats(b *testing.B) {
for _, nChks := range []int{2, 4, 10, 100, 1000, 10000, 100000} {
chks := mkChks(nChks)
// Only request the middle 20% of chunks.
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
w.addChunks(chks, &primary, &scratch, ChunkPageSize)
dec := newDecoder(nil, DefaultMaxChunksToBypassMarkerLookup)
for i := 0; i < b.N; i++ {
decbuf := encoding.DecWrap(tsdb_enc.Decbuf{B: primary.Get()})
_, _ = dec.readChunkStats(version, &decbuf, 1, from, through)
}
})
}
}
}
func BenchmarkReadChunks(b *testing.B) {
for _, nChks := range []int{2, 4, 10, 50, 100, 150, 1000, 10000, 100000} {
chks := mkChks(nChks)
res := ChunkMetasPool.Get()
// Only request the middle 20% of chunks.
from, through := int64(nChks*40/100), int64(nChks*60/100)
for _, version := range []int{FormatV2, FormatV3} {
b.Run(fmt.Sprintf("version %d/%d chunks", version, nChks), func(b *testing.B) {
var w Writer
w.Version = version
primary := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
scratch := encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0)})
w.addChunks(chks, &primary, &scratch, ChunkPageSize)
dec := newDecoder(nil, DefaultMaxChunksToBypassMarkerLookup)
for i := 0; i < b.N; i++ {
decbuf := encoding.DecWrap(tsdb_enc.Decbuf{B: primary.Get()})
_ = dec.readChunks(version, &decbuf, 1, from, through, &res)
}
})
}
}
}

@ -51,6 +51,9 @@ const (
FormatV1 = 1
// FormatV2 represents 2 version of index.
FormatV2 = 2
// FormatV3 represents 3 version of index. It adds support for
// paging through batches of chunks within a series
FormatV3 = 3
IndexFilename = "index"
@ -58,6 +61,9 @@ const (
fingerprintInterval = 1 << 10
millisecondsInHour = int64(time.Hour / time.Millisecond)
// The format that will be written by this process
LiveFormat = FormatV2
)
type indexWriterStage uint8
@ -209,8 +215,7 @@ func NewTOCFromByteSlice(bs ByteSlice) (*TOC, error) {
}, nil
}
// NewWriter returns a new Writer to the given filename. It serializes data in format version 2.
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
func NewWriterWithVersion(ctx context.Context, version int, fn string) (*Writer, error) {
dir := filepath.Dir(fn)
df, err := fileutil.OpenDir(dir)
@ -243,11 +248,12 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
}
iw := &Writer{
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
stage: idxStageNone,
Version: version,
ctx: ctx,
f: f,
fP: fP,
fPO: fPO,
stage: idxStageNone,
// Reusable memory.
buf1: encoding.EncWrap(tsdb_enc.Encbuf{B: make([]byte, 0, 1<<22)}),
@ -263,6 +269,11 @@ func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return iw, nil
}
// NewWriter returns a new Writer to the given filename. It serializes data according to the `LiveFormat` version
func NewWriter(ctx context.Context, fn string) (*Writer, error) {
return NewWriterWithVersion(ctx, LiveFormat, fn)
}
func (w *Writer) write(bufs ...[]byte) error {
return w.f.Write(bufs...)
}
@ -435,7 +446,7 @@ func (w *Writer) ensureStage(s indexWriterStage) error {
func (w *Writer) writeMeta() error {
w.buf1.Reset()
w.buf1.PutBE32(MagicIndex)
w.buf1.PutByte(FormatV2)
w.buf1.PutByte(byte(w.Version))
return w.write(w.buf1.Get())
}
@ -507,54 +518,135 @@ func (w *Writer) AddSeries(ref storage.SeriesRef, lset labels.Labels, fp model.F
w.buf2.PutUvarint32(valueIndex)
}
w.buf2.PutUvarint(len(chunks))
w.addChunks(chunks, &w.buf2, &w.buf1, ChunkPageSize)
w.buf1.Reset()
w.buf1.PutUvarint(w.buf2.Len())
w.buf2.PutHash(w.crc32)
w.lastSeries = append(w.lastSeries[:0], lset...)
w.lastSeriesHash = labelHash
w.lastRef = ref
if ref%fingerprintInterval == 0 {
// series references are the 16-byte aligned offsets
// Do NOT ask me how long I debugged this particular bit >:O
sRef := w.f.pos / 16
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash})
}
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
return errors.Wrap(err, "write series data")
}
return nil
}
func (w *Writer) addChunks(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, pageSize int) {
if w.Version > FormatV2 {
w.addChunksV3(chunks, primary, scratch, pageSize)
return
}
w.addChunksPriorV3(chunks, primary, scratch)
}
func (w *Writer) addChunksPriorV3(chunks []ChunkMeta, primary, _ *encoding.Encbuf) {
primary.PutUvarint(len(chunks))
if len(chunks) > 0 {
c := chunks[0]
w.toc.Metadata.EnsureBounds(c.MinTime, c.MaxTime)
w.buf2.PutVarint64(c.MinTime)
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
w.buf2.PutUvarint32(c.KB)
w.buf2.PutUvarint32(c.Entries)
w.buf2.PutBE32(c.Checksum)
primary.PutVarint64(c.MinTime)
primary.PutUvarint64(uint64(c.MaxTime - c.MinTime))
primary.PutUvarint32(c.KB)
primary.PutUvarint32(c.Entries)
primary.PutBE32(c.Checksum)
t0 := c.MaxTime
for _, c := range chunks[1:] {
w.toc.Metadata.EnsureBounds(c.MinTime, c.MaxTime)
// Encode the diff against previous chunk as varint
// instead of uvarint because chunks may overlap
w.buf2.PutVarint64(c.MinTime - t0)
w.buf2.PutUvarint64(uint64(c.MaxTime - c.MinTime))
w.buf2.PutUvarint32(c.KB)
w.buf2.PutUvarint32(c.Entries)
primary.PutVarint64(c.MinTime - t0)
primary.PutUvarint64(uint64(c.MaxTime - c.MinTime))
primary.PutUvarint32(c.KB)
primary.PutUvarint32(c.Entries)
t0 = c.MaxTime
w.buf2.PutBE32(c.Checksum)
primary.PutBE32(c.Checksum)
}
}
}
w.buf1.Reset()
w.buf1.PutUvarint(w.buf2.Len())
func (w *Writer) addChunksV3(chunks []ChunkMeta, primary, scratch *encoding.Encbuf, chunkPageSize int) {
scratch.Reset()
w.buf2.PutHash(w.crc32)
primary.PutUvarint(len(chunks))
// placeholder for how long the markers section is so it can be skipped when there are few chunks present
markersLnOffset := primary.Len()
primary.PutBE32(0)
w.lastSeries = append(w.lastSeries[:0], lset...)
w.lastSeriesHash = labelHash
w.lastRef = ref
markersStart := primary.Len()
if ref%fingerprintInterval == 0 {
// series references are the 16-byte aligned offsets
// Do NOT ask me how long I debugged this particular bit >:O
sRef := w.f.pos / 16
w.fingerprintOffsets = append(w.fingerprintOffsets, [2]uint64{sRef, labelHash})
nMarkers := len(chunks) / chunkPageSize
if len(chunks)%chunkPageSize != 0 {
nMarkers++
}
primary.PutUvarint(nMarkers)
chunksStart := scratch.Len()
markerOffset := 0 // start of the current marker page
if len(chunks) > 0 {
var t0 int64
var pageMarker chunkPageMarker
for i, c := range chunks {
pageMarker.combine(c)
w.toc.Metadata.EnsureBounds(c.MinTime, c.MaxTime)
// Encode the diff against previous chunk as varint
// instead of uvarint because chunks may overlap
scratch.PutVarint64(c.MinTime - t0)
scratch.PutUvarint64(uint64(c.MaxTime - c.MinTime))
scratch.PutUvarint32(c.KB)
scratch.PutUvarint32(c.Entries)
t0 = c.MaxTime
scratch.PutBE32(c.Checksum)
// test if this is the last chunk in the page
if i%chunkPageSize == chunkPageSize-1 {
pageMarker.encode(primary, markerOffset, chunkPageSize)
pageMarker.clear()
markerOffset = scratch.Len() - chunksStart
}
}
if rem := len(chunks) % chunkPageSize; rem != 0 {
// write partial page at the end
pageMarker.encode(primary, markerOffset, rem)
}
// now that we're done, we have two buffers:
// 1. scratch: the actual chunk data
// 2. primary: the chunk page markers
// so it's time to combine them
// first, write the length of the markers section
markersLn := primary.Len() - markersStart
diff := markersLnOffset - primary.Len()
primary.Skip(diff)
primary.PutBE32(uint32(markersLn))
// -4 for the length of the u32 field we just wrote
primary.Skip(-diff - 4)
if err := w.write(w.buf1.Get(), w.buf2.Get()); err != nil {
return errors.Wrap(err, "write series data")
}
return nil
primary.PutBytes(scratch.Get())
}
func (w *Writer) startSymbols() error {
@ -616,7 +708,7 @@ func (w *Writer) finishSymbols() error {
}
// Load in the symbol table efficiently for the rest of the index writing.
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), FormatV2, int(w.toc.Symbols))
w.symbols, err = NewSymbols(RealByteSlice(w.symbolFile.Bytes()), w.Version, int(w.toc.Symbols))
if err != nil {
return errors.Wrap(err, "read symbols")
}
@ -1240,7 +1332,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
}
r.version = int(r.b.Range(4, 5)[0])
if r.version != FormatV1 && r.version != FormatV2 {
if r.version != FormatV1 && r.version != FormatV2 && r.version != FormatV3 {
return nil, errors.Errorf("unknown index file version %d", r.version)
}
@ -1332,10 +1424,7 @@ func newReader(b ByteSlice, c io.Closer) (*Reader, error) {
return nil, errors.Wrap(err, "loading fingerprint offsets")
}
r.dec = &Decoder{
LookupSymbol: r.lookupSymbol,
chunksSample: map[storage.SeriesRef]*chunkSamples{},
}
r.dec = newDecoder(r.lookupSymbol, DefaultMaxChunksToBypassMarkerLookup)
return r, nil
}
@ -1420,7 +1509,7 @@ func (s Symbols) Lookup(o uint32) (string, error) {
B: s.bs.Range(0, s.bs.Len()),
})
if s.version == FormatV2 {
if s.version >= FormatV2 {
if int(o) >= s.seen {
return "", errors.Errorf("unknown symbol offset %d", o)
}
@ -1476,7 +1565,7 @@ func (s Symbols) ReverseLookup(sym string) (uint32, error) {
if lastSymbol != sym {
return 0, errors.Errorf("unknown symbol %q", sym)
}
if s.version == FormatV2 {
if s.version >= FormatV2 {
return uint32(res), nil
}
return uint32(s.bs.Len() - lastLen), nil
@ -1666,9 +1755,9 @@ func (r *Reader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) {
offsetsMap := make(map[uint32]struct{})
for _, id := range ids {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if r.version >= FormatV2 {
offset = id * 16
}
@ -1705,9 +1794,9 @@ func (r *Reader) LabelNamesFor(ids ...storage.SeriesRef) ([]string, error) {
// LabelValueFor returns label value for the given label name in the series referred to by ID.
func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if r.version >= FormatV2 {
offset = id * 16
}
d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable))
@ -1731,9 +1820,9 @@ func (r *Reader) LabelValueFor(id storage.SeriesRef, label string) (string, erro
// Series reads the series with the given ID and writes its labels and chunks into lbls and chks.
func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
offset := id
// In version 2 series IDs are no longer exact references but series are 16-byte padded
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version == FormatV2 {
if r.version >= FormatV2 {
offset = id * 16
}
d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable))
@ -1741,13 +1830,28 @@ func (r *Reader) Series(id storage.SeriesRef, from int64, through int64, lbls *l
return 0, d.Err()
}
fprint, err := r.dec.Series(d.Get(), id, from, through, lbls, chks)
fprint, err := r.dec.Series(r.version, d.Get(), id, from, through, lbls, chks)
if err != nil {
return 0, errors.Wrap(err, "read series")
}
return fprint, nil
}
func (r *Reader) ChunkStats(id storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
offset := id
// In version 2+ series IDs are no longer exact references but series are 16-byte padded
// and the ID is the multiple of 16 of the actual position.
if r.version >= FormatV2 {
offset = id * 16
}
d := encoding.DecWrap(tsdb_enc.NewDecbufUvarintAt(r.b, int(offset), castagnoliTable))
if d.Err() != nil {
return 0, ChunkStats{}, d.Err()
}
return r.dec.ChunkStats(r.version, d.Get(), id, from, through, lbls)
}
func (r *Reader) Postings(name string, shard *ShardAnnotation, values ...string) (Postings, error) {
if r.version == FormatV1 {
e, ok := r.postingsV1[name]
@ -1941,14 +2045,25 @@ func (c *chunkSamples) getChunkSampleForQueryStarting(ts int64) *chunkSample {
return &c.chunks[i]
}
// Decoder provides decoding methods for the v1 and v2 index file format.
//
// Decoder provides decoding methods
// It currently does not contain decoding methods for all entry types but can be extended
// by them if there's demand.
type Decoder struct {
LookupSymbol func(uint32) (string, error)
chunksSample map[storage.SeriesRef]*chunkSamples
chunksSampleMtx sync.RWMutex
LookupSymbol func(uint32) (string, error)
chunksSample map[storage.SeriesRef]*chunkSamples // used prior to v3
maxChunksToBypassMarkerLookup int
chunksSampleMtx sync.RWMutex // used prior to v3
}
func newDecoder(
lookupSymbol func(uint32) (string, error),
maxChunksToBypassMarkerLookup int,
) *Decoder {
return &Decoder{
LookupSymbol: lookupSymbol,
maxChunksToBypassMarkerLookup: maxChunksToBypassMarkerLookup,
chunksSample: map[storage.SeriesRef]*chunkSamples{},
}
}
// Postings returns a postings list for b and its number of elements.
@ -2088,10 +2203,11 @@ func buildChunkSamples(d encoding.Decbuf, numChunks int, info *chunkSamples) err
return d.Err()
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
func (dec *Decoder) prepSeries(b []byte, lbls *labels.Labels, chks *[]ChunkMeta) (*encoding.Decbuf, uint64, error) {
*lbls = (*lbls)[:0]
*chks = (*chks)[:0]
if chks != nil {
*chks = (*chks)[:0]
}
d := encoding.DecWrap(tsdb_enc.Decbuf{B: b})
@ -2103,42 +2219,277 @@ func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, th
lvo := uint32(d.Uvarint())
if d.Err() != nil {
return 0, errors.Wrap(d.Err(), "read series label offsets")
return nil, 0, errors.Wrap(d.Err(), "read series label offsets")
}
ln, err := dec.LookupSymbol(lno)
if err != nil {
return 0, errors.Wrap(err, "lookup label name")
return nil, 0, errors.Wrap(err, "lookup label name")
}
lv, err := dec.LookupSymbol(lvo)
if err != nil {
return 0, errors.Wrap(err, "lookup label value")
return nil, 0, errors.Wrap(err, "lookup label value")
}
*lbls = append(*lbls, labels.Label{Name: ln, Value: lv})
}
return &d, fprint, nil
}
func (dec *Decoder) ChunkStats(version int, b []byte, seriesRef storage.SeriesRef, from, through int64, lbls *labels.Labels) (uint64, ChunkStats, error) {
d, fp, err := dec.prepSeries(b, lbls, nil)
if err != nil {
return 0, ChunkStats{}, err
}
stats, err := dec.readChunkStats(version, d, seriesRef, from, through)
return fp, stats, err
}
func (dec *Decoder) readChunkStats(version int, d *encoding.Decbuf, seriesRef storage.SeriesRef, from, through int64) (ChunkStats, error) {
if version > FormatV2 {
return dec.readChunkStatsV3(d, from, through)
}
return dec.readChunkStatsPriorV3(d, seriesRef, from, through)
}
func (dec *Decoder) readChunkStatsV3(d *encoding.Decbuf, from, through int64) (res ChunkStats, err error) {
nChunks := d.Uvarint()
markersLn := int(d.Be32()) // markersLn
startMarkers := d.Len()
if nChunks < dec.maxChunksToBypassMarkerLookup {
d.Skip(markersLn)
return dec.accumulateChunkStats(d, nChunks, from, through)
}
nMarkers := d.Uvarint()
relevantPages := chunkPageMarkersPool.Get(nMarkers)
defer chunkPageMarkersPool.Put(relevantPages)
for i := 0; i < nMarkers; i++ {
var marker chunkPageMarker
marker.decode(d)
if overlap(from, through, marker.MinTime, marker.MaxTime) {
relevantPages = append(relevantPages, marker)
} else if marker.MinTime >= through {
break
}
}
if d.Err() != nil {
return res, errors.Wrap(d.Err(), "read chunk markers")
}
// guaranteed to have no matching chunks
if len(relevantPages) == 0 {
return ChunkStats{}, nil
}
// consume rest of markers, if any
d.Skip(markersLn - (startMarkers - d.Len()))
// length of buffer at beginning of chunks,
// later used to incrementally skip pages
initialLn := d.Len()
for markerIdx := 0; markerIdx < len(relevantPages); markerIdx++ {
curMarker := relevantPages[markerIdx]
if curMarker.subsetOf(from, through) {
// use aggregated stats for this page
res.addRaw(curMarker.ChunksInPage, curMarker.KB, curMarker.Entries)
continue
}
// skip to the offset of the page, adjusting for where we currently
// are, since offsets are relative to the start
d.Skip(curMarker.Offset - (initialLn - d.Len()))
// page partially overlaps -- need to check chunks individually
var prevMaxT int64
for i := 0; i < curMarker.ChunksInPage; i++ {
chunkMeta := &ChunkMeta{}
var err error
if i == 0 {
// need to force the min-time because chunks are indexed
// with delta-encoded min-times relative to the prior chunk,
// but this doesn't reset at page boundaries
// (maybe it should for more ergonomic programming).
// instead, we can just force the min-time to the page's min-time
err = readChunkMetaWithForcedMintime(d, curMarker.MinTime, chunkMeta, true)
} else {
err = readChunkMeta(d, prevMaxT, chunkMeta)
}
if err != nil {
return res, errors.Wrap(d.Err(), "read meta for chunk")
}
prevMaxT = chunkMeta.MaxTime
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
// add to stats
res.AddChunk(chunkMeta, from, through)
} else if chunkMeta.MinTime >= through {
break
}
}
}
return res, d.Err()
}
func (dec *Decoder) accumulateChunkStats(d *encoding.Decbuf, nChunks int, from, through int64) (res ChunkStats, err error) {
var prevMaxT int64
chunkMeta := &ChunkMeta{}
for i := 0; i < nChunks; i++ {
if err := readChunkMeta(d, prevMaxT, chunkMeta); err != nil {
return res, errors.Wrap(d.Err(), "read meta for chunk")
}
prevMaxT = chunkMeta.MaxTime
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
// add to stats
res.AddChunk(chunkMeta, from, through)
} else if chunkMeta.MinTime >= through {
break
}
}
return res, d.Err()
}
func (dec *Decoder) readChunkStatsPriorV3(d *encoding.Decbuf, seriesRef storage.SeriesRef, from, through int64) (res ChunkStats, err error) {
// prior to v3, chunks needed iteration for stats aggregation
chks := ChunkMetasPool.Get()
defer ChunkMetasPool.Put(chks)
err = dec.readChunks(FormatV2, d, seriesRef, from, through, &chks)
if err != nil {
return ChunkStats{}, err
}
for _, chk := range chks {
if overlap(from, through, chk.MinTime, chk.MaxTime) {
res.AddChunk(&chk, from, through)
} else if chk.MinTime >= through {
break
}
}
return res, nil
}
// Series decodes a series entry from the given byte slice into lset and chks.
func (dec *Decoder) Series(version int, b []byte, seriesRef storage.SeriesRef, from int64, through int64, lbls *labels.Labels, chks *[]ChunkMeta) (uint64, error) {
d, fprint, err := dec.prepSeries(b, lbls, chks)
if err != nil {
return 0, err
}
// read chunks based on fmt
if err := dec.readChunks(version, d, seriesRef, from, through, chks); err != nil {
return 0, err
}
return fprint, nil
}
func (dec *Decoder) readChunks(version int, d *encoding.Decbuf, seriesRef storage.SeriesRef, from int64, through int64, chks *[]ChunkMeta) error {
// read chunks based on fmt
if version > FormatV2 {
return dec.readChunksV3(d, from, through, chks)
}
return dec.readChunksPriorV3(d, seriesRef, from, through, chks)
}
func (dec *Decoder) readChunksV3(d *encoding.Decbuf, from int64, through int64, chks *[]ChunkMeta) error {
nChunks := d.Uvarint()
chunksRemaining := nChunks
markersLn := int(d.Be32()) // markersLn
// variables must be declared before goto, allowing us to skip
// using chunk pages when the chunk count is small
var (
nMarkers int
marker chunkPageMarker
markers chunkPageMarkers
prevMaxT int64
)
startMarkers := d.Len()
if nChunks < dec.maxChunksToBypassMarkerLookup {
d.Skip(markersLn)
goto iterate
}
nMarkers = d.Uvarint()
markers = chunkPageMarkersPool.Get(nMarkers)
markers = markers[:nMarkers]
defer chunkPageMarkersPool.Put(markers)
for i := range markers {
markers[i].decode(d)
if overlap(from, through, markers[i].MinTime, markers[i].MaxTime) {
d.Skip(markersLn - (startMarkers - d.Len())) // skip the rest of markers
marker = markers[i]
d.Skip(marker.Offset) // skip to the desired chunks
goto iterate
}
chunksRemaining -= marker.ChunksInPage
}
if d.Err() != nil {
return errors.Wrap(d.Err(), "read chunk markers")
}
return nil
iterate:
for i := 0; i < chunksRemaining; i++ {
chunkMeta := &ChunkMeta{}
if err := readChunkMeta(d, prevMaxT, chunkMeta); err != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", nChunks-chunksRemaining+i)
}
prevMaxT = chunkMeta.MaxTime
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
*chks = append(*chks, *chunkMeta)
} else if chunkMeta.MinTime >= through {
break
}
}
return d.Err()
}
func (dec *Decoder) readChunksPriorV3(d *encoding.Decbuf, seriesRef storage.SeriesRef, from int64, through int64, chks *[]ChunkMeta) error {
// Read the chunks meta data.
k = d.Uvarint()
k := d.Uvarint()
if k == 0 {
return 0, d.Err()
return d.Err()
}
chunksSample, err := dec.getOrCreateChunksSample(encoding.DecWrap(tsdb_enc.Decbuf{B: d.Get()}), seriesRef, k)
if err != nil {
return 0, err
return err
}
cs := chunksSample.getChunkSampleForQueryStarting(from)
if cs == nil {
return fprint, nil
return nil
}
d.Skip(cs.offset)
chunkMeta := &ChunkMeta{}
if err := readChunkMeta(&d, cs.prevChunkMaxt, chunkMeta); err != nil {
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
if err := readChunkMeta(d, cs.prevChunkMaxt, chunkMeta); err != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
}
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
@ -2147,26 +2498,34 @@ func (dec *Decoder) Series(b []byte, seriesRef storage.SeriesRef, from int64, th
t0 := chunkMeta.MaxTime
for i := cs.idx + 1; i < k; i++ {
if err := readChunkMeta(&d, t0, chunkMeta); err != nil {
return 0, errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
if err := readChunkMeta(d, t0, chunkMeta); err != nil {
return errors.Wrapf(d.Err(), "read meta for chunk %d", cs.idx)
}
t0 = chunkMeta.MaxTime
if !overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
continue
}
if chunkMeta.MinTime >= through {
if overlap(from, through, chunkMeta.MinTime, chunkMeta.MaxTime) {
*chks = append(*chks, *chunkMeta)
} else if chunkMeta.MinTime >= through {
break
}
*chks = append(*chks, *chunkMeta)
}
return fprint, d.Err()
return d.Err()
}
func readChunkMeta(d *encoding.Decbuf, prevChunkMaxt int64, chunkMeta *ChunkMeta) error {
// Decode the diff against previous chunk as varint
// instead of uvarint because chunks may overlap
chunkMeta.MinTime = d.Varint64() + prevChunkMaxt
mint := d.Varint64() + prevChunkMaxt
return readChunkMetaWithForcedMintime(d, mint, chunkMeta, false)
}
func readChunkMetaWithForcedMintime(d *encoding.Decbuf, mint int64, chunkMeta *ChunkMeta, decodeMinT bool) error {
if decodeMinT {
// skip the mint delta since we're forcing, but still need to
// remove the bytes from our buffer
d.Varint64()
}
chunkMeta.MinTime = mint
chunkMeta.MaxTime = int64(d.Uvarint64()) + chunkMeta.MinTime
chunkMeta.KB = uint32(d.Uvarint())
chunkMeta.Entries = uint32(d.Uvarint64())
@ -2183,6 +2542,8 @@ func yoloString(b []byte) string {
return *((*string)(unsafe.Pointer(&b)))
}
func overlap(aFrom, aThrough, bFrom, bThrough int64) bool {
return aFrom < bThrough && aThrough > bFrom
func overlap(from, through, chkFrom, chkThrough int64) bool {
// note: chkThrough is inclusive as it represents the last
// sample timestamp in the chunk, whereas through is exclusive
return from <= chkThrough && through > chkFrom
}

@ -29,12 +29,13 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/goleak"
"github.com/grafana/loki/pkg/util/encoding"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/storage"
tsdb_enc "github.com/prometheus/prometheus/tsdb/encoding"
"github.com/prometheus/prometheus/util/testutil"
"github.com/grafana/loki/pkg/util/encoding"
)
func TestMain(m *testing.M) {
@ -720,7 +721,7 @@ func TestDecoder_ChunkSamples(t *testing.T) {
},
} {
t.Run(name, func(t *testing.T) {
iw, err := NewWriter(context.Background(), filepath.Join(dir, name))
iw, err := NewWriterWithVersion(context.Background(), FormatV2, filepath.Join(dir, name))
require.NoError(t, err)
syms := []string{}

@ -1,8 +1,20 @@
package index
import "sync"
import (
"sync"
var ChunkMetasPool PoolChunkMetas
"github.com/prometheus/prometheus/util/pool"
)
var (
ChunkMetasPool PoolChunkMetas
chunkPageMarkersPool = poolChunkPageMarkers{
// pools of lengths 64->1024
pool: pool.New(64, 1024, 2, func(sz int) interface{} {
return make(chunkPageMarkers, 0, sz)
}),
}
)
type PoolChunkMetas struct {
pool sync.Pool
@ -20,3 +32,17 @@ func (p *PoolChunkMetas) Put(xs []ChunkMeta) {
//nolint:staticcheck
p.pool.Put(xs)
}
type poolChunkPageMarkers struct {
pool *pool.Pool
}
func (p *poolChunkPageMarkers) Get(sz int) chunkPageMarkers {
return p.pool.Get(sz).(chunkPageMarkers)
}
func (p *poolChunkPageMarkers) Put(xs chunkPageMarkers) {
xs = xs[:0]
//nolint:staticcheck
p.pool.Put(xs)
}

@ -42,7 +42,7 @@ func DefaultIndexClientOptions() IndexClientOptions {
type IndexStatsAccumulator interface {
AddStream(fp model.Fingerprint)
AddChunk(fp model.Fingerprint, chk index.ChunkMeta)
AddChunkStats(s index.ChunkStats)
Stats() stats.Stats
}

@ -178,7 +178,9 @@ func TestIndexClient_Stats(t *testing.T) {
Start: indexStartToday + 50,
End: indexStartToday + 60,
},
expectedNumChunks: 1, // end time not inclusive
// end time is inclusive because chunks are indexed by their start and end, although the chunk's stats contributions get reduced to zero
// in integer division due to 1 nanosecond overlap
expectedNumChunks: 2,
expectedNumEntries: 10,
expectedNumStreams: 1,
expectedNumBytes: 10 * 1024,

@ -68,6 +68,9 @@ type IndexReader interface {
// Returns storage.ErrNotFound if the ref does not resolve to a known series.
Series(ref storage.SeriesRef, from int64, through int64, lset *labels.Labels, chks *[]index.ChunkMeta) (uint64, error)
// ChunkStats returns the stats for the chunks in the given series.
ChunkStats(ref storage.SeriesRef, from, through int64, lset *labels.Labels) (uint64, index.ChunkStats, error)
// LabelNames returns all the unique label names present in the index in sorted order.
LabelNames(matchers ...*labels.Matcher) ([]string, error)

@ -11,7 +11,6 @@ import (
"github.com/grafana/loki/pkg/storage/chunk"
index_shipper "github.com/grafana/loki/pkg/storage/stores/indexshipper/index"
"github.com/grafana/loki/pkg/storage/stores/tsdb/index"
"github.com/grafana/loki/pkg/util"
)
// GetRawFileReaderFunc returns an io.ReadSeeker for reading raw tsdb file from disk
@ -104,10 +103,7 @@ func (i *TSDBIndex) SetChunkFilterer(chunkFilter chunk.RequestChunkFilterer) {
// fn must NOT capture it's arguments. They're reused across series iterations and returned to
// a pool after completion.
func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation, from model.Time, through model.Time, fn func(labels.Labels, model.Fingerprint, []index.ChunkMeta), matchers ...*labels.Matcher) error {
p, err := PostingsForMatchers(i.reader, shard, matchers...)
if err != nil {
return err
}
// TODO(owen-d): use pool
var ls labels.Labels
chks := ChunkMetasPool.Get()
@ -118,24 +114,41 @@ func (i *TSDBIndex) ForSeries(ctx context.Context, shard *index.ShardAnnotation,
filterer = i.chunkFilter.ForRequest(ctx)
}
for p.Next() {
hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks)
if err != nil {
return err
}
return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error {
for p.Next() {
hash, err := i.reader.Series(p.At(), int64(from), int64(through), &ls, &chks)
if err != nil {
return err
}
// skip series that belong to different shards
if shard != nil && !shard.Match(model.Fingerprint(hash)) {
continue
}
// skip series that belong to different shards
if shard != nil && !shard.Match(model.Fingerprint(hash)) {
continue
}
if filterer != nil && filterer.ShouldFilter(ls) {
continue
if filterer != nil && filterer.ShouldFilter(ls) {
continue
}
fn(ls, model.Fingerprint(hash), chks)
}
return p.Err()
})
}
fn(ls, model.Fingerprint(hash), chks)
func (i *TSDBIndex) forPostings(
ctx context.Context,
shard *index.ShardAnnotation,
from, through model.Time,
matchers []*labels.Matcher,
fn func(index.Postings) error,
) error {
p, err := PostingsForMatchers(i.reader, shard, matchers...)
if err != nil {
return err
}
return p.Err()
return fn(p)
}
func (i *TSDBIndex) GetChunkRefs(ctx context.Context, userID string, from, through model.Time, res []ChunkRef, shard *index.ShardAnnotation, matchers ...*labels.Matcher) ([]ChunkRef, error) {
@ -213,37 +226,36 @@ func (i *TSDBIndex) Identifier(string) SingleTenantTSDBIdentifier {
}
func (i *TSDBIndex) Stats(ctx context.Context, userID string, from, through model.Time, acc IndexStatsAccumulator, shard *index.ShardAnnotation, shouldIncludeChunk shouldIncludeChunk, matchers ...*labels.Matcher) error {
if err := i.ForSeries(ctx, shard, from, through, func(ls labels.Labels, fp model.Fingerprint, chks []index.ChunkMeta) {
var addedStream bool
for _, chk := range chks {
if shouldIncludeChunk != nil && !shouldIncludeChunk(chk) {
continue
return i.forPostings(ctx, shard, from, through, matchers, func(p index.Postings) error {
// TODO(owen-d): use pool
var ls labels.Labels
var filterer chunk.Filterer
if i.chunkFilter != nil {
filterer = i.chunkFilter.ForRequest(ctx)
}
for p.Next() {
fp, stats, err := i.reader.ChunkStats(p.At(), int64(from), int64(through), &ls)
if err != nil {
return err
}
if !addedStream {
acc.AddStream(fp)
addedStream = true
// skip series that belong to different shards
if shard != nil && !shard.Match(model.Fingerprint(fp)) {
continue
}
// Assuming entries and bytes are evenly distributed in the chunk,
// We will take the proportional number of entries and number of bytes
// We get the percentage of time that the range `from` to `through` fits into the chunk
// to use it as a factor to get the amount of bytes and entries
factor, leadingTime, trailingTime := util.GetFactorOfTime(int64(from), int64(through), chk.MinTime, chk.MaxTime)
adjustedChunkMeta := index.ChunkMeta{
Checksum: chk.Checksum,
MinTime: chk.MinTime + leadingTime,
MaxTime: chk.MinTime + trailingTime,
KB: uint32(float64(chk.KB) * factor),
Entries: uint32(float64(chk.Entries) * factor),
if filterer != nil && filterer.ShouldFilter(ls) {
continue
}
acc.AddChunk(fp, adjustedChunkMeta)
if stats.Entries > 0 {
// need to add stream
acc.AddStream(model.Fingerprint(fp))
acc.AddChunkStats(stats)
}
}
}, matchers...); err != nil {
return err
}
return p.Err()
})
return nil
}

@ -16,6 +16,10 @@ type Encbuf struct {
func (e *Encbuf) PutString(s string) { e.B = append(e.B, s...) }
func (e *Encbuf) Skip(i int) {
e.B = e.B[:len(e.B)+i]
}
func DecWith(b []byte) (res Decbuf) {
res.B = b
return res

Loading…
Cancel
Save