Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/consumer/partition_processor.go

309 lines
8.7 KiB

package consumer
import (
"context"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
9 months ago
"errors"
"io"
"strconv"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
9 months ago
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/scratch"
)
// builder allows mocking of [logsobj.Builder] in tests.
type builder interface {
Append(tenant string, stream logproto.Stream) error
GetEstimatedSize() int
Flush() (*dataobj.Object, io.Closer, error)
TimeRanges() []multitenancy.TimeRange
UnregisterMetrics(prometheus.Registerer)
CopyAndSort(ctx context.Context, obj *dataobj.Object) (*dataobj.Object, io.Closer, error)
}
// flusher allows mocking of flushes in tests.
type flusher interface {
FlushAsync(ctx context.Context, builder builder, startTime time.Time, offset int64, done func(error))
}
type partitionProcessor struct {
*services.BasicService
// Kafka client and topic/partition info
topic string
partition int32
// lastRecord contains the last record appended to the builder. It is used
// to commit the correct offset after a flush.
lastRecord *kgo.Record
builder builder
decoder *kafka.Decoder
// Builder initialization
builderOnce sync.Once
builderCfg logsobj.BuilderConfig
scratchStore scratch.Store
// Idle stream handling
idleFlushTimeout time.Duration
// Handling flushing dataobjs even if they are not full or idle for too long.
maxBuilderAge time.Duration
// lastModified is used to know when the idle is exceeded.
// The initial value is zero and must be reset to zero after each flush.
lastModified time.Time
// earliestRecordTime tracks the earliest timestamp all the records Appended to the builder for each object.
earliestRecordTime time.Time
// firstAppendTime tracks the time of the first append to the builder, as `earliestRecordTime` is highly influenced by scenarios of severe lagging.
firstAppendTime time.Time
// Metrics
metrics *partitionOffsetMetrics
// Control and coordination
reg prometheus.Registerer
logger log.Logger
recordsChan chan *kgo.Record
flusher flusher
}
func newPartitionProcessor(
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
9 months ago
builderCfg logsobj.BuilderConfig,
scratchStore scratch.Store,
idleFlushTimeout time.Duration,
maxBuilderAge time.Duration,
topic string,
partition int32,
recordsChan chan *kgo.Record,
flusher flusher,
logger log.Logger,
reg prometheus.Registerer,
) *partitionProcessor {
decoder, err := kafka.NewDecoder()
if err != nil {
panic(err)
}
reg = prometheus.WrapRegistererWith(prometheus.Labels{
"topic": topic,
"partition": strconv.Itoa(int(partition)),
}, reg)
metrics := newPartitionOffsetMetrics()
if err := metrics.register(reg); err != nil {
level.Error(logger).Log("msg", "failed to register partition metrics", "err", err)
}
p := &partitionProcessor{
topic: topic,
partition: partition,
logger: logger,
decoder: decoder,
reg: reg,
builderCfg: builderCfg,
scratchStore: scratchStore,
metrics: metrics,
idleFlushTimeout: idleFlushTimeout,
maxBuilderAge: maxBuilderAge,
recordsChan: recordsChan,
flusher: flusher,
}
p.BasicService = services.NewBasicService(p.starting, p.running, p.stopping)
return p
}
// starting implements [services.StartingFn].
func (p *partitionProcessor) starting(_ context.Context) error {
return nil
}
// running implements [services.RunningFn].
func (p *partitionProcessor) running(ctx context.Context) error {
return p.Run(ctx)
}
// stopping implements [services.StoppingFn].
func (p *partitionProcessor) stopping(_ error) error {
return nil
}
func (p *partitionProcessor) Run(ctx context.Context) error {
defer func() {
level.Info(p.logger).Log("msg", "stopped partition processor")
}()
level.Info(p.logger).Log("msg", "started partition processor")
for {
select {
case <-ctx.Done():
level.Info(p.logger).Log("msg", "stopping partition processor, context canceled")
// We don't return ctx.Err() here as it manifests as a service failure
// when shutting down.
return nil
case record, ok := <-p.recordsChan:
if !ok {
level.Info(p.logger).Log("msg", "stopping partition processor, channel closed")
return nil
}
p.processRecord(ctx, record)
// This partition is idle, flush it.
case <-time.After(p.idleFlushTimeout):
if _, err := p.idleFlush(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed to idle flush", "err", err)
}
}
}
}
func (p *partitionProcessor) initBuilder() error {
var initErr error
p.builderOnce.Do(func() {
// Dataobj builder
builder, err := logsobj.NewBuilder(p.builderCfg, p.scratchStore)
if err != nil {
initErr = err
return
}
if err := builder.RegisterMetrics(p.reg); err != nil {
initErr = err
return
}
p.builder = builder
})
return initErr
}
func (p *partitionProcessor) processRecord(ctx context.Context, record *kgo.Record) {
p.metrics.processedRecords.Inc()
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)
if record.Timestamp.Before(p.earliestRecordTime) || p.earliestRecordTime.IsZero() {
p.earliestRecordTime = record.Timestamp
}
// Observe processing delay
p.metrics.observeProcessingDelay(record.Timestamp)
// Initialize builder if this is the first record
if err := p.initBuilder(); err != nil {
level.Error(p.logger).Log("msg", "failed to initialize builder", "err", err)
return
}
tenant := string(record.Key)
stream, err := p.decoder.DecodeWithoutLabels(record.Value)
if err != nil {
level.Error(p.logger).Log("msg", "failed to decode record", "err", err)
return
}
p.metrics.processedBytes.Add(float64(stream.Size()))
if p.shouldFlushDueToMaxAge() {
p.metrics.incFlushesTotal(FlushReasonMaxAge)
if err := p.flush(ctx); err != nil {
p.metrics.flushFailures.Inc()
level.Error(p.logger).Log("msg", "failed to flush and commit dataobj that reached max age", "err", err)
return
}
}
if err := p.builder.Append(tenant, stream); err != nil {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
9 months ago
if !errors.Is(err, logsobj.ErrBuilderFull) {
level.Error(p.logger).Log("msg", "failed to append stream", "err", err)
p.metrics.incAppendFailures()
return
}
p.metrics.incFlushesTotal(FlushReasonBuilderFull)
if err := p.flush(ctx); err != nil {
p.metrics.flushFailures.Inc()
level.Error(p.logger).Log("msg", "failed to flush and commit", "err", err)
return
}
if err := p.builder.Append(tenant, stream); err != nil {
level.Error(p.logger).Log("msg", "failed to append stream after flushing", "err", err)
p.metrics.incAppendFailures()
}
}
if p.firstAppendTime.IsZero() {
p.firstAppendTime = time.Now()
}
p.lastRecord = record
p.lastModified = time.Now()
}
func (p *partitionProcessor) shouldFlushDueToMaxAge() bool {
return p.maxBuilderAge > 0 &&
p.builder.GetEstimatedSize() > 0 &&
!p.firstAppendTime.IsZero() &&
time.Since(p.firstAppendTime) > p.maxBuilderAge
}
// idleFlush flushes the partition if it has exceeded the idle flush timeout.
// It returns true if the partition was flushed, false with a non-nil error
// if the partition could not be flushed, and false with a nil error if
// the partition has not exceeded the timeout.
func (p *partitionProcessor) idleFlush(ctx context.Context) (bool, error) {
if !p.needsIdleFlush() {
return false, nil
}
p.metrics.incFlushesTotal(FlushReasonIdle)
if err := p.flush(ctx); err != nil {
p.metrics.flushFailures.Inc()
return false, err
}
return true, nil
}
// needsIdleFlush returns true if the partition has exceeded the idle timeout
// and the builder has some data buffered.
func (p *partitionProcessor) needsIdleFlush() bool {
// This is a safety check to make sure we never flush empty data objects.
// It should never happen that lastModified is non-zero while the builder
// is either uninitialized or empty.
if p.builder == nil || p.builder.GetEstimatedSize() == 0 {
return false
}
if p.lastModified.IsZero() {
return false
}
return time.Since(p.lastModified) > p.idleFlushTimeout
}
func (p *partitionProcessor) flush(ctx context.Context) error {
var (
err error
done = make(chan struct{})
)
defer func() {
// Reset the state to prepare for building the next data object.
p.earliestRecordTime = time.Time{}
p.firstAppendTime = time.Time{}
p.lastModified = time.Time{}
p.lastRecord = nil
}()
p.flusher.FlushAsync(ctx, p.builder, p.earliestRecordTime, p.lastRecord.Offset, func(flushErr error) {
err = flushErr
close(done)
})
<-done
return err
}