fix(blooms): bloom-gw chunk merging improvements (#12162)

pull/12160/head
Owen Diehl 1 year ago committed by GitHub
parent e71964cca4
commit f4b2c5d69b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 228
      pkg/bloomgateway/bloomgateway.go
  2. 351
      pkg/bloomgateway/bloomgateway_test.go
  3. 35
      pkg/bloomgateway/metrics.go
  4. 2
      pkg/storage/bloom/v1/index.go

@ -44,7 +44,6 @@ package bloomgateway
import (
"context"
"fmt"
"sort"
"sync"
"time"
@ -229,7 +228,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}, nil
}
var numSeries int
seriesByDay := partitionRequest(req)
// no tasks --> empty response
@ -240,11 +238,16 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
}
tasks := make([]Task, 0, len(seriesByDay))
responses := make([][]v1.Output, 0, len(seriesByDay))
for _, seriesForDay := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesForDay, filters)
if err != nil {
return nil, err
}
// TODO(owen-d): include capacity in constructor?
task.responses = responsesPool.Get(len(seriesForDay.series))
level.Debug(g.logger).Log(
"msg", "created task for day",
"task", task.ID,
@ -254,7 +257,6 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
"filters", JoinFunc(filters, ";", func(e syntax.LineFilterExpr) string { return e.String() }),
)
tasks = append(tasks, task)
numSeries += len(seriesForDay.series)
}
g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
@ -272,13 +274,19 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
// When enqueuing, we also add the task to the pending tasks
_ = g.pendingTasks.Inc()
})
// TODO(owen-d): use `concurrency` lib, bound parallelism
go g.consumeTask(ctx, task, tasksCh)
}
responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)
preFilterSeries := len(req.Refs)
var preFilterChunks, postFilterChunks int
for _, series := range req.Refs {
preFilterChunks += len(series.Refs)
}
for remaining > 0 {
select {
case <-ctx.Done():
@ -288,20 +296,36 @@ func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunk
if task.Err() != nil {
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses...)
responses = append(responses, task.responses)
remaining--
}
}
preFilterSeries := len(req.Refs)
filtered := filterChunkRefs(req, responses)
// TODO(chaudum): Don't wait for all responses before starting to filter chunks.
filtered := g.processResponses(req, responses)
// free up the responses
for _, resp := range responses {
responsesPool.Put(resp)
}
postFilterSeries := len(req.Refs)
postFilterSeries := len(filtered)
level.Info(logger).Log("msg", "return filtered chunk refs", "pre_filter_series", preFilterSeries, "post_filter_series", postFilterSeries, "filtered_chunks", filtered)
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
for _, group := range req.Refs {
postFilterChunks += len(group.Refs)
}
g.metrics.requestedSeries.Observe(float64(preFilterSeries))
g.metrics.filteredSeries.Observe(float64(preFilterSeries - postFilterSeries))
g.metrics.requestedChunks.Observe(float64(preFilterChunks))
g.metrics.filteredChunks.Observe(float64(preFilterChunks - postFilterChunks))
level.Info(logger).Log(
"msg", "return filtered chunk refs",
"requested_series", preFilterSeries,
"filtered_series", preFilterSeries-postFilterSeries,
"requested_chunks", preFilterChunks,
"filtered_chunks", preFilterChunks-postFilterChunks,
)
return &logproto.FilterChunkRefResponse{ChunkRefs: filtered}, nil
}
// consumeTask receives v1.Output yielded from the block querier on the task's
@ -317,11 +341,9 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
g.metrics.chunkRemovals.WithLabelValues("dropped").Add(float64(res.Removals.Len()))
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
g.metrics.chunkRemovals.WithLabelValues("accepted").Add(float64(res.Removals.Len()))
}
}
@ -334,68 +356,142 @@ func (g *Gateway) consumeTask(ctx context.Context, task Task, tasksCh chan<- Tas
}
}
func (g *Gateway) processResponses(req *logproto.FilterChunkRefRequest, responses []v1.Output) (filtered int) {
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
filtered += g.removeNotMatchingChunks(req, o)
// merges a list of responses via a heap. The same fingerprints and chunks can be present in multiple responses,
// but each response must be ordered by fingerprint
func orderedResponsesByFP(responses [][]v1.Output) v1.Iterator[v1.Output] {
if len(responses) == 0 {
return v1.NewEmptyIter[v1.Output]()
}
if len(responses) == 1 {
return v1.NewSliceIter(responses[0])
}
itrs := make([]v1.PeekingIterator[v1.Output], 0, len(responses))
for _, r := range responses {
itrs = append(itrs, v1.NewPeekingIter(v1.NewSliceIter(r)))
}
return
return v1.NewHeapIterator[v1.Output](
func(o1, o2 v1.Output) bool { return o1.Fp <= o2.Fp },
itrs...,
)
}
func (g *Gateway) removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output) (filtered int) {
// TODO(owen-d): improve perf. This can be faster with a more specialized impl
// NB(owen-d): `req` is mutated in place for performance, but `responses` is not
func filterChunkRefs(req *logproto.FilterChunkRefRequest, responses [][]v1.Output) []*logproto.GroupedChunkRefs {
res := make([]*logproto.GroupedChunkRefs, 0, len(req.Refs))
// dedupe outputs, merging the same series.
// This returns an Iterator[v1.Output]
dedupedResps := v1.NewDedupingIter[v1.Output, v1.Output](
// eq
func(o1, o2 v1.Output) bool {
return o1.Fp == o2.Fp
},
// from
v1.Identity[v1.Output],
// merge two removal sets for the same series, deduping
func(o1, o2 v1.Output) v1.Output {
res := v1.Output{Fp: o1.Fp}
var chks v1.ChunkRefs
var i, j int
for i < len(o1.Removals) && j < len(o2.Removals) {
a, b := o1.Removals[i], o2.Removals[j]
if a == b {
chks = append(chks, a)
i++
j++
continue
}
if a.Less(b) {
chks = append(chks, a)
i++
continue
}
chks = append(chks, b)
j++
}
// binary search index of fingerprint
// TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets.
// If all requested chunks are in both days, the first day could technically remove _all_ chunks from consideration.
// The sort.Search for the _next_ chunk would return an index where fingerprint is greater than the target fingerprint.
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})
if i < len(o1.Removals) {
chks = append(chks, o1.Removals[i:]...)
}
if j < len(o2.Removals) {
chks = append(chks, o2.Removals[j:]...)
}
// fingerprint not found
if idx >= len(req.Refs) {
level.Error(g.logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
res.Removals = chks
return res
},
v1.NewPeekingIter(orderedResponsesByFP(responses)),
)
// Iterate through the requested and filtered series/chunks,
// removing chunks that were filtered out.
var next bool
var at v1.Output
if next = dedupedResps.Next(); next {
at = dedupedResps.At()
}
// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
// TODO(owen-d): there's a bug here because the same fingerprint and chunks can exist over multiple day buckets.
// A later day bucket could happen to request removals with len=remaining, but whose chunk references were
// partially removed in an earlier round. Just checking the length here could cause us to discard chunks
// that shouldn't be.
if len(req.Refs[idx].Refs) == res.Removals.Len() {
filtered += len(req.Refs[idx].Refs)
req.Refs[idx] = nil // avoid leaking pointer
// TODO(owen-d): this is O(n^2);
// use more specialized data structure that doesn't reslice
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}
for i := 0; i < len(req.Refs); i++ {
// we've hit the end of the removals -- append the rest of the
// requested series and return
if !next {
res = append(res, req.Refs[i:]...)
return res
}
for i := range res.Removals {
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if req.Refs[idx].Refs[j] == nil {
continue
}
// the current series had no removals
cur := req.Refs[i]
if cur.Fingerprint < uint64(at.Fp) {
res = append(res, cur)
continue
}
// TODO(owen-d): These should check start/end/checksum, not just checksum.
if logproto.ShortRef(toRemove) == *req.Refs[idx].Refs[j] {
filtered += 1
// the current series had removals. No need to check for equality
// b/c removals must be present in input
filterChunkRefsForSeries(cur, at.Removals)
if len(cur.Refs) > 0 {
res = append(res, cur)
}
// TODO(owen-d): usually not a problem (n is small), but I've seen some series have
// many thousands of chunks per day, so would be good to not reslice.
// See `labels.NewBuilder()` for an example
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
// advance removals
if next = dedupedResps.Next(); next {
at = dedupedResps.At()
}
}
return
return res
}
// mutates cur
func filterChunkRefsForSeries(cur *logproto.GroupedChunkRefs, removals v1.ChunkRefs) {
// use same backing array to avoid allocations
res := cur.Refs[:0]
var i, j int
for i < len(cur.Refs) && j < len(removals) {
if (*v1.ChunkRef)(cur.Refs[i]).Less(removals[j]) {
// chunk was not removed
res = append(res, cur.Refs[i])
i++
} else {
// Since all removals must exist in the series, we can assume that if the removal
// is not less, it must be equal to the current chunk (a match). Skip this chunk.
i++
j++
}
}
if i < len(cur.Refs) {
res = append(res, cur.Refs[i:]...)
}
cur.Refs = cur.Refs[:len(res)]
}

@ -4,6 +4,7 @@ import (
"context"
"fmt"
"math/rand"
"sync"
"testing"
"time"
@ -16,6 +17,7 @@ import (
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
@ -411,65 +413,312 @@ func TestBloomGateway_FilterChunkRefs(t *testing.T) {
})
}
func TestBloomGateway_RemoveNotMatchingChunks(t *testing.T) {
g := &Gateway{
logger: log.NewNopLogger(),
func TestFilterChunkRefsForSeries(t *testing.T) {
mkInput := func(xs []uint32) *logproto.GroupedChunkRefs {
out := &logproto.GroupedChunkRefs{Refs: make([]*logproto.ShortRef, len(xs))}
for i, x := range xs {
out.Refs[i] = &logproto.ShortRef{Checksum: x}
}
return out
}
t.Run("removing chunks partially", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x3},
{Checksum: 0x4},
{Checksum: 0x5},
}},
},
mkRemovals := func(xs []uint32) v1.ChunkRefs {
out := make(v1.ChunkRefs, len(xs))
for i, x := range xs {
out[i] = v1.ChunkRef{Checksum: x}
}
res := v1.Output{
Fp: 0x00, Removals: v1.ChunkRefs{
{Checksum: 0x2},
{Checksum: 0x4},
},
return out
}
for _, tc := range []struct {
desc string
input, removals, expected []uint32
}{
{
desc: "no matches",
input: []uint32{0, 1},
expected: []uint32{0, 1},
},
{
desc: "remove all",
input: []uint32{0, 1, 2, 3, 4},
removals: []uint32{0, 1, 2, 3, 4},
expected: []uint32{},
},
{
desc: "remove every other",
input: []uint32{0, 1, 2, 3, 4},
removals: []uint32{0, 2, 4},
expected: []uint32{1, 3},
},
{
desc: "remove middle section",
input: []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9},
removals: []uint32{3, 4, 5},
expected: []uint32{0, 1, 2, 6, 7, 8, 9},
},
} {
t.Run(tc.desc, func(t *testing.T) {
input := mkInput(tc.input)
expected := mkInput(tc.expected)
filterChunkRefsForSeries(input, mkRemovals(tc.removals))
require.Equal(t, expected, input)
})
}
}
func TestFilterChunkRefs(t *testing.T) {
mkInput := func(nSeries, chunksPerSeries int) *logproto.FilterChunkRefRequest {
res := &logproto.FilterChunkRefRequest{}
refs := make([]*logproto.GroupedChunkRefs, nSeries)
for i := range refs {
chks := make([]*logproto.ShortRef, chunksPerSeries)
for j := range chks {
chks[j] = &logproto.ShortRef{Checksum: uint32(j)}
}
refs[i] = &logproto.GroupedChunkRefs{
Fingerprint: uint64(i),
Refs: chks,
}
}
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x3},
{Checksum: 0x5},
}},
},
res.Refs = refs
return res
}
type instruction struct {
fp uint64
checksums []uint32
}
mkRemovals := func(xs [][]instruction) [][]v1.Output {
out := make([][]v1.Output, len(xs))
for i, x := range xs {
out[i] = make([]v1.Output, len(x))
for j, c := range x {
out[i][j] = v1.Output{
Fp: model.Fingerprint(c.fp),
Removals: make(v1.ChunkRefs, len(c.checksums)),
}
for k, chk := range c.checksums {
out[i][j].Removals[k] = v1.ChunkRef{Checksum: chk}
}
}
}
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 2, n)
require.Equal(t, expected, req)
})
return out
}
t.Run("removing all chunks removed fingerprint ref", func(t *testing.T) {
req := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{
{Fingerprint: 0x00, Tenant: "fake", Refs: []*logproto.ShortRef{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x3},
}},
},
mkResult := func(xs []instruction) *logproto.FilterChunkRefRequest {
out := &logproto.FilterChunkRefRequest{Refs: make([]*logproto.GroupedChunkRefs, len(xs))}
for i, x := range xs {
out.Refs[i] = &logproto.GroupedChunkRefs{
Fingerprint: x.fp,
Refs: make([]*logproto.ShortRef, len(x.checksums)),
}
for j, c := range x.checksums {
out.Refs[i].Refs[j] = &logproto.ShortRef{Checksum: c}
}
}
res := v1.Output{
Fp: 0x00, Removals: v1.ChunkRefs{
{Checksum: 0x1},
{Checksum: 0x2},
{Checksum: 0x2},
return out
}
for _, tc := range []struct {
desc string
input *logproto.FilterChunkRefRequest
removals [][]instruction
expected *logproto.FilterChunkRefRequest
}{
{
desc: "no removals",
input: mkInput(2, 2),
expected: mkInput(2, 2),
},
{
desc: "remove all",
input: mkInput(2, 2),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{0, 1}},
{fp: 1, checksums: []uint32{0, 1}},
},
},
expected: mkInput(0, 0),
},
{
desc: "remove every other series",
input: mkInput(4, 2),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{0, 1}},
{fp: 2, checksums: []uint32{0, 1}},
},
},
expected: mkResult([]instruction{
{fp: 1, checksums: []uint32{0, 1}},
{fp: 3, checksums: []uint32{0, 1}},
}),
},
{
desc: "remove the last chunk for each series",
input: mkInput(4, 2),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{1}},
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{1}},
{fp: 3, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0}},
{fp: 1, checksums: []uint32{0}},
{fp: 2, checksums: []uint32{0}},
{fp: 3, checksums: []uint32{0}},
}),
},
{
desc: "remove the middle chunk for every other series",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0, 2}},
{fp: 1, checksums: []uint32{0, 1, 2}},
{fp: 2, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
},
{
desc: "remove the first chunk of the last series",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 3, checksums: []uint32{0}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0, 1, 2}},
{fp: 1, checksums: []uint32{0, 1, 2}},
{fp: 2, checksums: []uint32{0, 1, 2}},
{fp: 3, checksums: []uint32{1, 2}},
}),
},
{
desc: "duplicate removals",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{0, 1}},
{fp: 0, checksums: []uint32{0, 1, 2}},
{fp: 1, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 1, checksums: []uint32{0, 2}},
{fp: 2, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
},
{
desc: "middle duplicates across 2 days",
input: mkInput(4, 3),
removals: [][]instruction{
{
{fp: 0, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{1}},
},
{
{fp: 0, checksums: []uint32{1}},
{fp: 2, checksums: []uint32{1}},
},
},
expected: mkResult([]instruction{
{fp: 0, checksums: []uint32{0, 2}},
{fp: 1, checksums: []uint32{0, 1, 2}},
{fp: 2, checksums: []uint32{0, 2}},
{fp: 3, checksums: []uint32{0, 1, 2}},
}),
},
} {
t.Run(tc.desc, func(t *testing.T) {
res := filterChunkRefs(tc.input, mkRemovals(tc.removals))
require.Equal(t, tc.expected.Refs, res)
})
}
}
func BenchmarkFilterChunkRefs(b *testing.B) {
nSeries := 1024
chunksPerSeries := 10
mkInput := func() *logproto.FilterChunkRefRequest {
res := &logproto.FilterChunkRefRequest{}
refs := make([]*logproto.GroupedChunkRefs, nSeries)
for i := range refs {
chks := make([]*logproto.ShortRef, chunksPerSeries)
for j := range chks {
chks[j] = &logproto.ShortRef{Checksum: uint32(j)}
}
refs[i] = &logproto.GroupedChunkRefs{
Fingerprint: uint64(i),
Refs: chks,
}
}
expected := &logproto.FilterChunkRefRequest{
Refs: []*logproto.GroupedChunkRefs{},
res.Refs = refs
return res
}
// responses aren't mutated, so we add a pool to mitigate the alloc
// effect on the benchmark
var responseP sync.Pool
mkOutputs := func() *[]v1.Output {
// remove half the chunks from half the series, so 25% of the volume
outputs := make([]v1.Output, nSeries/2)
for i := range outputs {
output := v1.Output{
Fp: model.Fingerprint(i * 2),
}
for j := 0; j < chunksPerSeries/2; j++ {
output.Removals = append(output.Removals, v1.ChunkRef{Checksum: uint32(j * 2)})
}
outputs[i] = output
}
n := g.removeNotMatchingChunks(req, res)
require.Equal(t, 3, n)
require.Equal(t, expected, req)
})
return &outputs
}
responseP.New = func() interface{} {
return mkOutputs()
}
// Add comparison functions here to bench side by side
for _, tc := range []struct {
desc string
f func(req *logproto.FilterChunkRefRequest, responses []v1.Output)
}{
{
desc: "filterChunkRefs",
f: func(req *logproto.FilterChunkRefRequest, responses []v1.Output) {
filterChunkRefs(req, [][]v1.Output{responses})
},
},
} {
b.Run(tc.desc, func(b *testing.B) {
for i := 0; i < b.N; i++ {
req := mkInput()
ptr := responseP.Get().(*[]v1.Output)
resps := *ptr
tc.f(req, resps)
responseP.Put(ptr)
}
})
}
}

