Add interceptor override and make ingester and cfg public (#3618)

* Add interceptor override and make ingester and cfg public

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove extraneous comment

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/3628/head
Michel Hollands 4 years ago committed by GitHub
parent 31bb757c3b
commit 515f82ee1e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 37
      pkg/ingester/client/client.go
  2. 34
      pkg/loki/loki.go
  3. 212
      pkg/loki/modules.go

@ -42,9 +42,11 @@ type ClosableHealthAndIngesterClient struct {
// Config for an ingester client.
type Config struct {
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`
GRCPStreamClientInterceptors []grpc.StreamClientInterceptor `yaml:"-"`
}
// RegisterFlags registers flags.
@ -63,7 +65,7 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
grpc.WithDefaultCallOptions(cfg.GRPCClientConfig.CallOptions()...),
}
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation())
dialOpts, err := cfg.GRPCClientConfig.DialOption(instrumentation(&cfg))
if err != nil {
return nil, err
}
@ -82,14 +84,21 @@ func New(cfg Config, addr string) (HealthAndIngesterClient, error) {
}, nil
}
func instrumentation() ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
return []grpc.UnaryClientInterceptor{
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
}, []grpc.StreamClientInterceptor{
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
}
func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamClientInterceptor) {
var unaryInterceptors []grpc.UnaryClientInterceptor
unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...)
unaryInterceptors = append(unaryInterceptors,
otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer()),
middleware.ClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCUnaryInstrumentation(ingesterClientRequestDuration),
)
var streamInterceptors []grpc.StreamClientInterceptor
streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...)
streamInterceptors = append(streamInterceptors,
otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer()),
middleware.StreamClientUserHeaderInterceptor,
cortex_middleware.PrometheusGRPCStreamInstrumentation(ingesterClientRequestDuration),
)
return unaryInterceptors, streamInterceptors
}

@ -143,7 +143,7 @@ func (c *Config) Validate() error {
// Loki is the root datastructure for Loki.
type Loki struct {
cfg Config
Cfg Config
// set during initialization
ModuleManager *modules.Manager
@ -154,7 +154,7 @@ type Loki struct {
overrides *validation.Overrides
tenantConfigs *runtime.TenantConfigs
distributor *distributor.Distributor
ingester *ingester.Ingester
Ingester *ingester.Ingester
Querier *querier.Querier
ingesterQuerier *querier.IngesterQuerier
Store storage.Store
@ -175,28 +175,28 @@ type Loki struct {
// New makes a new Loki.
func New(cfg Config) (*Loki, error) {
loki := &Loki{
cfg: cfg,
Cfg: cfg,
}
loki.setupAuthMiddleware()
if err := loki.setupModuleManager(); err != nil {
return nil, err
}
storage.RegisterCustomIndexClients(&loki.cfg.StorageConfig, prometheus.DefaultRegisterer)
storage.RegisterCustomIndexClients(&loki.Cfg.StorageConfig, prometheus.DefaultRegisterer)
return loki, nil
}
func (t *Loki) setupAuthMiddleware() {
t.cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor}
t.cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor}
if t.cfg.AuthEnabled {
t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor)
t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor)
t.Cfg.Server.GRPCMiddleware = []grpc.UnaryServerInterceptor{serverutil.RecoveryGRPCUnaryInterceptor}
t.Cfg.Server.GRPCStreamMiddleware = []grpc.StreamServerInterceptor{serverutil.RecoveryGRPCStreamInterceptor}
if t.Cfg.AuthEnabled {
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, middleware.ServerUserHeaderInterceptor)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, GRPCStreamAuthInterceptor)
t.HTTPAuthMiddleware = middleware.AuthenticateUser
} else {
t.cfg.Server.GRPCMiddleware = append(t.cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware)
t.cfg.Server.GRPCStreamMiddleware = append(t.cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware)
t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, fakeGRPCAuthUnaryMiddleware)
t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, fakeGRPCAuthStreamMiddleware)
t.HTTPAuthMiddleware = fakeHTTPAuthMiddleware
}
}
@ -224,7 +224,7 @@ func newDefaultConfig() *Config {
// Run starts Loki running, and blocks until a Loki stops.
func (t *Loki) Run() error {
serviceMap, err := t.ModuleManager.InitModuleServices(t.cfg.Target)
serviceMap, err := t.ModuleManager.InitModuleServices(t.Cfg.Target)
if err != nil {
return err
}
@ -247,7 +247,7 @@ func (t *Loki) Run() error {
t.Server.HTTP.Path("/ready").Handler(t.readyHandler(sm))
// This adds a way to see the config and the changes compared to the defaults
t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.cfg, newDefaultConfig()))
t.Server.HTTP.Path("/config").HandlerFunc(configHandler(t.Cfg, newDefaultConfig()))
t.Server.HTTP.Path("/debug/fgprof").Handler(fgprof.Handler())
@ -325,8 +325,8 @@ func (t *Loki) readyHandler(sm *services.Manager) http.HandlerFunc {
// Ingester has a special check that makes sure that it was able to register into the ring,
// and that all other ring entries are OK too.
if t.ingester != nil {
if err := t.ingester.CheckReady(r.Context()); err != nil {
if t.Ingester != nil {
if err := t.Ingester.CheckReady(r.Context()); err != nil {
http.Error(w, "Ingester not ready: "+err.Error(), http.StatusServiceUnavailable)
return
}
@ -384,13 +384,13 @@ func (t *Loki) setupModuleManager() error {
}
// Add IngesterQuerier as a dependency for store when target is either ingester or querier.
if t.cfg.Target == Querier || t.cfg.Target == Ruler {
if t.Cfg.Target == Querier || t.Cfg.Target == Ruler {
deps[Store] = append(deps[Store], IngesterQuerier)
}
// If we are running Loki with boltdb-shipper as a single binary, without clustered mode(which should always be the case when using inmemory ring),
// we should start compactor as well for better user experience.
if storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) && t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
if storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Store == "inmemory" {
deps[All] = append(deps[All], Compactor)
}

@ -80,8 +80,8 @@ const (
func (t *Loki) initServer() (services.Service, error) {
// Loki handles signals on its own.
cortex.DisableSignalHandling(&t.cfg.Server)
serv, err := server.New(t.cfg.Server)
cortex.DisableSignalHandling(&t.Cfg.Server)
serv, err := server.New(t.Cfg.Server)
if err != nil {
return nil, err
}
@ -105,9 +105,9 @@ func (t *Loki) initServer() (services.Service, error) {
}
func (t *Loki) initRing() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.ring, err = ring.New(t.cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.ring, err = ring.New(t.Cfg.Ingester.LifecyclerConfig.RingConfig, "ingester", ring.IngesterRingKey, prometheus.DefaultRegisterer)
if err != nil {
return
}
@ -117,28 +117,28 @@ func (t *Loki) initRing() (_ services.Service, err error) {
}
func (t *Loki) initRuntimeConfig() (services.Service, error) {
if t.cfg.RuntimeConfig.LoadPath == "" {
t.cfg.RuntimeConfig.LoadPath = t.cfg.LimitsConfig.PerTenantOverrideConfig
t.cfg.RuntimeConfig.ReloadPeriod = t.cfg.LimitsConfig.PerTenantOverridePeriod
if t.Cfg.RuntimeConfig.LoadPath == "" {
t.Cfg.RuntimeConfig.LoadPath = t.Cfg.LimitsConfig.PerTenantOverrideConfig
t.Cfg.RuntimeConfig.ReloadPeriod = t.Cfg.LimitsConfig.PerTenantOverridePeriod
}
if t.cfg.RuntimeConfig.LoadPath == "" {
if t.Cfg.RuntimeConfig.LoadPath == "" {
// no need to initialize module if load path is empty
return nil, nil
}
t.cfg.RuntimeConfig.Loader = loadRuntimeConfig
t.Cfg.RuntimeConfig.Loader = loadRuntimeConfig
// make sure to set default limits before we start loading configuration into memory
validation.SetDefaultLimitsForYAMLUnmarshalling(t.cfg.LimitsConfig)
validation.SetDefaultLimitsForYAMLUnmarshalling(t.Cfg.LimitsConfig)
var err error
t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.cfg.RuntimeConfig, prometheus.DefaultRegisterer)
t.runtimeConfig, err = runtimeconfig.NewRuntimeConfigManager(t.Cfg.RuntimeConfig, prometheus.DefaultRegisterer)
return t.runtimeConfig, err
}
func (t *Loki) initOverrides() (_ services.Service, err error) {
t.overrides, err = validation.NewOverrides(t.cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig))
t.overrides, err = validation.NewOverrides(t.Cfg.LimitsConfig, tenantLimitsFromRuntimeConfig(t.runtimeConfig))
// overrides are not a service, since they don't have any operational state.
return nil, err
}
@ -150,15 +150,15 @@ func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
}
func (t *Loki) initDistributor() (services.Service, error) {
t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.Cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
var err error
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer)
t.distributor, err = distributor.New(t.Cfg.Distributor, t.Cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if t.cfg.Target != All {
if t.Cfg.Target != All {
logproto.RegisterPusherServer(t.Server.GRPC, t.distributor)
}
@ -179,21 +179,21 @@ func (t *Loki) initQuerier() (services.Service, error) {
)
// NewQuerierWorker now expects Frontend (or Scheduler) address to be set. Loki only supports Frontend for now.
if t.cfg.Worker.FrontendAddress != "" {
if t.Cfg.Worker.FrontendAddress != "" {
// In case someone set scheduler address, we ignore it.
t.cfg.Worker.SchedulerAddress = ""
t.cfg.Worker.MaxConcurrentRequests = t.cfg.Querier.MaxConcurrent
level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer)
t.Cfg.Worker.SchedulerAddress = ""
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
level.Debug(util_log.Logger).Log("msg", "initializing querier worker", "config", fmt.Sprintf("%+v", t.Cfg.Worker))
worker, err = cortex_querier_worker.NewQuerierWorker(t.Cfg.Worker, httpgrpc_server.NewServer(t.Server.HTTPServer.Handler), util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
}
if t.cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.cfg.Querier.IngesterQueryStoreMaxLookback = t.cfg.Ingester.QueryStoreMaxLookBackPeriod
if t.Cfg.Ingester.QueryStoreMaxLookBackPeriod != 0 {
t.Cfg.Querier.IngesterQueryStoreMaxLookback = t.Cfg.Ingester.QueryStoreMaxLookBackPeriod
}
t.Querier, err = querier.New(t.cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
t.Querier, err = querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
if err != nil {
return nil, err
}
@ -223,57 +223,57 @@ func (t *Loki) initQuerier() (services.Service, error) {
}
func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.Cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.Cfg.Ingester.LifecyclerConfig.ListenPort = t.Cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
t.Ingester, err = ingester.New(t.Cfg.Ingester, t.Cfg.IngesterClient, t.Store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
if err != nil {
return
}
logproto.RegisterPusherServer(t.Server.GRPC, t.ingester)
logproto.RegisterQuerierServer(t.Server.GRPC, t.ingester)
logproto.RegisterIngesterServer(t.Server.GRPC, t.ingester)
grpc_health_v1.RegisterHealthServer(t.Server.GRPC, t.ingester)
t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
logproto.RegisterPusherServer(t.Server.GRPC, t.Ingester)
logproto.RegisterQuerierServer(t.Server.GRPC, t.Ingester)
logproto.RegisterIngesterServer(t.Server.GRPC, t.Ingester)
grpc_health_v1.RegisterHealthServer(t.Server.GRPC, t.Ingester)
t.Server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.Ingester.FlushHandler))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(http.HandlerFunc(t.Ingester.ShutdownHandler))
return t.Ingester, nil
}
func (t *Loki) initTableManager() (services.Service, error) {
err := t.cfg.SchemaConfig.Load()
err := t.Cfg.SchemaConfig.Load()
if err != nil {
return nil, err
}
// Assume the newest config is the one to use
lastConfig := &t.cfg.SchemaConfig.Configs[len(t.cfg.SchemaConfig.Configs)-1]
if (t.cfg.TableManager.ChunkTables.WriteScale.Enabled ||
t.cfg.TableManager.IndexTables.WriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
t.cfg.TableManager.ChunkTables.ReadScale.Enabled ||
t.cfg.TableManager.IndexTables.ReadScale.Enabled ||
t.cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
t.cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
t.cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" {
lastConfig := &t.Cfg.SchemaConfig.Configs[len(t.Cfg.SchemaConfig.Configs)-1]
if (t.Cfg.TableManager.ChunkTables.WriteScale.Enabled ||
t.Cfg.TableManager.IndexTables.WriteScale.Enabled ||
t.Cfg.TableManager.ChunkTables.InactiveWriteScale.Enabled ||
t.Cfg.TableManager.IndexTables.InactiveWriteScale.Enabled ||
t.Cfg.TableManager.ChunkTables.ReadScale.Enabled ||
t.Cfg.TableManager.IndexTables.ReadScale.Enabled ||
t.Cfg.TableManager.ChunkTables.InactiveReadScale.Enabled ||
t.Cfg.TableManager.IndexTables.InactiveReadScale.Enabled) &&
t.Cfg.StorageConfig.AWSStorageConfig.Metrics.URL == "" {
level.Error(util_log.Logger).Log("msg", "WriteScale is enabled but no Metrics URL has been provided")
os.Exit(1)
}
reg := prometheus.WrapRegistererWith(prometheus.Labels{"component": "table-manager-store"}, prometheus.DefaultRegisterer)
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.cfg.StorageConfig.Config, reg)
tableClient, err := storage.NewTableClient(lastConfig.IndexType, t.Cfg.StorageConfig.Config, reg)
if err != nil {
return nil, err
}
bucketClient, err := storage.NewBucketClient(t.cfg.StorageConfig.Config)
bucketClient, err := storage.NewBucketClient(t.Cfg.StorageConfig.Config)
util_log.CheckFatal("initializing bucket client", err)
t.tableManager, err = chunk.NewTableManager(t.cfg.TableManager, t.cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
t.tableManager, err = chunk.NewTableManager(t.Cfg.TableManager, t.Cfg.SchemaConfig.SchemaConfig, maxChunkAgeForTableManager, tableClient, bucketClient, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
@ -284,19 +284,19 @@ func (t *Loki) initTableManager() (services.Service, error) {
func (t *Loki) initStore() (_ services.Service, err error) {
// If RF > 1 and current or upcoming index type is boltdb-shipper then disable index dedupe and write dedupe cache.
// This is to ensure that index entries are replicated to all the boltdb files in ingesters flushing replicated data.
if t.cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
t.cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
if t.Cfg.Ingester.LifecyclerConfig.RingConfig.ReplicationFactor > 1 && loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.ChunkStoreConfig.DisableIndexDeduplication = true
t.Cfg.ChunkStoreConfig.WriteDedupeCacheConfig = cache.Config{}
}
if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.cfg.Ingester.LifecyclerConfig.ID
switch t.cfg.Target {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterName = t.Cfg.Ingester.LifecyclerConfig.ID
switch t.Cfg.Target {
case Ingester:
// We do not want ingester to unnecessarily keep downloading files
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeWriteOnly
// Use fifo cache for caching index in memory.
t.cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
t.Cfg.StorageConfig.IndexQueriesCacheConfig = cache.Config{
EnableFifoCache: true,
Fifocache: cache.FifoCacheConfig{
MaxSizeBytes: "200 MB",
@ -304,52 +304,52 @@ func (t *Loki) initStore() (_ services.Service, err error) {
// This is usually set in StorageConfig.IndexCacheValidity but since this is exclusively used for caching the index entries,
// I(Sandeep) am setting it here which also helps reduce some CPU cycles and allocations required for
// unmarshalling the cached data to check the expiry.
Validity: t.cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
Validity: t.Cfg.StorageConfig.IndexCacheValidity - 1*time.Minute,
},
}
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
case Querier, Ruler:
// We do not want query to do any updates to index
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
default:
t.cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.cfg) + 2*time.Minute
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadWrite
t.Cfg.StorageConfig.BoltDBShipperConfig.IngesterDBRetainPeriod = boltdbShipperQuerierIndexUpdateDelay(t.Cfg) + 2*time.Minute
}
}
chunkStore, err := cortex_storage.NewStore(t.cfg.StorageConfig.Config, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
chunkStore, err := cortex_storage.NewStore(t.Cfg.StorageConfig.Config, t.Cfg.ChunkStoreConfig, t.Cfg.SchemaConfig.SchemaConfig, t.overrides, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return
}
if loki_storage.UsingBoltdbShipper(t.cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.cfg)
switch t.cfg.Target {
if loki_storage.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) {
boltdbShipperMinIngesterQueryStoreDuration := boltdbShipperMinIngesterQueryStoreDuration(t.Cfg)
switch t.Cfg.Target {
case Querier, Ruler:
// Use AsyncStore to query both ingesters local store and chunk store for store queries.
// Only queriers should use the AsyncStore, it should never be used in ingesters.
chunkStore = loki_storage.NewAsyncStore(chunkStore, t.ingesterQuerier,
calculateAsyncStoreQueryIngestersWithin(t.cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
calculateAsyncStoreQueryIngestersWithin(t.Cfg.Querier.QueryIngestersWithin, boltdbShipperMinIngesterQueryStoreDuration),
)
case All:
// We want ingester to also query the store when using boltdb-shipper but only when running with target All.
// We do not want to use AsyncStore otherwise it would start spiraling around doing queries over and over again to the ingesters and store.
// ToDo: See if we can avoid doing this when not running loki in clustered mode.
t.cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.cfg.SchemaConfig.Configs)
if t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType {
t.Cfg.Ingester.QueryStore = true
boltdbShipperConfigIdx := loki_storage.ActivePeriodConfig(t.Cfg.SchemaConfig.Configs)
if t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx].IndexType != shipper.BoltDBShipperType {
boltdbShipperConfigIdx++
}
mlb, err := calculateMaxLookBack(t.cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.cfg.Ingester.QueryStoreMaxLookBackPeriod,
mlb, err := calculateMaxLookBack(t.Cfg.SchemaConfig.Configs[boltdbShipperConfigIdx], t.Cfg.Ingester.QueryStoreMaxLookBackPeriod,
boltdbShipperMinIngesterQueryStoreDuration)
if err != nil {
return nil, err
}
t.cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
t.Cfg.Ingester.QueryStoreMaxLookBackPeriod = mlb
}
}
t.Store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
t.Store, err = loki_storage.NewStore(t.Cfg.StorageConfig, t.Cfg.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
if err != nil {
return
}
@ -361,7 +361,7 @@ func (t *Loki) initStore() (_ services.Service, err error) {
}
func (t *Loki) initIngesterQuerier() (_ services.Service, err error) {
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.cfg.IngesterClient, t.ring, t.cfg.Querier.ExtraQueryDelay)
t.ingesterQuerier, err = querier.NewIngesterQuerier(t.Cfg.IngesterClient, t.ring, t.Cfg.Querier.ExtraQueryDelay)
if err != nil {
return nil, err
}
@ -375,15 +375,15 @@ type disabledShuffleShardingLimits struct{}
func (disabledShuffleShardingLimits) MaxQueriersPerUser(userID string) int { return 0 }
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.cfg.Frontend))
level.Debug(util_log.Logger).Log("msg", "initializing query frontend", "config", fmt.Sprintf("%+v", t.Cfg.Frontend))
roundTripper, frontendV1, _, err := frontend.InitFrontend(frontend.CombinedFrontendConfig{
// Don't set FrontendV2 field to make sure that only frontendV1 can be initialized.
Handler: t.cfg.Frontend.Handler,
FrontendV1: t.cfg.Frontend.FrontendV1,
CompressResponses: t.cfg.Frontend.CompressResponses,
DownstreamURL: t.cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
Handler: t.Cfg.Frontend.Handler,
FrontendV1: t.Cfg.Frontend.FrontendV1,
CompressResponses: t.Cfg.Frontend.CompressResponses,
DownstreamURL: t.Cfg.Frontend.DownstreamURL,
}, disabledShuffleShardingLimits{}, t.Cfg.Server.GRPCListenPort, util_log.Logger, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
@ -393,15 +393,15 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
}
level.Debug(util_log.Logger).Log("msg", "initializing query range tripperware",
"config", fmt.Sprintf("%+v", t.cfg.QueryRange),
"limits", fmt.Sprintf("%+v", t.cfg.LimitsConfig),
"config", fmt.Sprintf("%+v", t.Cfg.QueryRange),
"limits", fmt.Sprintf("%+v", t.Cfg.LimitsConfig),
)
tripperware, stopper, err := queryrange.NewTripperware(
t.cfg.QueryRange,
t.Cfg.QueryRange,
util_log.Logger,
t.overrides,
t.cfg.SchemaConfig.SchemaConfig,
t.cfg.Querier.QueryIngestersWithin,
t.Cfg.SchemaConfig.SchemaConfig,
t.Cfg.Querier.QueryIngestersWithin,
prometheus.DefaultRegisterer,
)
if err != nil {
@ -414,8 +414,8 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
roundTripper = t.QueryFrontEndTripperware(roundTripper)
}
frontendHandler := transport.NewHandler(t.cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
if t.cfg.Frontend.CompressResponses {
frontendHandler := transport.NewHandler(t.Cfg.Frontend.Handler, roundTripper, util_log.Logger, prometheus.DefaultRegisterer)
if t.Cfg.Frontend.CompressResponses {
frontendHandler = gziphandler.GzipHandler(frontendHandler)
}
@ -428,12 +428,12 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
).Wrap(frontendHandler)
var defaultHandler http.Handler
if t.cfg.Frontend.TailProxyURL != "" {
if t.Cfg.Frontend.TailProxyURL != "" {
httpMiddleware := middleware.Merge(
t.HTTPAuthMiddleware,
queryrange.StatsHTTPMiddleware,
)
tailURL, err := url.Parse(t.cfg.Frontend.TailProxyURL)
tailURL, err := url.Parse(t.Cfg.Frontend.TailProxyURL)
if err != nil {
return nil, err
}
@ -485,26 +485,26 @@ func (t *Loki) initRulerStorage() (_ services.Service, err error) {
// unfortunately there is no way to generate a "default" config and compare default against actual
// to determine if it's unconfigured. the following check, however, correctly tests this.
// Single binary integration tests will break if this ever drifts
if t.cfg.Target == All && t.cfg.Ruler.StoreConfig.IsDefaults() {
if t.Cfg.Target == All && t.Cfg.Ruler.StoreConfig.IsDefaults() {
level.Info(util_log.Logger).Log("msg", "RulerStorage is not configured in single binary mode and will not be started.")
return
}
// Loki doesn't support the configdb backend, but without excessive mangling/refactoring
// it's hard to enforce this at validation time. Therefore detect this and fail early.
if t.cfg.Ruler.StoreConfig.Type == "configdb" {
if t.Cfg.Ruler.StoreConfig.Type == "configdb" {
return nil, errors.New("configdb is not supported as a Loki rules backend type")
}
// Make sure storage directory exists if using filesystem store
if t.cfg.Ruler.StoreConfig.Type == "local" && t.cfg.Ruler.StoreConfig.Local.Directory != "" {
err := chunk_util.EnsureDirectory(t.cfg.Ruler.StoreConfig.Local.Directory)
if t.Cfg.Ruler.StoreConfig.Type == "local" && t.Cfg.Ruler.StoreConfig.Local.Directory != "" {
err := chunk_util.EnsureDirectory(t.Cfg.Ruler.StoreConfig.Local.Directory)
if err != nil {
return nil, err
}
}
t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.cfg.Ruler.StoreConfig, manager.GroupLoader{}, util_log.Logger)
t.RulerStorage, err = cortex_ruler.NewLegacyRuleStore(t.Cfg.Ruler.StoreConfig, manager.GroupLoader{}, util_log.Logger)
return
}
@ -515,17 +515,17 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
return nil, nil
}
t.cfg.Ruler.Ring.ListenPort = t.cfg.Server.GRPCListenPort
t.cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
q, err := querier.New(t.cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
t.Cfg.Ruler.Ring.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
q, err := querier.New(t.Cfg.Querier, t.Store, t.ingesterQuerier, t.overrides)
if err != nil {
return nil, err
}
engine := logql.NewEngine(t.cfg.Querier.Engine, q, t.overrides)
engine := logql.NewEngine(t.Cfg.Querier.Engine, q, t.overrides)
t.ruler, err = ruler.NewRuler(
t.cfg.Ruler,
t.Cfg.Ruler,
engine,
prometheus.DefaultRegisterer,
util_log.Logger,
@ -540,7 +540,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
t.rulerAPI = cortex_ruler.NewAPI(t.ruler, t.RulerStorage, util_log.Logger)
// Expose HTTP endpoints.
if t.cfg.Ruler.EnableAPI {
if t.Cfg.Ruler.EnableAPI {
t.Server.HTTP.Handle("/ruler/ring", t.ruler)
cortex_ruler.RegisterRulerServer(t.Server.GRPC, t.ruler)
@ -570,18 +570,18 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
}
func (t *Loki) initMemberlistKV() (services.Service, error) {
t.cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
t.cfg.MemberlistKV.Codecs = []codec.Codec{
t.Cfg.MemberlistKV.MetricsRegisterer = prometheus.DefaultRegisterer
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
}
t.memberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV, util_log.Logger)
t.memberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV, util_log.Logger)
return t.memberlistKV, nil
}
func (t *Loki) initCompactor() (services.Service, error) {
var err error
t.compactor, err = compactor.NewCompactor(t.cfg.CompactorConfig, t.cfg.StorageConfig.Config, prometheus.DefaultRegisterer)
t.compactor, err = compactor.NewCompactor(t.Cfg.CompactorConfig, t.Cfg.StorageConfig.Config, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}

Loading…
Cancel
Save