refactor(blooms): Bloom building integration test (#13296)

pull/13289/head^2
Salva Corts 11 months ago committed by GitHub
parent debb5f202e
commit 0ddf1e6113
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 252
      integration/bloom_building_test.go
  2. 25
      pkg/bloombuild/builder/builder.go
  3. 13
      pkg/bloombuild/planner/metrics.go
  4. 28
      pkg/bloombuild/planner/planner.go
  5. 17
      pkg/loki/modules.go

@ -0,0 +1,252 @@
//go:build integration
package integration
import (
"context"
"fmt"
"testing"
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/integration/client"
"github.com/grafana/loki/v3/integration/cluster"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/chunk/cache"
"github.com/grafana/loki/v3/pkg/storage/chunk/client/local"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
bloomshipperconfig "github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper/config"
"github.com/grafana/loki/v3/pkg/storage/types"
"github.com/grafana/loki/v3/pkg/util/mempool"
)
func TestBloomBuilding(t *testing.T) {
const (
nSeries = 10 //1000
nLogsPerSeries = 50
nBuilders = 5
)
clu := cluster.New(nil, cluster.SchemaWithTSDB, func(c *cluster.Cluster) {
c.SetSchemaVer("v13")
})
defer func() {
require.NoError(t, clu.Cleanup())
}()
// First run distributor and ingester and write some data across many series.
tDistributor := clu.AddComponent(
"distributor",
"-target=distributor",
)
tIngester := clu.AddComponent(
"ingester",
"-target=ingester",
"-ingester.flush-on-shutdown=true",
)
require.NoError(t, clu.Run())
tenantID := "fake"
now := time.Now()
cliDistributor := client.New(tenantID, "", tDistributor.HTTPURL())
cliIngester := client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
// We now ingest some logs across many series.
series := make([]labels.Labels, 0, nSeries)
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-%d", i))
series = append(series, lbs)
for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// Start compactor and wait for compaction to finish.
tCompactor := clu.AddComponent(
"compactor",
"-target=compactor",
"-compactor.compaction-interval=10m",
"-compactor.run-once=true",
)
require.NoError(t, clu.Run())
// Wait for compaction to finish.
time.Sleep(5 * time.Second)
// Now create the bloom planner and builders
tBloomPlanner := clu.AddComponent(
"bloom-planner",
"-target=bloom-planner",
"-bloom-build.enabled=true",
"-bloom-build.enable=true",
"-bloom-build.planner.interval=10m",
"-bloom-build.planner.min-table-offset=0",
)
require.NoError(t, clu.Run())
// Add several builders
builders := make([]*cluster.Component, 0, nBuilders)
for i := 0; i < nBuilders; i++ {
builder := clu.AddComponent(
"bloom-builder",
"-target=bloom-builder",
"-bloom-build.enabled=true",
"-bloom-build.enable=true",
"-bloom-build.builder.planner-address="+tBloomPlanner.GRPCURL(),
)
builders = append(builders, builder)
}
require.NoError(t, clu.Run())
// Wait for bloom build to finish
time.Sleep(5 * time.Second)
// Create bloom client to fetch metas and blocks.
bloomStore := createBloomStore(t, tBloomPlanner.ClusterSharedPath())
// Check that all series pushed are present in the metas and blocks.
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)
// Push some more logs so TSDBs need to be updated.
for i := 0; i < nSeries; i++ {
lbs := labels.FromStrings("job", fmt.Sprintf("job-new-%d", i))
series = append(series, lbs)
for j := 0; j < nLogsPerSeries; j++ {
require.NoError(t, cliDistributor.PushLogLine(fmt.Sprintf("log line %d", j), now, nil, lbs.Map()))
}
}
// restart ingester which should flush the chunks and index
require.NoError(t, tIngester.Restart())
// Restart compactor and wait for compaction to finish so TSDBs are updated.
require.NoError(t, tCompactor.Restart())
time.Sleep(5 * time.Second)
// Restart bloom planner to trigger bloom build
require.NoError(t, tBloomPlanner.Restart())
// TODO(salvacorts): Implement retry on builder so we don't need to restart them.
for _, tBloomBuilder := range builders {
tBloomBuilder.AddFlags("-bloom-build.builder.planner-address=" + tBloomPlanner.GRPCURL())
require.NoError(t, tBloomBuilder.Restart())
}
// Wait for bloom build to finish
time.Sleep(5 * time.Second)
// Check that all series (both previous and new ones) pushed are present in the metas and blocks.
// This check ensures up to 1 meta per series, which tests deletion of old metas.
checkSeriesInBlooms(t, now, tenantID, bloomStore, series)
}
func createBloomStore(t *testing.T, sharedPath string) *bloomshipper.BloomStore {
logger := log.NewNopLogger()
//logger := log.NewLogfmtLogger(os.Stdout)
schemaCfg := config.SchemaConfig{
Configs: []config.PeriodConfig{
{
From: parseDayTime("2023-09-01"),
IndexTables: config.IndexPeriodicTableConfig{
PeriodicTableConfig: config.PeriodicTableConfig{
Prefix: "index_tsdb_",
Period: 24 * time.Hour,
},
},
IndexType: types.TSDBType,
ObjectType: types.StorageTypeFileSystem,
Schema: "v13",
RowShards: 16,
},
},
}
storageCfg := storage.Config{
BloomShipperConfig: bloomshipperconfig.Config{
WorkingDirectory: []string{sharedPath + "/bloom-store-test"},
DownloadParallelism: 1,
BlocksCache: bloomshipperconfig.BlocksCacheConfig{
SoftLimit: flagext.Bytes(10 << 20),
HardLimit: flagext.Bytes(20 << 20),
TTL: time.Hour,
},
},
FSConfig: local.FSConfig{
Directory: sharedPath + "/fs-store-1",
},
}
reg := prometheus.NewPedanticRegistry()
metasCache := cache.NewNoopCache()
blocksCache := bloomshipper.NewFsBlocksCache(storageCfg.BloomShipperConfig.BlocksCache, reg, logger)
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, storage.ClientMetrics{}, metasCache, blocksCache, &mempool.SimpleHeapAllocator{}, reg, logger)
require.NoError(t, err)
return store
}
func checkSeriesInBlooms(
t *testing.T,
now time.Time,
tenantID string,
bloomStore *bloomshipper.BloomStore,
series []labels.Labels,
) {
for _, lbs := range series {
seriesFP := model.Fingerprint(lbs.Hash())
metas, err := bloomStore.FetchMetas(context.Background(), bloomshipper.MetaSearchParams{
TenantID: tenantID,
Interval: bloomshipper.NewInterval(model.TimeFromUnix(now.Add(-24*time.Hour).Unix()), model.TimeFromUnix(now.Unix())),
Keyspace: v1.NewBounds(seriesFP, seriesFP),
})
require.NoError(t, err)
// Only one meta should be present.
require.Len(t, metas, 1)
var relevantBlocks []bloomshipper.BlockRef
for _, block := range metas[0].Blocks {
if block.Cmp(uint64(seriesFP)) != v1.Overlap {
continue
}
relevantBlocks = append(relevantBlocks, block)
}
// Only one block should be relevant.
require.Len(t, relevantBlocks, 1)
queriers, err := bloomStore.FetchBlocks(context.Background(), relevantBlocks)
require.NoError(t, err)
require.Len(t, queriers, 1)
querier := queriers[0]
require.NoError(t, querier.Seek(seriesFP))
require.Equal(t, seriesFP, querier.At().Series.Fingerprint)
}
}
func parseDayTime(s string) config.DayTime {
t, err := time.Parse("2006-01-02", s)
if err != nil {
panic(err)
}
return config.DayTime{
Time: model.TimeFromUnix(t.Unix()),
}
}

