feat(blooms): Add bloom planner and bloom builder to `backend` target (#13997)

Previously, the bloom compactor component was part of the `backend` target in the Simple Scalable Deployment (SSD) mode. However, the bloom compactor was removed (https://github.com/grafana/loki/pull/13969) in favour of planner and builder, and therefore also removed from the backend target.

This PR adds the planner and builder components to the backend target so it can continue building blooms if enabled.

The planner needs to be run as singleton, therefore there must only be one instance that creates tasks for the builders, even if multiple replicas of the backend target are deployed.
This is achieved by leader election through the already existing index gateway ring in the backend target. The planner leader is determined by the ownership of the leader key. Builders connect to the planner leader to pull tasks.

----

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/14027/head
Christian Haudum 1 year ago committed by GitHub
parent ef1df0e66f
commit bf60455c8e
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 4
      docs/sources/operations/query-acceleration-blooms.md
  2. 42
      pkg/bloombuild/builder/builder.go
  3. 2
      pkg/bloombuild/builder/builder_test.go
  4. 119
      pkg/bloombuild/common/ringwatcher.go
  5. 48
      pkg/bloombuild/planner/planner.go
  6. 2
      pkg/bloombuild/planner/planner_test.go
  7. 2
      pkg/loki/loki.go
  8. 18
      pkg/loki/modules.go

@ -43,8 +43,8 @@ and querying the bloom filters that only pays off at large scale deployments.
{{< /admonition >}}
To start building and using blooms you need to:
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components and enable the component in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] Backend target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Deploy the [Bloom Planner and Builder](#bloom-planner-and-builder) components (as [microservices][microservices] or via the [SSD][ssd] `backend` target) and enable the components in the [Bloom Build config][bloom-build-cfg].
- Deploy the [Bloom Gateway](#bloom-gateway) component (as a [microservice][microservices] or via the [SSD][ssd] `backend` target) and enable the component in the [Bloom Gateway config][bloom-gateway-cfg].
- Enable blooms building and filtering for each tenant individually, or for all of them by default.
```yaml

@ -30,6 +30,7 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
type Builder struct {
@ -47,6 +48,10 @@ type Builder struct {
chunkLoader ChunkLoader
client protos.PlannerForBuilderClient
// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}
func New(
@ -59,6 +64,7 @@ func New(
bloomStore bloomshipper.Store,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)
@ -82,11 +88,20 @@ func New(
logger: logger,
}
if rm != nil {
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
}
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}
func (b *Builder) starting(_ context.Context) error {
func (b *Builder) starting(ctx context.Context) error {
if b.ringWatcher != nil {
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
return fmt.Errorf("error starting builder subservices: %w", err)
}
}
b.metrics.running.Set(1)
return nil
}
@ -94,6 +109,12 @@ func (b *Builder) starting(_ context.Context) error {
func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)
if b.ringWatcher != nil {
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
return fmt.Errorf("error stopping builder subservices: %w", err)
}
}
if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
@ -137,16 +158,27 @@ func (b *Builder) running(ctx context.Context) error {
return nil
}
func (b *Builder) connectAndBuild(
ctx context.Context,
) error {
func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
}
addr, err := b.ringWatcher.GetLeaderAddress()
if err != nil {
return b.cfg.PlannerAddress
}
return addr
}
func (b *Builder) connectAndBuild(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}
// nolint:staticcheck // grpc.DialContext() has been deprecated; we'll address it before upgrading to gRPC 2.
conn, err := grpc.DialContext(ctx, b.cfg.PlannerAddress, opts...)
conn, err := grpc.DialContext(ctx, b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}

@ -88,7 +88,7 @@ func Test_BuilderLoop(t *testing.T) {
}
flagext.DefaultValues(&cfg.GrpcConfig)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer)
builder, err := New(cfg, limits, schemaCfg, storageCfg, storage.NewClientMetrics(), nil, fakeBloomStore{}, logger, prometheus.DefaultRegisterer, nil)
require.NoError(t, err)
t.Cleanup(func() {
err = services.StopAndAwaitTerminated(context.Background(), builder)

@ -0,0 +1,119 @@
package common
import (
"context"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
)
const (
RingKeyOfLeader = 0xffff
)
type RingWatcher struct {
services.Service
id string
ring *ring.Ring
leader *ring.InstanceDesc
lookupPeriod time.Duration
logger log.Logger
}
// NewRingWatcher creates a service.Service that watches a ring for a leader instance.
// The leader instance is the instance that owns the key `RingKeyOfLeader`.
// It provides functions to get the leader's address, and to check whether a given instance in the ring is leader.
// Bloom planner and bloom builder use this ring watcher to hook into index gateway ring when they are run as
// part of the `backend` target of the Simple Scalable Deployment (SSD).
// It should not be used for any other components outside of the bloombuild package.
func NewRingWatcher(id string, ring *ring.Ring, lookupPeriod time.Duration, logger log.Logger) *RingWatcher {
w := &RingWatcher{
id: id,
ring: ring,
lookupPeriod: lookupPeriod,
logger: logger,
}
w.Service = services.NewBasicService(nil, w.updateLoop, nil)
return w
}
func (w *RingWatcher) waitForInitialLeader(ctx context.Context) error {
syncTicker := time.NewTicker(time.Second)
defer syncTicker.Stop()
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-syncTicker.C:
w.lookupAddresses()
if w.leader != nil {
return nil
}
}
}
}
func (w *RingWatcher) updateLoop(ctx context.Context) error {
_ = w.waitForInitialLeader(ctx)
syncTicker := time.NewTicker(w.lookupPeriod)
defer syncTicker.Stop()
for {
select {
case <-ctx.Done():
return nil
case <-syncTicker.C:
w.lookupAddresses()
}
}
}
func (w *RingWatcher) lookupAddresses() {
bufDescs, bufHosts, bufZones := ring.MakeBuffersForGet()
rs, err := w.ring.Get(RingKeyOfLeader, ring.WriteNoExtend, bufDescs, bufHosts, bufZones)
if err != nil {
level.Error(w.logger).Log("msg", "failed to get replicationset for key", "key", RingKeyOfLeader, "err", err)
w.leader = nil
return
}
for i := range rs.Instances {
inst := rs.Instances[i]
state, err := w.ring.GetInstanceState(inst.Id)
if err != nil || state != ring.ACTIVE {
return
}
tr, err := w.ring.GetTokenRangesForInstance(inst.Id)
if err != nil && (len(tr) == 0 || tr.IncludesKey(RingKeyOfLeader)) {
if w.leader == nil || w.leader.Id != inst.Id {
level.Info(w.logger).Log("msg", "updated leader", "new_leader", inst)
}
w.leader = &inst
return
}
}
w.leader = nil
}
func (w *RingWatcher) IsLeader() bool {
return w.IsInstanceLeader(w.id)
}
func (w *RingWatcher) IsInstanceLeader(instanceID string) bool {
res := w.leader != nil && w.leader.Id == instanceID
level.Debug(w.logger).Log("msg", "check if instance is leader", "inst", instanceID, "curr_leader", w.leader, "is_leader", res)
return res
}
func (w *RingWatcher) GetLeaderAddress() (string, error) {
if w.leader == nil {
return "", ring.ErrEmptyRing
}
return w.leader.Addr, nil
}

@ -27,9 +27,13 @@ import (
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/indexshipper/tsdb"
"github.com/grafana/loki/v3/pkg/util"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
var errPlannerIsNotRunning = errors.New("planner is not running")
var (
errPlannerIsNotRunning = errors.New("planner is not running")
errPlannerIsNotLeader = errors.New("planner is not leader")
)
type Planner struct {
services.Service
@ -52,6 +56,10 @@ type Planner struct {
metrics *Metrics
logger log.Logger
// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}
func New(
@ -63,6 +71,7 @@ func New(
bloomStore bloomshipper.StoreBase,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Planner, error) {
utillog.WarnExperimentalUse("Bloom Planner", logger)
@ -101,6 +110,12 @@ func New(
)
svcs := []services.Service{p.tasksQueue, p.activeUsers}
if rm != nil {
p.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
svcs = append(svcs, p.ringWatcher)
}
p.subservices, err = services.NewManager(svcs...)
if err != nil {
return nil, fmt.Errorf("error creating subservices manager: %w", err)
@ -112,6 +127,15 @@ func New(
return p, nil
}
func (p *Planner) isLeader() bool {
if p.ringWatcher == nil {
// when the planner runs as standalone service in microserivce mode, then there is no ringWatcher
// therefore we can safely assume that the planner is a singleton
return true
}
return p.ringWatcher.IsLeader()
}
func (p *Planner) starting(ctx context.Context) (err error) {
if err := services.StartManagerAndAwaitHealthy(ctx, p.subservices); err != nil {
return fmt.Errorf("error starting planner subservices: %w", err)
@ -135,10 +159,9 @@ func (p *Planner) stopping(_ error) error {
func (p *Planner) running(ctx context.Context) error {
go p.trackInflightRequests(ctx)
// run once at beginning
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "bloom build iteration failed for the first time", "err", err)
}
// run once at beginning, but delay by 1m to allow ring consolidation when running in SSD mode
initialPlanningTimer := time.NewTimer(time.Minute)
defer initialPlanningTimer.Stop()
planningTicker := time.NewTicker(p.cfg.PlanningInterval)
defer planningTicker.Stop()
@ -154,6 +177,12 @@ func (p *Planner) running(ctx context.Context) error {
level.Debug(p.logger).Log("msg", "planner context done")
return nil
case <-initialPlanningTimer.C:
level.Info(p.logger).Log("msg", "starting initial bloom build iteration")
if err := p.runOne(ctx); err != nil {
level.Error(p.logger).Log("msg", "initial bloom build iteration failed", "err", err)
}
case <-planningTicker.C:
level.Info(p.logger).Log("msg", "starting bloom build iteration")
if err := p.runOne(ctx); err != nil {
@ -192,6 +221,10 @@ type tenantTable struct {
}
func (p *Planner) runOne(ctx context.Context) error {
if !p.isLeader() {
return errPlannerIsNotLeader
}
var (
wg sync.WaitGroup
start = time.Now()
@ -901,6 +934,11 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
builderID := resp.GetBuilderID()
logger := log.With(p.logger, "builder", builderID)
if !p.isLeader() {
return errPlannerIsNotLeader
}
level.Debug(logger).Log("msg", "builder connected")
p.tasksQueue.RegisterConsumerConnection(builderID)

@ -532,7 +532,7 @@ func createPlanner(
bloomStore, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg)
planner, err := New(cfg, limits, schemaCfg, storageCfg, storage.ClientMetrics{}, bloomStore, logger, reg, nil)
require.NoError(t, err)
return planner

@ -760,7 +760,7 @@ func (t *Loki) setupModuleManager() error {
Read: {QueryFrontend, Querier},
Write: {Ingester, IngesterRF1, Distributor, PatternIngester, IngesterKafka},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomGateway},
Backend: {QueryScheduler, Ruler, Compactor, IndexGateway, BloomPlanner, BloomBuilder, BloomGateway},
All: {QueryScheduler, QueryFrontend, Querier, Ingester, IngesterRF1, PatternIngester, Distributor, Ruler, Compactor, Metastore, IngesterKafka},
}

@ -1708,6 +1708,10 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-planner")
if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil {
level.Info(logger).Log("msg", "initializing bloom planner in ring mode as part of backend target")
}
p, err := planner.New(
t.Cfg.BloomBuild.Planner,
t.Overrides,
@ -1717,6 +1721,11 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
t.BloomStore,
logger,
prometheus.DefaultRegisterer,
// Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode.
// To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already
// part of the backend target. The planner creates a watcher service that regularly checks which replica is
// the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks.
t.indexGatewayRingManager,
)
if err != nil {
return nil, err
@ -1733,6 +1742,10 @@ func (t *Loki) initBloomBuilder() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-builder")
if t.Cfg.isTarget(Backend) && t.indexGatewayRingManager != nil {
level.Info(logger).Log("msg", "initializing bloom builder in ring mode as part of backend target")
}
return builder.New(
t.Cfg.BloomBuild.Builder,
t.Overrides,
@ -1743,6 +1756,11 @@ func (t *Loki) initBloomBuilder() (services.Service, error) {
t.BloomStore,
logger,
prometheus.DefaultRegisterer,
// Bloom planner and builder are part of the backend target in Simple Scalable Deployment mode.
// To avoid creating a new ring just for this special case, we can use the index gateway ring, which is already
// part of the backend target. The planner creates a watcher service that regularly checks which replica is
// the leader. Only the leader plans the tasks. Builders connect to the leader instance to pull tasks.
t.indexGatewayRingManager,
)
}

Loading…
Cancel
Save