feat: blockbuilder component (#14621)

Signed-off-by: Owen Diehl <ow.diehl@gmail.com>
Co-authored-by: Ashwanth Goli <iamashwanth@gmail.com>
pull/15069/head
Owen Diehl 1 year ago committed by GitHub
parent d3d31f1842
commit cbdd36a412
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 51
      docs/sources/shared/configuration.md
  2. 305
      pkg/blockbuilder/controller.go
  3. 152
      pkg/blockbuilder/metrics.go
  4. 99
      pkg/blockbuilder/pipeline.go
  5. 87
      pkg/blockbuilder/pipeline_test.go
  6. 34
      pkg/blockbuilder/plan.txt
  7. 811
      pkg/blockbuilder/slimgester.go
  8. 154
      pkg/blockbuilder/storage.go
  9. 37
      pkg/blockbuilder/storage_test.go
  10. 328
      pkg/blockbuilder/tsdb.go
  11. 33
      pkg/kafka/partition/reader.go
  12. 22
      pkg/kafka/partition/reader_service.go
  13. 9
      pkg/loki/loki.go
  14. 65
      pkg/loki/modules.go
  15. 5
      pkg/storage/stores/shipper/indexshipper/shipper.go
  16. 24
      pkg/storage/stores/shipper/indexshipper/tsdb/compactor_test.go
  17. 10
      pkg/storage/stores/shipper/indexshipper/tsdb/identifier.go
  18. 4
      pkg/storage/stores/shipper/indexshipper/tsdb/index/index.go
  19. 4
      pkg/storage/stores/shipper/indexshipper/tsdb/index_shipper_querier.go
  20. 27
      pkg/storage/stores/shipper/indexshipper/tsdb/manager.go
  21. 71
      pkg/storage/stores/shipper/indexshipper/tsdb/manager_test.go
  22. 13
      pkg/storage/stores/shipper/indexshipper/tsdb/store.go

@ -137,6 +137,57 @@ Pass the `-config.expand-env` flag at the command line to enable this way of set
# itself to a key value store.
[ingester: <ingester>]
block_builder:
# How many flushes can happen concurrently
# CLI flag: -blockbuilder.concurrent-flushes
[concurrent_flushes: <int> | default = 1]
# How many workers to process writes, defaults to number of available cpus
# CLI flag: -blockbuilder.concurrent-writers
[concurrent_writers: <int> | default = 1]
# The targeted _uncompressed_ size in bytes of a chunk block When this
# threshold is exceeded the head block will be cut and compressed inside the
# chunk.
# CLI flag: -blockbuilder.chunks-block-size
[chunk_block_size: <int> | default = 256KB]
# A target _compressed_ size in bytes for chunks. This is a desired size not
# an exact size, chunks may be slightly bigger or significantly smaller if
# they get flushed for other reasons (e.g. chunk_idle_period). A value of 0
# creates chunks with a fixed 10 blocks, a non zero value will create chunks
# with a variable number of blocks to meet the target size.
# CLI flag: -blockbuilder.chunk-target-size
[chunk_target_size: <int> | default = 1536KB]
# The algorithm to use for compressing chunk. (none, gzip, lz4-64k, snappy,
# lz4-256k, lz4-1M, lz4, flate, zstd)
# CLI flag: -blockbuilder.chunk-encoding
[chunk_encoding: <string> | default = "snappy"]
# The maximum duration of a timeseries chunk in memory. If a timeseries runs
# for longer than this, the current chunk will be flushed to the store and a
# new chunk created.
# CLI flag: -blockbuilder.max-chunk-age
[max_chunk_age: <duration> | default = 2h]
# The interval at which to run.
# CLI flag: -blockbuilder.interval
[interval: <duration> | default = 10m]
backoff_config:
# Minimum delay when backing off.
# CLI flag: -blockbuilder.backoff..backoff-min-period
[min_period: <duration> | default = 100ms]
# Maximum delay when backing off.
# CLI flag: -blockbuilder.backoff..backoff-max-period
[max_period: <duration> | default = 10s]
# Number of times to backoff and retry before failing.
# CLI flag: -blockbuilder.backoff..backoff-retries
[max_retries: <int> | default = 10]
pattern_ingester:
# Whether the pattern ingester is enabled.
# CLI flag: -pattern-ingester.enabled

@ -0,0 +1,305 @@
package blockbuilder
import (
"context"
"fmt"
"time"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/dskit/backoff"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/pkg/push"
)
// [min,max)
type Offsets struct {
Min, Max int64
}
type Job struct {
Partition int32
Offsets Offsets
}
// Interface required for interacting with queue partitions.
type PartitionController interface {
Topic() string
Partition() int32
// Returns the highest committed offset from the consumer group
HighestCommittedOffset(ctx context.Context) (int64, error)
// Returns the highest available offset in the partition
HighestPartitionOffset(ctx context.Context) (int64, error)
// Returns the earliest available offset in the partition
EarliestPartitionOffset(ctx context.Context) (int64, error)
// Commits the offset to the consumer group.
Commit(context.Context, int64) error
// Process will run load batches at a time and send them to channel,
// so it's advised to not buffer the channel for natural backpressure.
// As a convenience, it returns the last seen offset, which matches
// the final record sent on the channel.
Process(context.Context, Offsets, chan<- []AppendInput) (int64, error)
Close() error
}
// PartitionJobController loads a single job a time, bound to a given
// * topic
// * partition
// * offset_step_len: the number of offsets each job to contain. e.g. "10" could yield a job w / min=15, max=25
//
// At a high level, it watches a source topic/partition (where log data is ingested) and a "committed" topic/partition.
// The "committed" partition corresponds to the offsets from the source partition which have been committed to object storage.
// In essence, the following loop is performed
// 1. load the most recent record from the "committed" partition. This contains the highest msg offset in the "source" partition
// that has been committed to object storage. We'll call that $START_POS.
// 2. Create a job with `min=$START_POS+1,end=$START_POS+1+$STEP_LEN`
// 3. Sometime later when the job has been processed, we'll commit the final processed offset from the "source" partition (which
// will be <= $END_POS) to the "committed" partition.
//
// NB(owen-d): In our case, "source" is the partition
//
// containing log data and "committed" is the consumer group
type PartitionJobController struct {
stepLen int64
part partition.ReaderIfc
backoff backoff.Config
decoder *kafka.Decoder
}
func NewPartitionJobController(
controller partition.ReaderIfc,
backoff backoff.Config,
) (*PartitionJobController, error) {
decoder, err := kafka.NewDecoder()
if err != nil {
return nil, err
}
return &PartitionJobController{
stepLen: 1000, // Default step length of 1000 offsets per job
part: controller,
backoff: backoff,
decoder: decoder,
}, nil
}
func (l *PartitionJobController) HighestCommittedOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchLastCommittedOffset(ctx)
},
)
}
func (l *PartitionJobController) HighestPartitionOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaEndOffset)
},
)
}
func (l *PartitionJobController) EarliestPartitionOffset(ctx context.Context) (int64, error) {
return withBackoff(
ctx,
l.backoff,
func() (int64, error) {
return l.part.FetchPartitionOffset(ctx, partition.KafkaStartOffset)
},
)
}
func (l *PartitionJobController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
l.part.SetOffsetForConsumption(offsets.Min)
var (
lastOffset = offsets.Min - 1
boff = backoff.New(ctx, l.backoff)
err error
)
for boff.Ongoing() {
var records []partition.Record
records, err = l.part.Poll(ctx)
if err != nil {
boff.Wait()
continue
}
if len(records) == 0 {
// No more records available
break
}
// Reset backoff on successful poll
boff.Reset()
converted := make([]AppendInput, 0, len(records))
for _, record := range records {
offset := records[len(records)-1].Offset
if offset >= offsets.Max {
break
}
lastOffset = offset
stream, labels, err := l.decoder.Decode(record.Content)
if err != nil {
return 0, fmt.Errorf("failed to decode record: %w", err)
}
if len(stream.Entries) == 0 {
continue
}
converted = append(converted, AppendInput{
tenant: record.TenantID,
labels: labels,
labelsStr: stream.Labels,
entries: stream.Entries,
})
select {
case ch <- converted:
case <-ctx.Done():
return 0, ctx.Err()
}
}
}
return lastOffset, err
}
// LoadJob(ctx) returns the next job by finding the most recent unconsumed offset in the partition
// Returns whether an applicable job exists, the job, and an error
func (l *PartitionJobController) LoadJob(ctx context.Context) (bool, Job, error) {
// Read the most recent committed offset
committedOffset, err := l.HighestCommittedOffset(ctx)
if err != nil {
return false, Job{}, err
}
earliestOffset, err := l.EarliestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}
startOffset := committedOffset + 1
if startOffset < earliestOffset {
startOffset = earliestOffset
}
highestOffset, err := l.HighestPartitionOffset(ctx)
if err != nil {
return false, Job{}, err
}
if highestOffset == committedOffset {
return false, Job{}, nil
}
// Create the job with the calculated offsets
job := Job{
Partition: l.part.Partition(),
Offsets: Offsets{
Min: startOffset,
Max: min(startOffset+l.stepLen, highestOffset),
},
}
return true, job, nil
}
// implement a dummy controller which can be parameterized to
// deterministically simulate partitions
type dummyPartitionController struct {
topic string
partition int32
committed int64
highest int64
numTenants int // number of unique tenants to simulate
streamsPerTenant int // number of streams per tenant
entriesPerOffset int // coefficient for entries per offset
}
// used in testing
// nolint:revive
func NewDummyPartitionController(topic string, partition int32, highest int64) *dummyPartitionController {
return &dummyPartitionController{
topic: topic,
partition: partition,
committed: 0, // always starts at zero
highest: highest,
numTenants: 2, // default number of tenants
streamsPerTenant: 2, // default streams per tenant
entriesPerOffset: 1, // default entries per offset coefficient
}
}
func (d *dummyPartitionController) Topic() string {
return d.topic
}
func (d *dummyPartitionController) Partition() int32 {
return d.partition
}
func (d *dummyPartitionController) HighestCommittedOffset(_ context.Context) (int64, error) {
return d.committed, nil
}
func (d *dummyPartitionController) HighestPartitionOffset(_ context.Context) (int64, error) {
return d.highest, nil
}
func (d *dummyPartitionController) Commit(_ context.Context, offset int64) error {
d.committed = offset
return nil
}
func (d *dummyPartitionController) Process(ctx context.Context, offsets Offsets, ch chan<- []AppendInput) (int64, error) {
for i := int(offsets.Min); i < int(offsets.Max); i++ {
batch := d.createBatch(i)
select {
case <-ctx.Done():
return int64(i - 1), ctx.Err()
case ch <- batch:
}
}
return offsets.Max - 1, nil
}
// creates (tenants*streams) inputs
func (d *dummyPartitionController) createBatch(offset int) []AppendInput {
result := make([]AppendInput, 0, d.numTenants*d.streamsPerTenant)
for i := 0; i < d.numTenants; i++ {
tenant := fmt.Sprintf("tenant-%d", i)
for j := 0; j < d.streamsPerTenant; j++ {
lbls := labels.Labels{
{Name: "stream", Value: fmt.Sprintf("stream-%d", j)},
}
entries := make([]push.Entry, d.entriesPerOffset)
for k := 0; k < d.entriesPerOffset; k++ {
entries[k] = push.Entry{
Timestamp: time.Now(),
Line: fmt.Sprintf("tenant=%d stream=%d line=%d offset=%d", i, j, k, offset),
}
}
result = append(result, AppendInput{
tenant: tenant,
labels: lbls,
labelsStr: lbls.String(),
entries: entries,
})
}
}
return result
}
func (d *dummyPartitionController) Close() error {
return nil
}

