diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 63477495f1..dac605dacb 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -325,10 +325,11 @@ type Loki struct { bloomCompactorRingManager *lokiring.RingManager bloomGatewayRingManager *lokiring.RingManager - clientMetrics storage.ClientMetrics + ClientMetrics storage.ClientMetrics deleteClientMetrics *deletion.DeleteRequestClientMetrics Tee distributor.Tee + PushParserWrapper push.RequestParserWrapper HTTPAuthMiddleware middleware.Interface Codec Codec @@ -341,7 +342,7 @@ type Loki struct { func New(cfg Config) (*Loki, error) { loki := &Loki{ Cfg: cfg, - clientMetrics: storage.NewClientMetrics(), + ClientMetrics: storage.NewClientMetrics(), deleteClientMetrics: deletion.NewDeleteRequestClientMetrics(prometheus.DefaultRegisterer), Codec: queryrange.DefaultCodec, } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2470a1b355..51018ccb96 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -330,6 +330,10 @@ func (t *Loki) initDistributor() (services.Service, error) { return nil, err } + if t.PushParserWrapper != nil { + t.distributor.RequestParserWrapper = t.PushParserWrapper + } + // Register the distributor to receive Push requests over GRPC // EXCEPT when running with `-target=all` or `-target=` contains `ingester` if !t.Cfg.isModuleEnabled(All) && !t.Cfg.isModuleEnabled(Write) && !t.Cfg.isModuleEnabled(Ingester) { @@ -610,7 +614,7 @@ func (t *Loki) initTableManager() (services.Service, error) { reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer) - tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.clientMetrics, reg, util_log.Logger) + tableClient, err := storage.NewTableClient(lastConfig.IndexType, *lastConfig, t.Cfg.StorageConfig, t.ClientMetrics, reg, util_log.Logger) if err != nil { return nil, err } @@ -636,7 +640,7 @@ func (t *Loki) initStore() (services.Service, error) { } } - store, err := storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, prometheus.DefaultRegisterer, util_log.Logger, t.Cfg.MetricsNamespace) + store, err := storage.NewStore(t.Cfg.StorageConfig, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, prometheus.DefaultRegisterer, util_log.Logger, t.Cfg.MetricsNamespace) if err != nil { return nil, err } @@ -677,7 +681,7 @@ func (t *Loki) initBloomStore() (services.Service, error) { level.Warn(logger).Log("msg", "failed to preload blocks cache", "err", err) } - t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.clientMetrics, metasCache, blocksCache, reg, logger) + t.BloomStore, err = bloomshipper.NewBloomStore(t.Cfg.SchemaConfig.Configs, t.Cfg.StorageConfig, t.ClientMetrics, metasCache, blocksCache, reg, logger) if err != nil { return nil, fmt.Errorf("failed to create bloom store: %w", err) } @@ -1091,7 +1095,7 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) { } } - t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.clientMetrics, ruler.GroupLoader{}, util_log.Logger) + t.RulerStorage, err = base_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, t.Cfg.StorageConfig.Hedging, t.ClientMetrics, ruler.GroupLoader{}, util_log.Logger) return } @@ -1265,7 +1269,7 @@ func (t *Loki) initCompactor() (services.Service, error) { continue } - objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.clientMetrics) + objectClient, err := storage.NewObjectClient(periodConfig.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics) if err != nil { return nil, fmt.Errorf("failed to create object client: %w", err) } @@ -1276,7 +1280,7 @@ func (t *Loki) initCompactor() (services.Service, error) { var deleteRequestStoreClient client.ObjectClient if t.Cfg.CompactorConfig.RetentionEnabled { if deleteStore := t.Cfg.CompactorConfig.DeleteRequestStore; deleteStore != "" { - if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.clientMetrics); err != nil { + if deleteRequestStoreClient, err = storage.NewObjectClient(deleteStore, t.Cfg.StorageConfig, t.ClientMetrics); err != nil { return nil, fmt.Errorf("failed to create delete request store object client: %w", err) } } else { @@ -1366,7 +1370,7 @@ func (t *Loki) initIndexGateway() (services.Service, error) { } tableRange := period.GetIndexTableNumberRange(periodEndTime) - indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.clientMetrics, shardingStrategy, + indexClient, err := storage.NewIndexClient(period, tableRange, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.Overrides, t.ClientMetrics, shardingStrategy, prometheus.DefaultRegisterer, log.With(util_log.Logger, "index-store", fmt.Sprintf("%s-%s", period.IndexType, period.From.String())), t.Cfg.MetricsNamespace, ) if err != nil { @@ -1465,7 +1469,7 @@ func (t *Loki) initBloomCompactor() (services.Service, error) { t.Cfg.BloomCompactor, t.Cfg.SchemaConfig, t.Cfg.StorageConfig, - t.clientMetrics, + t.ClientMetrics, t.Store, shuffleSharding, t.Overrides, @@ -1571,7 +1575,7 @@ func (t *Loki) initAnalytics() (services.Service, error) { return nil, err } - objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.clientMetrics) + objectClient, err := storage.NewObjectClient(period.ObjectType, t.Cfg.StorageConfig, t.ClientMetrics) if err != nil { level.Info(util_log.Logger).Log("msg", "failed to initialize usage report", "err", err) return nil, nil