chore: Split indexing into distinct steps (#19987)

pull/20054/head
benclive 1 month ago committed by GitHub
parent 2ce207fbdb
commit 044ded2e29
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 3
      pkg/dataobj/index/builder_test.go
  2. 102
      pkg/dataobj/index/calculate.go
  3. 55
      pkg/dataobj/index/column_values.go
  4. 29
      pkg/dataobj/index/stream_statistics.go

@ -76,12 +76,14 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
"loki.metastore-events": {0, 1, 2},
})
revokedProcessed := make(chan struct{}, 1)
trigger := make(chan struct{})
go func() {
<-trigger
builder.handlePartitionsRevoked(ctx, nil, map[string][]int32{
"loki.metastore-events": {1},
})
revokedProcessed <- struct{}{}
}()
// Trigger the revocation of a partition, but only after we've processed a couple of records.
@ -101,6 +103,7 @@ func TestIndexBuilder_PartitionRevocation(t *testing.T) {
}
// Verify that the partition was revoked.
<-revokedProcessed
require.Equal(t, 2, len(builder.partitionStates))
require.Nil(t, builder.partitionStates[1])
}

@ -7,12 +7,9 @@ import (
"io"
"runtime"
"sync"
"time"
"github.com/bits-and-blooms/bloom/v3"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/prometheus/prometheus/model/labels"
"golang.org/x/sync/errgroup"
"github.com/grafana/loki/v3/pkg/dataobj"
@ -22,6 +19,33 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/sections/streams"
)
type logsIndexCalculation interface {
// Prepare is called before the first batch of logs is processed in order to initialize any state.
Prepare(ctx context.Context, section *dataobj.Section, stats logs.Stats) error
// ProcessBatch is called for each batch of logs records.
// Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns.
ProcessBatch(ctx context.Context, context *logsCalculationContext, batch []logs.Record) error
// Flush is called after all logs in a section have been processed.
// Implementations can assume to have exclusive access to the builder via the calculation context. They must not retain references to it after the call returns.
Flush(ctx context.Context, context *logsCalculationContext) error
}
type logsCalculationContext struct {
tenantID string
objectPath string
sectionIdx int64
streamIDLookup map[int64]int64
builder *indexobj.Builder
}
// These steps are applied to all logs and are unique to a section
func getLogsCalculationSteps() []logsIndexCalculation {
return []logsIndexCalculation{
&streamStatisticsCalculation{},
&columnValuesCalculation{},
}
}
// Calculator is used to calculate the indexes for a logs object and write them to the builder.
// It reads data from the logs object in order to build bloom filters and per-section stream metadata.
type Calculator struct {
@ -142,14 +166,6 @@ func (c *Calculator) processStreamsSection(ctx context.Context, section *dataobj
// processLogsSection reads information from the logs section in order to build index information in the c.indexobjBuilder.
func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.Logger, objectPath string, section *dataobj.Section, sectionIdx int64, streamIDLookup map[int64]int64) error {
logsBuf := make([]logs.Record, 8192)
type logInfo struct {
objectPath string
sectionIdx int64
streamID int64
timestamp time.Time
length int64
}
logsInfo := make([]logInfo, len(logsBuf))
logsSection, err := logs.Open(ctx, section)
if err != nil {
@ -164,22 +180,24 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
return fmt.Errorf("failed to read log section stats: %w", err)
}
columnBloomBuilders := make(map[string]*bloom.BloomFilter)
columnIndexes := make(map[string]int64)
for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
calculationContext := &logsCalculationContext{
tenantID: tenantID,
objectPath: objectPath,
sectionIdx: sectionIdx,
streamIDLookup: streamIDLookup,
builder: c.indexobjBuilder,
}
calculationSteps := getLogsCalculationSteps()
for _, calculation := range calculationSteps {
if err := calculation.Prepare(ctx, section, stats); err != nil {
return fmt.Errorf("failed to prepare calculation: %w", err)
}
columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
columnIndexes[column.Name] = column.ColumnIndex
}
// Read the whole logs section to extract all the column values.
cnt := 0
// TODO(benclive): Switch to a columnar reader instead of row based
// This is also likely to be more performant, especially if we don't need to read the whole log line.
// Note: the source object would need a new column storing just the length to avoid reading the log line itself.
cnt := 0
rowReader := logs.NewRowReader(logsSection)
for {
n, err := rowReader.Read(ctx, logsBuf)
@ -190,43 +208,25 @@ func (c *Calculator) processLogsSection(ctx context.Context, sectionLogger log.L
break
}
for i, log := range logsBuf[:n] {
cnt++
log.Metadata.Range(func(md labels.Label) {
columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
logsInfo[i].objectPath = objectPath
logsInfo[i].sectionIdx = sectionIdx
logsInfo[i].streamID = log.StreamID
logsInfo[i].timestamp = log.Timestamp
logsInfo[i].length = int64(len(log.Line))
}
// Lock the mutex once per read for perf reasons.
cnt += n
c.builderMtx.Lock()
for _, log := range logsInfo[:n] {
err = c.indexobjBuilder.ObserveLogLine(tenantID, log.objectPath, log.sectionIdx, log.streamID, streamIDLookup[log.streamID], log.timestamp, log.length)
if err != nil {
for _, calculation := range calculationSteps {
if err := calculation.ProcessBatch(ctx, calculationContext, logsBuf[:n]); err != nil {
c.builderMtx.Unlock()
return fmt.Errorf("failed to observe log line: %w", err)
return fmt.Errorf("failed to process batch: %w", err)
}
}
c.builderMtx.Unlock()
}
// Write the indexes (bloom filters) to the new index object.
for columnName, bloom := range columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
c.builderMtx.Lock()
err = c.indexobjBuilder.AppendColumnIndex(tenantID, objectPath, sectionIdx, columnName, columnIndexes[columnName], bloomBytes)
c.builderMtx.Unlock()
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
c.builderMtx.Lock()
for _, calculation := range calculationSteps {
if err := calculation.Flush(ctx, calculationContext); err != nil {
c.builderMtx.Unlock()
return fmt.Errorf("failed to flush calculation results: %w", err)
}
}
c.builderMtx.Unlock()
level.Info(sectionLogger).Log("msg", "finished processing logs section", "rowsProcessed", cnt)
return nil

@ -0,0 +1,55 @@
package index
import (
"context"
"fmt"
"github.com/bits-and-blooms/bloom/v3"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)
type columnValuesCalculation struct {
columnBloomBuilders map[string]*bloom.BloomFilter
columnIndexes map[string]int64
}
func (c *columnValuesCalculation) Prepare(_ context.Context, _ *dataobj.Section, stats logs.Stats) error {
c.columnBloomBuilders = make(map[string]*bloom.BloomFilter)
c.columnIndexes = make(map[string]int64)
for _, column := range stats.Columns {
logsType, _ := logs.ParseColumnType(column.Type)
if logsType != logs.ColumnTypeMetadata {
continue
}
c.columnBloomBuilders[column.Name] = bloom.NewWithEstimates(uint(column.Cardinality), 1.0/128.0)
c.columnIndexes[column.Name] = column.ColumnIndex
}
return nil
}
func (c *columnValuesCalculation) ProcessBatch(_ context.Context, _ *logsCalculationContext, batch []logs.Record) error {
for _, log := range batch {
log.Metadata.Range(func(md labels.Label) {
c.columnBloomBuilders[md.Name].Add([]byte(md.Value))
})
}
return nil
}
func (c *columnValuesCalculation) Flush(_ context.Context, context *logsCalculationContext) error {
for columnName, bloom := range c.columnBloomBuilders {
bloomBytes, err := bloom.MarshalBinary()
if err != nil {
return fmt.Errorf("failed to marshal bloom filter: %w", err)
}
err = context.builder.AppendColumnIndex(context.tenantID, context.objectPath, context.sectionIdx, columnName, c.columnIndexes[columnName], bloomBytes)
if err != nil {
return fmt.Errorf("failed to append column index: %w", err)
}
}
return nil
}

@ -0,0 +1,29 @@
package index
import (
"context"
"fmt"
"github.com/grafana/loki/v3/pkg/dataobj"
"github.com/grafana/loki/v3/pkg/dataobj/sections/logs"
)
type streamStatisticsCalculation struct{}
func (c *streamStatisticsCalculation) Prepare(_ context.Context, _ *dataobj.Section, _ logs.Stats) error {
return nil
}
func (c *streamStatisticsCalculation) ProcessBatch(_ context.Context, context *logsCalculationContext, batch []logs.Record) error {
for _, log := range batch {
err := context.builder.ObserveLogLine(context.tenantID, context.objectPath, context.sectionIdx, log.StreamID, context.streamIDLookup[log.StreamID], log.Timestamp, int64(len(log.Line)))
if err != nil {
return fmt.Errorf("failed to observe log line: %w", err)
}
}
return nil
}
func (c *streamStatisticsCalculation) Flush(_ context.Context, _ *logsCalculationContext) error {
return nil
}
Loading…
Cancel
Save