@ -0,0 +1,152 @@
package blockbuilder
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/util/constants"
)
type SlimgesterMetrics struct {
chunkUtilization prometheus.Histogram
chunkEntries prometheus.Histogram
chunkSize prometheus.Histogram
chunkCompressionRatio prometheus.Histogram
chunksPerTenant *prometheus.CounterVec
chunkSizePerTenant *prometheus.CounterVec
chunkAge prometheus.Histogram
chunkEncodeTime prometheus.Histogram
chunksFlushFailures prometheus.Counter
chunksFlushedPerReason *prometheus.CounterVec
chunkLifespan prometheus.Histogram
chunksEncoded *prometheus.CounterVec
chunkDecodeFailures *prometheus.CounterVec
flushedChunksStats *analytics.Counter
flushedChunksBytesStats *analytics.Statistics
flushedChunksLinesStats *analytics.Statistics
flushedChunksAgeStats *analytics.Statistics
flushedChunksLifespanStats *analytics.Statistics
flushedChunksUtilizationStats *analytics.Statistics
chunksCreatedTotal prometheus.Counter
samplesPerChunk prometheus.Histogram
blocksPerChunk prometheus.Histogram
chunkCreatedStats *analytics.Counter
}
func NewSlimgesterMetrics(r prometheus.Registerer) *SlimgesterMetrics {
return &SlimgesterMetrics{
chunkUtilization: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_utilization",
Help: "Distribution of stored chunk utilization (when stored).",
Buckets: prometheus.LinearBuckets(0, 0.2, 6),
}),
chunkEntries: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_entries",
Help: "Distribution of stored lines per chunk (when stored).",
Buckets: prometheus.ExponentialBuckets(200, 2, 9), // biggest bucket is 200*2^(9-1) = 51200
}),
chunkSize: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_size_bytes",
Help: "Distribution of stored chunk sizes (when stored).",
Buckets: prometheus.ExponentialBuckets(20000, 2, 10), // biggest bucket is 20000*2^(10-1) = 10,240,000 (~10.2MB)
}),
chunkCompressionRatio: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_compression_ratio",
Help: "Compression ratio of chunks (when stored).",
Buckets: prometheus.LinearBuckets(.75, 2, 10),
}),
chunksPerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunks_stored_total",
Help: "Total stored chunks per tenant.",
}, []string{"tenant"}),
chunkSizePerTenant: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_stored_bytes_total",
Help: "Total bytes stored in chunks per tenant.",
}, []string{"tenant"}),
chunkAge: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_age_seconds",
Help: "Distribution of chunk ages (when stored).",
// with default settings chunks should flush between 5 min and 12 hours
// so buckets at 1min, 5min, 10min, 30min, 1hr, 2hr, 4hr, 10hr, 12hr, 16hr
Buckets: []float64{60, 300, 600, 1800, 3600, 7200, 14400, 36000, 43200, 57600},
}),
chunkEncodeTime: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_encode_time_seconds",
Help: "Distribution of chunk encode times.",
// 10ms to 10s.
Buckets: prometheus.ExponentialBuckets(0.01, 4, 6),
}),
chunksFlushFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunks_flush_failures_total",
Help: "Total number of flush failures.",
}),
chunksFlushedPerReason: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunks_flushed_total",
Help: "Total flushed chunks per reason.",
}, []string{"reason"}),
chunkLifespan: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_bounds_hours",
Help: "Distribution of chunk end-start durations.",
// 1h -> 8hr
Buckets: prometheus.LinearBuckets(1, 1, 8),
}),
chunksEncoded: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunks_encoded_total",
Help: "The total number of chunks encoded in the ingester.",
}, []string{"user"}),
chunkDecodeFailures: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunk_decode_failures_total",
Help: "The number of freshly encoded chunks that failed to decode.",
}, []string{"user"}),
flushedChunksStats: analytics.NewCounter("slimgester_flushed_chunks"),
flushedChunksBytesStats: analytics.NewStatistics("slimgester_flushed_chunks_bytes"),
flushedChunksLinesStats: analytics.NewStatistics("slimgester_flushed_chunks_lines"),
flushedChunksAgeStats: analytics.NewStatistics(
"slimgester_flushed_chunks_age_seconds",
),
flushedChunksLifespanStats: analytics.NewStatistics(
"slimgester_flushed_chunks_lifespan_seconds",
),
flushedChunksUtilizationStats: analytics.NewStatistics(
"slimgester_flushed_chunks_utilization",
),
chunksCreatedTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: constants.Loki,
Name: "slimgester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
}),
samplesPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "slimgester",
Name: "samples_per_chunk",
Help: "The number of samples in a chunk.",
Buckets: prometheus.LinearBuckets(4096, 2048, 6),
}),
blocksPerChunk: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Namespace: constants.Loki,
Subsystem: "slimgester",
Name: "blocks_per_chunk",
Help: "The number of blocks in a chunk.",
Buckets: prometheus.ExponentialBuckets(5, 2, 6),
}),
chunkCreatedStats: analytics.NewCounter("slimgester_chunk_created"),
}
}

@ -0,0 +1,99 @@
package blockbuilder
import (
"context"
"github.com/grafana/dskit/multierror"
"golang.org/x/sync/errgroup"
)
type stage struct {
name string
parallelism int
grp *errgroup.Group
ctx context.Context
fn func(context.Context) error
cleanup func(context.Context) error // optional; will be called once the underlying group returns
}
// pipeline is a sequence of n different stages.
type pipeline struct {
ctx context.Context // base context
// we use a separate errgroup for stage dispatch/collection
// and inherit stage-specific groups from this ctx to
// propagate cancellation
grp *errgroup.Group
stages []stage
}
func newPipeline(ctx context.Context) *pipeline {
stagesGrp, ctx := errgroup.WithContext(ctx)
return &pipeline{
ctx: ctx,
grp: stagesGrp,
}
}
func (p *pipeline) AddStageWithCleanup(
name string,
parallelism int,
fn func(context.Context) error,
cleanup func(context.Context) error,
) {
grp, ctx := errgroup.WithContext(p.ctx)
p.stages = append(p.stages, stage{
name: name,
parallelism: parallelism,
fn: fn,
cleanup: cleanup,
ctx: ctx,
grp: grp,
})
}
func (p *pipeline) AddStage(
name string,
parallelism int,
fn func(context.Context) error,
) {
p.AddStageWithCleanup(name, parallelism, fn, nil)
}
func (p *pipeline) Run() error {
for i := range p.stages {
// we're using this in subsequent async closures;
// assign it directly in-loop
s := p.stages[i]
// spin up n workers for each stage using that stage's
// error group.
for j := 0; j < s.parallelism; j++ {
s.grp.Go(func() error {
return s.fn(s.ctx)
})
}
// Using the pipeline's err group, await the stage finish,
// calling any necessary cleanup fn
// NB: by using the pipeline's errgroup here, we propagate
// failures to downstream stage contexts, so once a single stage
// fails, the others will be notified.
p.grp.Go(func() error {
var errs multierror.MultiError
errs.Add(s.grp.Wait())
if s.cleanup != nil {
// NB: we use the pipeline's context for the cleanup call b/c
// the stage's context is cancelled once `Wait` returns.
// That's ok. cleanup is always called for a relevant stage
// and just needs to know if _other_ stages failed at this point
errs.Add(s.cleanup(p.ctx))
}
return errs.Err()
})
}
// finish all stages
return p.grp.Wait()
}

