Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/consumer/flush.go

143 lines
4.1 KiB

package consumer
import (
"context"
"fmt"
"io"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/v3/pkg/dataobj"
)
const (
flushReasonMaxAge = "max_age"
flushReasonBuilderFull = "builder_full"
flushReasonIdle = "idle"
)
// A sorter allows mocking of [logsobj.Sorter] in tests.
type sorter interface {
Sort(ctx context.Context, obj *dataobj.Object) (*dataobj.Object, io.Closer, error)
}
// An uploader allows mocking of [uploader.Uploader] in tests.
type uploader interface {
Upload(ctx context.Context, obj *dataobj.Object) (string, error)
}
// A flushJob contains all information needed to flush a data object builder.
type flushJob struct {
builder builder
// done is called when the job has finished.
done func(flushJobResult)
}
// A flushJobResult contains the result of a flush. The flush failed if err
// is non-nil.
type flushJobResult struct {
objectPath string
err error
}
// A flusherImpl is responsible for flushing data object builders to data objects.
type flusherImpl struct {
sorter sorter
uploader uploader
logger log.Logger
// Metrics.
flushes *prometheus.CounterVec
flushFailures prometheus.Counter
flushDuration prometheus.Histogram
// Used in tests.
flushFunc func(context.Context, flushJob) (string, error)
}
func newFlusher(sorter sorter, uploader uploader, logger log.Logger, r prometheus.Registerer) *flusherImpl {
f := &flusherImpl{
sorter: sorter,
uploader: uploader,
logger: logger,
flushes: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_flushes_total",
Help: "Total number of flushes.",
}, []string{"reason"}),
flushFailures: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_dataobj_consumer_flush_failures_total",
Help: "Total number of failed flushes.",
}),
flushDuration: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
Name: "loki_dataobj_consumer_flush_duration_seconds",
Help: "Time taken to flush a data object.",
Buckets: prometheus.DefBuckets,
NativeHistogramBucketFactor: 1.1,
NativeHistogramMaxBucketNumber: 100,
NativeHistogramMinResetDuration: 0,
}),
}
f.flushFunc = f.flush
return f
}
// Flush flushes the data object builder. It returns an error if the flush fails.
func (f *flusherImpl) Flush(ctx context.Context, builder builder, reason string) (string, error) {
var (
res flushJobResult
done = make(chan struct{})
)
f.FlushAsync(ctx, builder, reason, func(doneRes flushJobResult) {
res = doneRes
close(done)
})
select {
case <-ctx.Done():
return "", ctx.Err()
case <-done:
return res.objectPath, res.err
}
}
// FlushAsync asynchronously flushes the data object builder and calls done
// when finished.
func (f *flusherImpl) FlushAsync(ctx context.Context, builder builder, reason string, done func(flushJobResult)) {
f.flushes.WithLabelValues(reason).Inc()
go f.doJob(ctx, flushJob{builder: builder, done: done})
}
func (f *flusherImpl) doJob(ctx context.Context, job flushJob) {
timer := prometheus.NewTimer(f.flushDuration)
defer timer.ObserveDuration()
objectPath, err := f.flushFunc(ctx, job)
if err != nil {
f.flushFailures.Inc()
job.done(flushJobResult{err: err})
return
}
job.done(flushJobResult{objectPath: objectPath})
}
// flush builds a complete data object from the builder, uploads it, records
// it in the metastore, and emits an object written event to the events topic.
// It can be overidden in tests by replacing [jobFunc].
func (f *flusherImpl) flush(ctx context.Context, job flushJob) (string, error) {
obj, closer, err := job.builder.Flush()
if err != nil {
return "", fmt.Errorf("failed to flush data object builder: %w", err)
}
defer closer.Close()
obj, closer, err = f.sorter.Sort(ctx, obj)
if err != nil {
return "", fmt.Errorf("failed to sort data object: %w", err)
}
defer closer.Close()
objectPath, err := f.uploader.Upload(ctx, obj)
if err != nil {
return "", fmt.Errorf("failed to upload object: %w", err)
}
return objectPath, nil
}