feat: Boilerplate for new bloom build planner and worker components. (#12989)

pull/12982/head^2
Salva Corts 2 years ago committed by GitHub
parent 88e545fc95
commit 8978ecf0c8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 10
      docs/sources/shared/configuration.md
  2. 50
      pkg/bloombuild/builder/builder.go
  3. 21
      pkg/bloombuild/builder/config.go
  4. 26
      pkg/bloombuild/builder/metrics.go
  5. 40
      pkg/bloombuild/config.go
  6. 21
      pkg/bloombuild/planner/config.go
  7. 26
      pkg/bloombuild/planner/metrics.go
  8. 50
      pkg/bloombuild/planner/planner.go
  9. 7
      pkg/loki/loki.go
  10. 34
      pkg/loki/modules.go

@ -326,6 +326,16 @@ pattern_ingester:
# merging them as bloom blocks.
[bloom_compactor: <bloom_compactor>]
bloom_build:
# Flag to enable or disable the usage of the bloom-planner and bloom-builder
# components.
# CLI flag: -bloom-build.enabled
[enabled: <boolean> | default = false]
planner:
builder:
# Experimental: The bloom_gateway block configures the Loki bloom gateway
# server, responsible for serving queries for filtering chunks based on filter
# expressions.

@ -0,0 +1,50 @@
package builder
import (
"context"
"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
utillog "github.com/grafana/loki/v3/pkg/util/log"
)
type Worker struct {
services.Service
cfg Config
metrics *Metrics
logger log.Logger
}
func New(
cfg Config,
logger log.Logger,
r prometheus.Registerer,
) (*Worker, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)
w := &Worker{
cfg: cfg,
metrics: NewMetrics(r),
logger: logger,
}
w.Service = services.NewBasicService(w.starting, w.running, w.stopping)
return w, nil
}
func (w *Worker) starting(_ context.Context) (err error) {
w.metrics.running.Set(1)
return err
}
func (w *Worker) stopping(_ error) error {
w.metrics.running.Set(0)
return nil
}
func (w *Worker) running(_ context.Context) error {
return nil
}

@ -0,0 +1,21 @@
package builder
import "flag"
// Config configures the bloom-builder component.
type Config struct {
// TODO: Add config
}
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
// TODO: Register flags with flagsPrefix
}
func (cfg *Config) Validate() error {
return nil
}
type Limits interface {
// TODO: Add limits
}

@ -0,0 +1,26 @@
package builder
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
metricsNamespace = "loki"
metricsSubsystem = "bloombuilder"
)
type Metrics struct {
running prometheus.Gauge
}
func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if the bloom builder is currently running on this instance",
}),
}
}

@ -0,0 +1,40 @@
package bloombuild
import (
"flag"
"fmt"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
)
// Config configures the bloom-planner component.
type Config struct {
Enabled bool `yaml:"enabled"`
Planner planner.Config `yaml:"planner"`
Builder builder.Config `yaml:"builder"`
}
// RegisterFlags registers flags for the bloom building configuration.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.Enabled, "bloom-build.enabled", false, "Flag to enable or disable the usage of the bloom-planner and bloom-builder components.")
cfg.Planner.RegisterFlagsWithPrefix("bloom-build.planner", f)
cfg.Builder.RegisterFlagsWithPrefix("bloom-build.builder", f)
}
func (cfg *Config) Validate() error {
if !cfg.Enabled {
return nil
}
if err := cfg.Planner.Validate(); err != nil {
return fmt.Errorf("invalid bloom planner configuration: %w", err)
}
if err := cfg.Builder.Validate(); err != nil {
return fmt.Errorf("invalid bloom builder configuration: %w", err)
}
return nil
}

@ -0,0 +1,21 @@
package planner
import "flag"
// Config configures the bloom-planner component.
type Config struct {
// TODO: Add config
}
// RegisterFlagsWithPrefix registers flags for the bloom-planner configuration.
func (cfg *Config) RegisterFlagsWithPrefix(_ string, _ *flag.FlagSet) {
// TODO: Register flags with flagsPrefix
}
func (cfg *Config) Validate() error {
return nil
}
type Limits interface {
// TODO: Add limits
}

@ -0,0 +1,26 @@
package planner
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
const (
metricsNamespace = "loki"
metricsSubsystem = "bloomplanner"
)
type Metrics struct {
running prometheus.Gauge
}
func NewMetrics(r prometheus.Registerer) *Metrics {
return &Metrics{
running: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "running",
Help: "Value will be 1 if bloom planner is currently running on this instance",
}),
}
}

@ -0,0 +1,50 @@
package planner
import (
"context"
"github.com/go-kit/log"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
utillog "github.com/grafana/loki/v3/pkg/util/log"
)
type Planner struct {
services.Service
cfg Config
metrics *Metrics
logger log.Logger
}
func New(
cfg Config,
logger log.Logger,
r prometheus.Registerer,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)
p := &Planner{
cfg: cfg,
metrics: NewMetrics(r),
logger: logger,
}
p.Service = services.NewBasicService(p.starting, p.running, p.stopping)
return p, nil
}
func (p *Planner) starting(_ context.Context) (err error) {
p.metrics.running.Set(1)
return err
}
func (p *Planner) stopping(_ error) error {
p.metrics.running.Set(0)
return nil
}
func (p *Planner) running(_ context.Context) error {
return nil
}

