Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/dataobj/consumer/partition_processor_factory.go

70 lines
1.8 KiB

package consumer
import (
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/dataobj/metastore"
"github.com/grafana/loki/v3/pkg/kafka/partition"
"github.com/grafana/loki/v3/pkg/scratch"
)
// partitionProcessorFactory is a factory for partition processors.
type partitionProcessorFactory struct {
cfg Config
// TODO(grobinson): We should see if we can move metastore.Config inside
// Config instead of having a separate field just for the metastore.
metastoreCfg metastore.Config
metastoreEvents *kgo.Client
bucket objstore.Bucket
scratchStore scratch.Store
logger log.Logger
reg prometheus.Registerer
topic string
partition int32
}
// newPartitionProcessorFactory returns a new partitionProcessorFactory.
func newPartitionProcessorFactory(
cfg Config,
metastoreCfg metastore.Config,
metastoreEvents *kgo.Client,
bucket objstore.Bucket,
scratchStore scratch.Store,
logger log.Logger,
reg prometheus.Registerer,
topic string,
partition int32,
) *partitionProcessorFactory {
return &partitionProcessorFactory{
cfg: cfg,
metastoreCfg: metastoreCfg,
metastoreEvents: metastoreEvents,
bucket: bucket,
scratchStore: scratchStore,
logger: logger,
reg: reg,
topic: topic,
partition: partition,
}
}
// New returns a new processor for the partition.
func (f *partitionProcessorFactory) New(committer partition.Committer, logger log.Logger) (partition.Consumer, error) {
return newPartitionProcessor(
committer,
f.cfg.BuilderConfig,
f.cfg.UploaderConfig,
f.metastoreCfg,
f.bucket,
f.scratchStore,
f.logger,
f.reg,
f.cfg.IdleFlushTimeout,
f.metastoreEvents,
f.topic,
f.partition,
), nil
}