@ -14,7 +14,10 @@ type metrics struct {
type serverMetrics struct {
inflightRequests prometheus.Summary
chunkRemovals *prometheus.CounterVec
requestedSeries prometheus.Histogram
filteredSeries prometheus.Histogram
requestedChunks prometheus.Histogram
filteredChunks prometheus.Histogram
}
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
@ -35,12 +38,34 @@ func newServerMetrics(registerer prometheus.Registerer, namespace, subsystem str
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRemovals: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
requestedSeries: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunk_removals_total",
Help: "Total amount of removals received from the block querier partitioned by state. The state 'accepted' means that the removals are processed, the state 'dropped' means that the removals were received after the task context was done (e.g. client timeout, etc).",
}, []string{"state"}),
Name: "requested_series",
Help: "Total amount of series refs sent to bloom-gateway for querying",
Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10),
}),
filteredSeries: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "filtered_series",
Help: "Total amount of series refs filtered by bloom-gateway",
Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10),
}),
requestedChunks: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "requested_chunks",
Help: "Total amount of chunk refs sent to bloom-gateway for querying",
Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10),
}),
filteredChunks: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "filtered_chunks",
Help: "Total amount of chunk refs filtered by bloom-gateway",
Buckets: prometheus.ExponentialBucketsRange(1, 100e3, 10),
}),
}
}

@ -4,6 +4,7 @@ import (
"bytes"
"fmt"
"io"
"sort"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
@ -364,6 +365,7 @@ func (s *SeriesWithOffset) Encode(
previousFp model.Fingerprint,
previousOffset BloomOffset,
) (model.Fingerprint, BloomOffset) {
sort.Sort(s.Chunks) // ensure order
// delta encode fingerprint
enc.PutBE64(uint64(s.Fingerprint - previousFp))
// delta encode offsets

Loading…
Cancel
Save