Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/dataobj/consumer/partition_processor.go

429 lines
13 KiB

package consumer
import (
"context"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
"errors"
"fmt"
"io"
"strconv"
"sync"
"time"
"github.com/coder/quartz"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/backoff"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj"
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
"github.com/grafana/loki/v3/pkg/dataobj/consumer/logsobj"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/dataobj/metastore/multitenancy"
"github.com/grafana/loki/v3/pkg/dataobj/uploader"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/scratch"
)
// builder allows mocking of [logsobj.Builder] in tests.
type builder interface {
Append(tenant string, stream logproto.Stream) error
GetEstimatedSize() int
Flush() (*dataobj.Object, io.Closer, error)
TimeRanges() []multitenancy.TimeRange
UnregisterMetrics(prometheus.Registerer)
CopyAndSort(obj *dataobj.Object) (*dataobj.Object, io.Closer, error)
}
// committer allows mocking of certain [kgo.Client] methods in tests.
type committer interface {
Commit(ctx context.Context, offset int64) error
}
type producer interface {
ProduceSync(ctx context.Context, records ...*kgo.Record) kgo.ProduceResults
}
type partitionProcessor struct {
// Kafka client and topic/partition info
committer committer
topic string
partition int32
// lastRecord contains the last record appended to the builder. It is used
// to commit the correct offset after a flush.
lastRecord *partition.Record
builder builder
decoder *kafka.Decoder
uploader *uploader.Uploader
// Builder initialization
builderOnce sync.Once
builderCfg logsobj.BuilderConfig
bucket objstore.Bucket
scratchStore scratch.Store
// Idle stream handling
idleFlushTimeout time.Duration
// The initial value is the zero time.
lastFlushed time.Time
// lastModified is used to know when the idle is exceeded.
// The initial value is zero and must be reset to zero after each flush.
lastModified time.Time
// earliestRecordTime tracks the earliest timestamp all the records Appended to the builder for each object.
earliestRecordTime time.Time
// Metrics
metrics *partitionOffsetMetrics
// Control and coordination
wg sync.WaitGroup
reg prometheus.Registerer
logger log.Logger
eventsProducerClient producer
metastorePartitionRatio int32
// Used for tests.
clock quartz.Clock
}
func newPartitionProcessor(
committer committer,
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
builderCfg logsobj.BuilderConfig,
uploaderCfg uploader.Config,
metastoreCfg metastore.Config,
bucket objstore.Bucket,
scratchStore scratch.Store,
logger log.Logger,
reg prometheus.Registerer,
idleFlushTimeout time.Duration,
eventsProducerClient *kgo.Client,
topic string,
partition int32,
) *partitionProcessor {
decoder, err := kafka.NewDecoder()
if err != nil {
panic(err)
}
reg = prometheus.WrapRegistererWith(prometheus.Labels{
"topic": topic,
"partition": strconv.Itoa(int(partition)),
}, reg)
metrics := newPartitionOffsetMetrics()
if err := metrics.register(reg); err != nil {
level.Error(logger).Log("msg", "failed to register partition metrics", "err", err)
}
uploader := uploader.New(uploaderCfg, bucket, logger)
if err := uploader.RegisterMetrics(reg); err != nil {
level.Error(logger).Log("msg", "failed to register uploader metrics", "err", err)
}
return &partitionProcessor{
topic: topic,
partition: partition,
committer: committer,
logger: logger,
decoder: decoder,
reg: reg,
builderCfg: builderCfg,
bucket: bucket,
scratchStore: scratchStore,
metrics: metrics,
uploader: uploader,
idleFlushTimeout: idleFlushTimeout,
eventsProducerClient: eventsProducerClient,
clock: quartz.NewReal(),
metastorePartitionRatio: int32(metastoreCfg.PartitionRatio),
}
}
func (p *partitionProcessor) Start(ctx context.Context, recordsChan <-chan []partition.Record) func() {
// This is a hack to avoid duplicate metrics registration panics. The
// problem occurs because [kafka.ReaderService] creates a consumer to
// process lag on startup, tears it down, and then creates another one
// once the lag threshold has been met. The new [kafkav2] package will
// solve this by de-coupling the consumer from processing consumer lag.
p.wg.Add(1)
go func() {
defer func() {
p.unregisterMetrics()
level.Info(p.logger).Log("msg", "stopped partition processor")
p.wg.Done()
}()
level.Info(p.logger).Log("msg", "started partition processor")
for {
select {
case <-ctx.Done():
level.Info(p.logger).Log("msg", "stopping partition processor, context canceled")
return
case records, ok := <-recordsChan:
if !ok {
level.Info(p.logger).Log("msg", "stopping partition processor, channel closed")
// Channel was closed. This means no more records will be
// received. We need to flush what we have to avoid data
// loss because of how the consumer is torn down between
// starting and running phases in [kafka.ReaderService].
if err := p.finalFlush(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed to flush", "err", err)
}
return
}
// Process the records received.
for _, record := range records {
p.processRecord(ctx, record)
}
// This partition is idle, flush it.
case <-time.After(p.idleFlushTimeout):
if _, err := p.idleFlush(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed to idle flush", "err", err)
}
}
}
}()
return p.wg.Wait
}
func (p *partitionProcessor) unregisterMetrics() {
if p.builder != nil {
p.builder.UnregisterMetrics(p.reg)
}
p.metrics.unregister(p.reg)
p.uploader.UnregisterMetrics(p.reg)
}
func (p *partitionProcessor) initBuilder() error {
var initErr error
p.builderOnce.Do(func() {
// Dataobj builder
builder, err := logsobj.NewBuilder(p.builderCfg, p.scratchStore)
if err != nil {
initErr = err
return
}
if err := builder.RegisterMetrics(p.reg); err != nil {
initErr = err
return
}
p.builder = builder
})
return initErr
}
func (p *partitionProcessor) emitObjectWrittenEvent(ctx context.Context, objectPath string) error {
event := &metastore.ObjectWrittenEvent{
ObjectPath: objectPath,
WriteTime: p.clock.Now().Format(time.RFC3339),
EarliestRecordTime: p.earliestRecordTime.Format(time.RFC3339),
}
eventBytes, err := event.Marshal()
if err != nil {
return err
}
// Apply the partition ratio to the incoming partition to find the metastore topic partition.
// This has the effect of concentrating the log partitions to fewer metastore partitions for later processing.
partition := p.partition / p.metastorePartitionRatio
results := p.eventsProducerClient.ProduceSync(ctx, &kgo.Record{
Partition: partition,
Value: eventBytes,
})
return results.FirstErr()
}
func (p *partitionProcessor) processRecord(ctx context.Context, record partition.Record) {
p.metrics.processedRecords.Inc()
// Update offset metric at the end of processing
defer p.metrics.updateOffset(record.Offset)
if record.Timestamp.Before(p.earliestRecordTime) || p.earliestRecordTime.IsZero() {
p.earliestRecordTime = record.Timestamp
}
// Observe processing delay
p.metrics.observeProcessingDelay(record.Timestamp)
// Initialize builder if this is the first record
if err := p.initBuilder(); err != nil {
level.Error(p.logger).Log("msg", "failed to initialize builder", "err", err)
return
}
tenant := record.TenantID
stream, err := p.decoder.DecodeWithoutLabels(record.Content)
if err != nil {
level.Error(p.logger).Log("msg", "failed to decode record", "err", err)
return
}
p.metrics.incAppendsTotal()
if err := p.builder.Append(tenant, stream); err != nil {
refactor(dataobj): invert dependency between dataobj and sections (#17762) Originally, the dataobj package was a higher-level API around sections. This design caused it to become a bottleneck: * Implementing any new public behaviour for a section required bubbling it up to the dataobj API for it to be exposed, making it tedious to add new sections or update existing ones. * The `dataobj.Builder` pattern was focused on constructing dataobjs for storing log data, which will cause friction as we build objects around other use cases. This PR builds on top of the foundation laid out by #17704 and #17708, fully inverting the dependency between dataobj and sections: * The `dataobj` package has no knowledge of what sections exist, and can now be used for writing and reading generic sections. Section packages now create higher-level APIs around the abstractions provided by `dataobj`. * Section packages are now public, and callers interact directly with these packages for writing and reading section-specific data. * All logic for a section (encoding, decoding, buffering, reading) is now fully self-contained inside the section package. Previously, the implementation of each section was spread across three packages (`pkg/dataobj/internal/encoding`, `pkg/dataobj/internal/sections/SECTION`, `pkg/dataobj`). * Cutting a section is now a decision made by the caller rather than the section implementation. Previously, the logs section builder would create multiple sections. For the most part, this change is a no-op, with two exceptions: 1. Section cutting is now performed by the caller; however, this shouldn't result in any issues. 2. Removing the high-level `dataobj.Stream` and `dataobj.Record` types will temporarily reduce the allocation gains from #16988. I will address this after this PR is merged.
8 months ago
if !errors.Is(err, logsobj.ErrBuilderFull) {
level.Error(p.logger).Log("msg", "failed to append stream", "err", err)
p.metrics.incAppendFailures()
return
}
if err := p.flushAndCommit(ctx); err != nil {
level.Error(p.logger).Log("msg", "failed to flush and commit", "err", err)
return
}
p.metrics.incAppendsTotal()
if err := p.builder.Append(tenant, stream); err != nil {
level.Error(p.logger).Log("msg", "failed to append stream after flushing", "err", err)
p.metrics.incAppendFailures()
}
}
p.lastRecord = &record
p.lastModified = p.clock.Now()
}
// flushAndCommit flushes the builder and, if successful, commits the offset
// of the last record processed. It expects that the last record processed
// was also the last record appended to the builder. If not, data loss can
// occur should the consumer restart or a partition rebalance occur
func (p *partitionProcessor) flushAndCommit(ctx context.Context) error {
if err := p.flush(ctx); err != nil {
return fmt.Errorf("failed to flush: %w", err)
}
if err := p.commit(ctx); err != nil {
return fmt.Errorf("failed to commit offset: %w", err)
}
return nil
}
// flush builds a complete data object from the builder, uploads it, records
// it in the metastore, and emits an object written event to the events topic.
func (p *partitionProcessor) flush(ctx context.Context) error {
// The time range must be read before the flush as the builder is reset
// at the end of each flush, resetting the time range.
obj, closer, err := p.builder.Flush()
if err != nil {
level.Error(p.logger).Log("msg", "failed to flush builder", "err", err)
return err
}
obj, closer, err = p.sort(obj, closer)
if err != nil {
level.Error(p.logger).Log("msg", "failed to sort dataobj", "err", err)
return err
}
defer closer.Close()
objectPath, err := p.uploader.Upload(ctx, obj)
if err != nil {
level.Error(p.logger).Log("msg", "failed to upload object", "err", err)
return err
}
if err := p.emitObjectWrittenEvent(ctx, objectPath); err != nil {
level.Error(p.logger).Log("msg", "failed to emit metastore event", "err", err)
return err
}
p.lastModified = time.Time{}
p.lastFlushed = p.clock.Now()
p.earliestRecordTime = time.Time{}
return nil
}
func (p *partitionProcessor) sort(obj *dataobj.Object, closer io.Closer) (*dataobj.Object, io.Closer, error) {
defer closer.Close()
start := time.Now()
defer func() {
level.Debug(p.logger).Log("msg", "partition processor sorted logs object-wide", "duration", time.Since(start))
}()
return p.builder.CopyAndSort(obj)
}
// commits the offset of the last record processed. It should be called after
// each successful flush to avoid duplicate data in consecutive data objects.
func (p *partitionProcessor) commit(ctx context.Context) error {
if p.lastRecord == nil {
return errors.New("failed to commit offset, no records processed")
}
backoff := backoff.New(ctx, backoff.Config{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 10 * time.Second,
MaxRetries: 20,
})
var lastErr error
backoff.Reset()
for backoff.Ongoing() {
p.metrics.incCommitsTotal()
err := p.committer.Commit(ctx, p.lastRecord.Offset)
if err == nil {
return nil
}
level.Error(p.logger).Log("msg", "failed to commit records", "err", err)
p.metrics.incCommitFailures()
lastErr = err
backoff.Wait()
}
return lastErr
}
// idleFlush flushes the partition if it has exceeded the idle flush timeout.
// It returns true if the partition was flushed, false with a non-nil error
// if the partition could not be flushed, and false with a nil error if
// the partition has not exceeded the timeout.
func (p *partitionProcessor) idleFlush(ctx context.Context) (bool, error) {
if !p.needsIdleFlush() {
return false, nil
}
if err := p.flushAndCommit(ctx); err != nil {
return false, err
}
return true, nil
}
// needsIdleFlush returns true if the partition has exceeded the idle timeout
// and the builder has some data buffered.
func (p *partitionProcessor) needsIdleFlush() bool {
// This is a safety check to make sure we never flush empty data objects.
// It should never happen that lastModified is non-zero while the builder
// is either uninitialized or empty.
if p.builder == nil || p.builder.GetEstimatedSize() == 0 {
return false
}
if p.lastModified.IsZero() {
return false
}
return p.clock.Since(p.lastModified) > p.idleFlushTimeout
}
func (p *partitionProcessor) finalFlush(ctx context.Context) error {
if !p.needsFinalFlush() {
return nil
}
if err := p.flushAndCommit(ctx); err != nil {
return err
}
return nil
}
func (p *partitionProcessor) needsFinalFlush() bool {
if p.builder == nil || p.builder.GetEstimatedSize() == 0 {
return false
}
if p.lastModified.IsZero() {
return false
}
return true
}