@ -11,9 +11,12 @@ import (
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
@ -84,16 +87,25 @@ func (b *Builder) starting(_ context.Context) error {
}
func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)
if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
level.Error(b.logger).Log("msg", "failed to inject orgID into context", "err", err)
return nil
}
req := &protos.NotifyBuilderShutdownRequest{
BuilderID: b.ID,
}
if _, err := b.client.NotifyBuilderShutdown(context.Background(), req); err != nil {
if _, err := b.client.NotifyBuilderShutdown(ctx, req); err != nil {
level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err)
}
}
b.metrics.running.Set(0)
return nil
}
@ -111,6 +123,13 @@ func (b *Builder) running(ctx context.Context) error {
b.client = protos.NewPlannerForBuilderClient(conn)
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
ctx, err = user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, "fake"))
if err != nil {
return fmt.Errorf("failed to inject orgID into context: %w", err)
}
c, err := b.client.BuilderLoop(ctx)
if err != nil {
return fmt.Errorf("failed to start builder loop: %w", err)
@ -135,7 +154,7 @@ func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) erro
// will be canceled and the loop will exit.
protoTask, err := c.Recv()
if err != nil {
if errors.Is(c.Context().Err(), context.Canceled) {
if status.Code(err) == codes.Canceled {
level.Debug(b.logger).Log("msg", "builder loop context canceled")
return nil
}

@ -28,9 +28,10 @@ type Metrics struct {
taskLost prometheus.Counter
tasksFailed prometheus.Counter
buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
buildTime *prometheus.HistogramVec
buildStarted prometheus.Counter
buildCompleted *prometheus.CounterVec
buildTime *prometheus.HistogramVec
buildLastSuccess prometheus.Gauge
blocksDeleted prometheus.Counter
metasDeleted prometheus.Counter
@ -111,6 +112,12 @@ func NewMetrics(
Help: "Time spent during a builds cycle.",
Buckets: prometheus.DefBuckets,
}, []string{"status"}),
buildLastSuccess: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Namespace: metricsNamespace,
Subsystem: metricsSubsystem,
Name: "build_last_successful_run_timestamp_seconds",
Help: "Unix timestamp of the last successful build cycle.",
}),
blocksDeleted: promauto.With(r).NewCounter(prometheus.CounterOpts{
Namespace: metricsNamespace,

@ -188,6 +188,10 @@ func (p *Planner) runOne(ctx context.Context) error {
defer func() {
p.metrics.buildCompleted.WithLabelValues(status).Inc()
p.metrics.buildTime.WithLabelValues(status).Observe(time.Since(start).Seconds())
if status == statusSuccess {
p.metrics.buildLastSuccess.SetToCurrentTime()
}
}()
p.metrics.buildStarted.Inc()
@ -219,6 +223,7 @@ func (p *Planner) runOne(ctx context.Context) error {
level.Error(logger).Log("msg", "error computing tasks", "err", err)
continue
}
level.Debug(logger).Log("msg", "computed tasks", "tasks", len(tasks), "existingMetas", len(existingMetas))
var tenantTableEnqueuedTasks int
resultsCh := make(chan *protos.TaskResult, len(tasks))
@ -253,6 +258,11 @@ func (p *Planner) runOne(ctx context.Context) error {
// Create a pool of workers to process table-tenant tuples.
var wg sync.WaitGroup
for tt, results := range tasksResultForTenantTable {
if results.tasksToWait == 0 {
// No tasks enqueued for this tenant-table tuple, skip processing
continue
}
wg.Add(1)
go func(table config.DayTable, tenant string, results tenantTableTaskResults) {
defer wg.Done()
@ -306,7 +316,6 @@ func (p *Planner) computeTasks(
// Filter only the metas that overlap in the ownership range
metasInBounds := bloomshipper.FilterMetasOverlappingBounds(metas, ownershipRange)
level.Debug(logger).Log("msg", "found relevant metas", "metas", len(metasInBounds))
// Find gaps in the TSDBs for this tenant/table
gaps, err := p.findOutdatedGaps(ctx, tenant, table, ownershipRange, metasInBounds, logger)
@ -314,6 +323,10 @@ func (p *Planner) computeTasks(
level.Error(logger).Log("msg", "failed to find outdated gaps", "err", err)
continue
}
if len(gaps) == 0 {
level.Debug(logger).Log("msg", "no gaps found")
continue
}
for _, gap := range gaps {
tasks = append(tasks, protos.NewTask(table, tenant, ownershipRange, gap.tsdb, gap.gaps))
@ -331,7 +344,7 @@ func (p *Planner) processTenantTaskResults(
totalTasks int,
resultsCh <-chan *protos.TaskResult,
) error {
logger := log.With(p.logger, table, table.Addr(), "tenant", tenant)
logger := log.With(p.logger, "table", table.Addr(), "tenant", tenant)
level.Debug(logger).Log("msg", "waiting for all tasks to be completed", "tasks", totalTasks)
newMetas := make([]bloomshipper.Meta, 0, totalTasks)
@ -379,8 +392,12 @@ func (p *Planner) processTenantTaskResults(
combined := append(originalMetas, newMetas...)
outdated := outdatedMetas(combined)
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if len(outdated) == 0 {
level.Debug(logger).Log("msg", "no outdated metas found")
return nil
}
level.Debug(logger).Log("msg", "found outdated metas", "outdated", len(outdated))
if err := p.deleteOutdatedMetasAndBlocks(ctx, table, tenant, outdated); err != nil {
return fmt.Errorf("failed to delete outdated metas: %w", err)
}
@ -494,6 +511,7 @@ func (p *Planner) loadTenantWork(
tenant := tenants.At()
if !p.limits.BloomCreationEnabled(tenant) {
level.Debug(p.logger).Log("msg", "bloom creation disabled for tenant", "tenant", tenant)
continue
}
@ -730,7 +748,7 @@ func (p *Planner) NotifyBuilderShutdown(
req *protos.NotifyBuilderShutdownRequest,
) (*protos.NotifyBuilderShutdownResponse, error) {
level.Debug(p.logger).Log("msg", "builder shutdown", "builder", req.BuilderID)
p.tasksQueue.UnregisterConsumerConnection(req.GetBuilderID())
p.tasksQueue.NotifyConsumerShutdown(req.GetBuilderID())
return &protos.NotifyBuilderShutdownResponse{}, nil
}
@ -820,7 +838,7 @@ func (p *Planner) BuilderLoop(builder protos.PlannerForBuilder_BuilderLoopServer
level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(task.queueTime).Seconds(),
"retries", task.timesEnqueued.Load(),
"retries", task.timesEnqueued.Load()-1, // -1 because the first enqueue is not a retry
)
p.removePendingTask(task)
p.metrics.tenantTasksCompleted.WithLabelValues(task.Tenant).Inc()

@ -40,6 +40,7 @@ import (
"github.com/grafana/loki/v3/pkg/analytics"
"github.com/grafana/loki/v3/pkg/bloombuild/builder"
"github.com/grafana/loki/v3/pkg/bloombuild/planner"
bloomprotos "github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compactor"
compactorclient "github.com/grafana/loki/v3/pkg/compactor/client"
@ -714,9 +715,9 @@ func (t *Loki) initStore() (services.Service, error) {
}
func (t *Loki) initBloomStore() (services.Service, error) {
// BloomStore is a dependency of IndexGateway, even when the BloomGateway is not enabled.
// Do not instantiate store and do not create a service.
if !t.Cfg.BloomGateway.Enabled {
// BloomStore is a dependency of IndexGateway and Bloom Planner & Builder.
// Do not instantiate store and do not create a service if neither ar enabled.
if !t.Cfg.BloomGateway.Enabled && !t.Cfg.BloomBuild.Enabled {
return nil, nil
}
@ -1584,7 +1585,7 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
logger := log.With(util_log.Logger, "component", "bloom-planner")
return planner.New(
p, err := planner.New(
t.Cfg.BloomBuild.Planner,
t.Overrides,
t.Cfg.SchemaConfig,
@ -1594,6 +1595,12 @@ func (t *Loki) initBloomPlanner() (services.Service, error) {
logger,
prometheus.DefaultRegisterer,
)
if err != nil {
return nil, err
}
bloomprotos.RegisterPlannerForBuilderServer(t.Server.GRPC, p)
return p, nil
}
func (t *Loki) initBloomBuilder() (services.Service, error) {
@ -1601,7 +1608,7 @@ func (t *Loki) initBloomBuilder() (services.Service, error) {
return nil, nil
}
logger := log.With(util_log.Logger, "component", "bloom-worker")
logger := log.With(util_log.Logger, "component", "bloom-builder")
return builder.New(
t.Cfg.BloomBuild.Builder,

Loading…
Cancel
Save