@ -31,6 +31,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/bloombuild"
"github.com/grafana/loki/v3/pkg/bloomcompactor"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
@ -90,6 +91,7 @@ type Config struct {
Pattern pattern.Config `yaml:"pattern_ingester,omitempty"`
IndexGateway indexgateway.Config `yaml:"index_gateway"`
BloomCompactor bloomcompactor.Config `yaml:"bloom_compactor,omitempty" category:"experimental"`
BloomBuild bloombuild.Config `yaml:"bloom_build,omitempty" category:"experimental"`
BloomGateway bloomgateway.Config `yaml:"bloom_gateway,omitempty" category:"experimental"`
StorageConfig storage.Config `yaml:"storage_config,omitempty"`
ChunkStoreConfig config.ChunkStoreConfig `yaml:"chunk_store_config,omitempty"`
@ -173,6 +175,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Tracing.RegisterFlags(f)
c.CompactorConfig.RegisterFlags(f)
c.BloomCompactor.RegisterFlags(f)
c.BloomBuild.RegisterFlags(f)
c.QueryScheduler.RegisterFlags(f)
c.Analytics.RegisterFlags(f)
c.OperationalConfig.RegisterFlags(f)
@ -649,6 +652,8 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(BloomStore, t.initBloomStore)
mm.RegisterModule(BloomCompactor, t.initBloomCompactor)
mm.RegisterModule(BloomCompactorRing, t.initBloomCompactorRing, modules.UserInvisibleModule)
mm.RegisterModule(BloomPlanner, t.initBloomPlanner)
mm.RegisterModule(BloomBuilder, t.initBloomBuilder)
mm.RegisterModule(IndexGateway, t.initIndexGateway)
mm.RegisterModule(IndexGatewayRing, t.initIndexGatewayRing, modules.UserInvisibleModule)
mm.RegisterModule(IndexGatewayInterceptors, t.initIndexGatewayInterceptors, modules.UserInvisibleModule)
@ -686,6 +691,8 @@ func (t *Loki) setupModuleManager() error {
IndexGateway: {Server, Store, BloomStore, IndexGatewayRing, IndexGatewayInterceptors, Analytics},
BloomGateway: {Server, BloomStore, Analytics},
BloomCompactor: {Server, BloomStore, BloomCompactorRing, Analytics, Store},
BloomPlanner: {Server, BloomStore, Analytics, Store},
BloomBuilder: {Server, BloomStore, Analytics, Store},
PatternIngester: {Server, MemberlistKV, Analytics},
PatternRingClient: {Server, MemberlistKV, Analytics},
IngesterQuerier: {Ring},

@ -38,6 +38,8 @@ import (
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
compactorclient "github.com/grafana/loki/v3/pkg/compactor/client"
@ -122,6 +124,8 @@ const (
QuerySchedulerRing string = "query-scheduler-ring"
BloomCompactor string = "bloom-compactor"
BloomCompactorRing string = "bloom-compactor-ring"
BloomPlanner string = "bloom-planner"
BloomBuilder string = "bloom-builder"
BloomStore string = "bloom-store"
All string = "all"
Read string = "read"
@ -803,7 +807,7 @@ func (t *Loki) updateConfigForShipperStore() {
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeWriteOnly
t.Cfg.StorageConfig.TSDBShipperConfig.IngesterDBRetainPeriod = shipperQuerierIndexUpdateDelay(t.Cfg.StorageConfig.IndexCacheValidity, t.Cfg.StorageConfig.TSDBShipperConfig.ResyncInterval)
case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor):
case t.Cfg.isTarget(Querier), t.Cfg.isTarget(Ruler), t.Cfg.isTarget(Read), t.Cfg.isTarget(Backend), t.isModuleActive(IndexGateway), t.Cfg.isTarget(BloomCompactor), t.Cfg.isTarget(BloomPlanner), t.Cfg.isTarget(BloomBuilder):
// We do not want query to do any updates to index
t.Cfg.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
t.Cfg.StorageConfig.TSDBShipperConfig.Mode = indexshipper.ModeReadOnly
@ -1553,6 +1557,34 @@ func (t *Loki) initBloomCompactorRing() (services.Service, error) {
return t.bloomCompactorRingManager, nil
}
func (t *Loki) initBloomPlanner() (services.Service, error) {
if !t.Cfg.BloomBuild.Enabled {
return nil, nil
}
logger := log.With(util_log.Logger, "component", "bloom-planner")
return planner.New(
t.Cfg.BloomBuild.Planner,
logger,
prometheus.DefaultRegisterer,
)
}
func (t *Loki) initBloomBuilder() (services.Service, error) {
if !t.Cfg.BloomBuild.Enabled {
return nil, nil
}
logger := log.With(util_log.Logger, "component", "bloom-worker")
return builder.New(
t.Cfg.BloomBuild.Builder,
logger,
prometheus.DefaultRegisterer,
)
}
func (t *Loki) initQueryScheduler() (services.Service, error) {
s, err := scheduler.NewScheduler(t.Cfg.QueryScheduler, t.Overrides, util_log.Logger, t.querySchedulerRingManager, prometheus.DefaultRegisterer, t.Cfg.MetricsNamespace)
if err != nil {

Loading…
Cancel
Save