Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/bloombuild/builder/builder.go

588 lines
18 KiB

package builder
import (
"context"
"fmt"
"io"
"os"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/google/uuid"
"github.com/grafana/dskit/backoff"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"github.com/grafana/loki/v3/pkg/bloombuild/common"
"github.com/grafana/loki/v3/pkg/bloombuild/protos"
"github.com/grafana/loki/v3/pkg/bloomgateway"
"github.com/grafana/loki/v3/pkg/compression"
iter "github.com/grafana/loki/v3/pkg/iter/v2"
"github.com/grafana/loki/v3/pkg/storage"
v1 "github.com/grafana/loki/v3/pkg/storage/bloom/v1"
"github.com/grafana/loki/v3/pkg/storage/config"
"github.com/grafana/loki/v3/pkg/storage/stores"
"github.com/grafana/loki/v3/pkg/storage/stores/shipper/bloomshipper"
utillog "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/util/ring"
)
// TODO(chaudum): Make configurable via (per-tenant?) setting.
var defaultBlockCompressionCodec = compression.None
type Builder struct {
services.Service
ID string
cfg Config
limits Limits
metrics *Metrics
logger log.Logger
bloomStore bloomshipper.Store
chunkLoader ChunkLoader
bloomGateway bloomgateway.Client
client protos.PlannerForBuilderClient
// used only in SSD mode where a single planner of the backend replicas needs to create tasksQueue
// therefore is nil when planner is run in microservice mode (default)
ringWatcher *common.RingWatcher
}
func New(
cfg Config,
limits Limits,
_ config.SchemaConfig,
_ storage.Config,
_ storage.ClientMetrics,
fetcherProvider stores.ChunkFetcherProvider,
bloomStore bloomshipper.Store,
bloomGateway bloomgateway.Client,
logger log.Logger,
r prometheus.Registerer,
rm *ring.RingManager,
) (*Builder, error) {
utillog.WarnExperimentalUse("Bloom Builder", logger)
builderID := uuid.NewString()
logger = log.With(logger, "builder_id", builderID)
metrics := NewMetrics(r)
b := &Builder{
ID: builderID,
cfg: cfg,
limits: limits,
metrics: metrics,
bloomStore: bloomStore,
chunkLoader: NewStoreChunkLoader(fetcherProvider, metrics),
bloomGateway: bloomGateway,
logger: logger,
}
if rm != nil {
b.ringWatcher = common.NewRingWatcher(rm.RingLifecycler.GetInstanceID(), rm.Ring, time.Minute, logger)
}
b.Service = services.NewBasicService(b.starting, b.running, b.stopping)
return b, nil
}
func (b *Builder) starting(ctx context.Context) error {
if b.ringWatcher != nil {
if err := services.StartAndAwaitRunning(ctx, b.ringWatcher); err != nil {
return fmt.Errorf("error starting builder subservices: %w", err)
}
}
b.metrics.running.Set(1)
return nil
}
func (b *Builder) stopping(_ error) error {
defer b.metrics.running.Set(0)
if b.ringWatcher != nil {
if err := services.StopAndAwaitTerminated(context.Background(), b.ringWatcher); err != nil {
return fmt.Errorf("error stopping builder subservices: %w", err)
}
}
if b.client != nil {
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
ctx, err := user.InjectIntoGRPCRequest(user.InjectOrgID(context.Background(), "fake"))
if err != nil {
level.Error(b.logger).Log("msg", "failed to inject orgID into context", "err", err)
return nil
}
req := &protos.NotifyBuilderShutdownRequest{
BuilderID: b.ID,
}
if _, err := b.client.NotifyBuilderShutdown(ctx, req); err != nil {
level.Error(b.logger).Log("msg", "failed to notify planner about builder shutdown", "err", err)
}
}
return nil
}
func (b *Builder) running(ctx context.Context) error {
// Retry if the connection to the planner is lost.
retries := backoff.New(ctx, b.cfg.BackoffConfig)
for retries.Ongoing() {
err := b.connectAndBuild(ctx)
if err != nil {
err := standardizeRPCError(err)
// When the builder is shutting down, we will get a context canceled error
if errors.Is(err, context.Canceled) && b.State() != services.Running {
level.Debug(b.logger).Log("msg", "builder is shutting down")
break
}
// If the planner disconnects while we are sending/receive a message we get an EOF error.
// In this case we should reset the backoff and retry
if errors.Is(err, io.EOF) {
level.Error(b.logger).Log("msg", "planner disconnected. Resetting backoff and retrying", "err", err)
retries.Reset()
continue
}
// Otherwise (e.g. failed to connect to the builder), we should retry
code := status.Code(err)
level.Error(b.logger).Log("msg", "failed to connect and build. Retrying", "retry", retries.NumRetries(), "maxRetries", b.cfg.BackoffConfig.MaxRetries, "code", code.String(), "err", err)
retries.Wait()
continue
}
// We shouldn't get here. If we do, we better restart the builder.
// Adding a log line for visibility
level.Error(b.logger).Log("msg", "unexpected end of connectAndBuild. Restarting builder")
break
}
if err := retries.Err(); err != nil {
if errors.Is(err, context.Canceled) {
// Edge case when the builder is shutting down while we check for retries
return nil
}
return fmt.Errorf("failed to connect and build: %w", err)
}
return nil
}
// standardizeRPCError converts some gRPC errors we want to handle differently to standard errors.
// 1. codes.Canceled -> context.Canceled
// 2. codes.Unavailable with EOF -> io.EOF
// 3. All other errors are returned as is.
func standardizeRPCError(err error) error {
if err == nil {
return nil
}
if st, ok := status.FromError(err); ok {
switch st.Code() {
case codes.Canceled:
// Happens when the builder is shutting down, and we are sending/receiving a message
return context.Canceled
case codes.Unavailable:
// We want to handle this case as a retryable error that resets the backoff
if i := strings.LastIndex(st.Message(), "EOF"); i != -1 {
return io.EOF
}
}
}
return err
}
func (b *Builder) plannerAddress() string {
if b.ringWatcher == nil {
return b.cfg.PlannerAddress
}
addr, err := b.ringWatcher.GetLeaderAddress()
if err != nil {
return b.cfg.PlannerAddress
}
return addr
}
func (b *Builder) connectAndBuild(ctx context.Context) error {
opts, err := b.cfg.GrpcConfig.DialOption(nil, nil, middleware.NoOpInvalidClusterValidationReporter)
if err != nil {
return fmt.Errorf("failed to create grpc dial options: %w", err)
}
conn, err := grpc.NewClient(b.plannerAddress(), opts...)
if err != nil {
return fmt.Errorf("failed to dial bloom planner: %w", err)
}
b.client = protos.NewPlannerForBuilderClient(conn)
// The gRPC server we use from dskit expects the orgID to be injected into the context when auth is enabled
// We won't actually use the orgID anywhere in this service, but we need to inject it to satisfy the server.
ctx, err = user.InjectIntoGRPCRequest(user.InjectOrgID(ctx, "fake"))
if err != nil {
return fmt.Errorf("failed to inject orgID into context: %w", err)
}
c, err := b.client.BuilderLoop(ctx)
if err != nil {
return fmt.Errorf("failed to start builder loop: %w", err)
}
// Start processing tasks from planner
if err := b.builderLoop(c); err != nil {
return fmt.Errorf("builder loop failed: %w", err)
}
return nil
}
func (b *Builder) builderLoop(c protos.PlannerForBuilder_BuilderLoopClient) error {
ctx := c.Context()
// Send ready message to planner
if err := c.Send(&protos.BuilderToPlanner{BuilderID: b.ID}); err != nil {
return fmt.Errorf("failed to send ready message to planner: %w", err)
}
// Will break when planner<->builder connection is closed or when the builder is shutting down.
for ctx.Err() == nil {
protoTask, err := c.Recv()
if err != nil {
return fmt.Errorf("failed to receive task from planner: %w", err)
}
b.metrics.processingTask.Set(1)
b.metrics.taskStarted.Inc()
start := time.Now()
task, err := protos.FromProtoTask(protoTask.Task)
if err != nil {
task = &protos.Task{ID: protoTask.Task.Id}
err = fmt.Errorf("failed to convert proto task to task: %w", err)
b.logTaskCompleted(task, nil, err, start)
if err = b.notifyTaskCompletedToPlanner(c, task, nil, err); err != nil {
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
continue
}
newMetas, err := b.processTask(ctx, task)
if err != nil {
err = fmt.Errorf("failed to process task: %w", err)
}
b.logTaskCompleted(task, newMetas, err, start)
if err = b.notifyTaskCompletedToPlanner(c, task, newMetas, err); err != nil {
b.metrics.processingTask.Set(0)
return fmt.Errorf("failed to notify task completion to planner: %w", err)
}
b.metrics.processingTask.Set(0)
}
level.Debug(b.logger).Log("msg", "builder loop stopped", "ctx_err", ctx.Err())
return ctx.Err()
}
func (b *Builder) logTaskCompleted(
task *protos.Task,
metas []bloomshipper.Meta,
err error,
start time.Time,
) {
logger := task.GetLogger(b.logger)
if err != nil {
b.metrics.taskCompleted.WithLabelValues(statusFailure).Inc()
b.metrics.taskDuration.WithLabelValues(statusFailure).Observe(time.Since(start).Seconds())
level.Debug(logger).Log(
"msg", "task failed",
"duration", time.Since(start).String(),
"err", err,
)
return
}
b.metrics.taskCompleted.WithLabelValues(statusSuccess).Inc()
b.metrics.taskDuration.WithLabelValues(statusSuccess).Observe(time.Since(start).Seconds())
level.Debug(logger).Log(
"msg", "task completed",
"duration", time.Since(start).String(),
"metas", len(metas),
)
}
func (b *Builder) notifyTaskCompletedToPlanner(
c protos.PlannerForBuilder_BuilderLoopClient,
task *protos.Task,
metas []bloomshipper.Meta,
err error,
) error {
logger := task.GetLogger(b.logger)
result := &protos.TaskResult{
TaskID: task.ID,
Error: err,
CreatedMetas: metas,
}
if err := c.Send(&protos.BuilderToPlanner{
BuilderID: b.ID,
Result: *result.ToProtoTaskResult(),
}); err != nil {
return fmt.Errorf("failed to acknowledge task completion to planner: %w", err)
}
level.Debug(logger).Log("msg", "acknowledged task completion to planner")
return nil
}
// processTask generates the blooms blocks and metas and uploads them to the object storage.
// Now that we have the gaps, we will generate a bloom block for each gap.
// We can accelerate this by using existing blocks which may already contain
// needed chunks in their blooms, for instance after a new TSDB version is generated
// but contains many of the same chunk references from the previous version.
// To do this, we'll need to take the metas we've already resolved and find blocks
// overlapping the ownership ranges we've identified as needing updates.
// With these in hand, we can download the old blocks and use them to
// accelerate bloom generation for the new blocks.
func (b *Builder) processTask(
ctx context.Context,
task *protos.Task,
) ([]bloomshipper.Meta, error) {
tenant := task.Tenant
logger := task.GetLogger(b.logger)
level.Debug(logger).Log("msg", "task started")
if timeout := b.limits.BuilderResponseTimeout(task.Tenant); timeout > 0 {
var cancel context.CancelFunc
ctx, cancel = context.WithDeadline(ctx, time.Now().Add(timeout))
defer cancel()
}
client, err := b.bloomStore.Client(task.Table.ModelTime())
if err != nil {
level.Error(logger).Log("msg", "failed to get client", "err", err)
return nil, fmt.Errorf("failed to get client: %w", err)
}
blockEnc, err := compression.ParseCodec(b.limits.BloomBlockEncoding(task.Tenant))
if err != nil {
return nil, fmt.Errorf("failed to parse block encoding: %w", err)
}
var (
blockCt int
maxBlockSize = uint64(b.limits.BloomMaxBlockSize(tenant))
maxBloomSize = uint64(b.limits.BloomMaxBloomSize(tenant))
blockOpts = v1.NewBlockOptions(blockEnc, maxBlockSize, maxBloomSize)
created []bloomshipper.Meta
totalSeries int
bytesAdded int
)
for i := range task.Gaps {
if ctx.Err() != nil {
return nil, ctx.Err()
}
gap := task.Gaps[i]
logger := log.With(logger, "gap", gap.Bounds.String())
meta := bloomshipper.Meta{
MetaRef: bloomshipper.MetaRef{
Ref: bloomshipper.Ref{
TenantID: tenant,
TableName: task.Table.Addr(),
Bounds: gap.Bounds,
},
},
}
// Fetch blocks that aren't up to date but are in the desired fingerprint range
// to try and accelerate bloom creation.
level.Debug(logger).Log("msg", "loading series and blocks for gap", "blocks", len(gap.Blocks))
seriesItr, blocksIter, err := b.loadWorkForGap(ctx, task.Table, gap)
if err != nil {
level.Error(logger).Log("msg", "failed to get series and blocks", "err", err)
return nil, fmt.Errorf("failed to get series and blocks: %w", err)
}
// TODO(owen-d): more elegant error handling than sync.OnceFunc
closeBlocksIter := sync.OnceFunc(func() {
if err := blocksIter.Close(); err != nil {
level.Error(logger).Log("msg", "failed to close blocks iterator", "err", err)
}
})
defer closeBlocksIter()
// Blocks are built consuming the series iterator. For observability, we wrap the series iterator
// with a counter iterator to count the number of times Next() is called on it.
// This is used to observe the number of series that are being processed.
seriesItrWithCounter := iter.NewCounterIter(seriesItr)
gen := NewSimpleBloomGenerator(
tenant,
blockOpts,
seriesItrWithCounter,
b.chunkLoader,
blocksIter,
b.writerReaderFunc,
nil, // TODO(salvacorts): Pass reporter or remove when we address tracking
b.bloomStore.BloomMetrics(),
logger,
)
level.Debug(logger).Log("msg", "generating blocks", "overlapping_blocks", len(gap.Blocks))
newBlocks := gen.Generate(ctx)
for newBlocks.Next() && newBlocks.Err() == nil {
blockCt++
blk := newBlocks.At()
built, err := bloomshipper.BlockFrom(defaultBlockCompressionCodec, tenant, task.Table.Addr(), blk)
if err != nil {
level.Error(logger).Log("msg", "failed to build block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to build block: %w", err)
}
logger := log.With(logger, "block", built.String())
if err := client.PutBlock(
ctx,
built,
); err != nil {
level.Error(logger).Log("msg", "failed to write block", "err", err)
if err = blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
return nil, fmt.Errorf("failed to write block: %w", err)
}
b.metrics.blocksCreated.Inc()
if err := blk.Reader().Cleanup(); err != nil {
level.Error(logger).Log("msg", "failed to cleanup block directory", "err", err)
}
totalGapKeyspace := gap.Bounds.Max - gap.Bounds.Min
progress := built.Bounds.Max - gap.Bounds.Min
pct := float64(progress) / float64(totalGapKeyspace) * 100
level.Debug(logger).Log("msg", "uploaded block", "progress_pct", fmt.Sprintf("%.2f", pct))
meta.Blocks = append(meta.Blocks, built.BlockRef)
fix(blooms): Match series to newest block only (#15481) **What this PR does / why we need it**: While running bloom filters in production we noticed some Loki clusters that showed a very high percentage of missing chunks when querying blooms, thus resulting in lower filter rate. The reason is that old, superseded blocks are still considered up-to-date, because they cover a keyspace that is not covered by newer blocks with smaller keyspaces (due to larger individual series). ``` | series fingerprint keyspace ------------+---------------------------------------------------------------- | o o o o o o o o o o o o o ------------+---------------------------------------------------------------- iteration 1 | 111111111111111111111111111111111111111111111111111111111 iteration 2 | 22222222222222 3333333333333333 444444 iteration 3 | 5555555 6666666 77777777 888888888 9999999999 ... up-to-date | 555555522266666661111177777777333388888888811119999999999 ------------+---------------------------------------------------------------- | x ``` The chart shows the different blocks marked with the numbers 1 to 9 for a subset of the full series fingerprint keyspace. The blocks are generated in multiple successive bloom building iterations. The first block covers a larger keyspace (more series), because the individual blooms in the blocks are smaller in the beginning of the day. Later, the blooms get larger and therefore the block fingerprint ranges gets smaller. However, since we are dealing with fingerprint ranges, not individual fingerprints, the newer blocks cause "gaps" in the range of the previously larger keyspace. In the case above, every block except block 4, are considered up-to-date, since each of them covers a keyspace that is otherwise not covered. When resolving blocks for a series at query time, we consider looking at all up-to-date blocks, which are referenced by the meta files. The series `x` in the chart shows, that it is within the range of 3 up-to-date blocks: 1, 2, 5. However, only the newest block (5) may contain the requested series. This PR changes the block resolver on the index-gateway to only match the newest block to a series, based on the timestamp of the TSDB from with the blocks were generated. --- Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
1 year ago
meta.Sources = append(meta.Sources, task.TSDB)
}
if err := newBlocks.Err(); err != nil {
level.Error(logger).Log("msg", "failed to generate bloom", "err", err)
return nil, fmt.Errorf("failed to generate bloom: %w", err)
}
closeBlocksIter()
bytesAdded += newBlocks.Bytes()
totalSeries += seriesItrWithCounter.Count()
b.metrics.blocksReused.Add(float64(len(gap.Blocks)))
// Write the new meta
// TODO(owen-d): put total size in log, total time in metrics+log
ref, err := bloomshipper.MetaRefFrom(tenant, task.Table.Addr(), gap.Bounds, meta.Sources, meta.Blocks)
if err != nil {
level.Error(logger).Log("msg", "failed to checksum meta", "err", err)
return nil, fmt.Errorf("failed to checksum meta: %w", err)
}
meta.MetaRef = ref
logger = log.With(logger, "meta", meta.String())
if err := client.PutMeta(ctx, meta); err != nil {
level.Error(logger).Log("msg", "failed to write meta", "err", err)
return nil, fmt.Errorf("failed to write meta: %w", err)
}
b.metrics.metasCreated.Inc()
level.Debug(logger).Log("msg", "uploaded meta")
created = append(created, meta)
// Now that the meta is written thus blocks can be queried, we prefetch them to the gateway
if b.bloomGateway != nil && b.limits.PrefetchBloomBlocks(tenant) {
if err := b.bloomGateway.PrefetchBloomBlocks(ctx, meta.Blocks); err != nil {
level.Error(logger).Log("msg", "failed to prefetch block on gateway", "err", err)
}
}
}
b.metrics.seriesPerTask.Observe(float64(totalSeries))
b.metrics.bytesPerTask.Observe(float64(bytesAdded))
level.Debug(logger).Log(
"msg", "finished bloom generation",
"blocks", blockCt,
"series", totalSeries,
"bytes_added", bytesAdded,
)
return created, nil
}
func (b *Builder) loadWorkForGap(
ctx context.Context,
table config.DayTable,
gap protos.Gap,
) (iter.Iterator[*v1.Series], iter.CloseResetIterator[*v1.SeriesWithBlooms], error) {
seriesItr := iter.NewCancelableIter(ctx, iter.NewSliceIter(gap.Series))
// load a blocks iterator for the gap
fetcher, err := b.bloomStore.Fetcher(table.ModelTime())
if err != nil {
return nil, nil, errors.Wrap(err, "failed to get fetcher")
}
// NB(owen-d): we filter out nil blocks here to avoid panics in the bloom generator since the fetcher
// input->output length and indexing in its contract
// NB(chaudum): Do we want to fetch in strict mode and fail instead?
f := FetchFunc[bloomshipper.BlockRef, *bloomshipper.CloseableBlockQuerier](func(ctx context.Context, refs []bloomshipper.BlockRef) ([]*bloomshipper.CloseableBlockQuerier, error) {
blks, err := fetcher.FetchBlocks(ctx, refs, bloomshipper.WithFetchAsync(false), bloomshipper.WithIgnoreNotFound(true))
if err != nil {
return nil, err
}
exists := make([]*bloomshipper.CloseableBlockQuerier, 0, len(blks))
for _, blk := range blks {
if blk != nil {
exists = append(exists, blk)
}
}
return exists, nil
})
blocksIter := newBlockLoadingIter(ctx, gap.Blocks, f, 10)
return seriesItr, blocksIter, nil
}
func (b *Builder) writerReaderFunc() (v1.BlockWriter, v1.BlockReader) {
dir, err := os.MkdirTemp(b.cfg.WorkingDir, "bloom-block-")
if err != nil {
panic(err)
}
return v1.NewDirectoryBlockWriter(dir), v1.NewDirectoryBlockReader(dir)
}