@ -0,0 +1,87 @@
package blockbuilder
import (
"context"
"errors"
"fmt"
"testing"
"github.com/stretchr/testify/require"
)
type testStage struct {
parallelism int
fn func(context.Context) error
cleanup func(context.Context) error
}
func TestPipeline(t *testing.T) {
tests := []struct {
name string
stages []testStage
expectedErr error
}{
{
name: "single stage success",
stages: []testStage{
{
parallelism: 1,
fn: func(_ context.Context) error {
return nil
},
},
},
},
{
name: "multiple stages success",
stages: []testStage{
{
parallelism: 2,
fn: func(_ context.Context) error {
return nil
},
},
{
parallelism: 1,
fn: func(_ context.Context) error {
return nil
},
},
},
},
{
name: "stage error propagates",
stages: []testStage{
{
parallelism: 1,
fn: func(_ context.Context) error {
return errors.New("stage error")
},
},
},
expectedErr: errors.New("stage error"),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
p := newPipeline(context.Background())
for i, stage := range tt.stages {
if stage.cleanup != nil {
p.AddStageWithCleanup(fmt.Sprint(i), stage.parallelism, stage.fn, stage.cleanup)
} else {
p.AddStage(fmt.Sprint(i), stage.parallelism, stage.fn)
}
}
err := p.Run()
if tt.expectedErr != nil {
require.Error(t, err)
require.Equal(t, tt.expectedErr.Error(), err.Error())
} else {
require.NoError(t, err)
}
})
}
}

@ -0,0 +1,34 @@
# Purpose
blockbuilder is responsible for consuming ingested data in the queue (kafka, etc) and writing it in an optimized form to long term storage. While this should always remain true, it can be built and iterated upon in phases. First, let's look at the simplest possible architecture:
* [interface] loads "jobs": partitions+offset ranges in kafka
* For each job, process data, building the storage format
* [interface] Upon completion (inc flushing to storage), commit work
* e.g. update consumer group processed offset in kafka
* consumes
# First Impl: Alongside existing multi-zone ingester writers
Goal: modify ingester architecture towards RF1, but don't actually write to storage yet, b/c we haven't solved coordinating interim reads/writes.
Deliverable: RF1 metrics proof
* run replicas==partitions (from ingesters)
* run every $INTERVAL (5m?),
* slim down ingester write path
* remove disk (all WALs).
* ignore limits if too complex (for now)
* /dev/null backend
# TODO improvements
* metadata store
* include offsets committed for coordination b/w ingester-readers & long term storage
* planner/scheduler+worker architecture
* shuffle sharding
# Things to solve
* limits application
* job sizing -- coordinate kafka offsets w/ underlying bytes added?
* ideally we can ask kafka for "the next 1GB" in a partition, but to do this we'd need the kafka offsets (auto-incremented integers for messages within a partition) to be derived from the message size. Right now, different batch sizes can cause kafka msgs to have very different sizes.
* idea: another set of partitions to store offsets->datasize? Sounds shitty tbh & breaks the consistency bound on writes (what if kafka acks first write but doesnt ack the second?)
* what if we stored byte counter metadata in kafka records so we could O(log(n)) seek an offset range w/ the closest $SIZE
* Likely a reasonable perf tradeoff as this isn't called often (only in job planner in the future).

@ -0,0 +1,811 @@
package blockbuilder
import (
"bytes"
"context"
"flag"
"fmt"
"math"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/storage/chunk"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/flagext"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/pkg/push"
)
const (
flushReasonFull = "full"
flushReasonMaxAge = "max_age"
onePointFiveMB = 3 << 19
)
type Config struct {
ConcurrentFlushes int `yaml:"concurrent_flushes"`
ConcurrentWriters int `yaml:"concurrent_writers"`
BlockSize flagext.ByteSize `yaml:"chunk_block_size"`
TargetChunkSize flagext.ByteSize `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
parsedEncoding compression.Codec `yaml:"-"` // placeholder for validated encoding
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
Interval time.Duration `yaml:"interval"`
Backoff backoff.Config `yaml:"backoff_config"`
}
func (cfg *Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.IntVar(&cfg.ConcurrentFlushes, prefix+"concurrent-flushes", 1, "How many flushes can happen concurrently")
f.IntVar(&cfg.ConcurrentWriters, prefix+"concurrent-writers", 1, "How many workers to process writes, defaults to number of available cpus")
_ = cfg.BlockSize.Set("256KB")
f.Var(&cfg.BlockSize, prefix+"chunks-block-size", "The targeted _uncompressed_ size in bytes of a chunk block When this threshold is exceeded the head block will be cut and compressed inside the chunk.")
_ = cfg.TargetChunkSize.Set(fmt.Sprint(onePointFiveMB))
f.Var(&cfg.TargetChunkSize, prefix+"chunk-target-size", "A target _compressed_ size in bytes for chunks. This is a desired size not an exact size, chunks may be slightly bigger or significantly smaller if they get flushed for other reasons (e.g. chunk_idle_period). A value of 0 creates chunks with a fixed 10 blocks, a non zero value will create chunks with a variable number of blocks to meet the target size.")
f.StringVar(&cfg.ChunkEncoding, prefix+"chunk-encoding", compression.Snappy.String(), fmt.Sprintf("The algorithm to use for compressing chunk. (%s)", compression.SupportedCodecs()))
f.DurationVar(&cfg.MaxChunkAge, prefix+"max-chunk-age", 2*time.Hour, "The maximum duration of a timeseries chunk in memory. If a timeseries runs for longer than this, the current chunk will be flushed to the store and a new chunk created.")
f.DurationVar(&cfg.Interval, prefix+"interval", 10*time.Minute, "The interval at which to run.")
cfg.Backoff.RegisterFlagsWithPrefix(prefix+"backoff.", f)
}
// RegisterFlags registers flags.
func (cfg *Config) RegisterFlags(flags *flag.FlagSet) {
cfg.RegisterFlagsWithPrefix("blockbuilder.", flags)
}
func (cfg *Config) Validate() error {
enc, err := compression.ParseCodec(cfg.ChunkEncoding)
if err != nil {
return err
}
cfg.parsedEncoding = enc
return nil
}
// BlockBuilder is a slimmed-down version of the ingester, intended to
// ingest logs without WALs. Broadly, it accumulates logs into per-tenant chunks in the same way the existing ingester does,
// without a WAL. Index (TSDB) creation is also not an out-of-band procedure and must be called directly. In essence, this
// allows us to buffer data, flushing chunks to storage as necessary, and then when ready to commit this, relevant TSDBs (one per period) are created and flushed to storage. This allows an external caller to prepare a batch of data, build relevant chunks+indices, ensure they're flushed, and then return. As long as chunk+index creation is deterministic, this operation is also
// idempotent, making retries simple and impossible to introduce duplicate data.
// It contains the following methods:
// - `Append(context.Context, logproto.PushRequest) error`
// Adds a push request to ingested data. May flush existing chunks when they're full/etc.
// - `Commit(context.Context) error`
// Serializes (cuts) any buffered data into chunks, flushes them to storage, then creates + flushes TSDB indices
// containing all chunk references. Finally, clears internal state.
type BlockBuilder struct {
services.Service
id string
cfg Config
periodicConfigs []config.PeriodConfig
metrics *SlimgesterMetrics
logger log.Logger
store stores.ChunkWriter
objStore *MultiStore
jobController *PartitionJobController
}
func NewBlockBuilder(
id string,
cfg Config,
periodicConfigs []config.PeriodConfig,
store stores.ChunkWriter,
objStore *MultiStore,
logger log.Logger,
reg prometheus.Registerer,
jobController *PartitionJobController,
) (*BlockBuilder,
error) {
i := &BlockBuilder{
id: id,
cfg: cfg,
periodicConfigs: periodicConfigs,
metrics: NewSlimgesterMetrics(reg),
logger: logger,
store: store,
objStore: objStore,
jobController: jobController,
}
i.Service = services.NewBasicService(nil, i.running, nil)
return i, nil
}
func (i *BlockBuilder) running(ctx context.Context) error {
ticker := time.NewTicker(i.cfg.Interval)
defer ticker.Stop()
// run once in beginning
select {
case <-ctx.Done():
return nil
default:
_, err := i.runOne(ctx)
if err != nil {
return err
}
}
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
skipped, err := i.runOne(ctx)
level.Info(i.logger).Log(
"msg", "completed block builder run", "skipped",
"skipped", skipped,
"err", err,
)
if err != nil {
return err
}
}
}
}
// runOne performs a single
func (i *BlockBuilder) runOne(ctx context.Context) (skipped bool, err error) {
exists, job, err := i.jobController.LoadJob(ctx)
if err != nil {
return false, err
}
if !exists {
level.Info(i.logger).Log("msg", "no available job to process")
return true, nil
}
logger := log.With(
i.logger,
"partition", job.Partition,
"job_min_offset", job.Offsets.Min,
"job_max_offset", job.Offsets.Max,
)
level.Debug(logger).Log("msg", "beginning job")
indexer := newTsdbCreator()
appender := newAppender(i.id,
i.cfg,
i.periodicConfigs,
i.store,
i.objStore,
logger,
i.metrics,
)
var lastOffset int64
p := newPipeline(ctx)
// Pipeline stage 1: Process the job offsets and write records to inputCh
// This stage reads from the partition and feeds records into the input channel
// When complete, it stores the last processed offset and closes the channel
inputCh := make(chan []AppendInput)
p.AddStageWithCleanup(
"load records",
1,
func(ctx context.Context) error {
lastOffset, err = i.jobController.Process(ctx, job.Offsets, inputCh)
return err
},
func(ctx context.Context) error {
level.Debug(logger).Log(
"msg", "finished loading records",
"ctx_error", ctx.Err(),
)
close(inputCh)
return nil
},
)
// Stage 2: Process input records and generate chunks
// This stage receives AppendInput batches, appends them to appropriate instances,
// and forwards any cut chunks to the chunks channel for flushing.
// ConcurrentWriters workers process inputs in parallel to maximize throughput.
flush := make(chan *chunk.Chunk)
p.AddStageWithCleanup(
"appender",
i.cfg.ConcurrentWriters,
func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case inputs, ok := <-inputCh:
// inputs are finished; we're done
if !ok {
return nil
}
for _, input := range inputs {
cut, err := appender.Append(ctx, input)
if err != nil {
level.Error(logger).Log("msg", "failed to append records", "err", err)
return err
}
for _, chk := range cut {
select {
case <-ctx.Done():
return ctx.Err()
case flush <- chk:
}
}
}
}
}
},
func(ctx context.Context) (err error) {
defer func() {
level.Debug(logger).Log(
"msg", "finished appender",
"err", err,
"ctx_error", ctx.Err(),
)
}()
defer close(flush)
// once we're done appending, cut all remaining chunks.
chks, err := appender.CutRemainingChunks(ctx)
if err != nil {
return err
}
for _, chk := range chks {
select {
case <-ctx.Done():
return ctx.Err()
case flush <- chk:
}
}
return nil
},
)
// Stage 3: Flush chunks to storage
// This stage receives chunks from the chunks channel and flushes them to storage
// using ConcurrentFlushes workers for parallel processing
p.AddStage(
"flusher",
i.cfg.ConcurrentFlushes,
func(ctx context.Context) error {
for {
select {
case <-ctx.Done():
return ctx.Err()
case chk, ok := <-flush:
if !ok {
return nil
}
if _, err := withBackoff(
ctx,
i.cfg.Backoff, // retry forever
func() (res struct{}, err error) {
err = i.store.PutOne(ctx, chk.From, chk.Through, *chk)
if err != nil {
i.metrics.chunksFlushFailures.Inc()
return
}
appender.reportFlushedChunkStatistics(chk)
// write flushed chunk to index
approxKB := math.Round(float64(chk.Data.UncompressedSize()) / float64(1<<10))
meta := index.ChunkMeta{
Checksum: chk.ChunkRef.Checksum,
MinTime: int64(chk.ChunkRef.From),
MaxTime: int64(chk.ChunkRef.Through),
KB: uint32(approxKB),
Entries: uint32(chk.Data.Entries()),
}
err = indexer.Append(chk.UserID, chk.Metric, chk.ChunkRef.Fingerprint, index.ChunkMetas{meta})
return
},
); err != nil {
return err
}
}
}
},
)
err = p.Run()
level.Debug(logger).Log(
"msg", "finished chunk creation",
"err", err,
)
if err != nil {
return false, err
}
var (
nodeName = i.id
tableRanges = config.GetIndexStoreTableRanges(types.TSDBType, i.periodicConfigs)
)
built, err := indexer.create(ctx, nodeName, tableRanges)
if err != nil {
return false, err
}
for _, db := range built {
u := newUploader(i.objStore)
if err := u.Put(ctx, db); err != nil {
level.Error(util_log.Logger).Log(
"msg", "failed to upload tsdb",
"path", db.id.Path(),
)
return false, err
}
level.Debug(logger).Log(
"msg", "uploaded tsdb",
"name", db.id.Name(),
)
}
if lastOffset <= job.Offsets.Min {
return false, nil
}
if err = i.jobController.part.Commit(ctx, lastOffset); err != nil {
level.Error(logger).Log(
"msg", "failed to commit offset",
"last_offset", lastOffset,
"err", err,
)
return false, err
}
// log success
level.Info(logger).Log(
"msg", "successfully processed and committed batch",
"last_offset", lastOffset,
)
return false, nil
}
type Appender struct {
id string
cfg Config
periodicConfigs []config.PeriodConfig
metrics *SlimgesterMetrics
logger log.Logger
instances map[string]*instance
instancesMtx sync.RWMutex
store stores.ChunkWriter
objStore *MultiStore
}
// Writer is a single use construct for building chunks
// for from a set of records. It's an independent struct to ensure its
// state is not reused across jobs.
func newAppender(
id string,
cfg Config,
periodicConfigs []config.PeriodConfig,
store stores.ChunkWriter,
objStore *MultiStore,
logger log.Logger,
metrics *SlimgesterMetrics,
) *Appender {
return &Appender{
id: id,
cfg: cfg,
periodicConfigs: periodicConfigs,
metrics: metrics,
logger: logger,
instances: make(map[string]*instance),
store: store,
objStore: objStore,
}
}
// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func (w *Appender) reportFlushedChunkStatistics(
ch *chunk.Chunk,
) {
byt, err := ch.Encoded()
if err != nil {
level.Error(w.logger).Log("msg", "failed to encode flushed wire chunk", "err", err)
return
}
sizePerTenant := w.metrics.chunkSizePerTenant.WithLabelValues(ch.UserID)
countPerTenant := w.metrics.chunksPerTenant.WithLabelValues(ch.UserID)
reason := flushReasonFull
from, through := ch.From.Time(), ch.Through.Time()
if through.Sub(from) > w.cfg.MaxChunkAge {
reason = flushReasonMaxAge
}
w.metrics.chunksFlushedPerReason.WithLabelValues(reason).Add(1)
compressedSize := float64(len(byt))
uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data)
if ok && compressedSize > 0 {
w.metrics.chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize)
}
utilization := ch.Data.Utilization()
w.metrics.chunkUtilization.Observe(utilization)
numEntries := ch.Data.Entries()
w.metrics.chunkEntries.Observe(float64(numEntries))
w.metrics.chunkSize.Observe(compressedSize)
sizePerTenant.Add(compressedSize)
countPerTenant.Inc()
w.metrics.chunkAge.Observe(time.Since(from).Seconds())
w.metrics.chunkLifespan.Observe(through.Sub(from).Hours())
w.metrics.flushedChunksBytesStats.Record(compressedSize)
w.metrics.flushedChunksLinesStats.Record(float64(numEntries))
w.metrics.flushedChunksUtilizationStats.Record(utilization)
w.metrics.flushedChunksAgeStats.Record(time.Since(from).Seconds())
w.metrics.flushedChunksLifespanStats.Record(through.Sub(from).Seconds())
w.metrics.flushedChunksStats.Inc(1)
}
func (w *Appender) CutRemainingChunks(ctx context.Context) ([]*chunk.Chunk, error) {
var chunks []*chunk.Chunk
w.instancesMtx.Lock()
defer w.instancesMtx.Unlock()
for _, inst := range w.instances {
// wrap in anonymous fn to make lock release more straightforward
if err := func() error {
inst.streams.mtx.Lock()
defer inst.streams.mtx.Unlock()
for _, stream := range inst.streams.byLabels {
// wrap in anonymous fn to make lock release more straightforward
if err := func() error {
stream.chunkMtx.Lock()
defer stream.chunkMtx.Unlock()
if stream.chunk != nil {
cut, err := stream.closeChunk()
if err != nil {
return err
}
encoded, err := inst.encodeChunk(ctx, stream, cut)
if err != nil {
return err
}
chunks = append(chunks, encoded)
}
return nil
}(); err != nil {
return err
}
}
return nil
}(); err != nil {
return nil, err
}
}
return chunks, nil
}
type AppendInput struct {
tenant string
// both labels & labelsStr are populated to prevent duplicating conversion work in multiple places
labels labels.Labels
labelsStr string
entries []push.Entry
}
func (w *Appender) Append(ctx context.Context, input AppendInput) ([]*chunk.Chunk, error) {
// use rlock so multiple appends can be called on same instance.
// re-check after using regular lock if it didnt exist.
w.instancesMtx.RLock()
inst, ok := w.instances[input.tenant]
w.instancesMtx.RUnlock()
if !ok {
w.instancesMtx.Lock()
inst, ok = w.instances[input.tenant]
if !ok {
inst = newInstance(w.cfg, input.tenant, w.metrics, w.periodicConfigs, w.logger)
w.instances[input.tenant] = inst
}
w.instancesMtx.Unlock()
}
closed, err := inst.Push(ctx, input)
return closed, err
}
// instance is a slimmed down version from the ingester pkg
type instance struct {
cfg Config
tenant string
buf []byte // buffer used to compute fps.
mapper *ingester.FpMapper // using of mapper no longer needs mutex because reading from streams is lock-free
metrics *SlimgesterMetrics
streams *streamsMap
logger log.Logger
periods []config.PeriodConfig
}
func newInstance(
cfg Config,
tenant string,
metrics *SlimgesterMetrics,
periods []config.PeriodConfig,
logger log.Logger,
) *instance {
streams := newStreamsMap()
return &instance{
cfg: cfg,
tenant: tenant,
buf: make([]byte, 0, 1024),
mapper: ingester.NewFPMapper(streams.getLabelsFromFingerprint),
metrics: metrics,
streams: streams,
logger: logger,
periods: periods,
}
}
func newStreamsMap() *streamsMap {
return &streamsMap{
byLabels: make(map[string]*stream),
byFp: make(map[model.Fingerprint]*stream),
}
}
type streamsMap struct {
// labels -> stream
byLabels map[string]*stream
byFp map[model.Fingerprint]*stream
mtx sync.RWMutex
}
// For performs an operation on an existing stream, creating it if it wasn't previously present.
func (m *streamsMap) For(
ls string,
createFn func() (*stream, error),
fn func(*stream) error,
) error {
// first use read lock in case the stream exists
m.mtx.RLock()
if s, ok := m.byLabels[ls]; ok {
err := fn(s)
m.mtx.RUnlock()
return err
}
m.mtx.RUnlock()
// Stream wasn't found, acquire write lock to create it
m.mtx.Lock()
defer m.mtx.Unlock()
// Double check it wasn't created while we were upgrading the lock
if s, ok := m.byLabels[ls]; ok {
return fn(s)
}
// Create new stream
s, err := createFn()
if err != nil {
return err
}
m.byLabels[ls] = s
m.byFp[s.fp] = s
return fn(s)
}
// Return labels associated with given fingerprint. Used by fingerprint mapper.
func (m *streamsMap) getLabelsFromFingerprint(fp model.Fingerprint) labels.Labels {
if s, ok := m.byFp[fp]; ok {
return s.ls
}
return nil
}
func (i *instance) getHashForLabels(ls labels.Labels) model.Fingerprint {
var fp uint64
fp, i.buf = ls.HashWithoutLabels(i.buf, []string(nil)...)
return i.mapper.MapFP(model.Fingerprint(fp), ls)
}
// Push will iterate over the given streams present in the PushRequest and attempt to store them.
func (i *instance) Push(
ctx context.Context,
input AppendInput,
) (closed []*chunk.Chunk, err error) {
err = i.streams.For(
input.labelsStr,
func() (*stream, error) {
fp := i.getHashForLabels(input.labels)
return newStream(fp, input.labels, i.cfg, i.metrics), nil
},
func(stream *stream) error {
xs, err := stream.Push(input.entries)
if err != nil {
return err
}
if len(xs) > 0 {
for _, x := range xs {
// encodeChunk mutates the chunk so we must pass by reference
chk, err := i.encodeChunk(ctx, stream, x)
if err != nil {
return err
}
closed = append(closed, chk)
}
}
return err
},
)
return closed, err
}
// encodeChunk encodes a chunk.Chunk.
func (i *instance) encodeChunk(ctx context.Context, stream *stream, mc *chunkenc.MemChunk) (*chunk.Chunk, error) {
if err := ctx.Err(); err != nil {
return nil, err
}
start := time.Now()
firstTime, lastTime := util.RoundToMilliseconds(mc.Bounds())
chk := chunk.NewChunk(
i.tenant, stream.fp, stream.ls,
chunkenc.NewFacade(mc, stream.blockSize, stream.targetChunkSize),
firstTime,
lastTime,
)
chunkBytesSize := mc.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
if err := chk.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize)), i.logger); err != nil {
if !errors.Is(err, chunk.ErrChunkDecode) {
return nil, fmt.Errorf("chunk encoding: %w", err)
}
i.metrics.chunkDecodeFailures.WithLabelValues(chk.UserID).Inc()
}
i.metrics.chunkEncodeTime.Observe(time.Since(start).Seconds())
i.metrics.chunksEncoded.WithLabelValues(chk.UserID).Inc()
return &chk, nil
}
type stream struct {
fp model.Fingerprint
ls labels.Labels
chunkFormat byte
codec compression.Codec
blockSize int
targetChunkSize int
chunkMtx sync.RWMutex
chunk *chunkenc.MemChunk
metrics *SlimgesterMetrics
}
func newStream(fp model.Fingerprint, ls labels.Labels, cfg Config, metrics *SlimgesterMetrics) *stream {
return &stream{
fp: fp,
ls: ls,
chunkFormat: chunkenc.ChunkFormatV4,
codec: cfg.parsedEncoding,
blockSize: cfg.BlockSize.Val(),
targetChunkSize: cfg.TargetChunkSize.Val(),
metrics: metrics,
}
}
func (s *stream) Push(entries []push.Entry) (closed []*chunkenc.MemChunk, err error) {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if s.chunk == nil {
s.chunk = s.NewChunk()
}
// bytesAdded, err := s.storeEntries(ctx, toStore, usageTracker)
for i := 0; i < len(entries); i++ {
// cut the chunk if the new addition overflows target size
if !s.chunk.SpaceFor(&entries[i]) {
cut, err := s.closeChunk()
if err != nil {
return nil, err
}
closed = append(closed, cut)
}
if _, err = s.chunk.Append(&entries[i]); err != nil {
return closed, errors.Wrap(err, "appending entry")
}
}
return closed, nil
}
func (s *stream) closeChunk() (*chunkenc.MemChunk, error) {
if err := s.chunk.Close(); err != nil {
return nil, errors.Wrap(err, "closing chunk")
}
s.metrics.samplesPerChunk.Observe(float64(s.chunk.Size()))
s.metrics.blocksPerChunk.Observe(float64(s.chunk.BlockCount()))
s.metrics.chunksCreatedTotal.Inc()
s.metrics.chunkCreatedStats.Inc(1)
// add a chunk
res := s.chunk
s.chunk = s.NewChunk()
return res, nil
}
func (s *stream) NewChunk() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(
s.chunkFormat,
s.codec,
chunkenc.ChunkHeadFormatFor(s.chunkFormat),
s.blockSize,
s.targetChunkSize,
)
}
func withBackoff[T any](
ctx context.Context,
config backoff.Config,
fn func() (T, error),
) (T, error) {
var zero T
var boff = backoff.New(ctx, config)
for boff.Ongoing() {
res, err := fn()
if err != nil {
boff.Wait()
continue
}
return res, nil
}
return zero, boff.ErrCause()
}

@ -0,0 +1,154 @@
package blockbuilder
import (
"context"
"fmt"
"io"
"sort"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/client"
"github.com/grafana/loki/v3/pkg/storage/config"
)
type MultiStore struct {
stores []*storeEntry
storageConfig storage.Config
}
type storeEntry struct {
start model.Time
cfg config.PeriodConfig
objectClient client.ObjectClient
}
var _ client.ObjectClient = (*MultiStore)(nil)
func NewMultiStore(
periodicConfigs []config.PeriodConfig,
storageConfig storage.Config,
clientMetrics storage.ClientMetrics,
) (*MultiStore, error) {
store := &MultiStore{
storageConfig: storageConfig,
}
// sort by From time
sort.Slice(periodicConfigs, func(i, j int) bool {
return periodicConfigs[i].From.Time.Before(periodicConfigs[j].From.Time)
})
for _, periodicConfig := range periodicConfigs {
objectClient, err := storage.NewObjectClient(periodicConfig.ObjectType, "storage-rf1", storageConfig, clientMetrics)
if err != nil {
return nil, fmt.Errorf("creating object client for period %s: %w ", periodicConfig.From, err)
}
store.stores = append(store.stores, &storeEntry{
start: periodicConfig.From.Time,
cfg: periodicConfig,
objectClient: objectClient,
})
}
return store, nil
}
func (m *MultiStore) GetStoreFor(ts model.Time) (client.ObjectClient, error) {
// find the schema with the lowest start _after_ tm
j := sort.Search(len(m.stores), func(j int) bool {
return m.stores[j].start > ts
})
// reduce it by 1 because we want a schema with start <= tm
j--
if 0 <= j && j < len(m.stores) {
return m.stores[j].objectClient, nil
}
// should in theory never happen
return nil, fmt.Errorf("no store found for timestamp %s", ts)
}
func (m *MultiStore) GetAttributes(ctx context.Context, objectKey string) (client.ObjectAttributes, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return client.ObjectAttributes{}, err
}
return s.GetAttributes(ctx, objectKey)
}
func (m *MultiStore) ObjectExists(ctx context.Context, objectKey string) (bool, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return false, err
}
return s.ObjectExists(ctx, objectKey)
}
func (m *MultiStore) PutObject(ctx context.Context, objectKey string, object io.Reader) error {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return err
}
return s.PutObject(ctx, objectKey, object)
}
func (m *MultiStore) GetObject(ctx context.Context, objectKey string) (io.ReadCloser, int64, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, 0, err
}
return s.GetObject(ctx, objectKey)
}
func (m *MultiStore) GetObjectRange(ctx context.Context, objectKey string, off, length int64) (io.ReadCloser, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "GetObjectRange")
if sp != nil {
sp.LogKV("objectKey", objectKey, "off", off, "length", length)
}
defer sp.Finish()
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, err
}
return s.GetObjectRange(ctx, objectKey, off, length)
}
func (m *MultiStore) List(ctx context.Context, prefix string, delimiter string) ([]client.StorageObject, []client.StorageCommonPrefix, error) {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return nil, nil, err
}
return s.List(ctx, prefix, delimiter)
}
func (m *MultiStore) DeleteObject(ctx context.Context, objectKey string) error {
s, err := m.GetStoreFor(model.Now())
if err != nil {
return err
}
return s.DeleteObject(ctx, objectKey)
}
func (m *MultiStore) IsObjectNotFoundErr(err error) bool {
s, _ := m.GetStoreFor(model.Now())
if s == nil {
return false
}
return s.IsObjectNotFoundErr(err)
}
func (m *MultiStore) IsRetryableErr(err error) bool {
s, _ := m.GetStoreFor(model.Now())
if s == nil {
return false
}
return s.IsRetryableErr(err)
}
func (m *MultiStore) Stop() {
for _, s := range m.stores {
s.objectClient.Stop()
}
}

@ -0,0 +1,37 @@
package blockbuilder
import (
"os"
"testing"
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/storage"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
)
var metrics *storage.ClientMetrics
func NewTestStorage(t testing.TB) (*MultiStore, error) {
if metrics == nil {
m := storage.NewClientMetrics()
metrics = &m
}
dir := t.TempDir()
t.Cleanup(func() {
os.RemoveAll(dir)
metrics.Unregister()
})
cfg := storage.Config{
FSConfig: local.FSConfig{
Directory: dir,
},
}
return NewMultiStore([]config.PeriodConfig{
{
From: config.DayTime{Time: model.Now()},
ObjectType: "filesystem",
},
}, cfg, *metrics)
}

@ -0,0 +1,328 @@
package blockbuilder
import (
"bytes"
"context"
"fmt"
"io"
"sync"
"time"
"github.com/cespare/xxhash/v2"
"github.com/go-kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/compression"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
// TsdbCreator accepts writes and builds TSDBs.
type TsdbCreator struct {
// Function to build a TSDB from the current state
mtx sync.RWMutex
shards int
heads *tenantHeads
}
// new creates a new HeadManager
func newTsdbCreator() *TsdbCreator {
m := &TsdbCreator{
shards: 1 << 5, // 32 shards
}
m.reset()
return m
}
// reset updates heads
func (m *TsdbCreator) reset() {
m.heads = newTenantHeads(m.shards)
}
// Append adds a new series for the given user
func (m *TsdbCreator) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) error {
m.mtx.RLock()
defer m.mtx.RUnlock()
// TODO(owen-d): safe to remove?
// Remove __name__="logs" as it's not needed in TSDB
b := labels.NewBuilder(ls)
b.Del(labels.MetricName)
ls = b.Labels()
// Just append to heads, no WAL needed
m.heads.Append(userID, ls, fprint, chks)
return nil
}
type chunkInfo struct {
chunkMetas index.ChunkMetas
tsdbFormat int
}
type tsdbWithID struct {
bucket model.Time
data []byte
id tsdb.Identifier
}
// Create builds a TSDB from the current state using the provided mkTsdb function
func (m *TsdbCreator) create(ctx context.Context, nodeName string, tableRanges []config.TableRange) ([]tsdbWithID, error) {
m.mtx.Lock()
defer m.mtx.Unlock()
type key struct {
bucket model.Time
prefix string
}
periods := make(map[key]*tsdb.Builder)
if err := m.heads.forAll(
func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[key]chunkInfo)
for _, chk := range chks {
idxBuckets := tsdb.IndexBuckets(chk.From(), chk.Through(), tableRanges)
for _, bucket := range idxBuckets {
k := key{
bucket: bucket.BucketStart,
prefix: bucket.Prefix,
}
chkinfo := pds[k]
chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk)
chkinfo.tsdbFormat = bucket.TsdbFormat
pds[k] = chkinfo
}
}
// Embed the tenant label into TSDB
lb := labels.NewBuilder(ls)
lb.Set(index.TenantLabel, user)
withTenant := lb.Labels()
// Add the chunks to all relevant builders
for pd, chkinfo := range pds {
matchingChks := chkinfo.chunkMetas
b, ok := periods[pd]
if !ok {
b = tsdb.NewBuilder(chkinfo.tsdbFormat)
periods[pd] = b
}
b.AddSeries(
withTenant,
// use the fingerprint without the added tenant label
// so queries route to the chunks which actually exist.
model.Fingerprint(fp),
matchingChks,
)
}
return nil
},
); err != nil {
level.Error(util_log.Logger).Log("err", err.Error(), "msg", "building TSDB")
return nil, err
}
now := time.Now()
res := make([]tsdbWithID, 0, len(periods))
for p, b := range periods {
level.Debug(util_log.Logger).Log(
"msg", "building tsdb for period",
"pd", p,
)
// build+move tsdb to multitenant dir
start := time.Now()
dst, data, err := b.BuildInMemory(
ctx,
func(_, _ model.Time, _ uint32) tsdb.Identifier {
return tsdb.NewPrefixedIdentifier(
tsdb.MultitenantTSDBIdentifier{
NodeName: nodeName,
Ts: now,
},
p.prefix,
"",
)
},
)
if err != nil {
return nil, err
}
level.Debug(util_log.Logger).Log(
"msg", "finished building tsdb for period",
"pd", p,
"dst", dst.Path(),
"duration", time.Since(start),
)
res = append(res, tsdbWithID{
bucket: p.bucket,
id: dst,
data: data,
})
}
m.reset()
return res, nil
}
// tenantHeads manages per-tenant series
type tenantHeads struct {
shards int
locks []sync.RWMutex
tenants []map[string]*Head
}
func newTenantHeads(shards int) *tenantHeads {
t := &tenantHeads{
shards: shards,
locks: make([]sync.RWMutex, shards),
tenants: make([]map[string]*Head, shards),
}
for i := range t.tenants {
t.tenants[i] = make(map[string]*Head)
}
return t
}
func (t *tenantHeads) Append(userID string, ls labels.Labels, fprint uint64, chks index.ChunkMetas) {
head := t.getOrCreateTenantHead(userID)
head.Append(ls, fprint, chks)
}
func (t *tenantHeads) getOrCreateTenantHead(userID string) *Head {
idx := t.shardForTenant(userID)
mtx := &t.locks[idx]
// Fast path: return existing head
mtx.RLock()
head, ok := t.tenants[idx][userID]
mtx.RUnlock()
if ok {
return head
}
// Slow path: create new head
mtx.Lock()
defer mtx.Unlock()
head, ok = t.tenants[idx][userID]
if !ok {
head = NewHead(userID)
t.tenants[idx][userID] = head
}
return head
}
func (t *tenantHeads) shardForTenant(userID string) uint64 {
return xxhash.Sum64String(userID) & uint64(t.shards-1)
}
// forAll iterates through all series in all tenant heads
func (t *tenantHeads) forAll(fn func(user string, ls labels.Labels, fp uint64, chks index.ChunkMetas) error) error {
for i, shard := range t.tenants {
t.locks[i].RLock()
defer t.locks[i].RUnlock()
for user, tenant := range shard {
if err := tenant.forAll(func(ls labels.Labels, fp uint64, chks index.ChunkMetas) error {
return fn(user, ls, fp, chks)
}); err != nil {
return err
}
}
}
return nil
}
// Head manages series for a single tenant
type Head struct {
userID string
series map[uint64]*series
mtx sync.RWMutex
}
type series struct {
labels labels.Labels
chks []index.ChunkMeta
}
func NewHead(userID string) *Head {
return &Head{
userID: userID,
series: make(map[uint64]*series),
}
}
func (h *Head) Append(ls labels.Labels, fp uint64, chks index.ChunkMetas) {
h.mtx.Lock()
defer h.mtx.Unlock()
s, ok := h.series[fp]
if !ok {
s = &series{labels: ls}
h.series[fp] = s
}
s.chks = append(s.chks, chks...)
}
func (h *Head) forAll(fn func(ls labels.Labels, fp uint64, chks index.ChunkMetas) error) error {
h.mtx.RLock()
defer h.mtx.RUnlock()
for fp, s := range h.series {
if err := fn(s.labels, fp, s.chks); err != nil {
return err
}
}
return nil
}
type uploader struct {
store *MultiStore
}
func newUploader(store *MultiStore) *uploader {
return &uploader{store: store}
}
func (u *uploader) Put(ctx context.Context, db tsdbWithID) error {
client, err := u.store.GetStoreFor(db.bucket)
if err != nil {
return err
}
reader := bytes.NewReader(db.data)
gzipPool := compression.GetWriterPool(compression.GZIP)
buf := bytes.NewBuffer(make([]byte, 0, 1<<20))
compressedWriter := gzipPool.GetWriter(buf)
defer gzipPool.PutWriter(compressedWriter)
_, err = io.Copy(compressedWriter, reader)
if err != nil {
return err
}
err = compressedWriter.Close()
if err != nil {
return err
}
return client.PutObject(ctx, db.id.Path(), buf)
}
func buildFileName(indexName string) string {
return fmt.Sprintf("%s.gz", indexName)
}

@ -16,6 +16,10 @@ import (
"github.com/twmb/franz-go/pkg/kerr"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
)
type SpecialOffset int
@ -97,6 +101,35 @@ type Reader struct {
logger log.Logger
}
func NewReader(
cfg kafka.Config,
partitionID int32,
instanceID string,
logger log.Logger,
reg prometheus.Registerer,
) (*Reader, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
cfg,
clientMetrics,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}
// Create the reader
return newReader(
c,
cfg.Topic,
partitionID,
cfg.GetConsumerGroup(instanceID, partitionID),
logger,
reg,
), nil
}
// NewReader creates a new Reader instance
func newReader(
client *kgo.Client,

@ -15,7 +15,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/client"
)
var errWaitTargetLagDeadlineExceeded = errors.New("waiting for target lag deadline exceeded")
@ -81,27 +80,20 @@ func NewReaderService(
logger log.Logger,
reg prometheus.Registerer,
) (*ReaderService, error) {
// Create a new Kafka client for this reader
clientMetrics := client.NewReaderClientMetrics("partition-reader", reg)
c, err := client.NewReaderClient(
kafkaCfg,
clientMetrics,
log.With(logger, "component", "kafka-client"),
)
if err != nil {
return nil, fmt.Errorf("creating kafka client: %w", err)
}
// Create the reader
reader := newReader(
c,
kafkaCfg.Topic,
reader, err := NewReader(
kafkaCfg,
partitionID,
kafkaCfg.GetConsumerGroup(instanceID, partitionID),
instanceID,
logger,
reg,
)
if err != nil {
return nil, errors.Wrap(err, "creating kafka reader")
}
return newReaderServiceFromIfc(
ReaderConfig{
TargetConsumerLagAtStartup: kafkaCfg.TargetConsumerLagAtStartup,

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/blockbuilder"
"github.com/grafana/loki/v3/pkg/bloombuild"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
@ -89,6 +90,7 @@ type Config struct {
RulerStorage rulestore.Config `yaml:"ruler_storage,omitempty" doc:"hidden"`
IngesterClient ingester_client.Config `yaml:"ingester_client,omitempty"`
Ingester ingester.Config `yaml:"ingester,omitempty"`
BlockBuilder blockbuilder.Config `yaml:"block_builder,omitempty"`
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"`
@ -183,6 +185,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.OperationalConfig.RegisterFlags(f)
c.Profiling.RegisterFlags(f)
c.KafkaConfig.RegisterFlags(f)
c.BlockBuilder.RegisterFlags(f)
}
func (c *Config) registerServerFlagsWithChangedDefaultValues(fs *flag.FlagSet) {
@ -258,6 +261,9 @@ func (c *Config) Validate() error {
if err := c.Ingester.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid ingester config"))
}
if err := c.BlockBuilder.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid block_builder config"))
}
if err := c.LimitsConfig.Validate(); err != nil {
errs = append(errs, errors.Wrap(err, "CONFIG ERROR: invalid limits_config config"))
}
@ -372,6 +378,7 @@ type Loki struct {
indexGatewayRingManager *lokiring.RingManager
partitionRingWatcher *ring.PartitionRingWatcher
partitionRing *ring.PartitionInstanceRing
blockBuilder *blockbuilder.BlockBuilder
ClientMetrics storage.ClientMetrics
deleteClientMetrics *deletion.DeleteRequestClientMetrics
@ -682,6 +689,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(PatternIngesterTee, t.initPatternIngesterTee, modules.UserInvisibleModule)
mm.RegisterModule(PatternIngester, t.initPatternIngester)
mm.RegisterModule(PartitionRing, t.initPartitionRing, modules.UserInvisibleModule)
mm.RegisterModule(BlockBuilder, t.initBlockBuilder)
mm.RegisterModule(All, nil)
mm.RegisterModule(Read, nil)
@ -719,6 +727,7 @@ func (t *Loki) setupModuleManager() error {
IndexGatewayRing: {Overrides, MemberlistKV},
PartitionRing: {MemberlistKV, Server, Ring},
MemberlistKV: {Server},
BlockBuilder: {PartitionRing, Store, Server},
Read: {QueryFrontend, Querier},
Write: {Ingester, Distributor, PatternIngester},

@ -35,6 +35,7 @@ import (
"github.com/prometheus/common/model"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/blockbuilder"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"
@ -47,6 +48,8 @@ import (
"github.com/grafana/loki/v3/pkg/distributor"
"github.com/grafana/loki/v3/pkg/indexgateway"
"github.com/grafana/loki/v3/pkg/ingester"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/kafka/partitionring"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
@ -134,6 +137,7 @@ const (
Analytics string = "analytics"
InitCodec string = "init-codec"
PartitionRing string = "partition-ring"
BlockBuilder string = "block-builder"
)
const (
@ -875,6 +879,12 @@ func (t *Loki) updateConfigForShipperStore() {
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
case t.Cfg.isTarget(BlockBuilder):
// Blockbuilder handles index creation independently of the shipper.
// TODO: introduce Disabled mode for boltdb shipper and set it here.
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeDisabled
default:
// All other targets use the shipper store in RW mode
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadWrite
@ -1770,6 +1780,61 @@ func (t *Loki) initPartitionRing() (services.Service, error) {
return t.partitionRingWatcher, nil
}
func (t *Loki) initBlockBuilder() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "block_builder")
// TODO(owen-d): perhaps refactor to not use the ingester config?
id := t.Cfg.Ingester.LifecyclerConfig.ID
ingestPartitionID, err := partitionring.ExtractIngesterPartitionID(id)
if err != nil {
return nil, fmt.Errorf("calculating block builder partition ID: %w", err)
}
reader, err := partition.NewReader(
t.Cfg.KafkaConfig,
ingestPartitionID,
id,
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}
controller, err := blockbuilder.NewPartitionJobController(
reader,
t.Cfg.BlockBuilder.Backoff,
)
if err != nil {
return nil, err
}
objectStore, err := blockbuilder.NewMultiStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics)
if err != nil {
return nil, err
}
bb, err := blockbuilder.NewBlockBuilder(
id,
t.Cfg.BlockBuilder,
t.Cfg.SchemaConfig.Configs,
t.Store,
objectStore,
logger,
prometheus.DefaultRegisterer,
controller,
)
if err != nil {
return nil, err
}
t.blockBuilder = bb
return t.blockBuilder, nil
}
func (t *Loki) deleteRequestsClient(clientType string, limits limiter.CombinedLimits) (deletion.DeleteRequestsClient, error) {
if !t.supportIndexDeleteRequest() || !t.Cfg.CompactorConfig.RetentionEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil

@ -33,6 +33,9 @@ const (
ModeReadOnly = Mode("RO")
// ModeWriteOnly is to allow only write operations
ModeWriteOnly = Mode("WO")
// ModeDisabled is a no-op implementation which does nothing & does not error.
// It's used by the blockbuilder which handles index operations independently.
ModeDisabled = Mode("NO")
// FilesystemObjectStoreType holds the periodic config type for the filesystem store
FilesystemObjectStoreType = "filesystem"
@ -142,6 +145,8 @@ type indexShipper struct {
func NewIndexShipper(prefix string, cfg Config, storageClient client.ObjectClient, limits downloads.Limits,
tenantFilter downloads.TenantFilter, open index.OpenIndexFileFunc, tableRangeToHandle config.TableRange, reg prometheus.Registerer, logger log.Logger) (IndexShipper, error) {
switch cfg.Mode {
case ModeDisabled:
return Noop{}, nil
case ModeReadOnly, ModeWriteOnly, ModeReadWrite:
default:
return nil, fmt.Errorf("invalid mode: %v", cfg.Mode)

@ -134,8 +134,8 @@ func setupMultiTenantIndex(t *testing.T, indexFormat int, userStreams map[string
dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: "test",
ts: ts,
NodeName: "test",
Ts: ts,
},
destDir,
"",
@ -239,7 +239,7 @@ func TestCompactor_Compact(t *testing.T) {
PeriodicTableConfig: config.PeriodicTableConfig{Period: config.ObjectStorageIndexRequiredPeriod}},
Schema: "v12",
}
indexBkts := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
indexBkts := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
tableName := indexBkts[0]
lbls1 := mustParseLabels(`{foo="bar", a="b"}`)
@ -497,8 +497,8 @@ func TestCompactor_Compact(t *testing.T) {
t.Run(name, func(t *testing.T) {
tempDir := t.TempDir()
objectStoragePath := filepath.Join(tempDir, objectsStorageDirName)
tablePathInStorage := filepath.Join(objectStoragePath, tableName.prefix)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.prefix)
tablePathInStorage := filepath.Join(objectStoragePath, tableName.Prefix)
tableWorkingDirectory := filepath.Join(tempDir, workingDirName, tableName.Prefix)
require.NoError(t, util.EnsureDirectory(objectStoragePath))
require.NoError(t, util.EnsureDirectory(tablePathInStorage))
@ -551,7 +551,7 @@ func TestCompactor_Compact(t *testing.T) {
objectClient, err := local.NewFSObjectClient(local.FSConfig{Directory: objectStoragePath})
require.NoError(t, err)
_, commonPrefixes, err := objectClient.List(context.Background(), tableName.prefix, "/")
_, commonPrefixes, err := objectClient.List(context.Background(), tableName.Prefix, "/")
require.NoError(t, err)
initializedIndexSets := map[string]compactor.IndexSet{}
@ -559,19 +559,19 @@ func TestCompactor_Compact(t *testing.T) {
existingUserIndexSets := make(map[string]compactor.IndexSet, len(commonPrefixes))
for _, commonPrefix := range commonPrefixes {
userID := path.Base(string(commonPrefix))
idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
require.NoError(t, err)
existingUserIndexSets[userID] = idxSet
initializedIndexSets[userID] = idxSet
}
commonIndexSet, err := newMockIndexSet("", tableName.prefix, tableWorkingDirectory, objectClient)
commonIndexSet, err := newMockIndexSet("", tableName.Prefix, tableWorkingDirectory, objectClient)
require.NoError(t, err)
// build TableCompactor and compact the index
tCompactor := newTableCompactor(context.Background(), commonIndexSet, existingUserIndexSets, func(userID string) (compactor.IndexSet, error) {
idxSet, err := newMockIndexSet(userID, tableName.prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
idxSet, err := newMockIndexSet(userID, tableName.Prefix, filepath.Join(tableWorkingDirectory, userID), objectClient)
require.NoError(t, err)
initializedIndexSetsMtx.Lock()
@ -875,9 +875,9 @@ func setupCompactedIndex(t *testing.T) *testContext {
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{periodConfig},
}
indexBuckets := indexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
indexBuckets := IndexBuckets(now, now, []config.TableRange{periodConfig.GetIndexTableNumberRange(config.DayTime{Time: now})})
tableName := indexBuckets[0]
tableInterval := retention.ExtractIntervalFromTableName(tableName.prefix)
tableInterval := retention.ExtractIntervalFromTableName(tableName.Prefix)
// shiftTableStart shift tableInterval.Start by the given amount of milliseconds.
// It is used for building chunkmetas relative to start time of the table.
shiftTableStart := func(ms int64) int64 {
@ -900,7 +900,7 @@ func setupCompactedIndex(t *testing.T) *testContext {
builder.FinalizeChunks()
return newCompactedIndex(context.Background(), tableName.prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
return newCompactedIndex(context.Background(), tableName.Prefix, buildUserID(0), t.TempDir(), periodConfig, builder)
}
expectedChunkEntries := map[string][]retention.ChunkEntry{

@ -161,13 +161,13 @@ func ParseSingleTenantTSDBPath(p string) (id SingleTenantTSDBIdentifier, ok bool
}
type MultitenantTSDBIdentifier struct {
nodeName string
ts time.Time
NodeName string
Ts time.Time
}
// Name builds filename with format <file-creation-ts> + `-` + `<nodeName>
func (id MultitenantTSDBIdentifier) Name() string {
return fmt.Sprintf("%d-%s.tsdb", id.ts.Unix(), id.nodeName)
return fmt.Sprintf("%d-%s.tsdb", id.Ts.Unix(), id.NodeName)
}
func (id MultitenantTSDBIdentifier) Path() string {
@ -200,7 +200,7 @@ func parseMultitenantTSDBNameFromBase(name string) (res MultitenantTSDBIdentifie
}
return MultitenantTSDBIdentifier{
ts: time.Unix(int64(ts), 0),
nodeName: strings.Join(xs[1:], "-"),
Ts: time.Unix(int64(ts), 0),
NodeName: strings.Join(xs[1:], "-"),
}, true
}

@ -62,6 +62,10 @@ const (
fingerprintInterval = 1 << 10
millisecondsInHour = int64(time.Hour / time.Millisecond)
// reserved; used in multitenant indices to signal the tenant. Eventually compacted away when
// single tenant indices are created.
TenantLabel = "__loki_tenant__"
)
type indexWriterStage uint8

@ -40,9 +40,9 @@ func (i indexIterFunc) For(_ context.Context, _ int, f func(context.Context, Ind
func (i *indexShipperQuerier) indices(ctx context.Context, from, through model.Time, user string) (Index, error) {
itr := indexIterFunc(func(f func(context.Context, Index) error) error {
// Ensure we query both per tenant and multitenant TSDBs
idxBuckets := indexBuckets(from, through, []config.TableRange{i.tableRange})
idxBuckets := IndexBuckets(from, through, []config.TableRange{i.tableRange})
for _, bkt := range idxBuckets {
if err := i.shipper.ForEachConcurrent(ctx, bkt.prefix, user, func(multitenant bool, idx shipperindex.Index) error {
if err := i.shipper.ForEachConcurrent(ctx, bkt.Prefix, user, func(multitenant bool, idx shipperindex.Index) error {
impl, ok := idx.(Index)
if !ok {
return fmt.Errorf("unexpected shipper index type: %T", idx)

@ -165,13 +165,13 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe
// chunks may overlap index period bounds, in which case they're written to multiple
pds := make(map[string]chunkInfo)
for _, chk := range chks {
idxBuckets := indexBuckets(chk.From(), chk.Through(), tableRanges)
idxBuckets := IndexBuckets(chk.From(), chk.Through(), tableRanges)
for _, bucket := range idxBuckets {
chkinfo := pds[bucket.prefix]
chkinfo := pds[bucket.Prefix]
chkinfo.chunkMetas = append(chkinfo.chunkMetas, chk)
chkinfo.tsdbFormat = bucket.tsdbFormat
pds[bucket.prefix] = chkinfo
chkinfo.tsdbFormat = bucket.TsdbFormat
pds[bucket.Prefix] = chkinfo
}
}
@ -208,8 +208,8 @@ func (m *tsdbManager) buildFromHead(heads *tenantHeads, indexShipper indexshippe
dstDir := filepath.Join(managerMultitenantDir(m.dir), fmt.Sprint(p))
dst := NewPrefixedIdentifier(
MultitenantTSDBIdentifier{
nodeName: m.nodeName,
ts: heads.start,
NodeName: m.nodeName,
Ts: heads.start,
},
dstDir,
"",
@ -300,19 +300,24 @@ func (m *tsdbManager) BuildFromWALs(t time.Time, ids []WALIdentifier, legacy boo
return nil
}
type indexInfo struct {
prefix string
tsdbFormat int
type IndexInfo struct {
BucketStart model.Time
Prefix string
TsdbFormat int
}
func indexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []indexInfo) {
func IndexBuckets(from, through model.Time, tableRanges config.TableRanges) (res []IndexInfo) {
start := from.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
end := through.Time().UnixNano() / int64(config.ObjectStorageIndexRequiredPeriod)
for cur := start; cur <= end; cur++ {
cfg := tableRanges.ConfigForTableNumber(cur)
if cfg != nil {
tsdbFormat, _ := cfg.TSDBFormat() // Ignoring error, as any valid period config should return valid format.
res = append(res, indexInfo{prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)), tsdbFormat: tsdbFormat})
res = append(res, IndexInfo{
BucketStart: model.TimeFromUnixNano(cur * int64(config.ObjectStorageIndexRequiredPeriod)),
Prefix: cfg.IndexTables.Prefix + strconv.Itoa(int(cur)),
TsdbFormat: tsdbFormat,
})
}
}
if len(res) == 0 {

@ -0,0 +1,71 @@
package tsdb
import (
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb/index"
"github.com/grafana/loki/v3/pkg/storage/types"
)
func TestIndexBuckets(t *testing.T) {
var (
day0 = model.Time(0)
day1 = day0.Add(24 * time.Hour)
day2 = day1.Add(24 * time.Hour)
periods = []config.PeriodConfig{
{
From: config.NewDayTime(day0),
Schema: "v12",
IndexType: "tsdb",
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index/",
Period: 24 * time.Hour,
},
},
},
{
From: config.NewDayTime(day2),
Schema: "v13",
IndexType: "tsdb",
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index2/",
Period: 24 * time.Hour,
},
},
},
}
tableRanges = config.GetIndexStoreTableRanges(types.TSDBType, periods)
)
tests := []struct {
name string
from model.Time
through model.Time
expectedInfo []IndexInfo
}{
{
name: "single table range",
from: day0,
through: day2,
expectedInfo: []IndexInfo{
{BucketStart: day0, TsdbFormat: index.FormatV2, Prefix: "index/0"},
{BucketStart: day1, TsdbFormat: index.FormatV2, Prefix: "index/1"},
{BucketStart: day2, TsdbFormat: index.FormatV3, Prefix: "index2/2"},
},
},
}
for _, tc := range tests {
t.Run(tc.name, func(t *testing.T) {
res := IndexBuckets(tc.from, tc.through, tableRanges)
require.Equal(t, tc.expectedInfo, res)
})
}
}

@ -85,6 +85,13 @@ func (s *store) init(name, prefix string, indexShipperCfg indexshipper.Config, s
var indices []Index
opts := DefaultIndexClientOptions()
// early return in case index shipper is disabled.
if indexShipperCfg.Mode == indexshipper.ModeDisabled {
s.indexWriter = noopIndexWriter{}
s.Reader = NewIndexClient(NoopIndex{}, opts, limits)
return nil
}
if indexShipperCfg.Mode == indexshipper.ModeWriteOnly {
// We disable bloom filters on write nodes
// for the Stats() methods as it's of relatively little
@ -172,3 +179,9 @@ type failingIndexWriter struct{}
func (f failingIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdbindex.ChunkMetas) error {
return fmt.Errorf("index writer is not initialized due to tsdb store being initialized in read-only mode")
}
type noopIndexWriter struct{}
func (f noopIndexWriter) Append(_ string, _ labels.Labels, _ uint64, _ tsdbindex.ChunkMetas) error {
return nil
}

Loading…
Cancel
Save