Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/bloomgateway/bloomgateway.go

474 lines
14 KiB

/*
Bloom Gateway package
The bloom gateway is a component that can be run as a standalone microserivce
target and provides capabilities for filtering ChunkRefs based on a given list
of line filter expressions.
Querier Query Frontend
| |
................................... service boundary
| |
+----+------+
|
indexgateway.Gateway
|
bloomgateway.BloomQuerier
|
bloomgateway.GatewayClient
|
logproto.BloomGatewayClient
|
................................... service boundary
|
bloomgateway.Gateway
|
queue.RequestQueue
|
bloomgateway.Worker
|
bloomgateway.Processor
|
bloomshipper.Store
|
bloomshipper.Client
|
ObjectClient
|
................................... service boundary
|
object storage
*/
package bloomgateway
import (
"context"
"fmt"
"sort"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/queue"
"github.com/grafana/loki/pkg/storage"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/chunk/cache"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/constants"
)
var errGatewayUnhealthy = errors.New("bloom-gateway is unhealthy in the ring")
const (
pendingTasksInitialCap = 1024
metricsSubsystem = "bloom_gateway"
)
var (
// responsesPool pooling array of v1.Output [64, 128, 256, ..., 65536]
responsesPool = queue.NewSlicePool[v1.Output](1<<6, 1<<16, 2)
)
type metrics struct {
queueDuration prometheus.Histogram
inflightRequests prometheus.Summary
chunkRefsUnfiltered prometheus.Counter
chunkRefsFiltered prometheus.Counter
}
func newMetrics(registerer prometheus.Registerer, namespace, subsystem string) *metrics {
return &metrics{
queueDuration: promauto.With(registerer).NewHistogram(prometheus.HistogramOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "queue_duration_seconds",
Help: "Time spent by tasks in queue before getting picked up by a worker.",
Buckets: prometheus.DefBuckets,
}),
inflightRequests: promauto.With(registerer).NewSummary(prometheus.SummaryOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "inflight_tasks",
Help: "Number of inflight tasks (either queued or processing) sampled at a regular interval. Quantile buckets keep track of inflight tasks over the last 60s.",
Objectives: map[float64]float64{0.5: 0.05, 0.75: 0.02, 0.8: 0.02, 0.9: 0.01, 0.95: 0.01, 0.99: 0.001},
MaxAge: time.Minute,
AgeBuckets: 6,
}),
chunkRefsUnfiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_pre_filtering",
Help: "Total amount of chunk refs pre filtering. Does not count chunk refs in failed requests.",
}),
chunkRefsFiltered: promauto.With(registerer).NewCounter(prometheus.CounterOpts{
Namespace: namespace,
Subsystem: subsystem,
Name: "chunkrefs_post_filtering",
Help: "Total amount of chunk refs post filtering.",
}),
}
}
func (m *metrics) addUnfilteredCount(n int) {
m.chunkRefsUnfiltered.Add(float64(n))
}
func (m *metrics) addFilteredCount(n int) {
m.chunkRefsFiltered.Add(float64(n))
}
// SyncMap is a map structure which can be synchronized using the RWMutex
type SyncMap[k comparable, v any] struct {
sync.RWMutex
Map map[k]v
}
type pendingTasks SyncMap[ulid.ULID, Task]
func (t *pendingTasks) Len() int {
t.RLock()
defer t.RUnlock()
return len(t.Map)
}
func (t *pendingTasks) Add(k ulid.ULID, v Task) {
t.Lock()
t.Map[k] = v
t.Unlock()
}
func (t *pendingTasks) Delete(k ulid.ULID) {
t.Lock()
delete(t.Map, k)
t.Unlock()
}
// makePendingTasks creates a SyncMap that holds pending tasks
func makePendingTasks(n int) *pendingTasks {
return &pendingTasks{
RWMutex: sync.RWMutex{},
Map: make(map[ulid.ULID]Task, n),
}
}
type Gateway struct {
services.Service
cfg Config
logger log.Logger
metrics *metrics
workerMetrics *workerMetrics
queueMetrics *queue.Metrics
queue *queue.RequestQueue
activeUsers *util.ActiveUsersCleanupService
bloomStore bloomshipper.Store
pendingTasks *pendingTasks
serviceMngr *services.Manager
serviceWatcher *services.FailureWatcher
workerConfig workerConfig
}
type fixedQueueLimits struct {
maxConsumers int
}
func (l *fixedQueueLimits) MaxConsumers(_ string, _ int) int {
return l.maxConsumers
}
// New returns a new instance of the Bloom Gateway.
func New(cfg Config, schemaCfg config.SchemaConfig, storageCfg storage.Config, overrides Limits, cm storage.ClientMetrics, logger log.Logger, reg prometheus.Registerer) (*Gateway, error) {
g := &Gateway{
cfg: cfg,
logger: logger,
metrics: newMetrics(reg, constants.Loki, metricsSubsystem),
pendingTasks: makePendingTasks(pendingTasksInitialCap),
workerConfig: workerConfig{
maxItems: 100,
},
workerMetrics: newWorkerMetrics(reg, constants.Loki, metricsSubsystem),
queueMetrics: queue.NewMetrics(reg, constants.Loki, metricsSubsystem),
}
var err error
g.queue = queue.NewRequestQueue(cfg.MaxOutstandingPerTenant, time.Minute, &fixedQueueLimits{0}, g.queueMetrics)
g.activeUsers = util.NewActiveUsersCleanupWithDefaultValues(g.queueMetrics.Cleanup)
var metasCache cache.Cache
mcCfg := storageCfg.BloomShipperConfig.MetasCache
if cache.IsCacheConfigured(mcCfg) {
metasCache, err = cache.New(mcCfg, reg, logger, stats.BloomMetasCache, constants.Loki)
if err != nil {
return nil, err
}
}
var blocksCache cache.TypedCache[string, bloomshipper.BlockDirectory]
bcCfg := storageCfg.BloomShipperConfig.BlocksCache
if bcCfg.IsEnabled() {
blocksCache = bloomshipper.NewBlocksCache(bcCfg, reg, logger)
}
store, err := bloomshipper.NewBloomStore(schemaCfg.Configs, storageCfg, cm, metasCache, blocksCache, logger)
if err != nil {
return nil, err
}
// We need to keep a reference to be able to call Stop() on shutdown of the gateway.
g.bloomStore = store
if err := g.initServices(); err != nil {
return nil, err
}
g.Service = services.NewBasicService(g.starting, g.running, g.stopping).WithName("bloom-gateway")
return g, nil
}
func (g *Gateway) initServices() error {
var err error
svcs := []services.Service{g.queue, g.activeUsers}
for i := 0; i < g.cfg.WorkerConcurrency; i++ {
id := fmt.Sprintf("bloom-query-worker-%d", i)
w := newWorker(id, g.workerConfig, g.queue, g.bloomStore, g.pendingTasks, g.logger, g.workerMetrics)
svcs = append(svcs, w)
}
g.serviceMngr, err = services.NewManager(svcs...)
if err != nil {
return err
}
g.serviceWatcher = services.NewFailureWatcher()
g.serviceWatcher.WatchManager(g.serviceMngr)
return nil
}
func (g *Gateway) starting(ctx context.Context) error {
var err error
defer func() {
if err == nil || g.serviceMngr == nil {
return
}
if err := services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr); err != nil {
level.Error(g.logger).Log("msg", "failed to gracefully stop bloom gateway dependencies", "err", err)
}
}()
if err := services.StartManagerAndAwaitHealthy(ctx, g.serviceMngr); err != nil {
return errors.Wrap(err, "unable to start bloom gateway subservices")
}
return nil
}
func (g *Gateway) running(ctx context.Context) error {
// We observe inflight tasks frequently and at regular intervals, to have a good
// approximation of max inflight tasks over percentiles of time. We also do it with
// a ticker so that we keep tracking it even if we have no new requests but stuck inflight
// tasks (eg. worker are all exhausted).
inflightTasksTicker := time.NewTicker(250 * time.Millisecond)
defer inflightTasksTicker.Stop()
for {
select {
case <-ctx.Done():
return nil
case err := <-g.serviceWatcher.Chan():
return errors.Wrap(err, "bloom gateway subservice failed")
case <-inflightTasksTicker.C:
inflight := g.pendingTasks.Len()
g.metrics.inflightRequests.Observe(float64(inflight))
}
}
}
func (g *Gateway) stopping(_ error) error {
g.bloomStore.Stop()
return services.StopManagerAndAwaitStopped(context.Background(), g.serviceMngr)
}
// FilterChunkRefs implements BloomGatewayServer
func (g *Gateway) FilterChunkRefs(ctx context.Context, req *logproto.FilterChunkRefRequest) (*logproto.FilterChunkRefResponse, error) {
tenantID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
logger := log.With(g.logger, "tenant", tenantID)
// start time == end time --> empty response
if req.From.Equal(req.Through) {
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}
// start time > end time --> error response
if req.Through.Before(req.From) {
return nil, errors.New("from time must not be after through time")
}
numChunksUnfiltered := len(req.Refs)
// Shortcut if request does not contain filters
if len(req.Filters) == 0 {
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
return &logproto.FilterChunkRefResponse{
ChunkRefs: req.Refs,
}, nil
}
// Sort ChunkRefs by fingerprint in ascending order
sort.Slice(req.Refs, func(i, j int) bool {
return req.Refs[i].Fingerprint < req.Refs[j].Fingerprint
})
var numSeries int
seriesByDay := partitionRequest(req)
// no tasks --> empty response
if len(seriesByDay) == 0 {
return &logproto.FilterChunkRefResponse{
ChunkRefs: []*logproto.GroupedChunkRefs{},
}, nil
}
tasks := make([]Task, 0, len(seriesByDay))
for _, seriesWithBounds := range seriesByDay {
task, err := NewTask(ctx, tenantID, seriesWithBounds, req.Filters)
if err != nil {
return nil, err
}
tasks = append(tasks, task)
numSeries += len(seriesWithBounds.series)
}
g.activeUsers.UpdateUserTimestamp(tenantID, time.Now())
// Ideally we could use an unbuffered channel here, but since we return the
// request on the first error, there can be cases where the request context
// is not done yet and the consumeTask() function wants to send to the
// tasksCh, but nobody reads from it any more.
tasksCh := make(chan Task, len(tasks))
for _, task := range tasks {
task := task
level.Info(logger).Log("msg", "enqueue task", "task", task.ID, "table", task.table, "series", len(task.series))
g.queue.Enqueue(tenantID, []string{}, task, func() {
// When enqueuing, we also add the task to the pending tasks
g.pendingTasks.Add(task.ID, task)
})
go consumeTask(ctx, task, tasksCh, logger)
}
responses := responsesPool.Get(numSeries)
defer responsesPool.Put(responses)
remaining := len(tasks)
outer:
for {
select {
case <-ctx.Done():
return nil, errors.Wrap(ctx.Err(), "request failed")
case task := <-tasksCh:
level.Info(logger).Log("msg", "task done", "task", task.ID, "err", task.Err())
if task.Err() != nil {
return nil, errors.Wrap(task.Err(), "request failed")
}
responses = append(responses, task.responses...)
remaining--
if remaining == 0 {
break outer
}
}
}
for _, o := range responses {
if o.Removals.Len() == 0 {
continue
}
removeNotMatchingChunks(req, o, g.logger)
}
g.metrics.addUnfilteredCount(numChunksUnfiltered)
g.metrics.addFilteredCount(len(req.Refs))
level.Info(logger).Log("msg", "return filtered chunk refs", "unfiltered", numChunksUnfiltered, "filtered", len(req.Refs))
return &logproto.FilterChunkRefResponse{ChunkRefs: req.Refs}, nil
}
// consumeTask receives v1.Output yielded from the block querier on the task's
// result channel and stores them on the task.
// In case the context task is done, it drains the remaining items until the
// task is closed by the worker.
// Once the tasks is closed, it will send the task with the results from the
// block querier to the supplied task channel.
func consumeTask(ctx context.Context, task Task, tasksCh chan<- Task, logger log.Logger) {
logger = log.With(logger, "task", task.ID)
for res := range task.resCh {
select {
case <-ctx.Done():
level.Debug(logger).Log("msg", "drop partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
default:
level.Debug(logger).Log("msg", "accept partial result", "fp_int", uint64(res.Fp), "fp_hex", res.Fp, "chunks_to_remove", res.Removals.Len())
task.responses = append(task.responses, res)
}
}
select {
case <-ctx.Done():
// do nothing
case <-task.Done():
// notify request handler about finished task
tasksCh <- task
}
}
func removeNotMatchingChunks(req *logproto.FilterChunkRefRequest, res v1.Output, logger log.Logger) {
// binary search index of fingerprint
idx := sort.Search(len(req.Refs), func(i int) bool {
return req.Refs[i].Fingerprint >= uint64(res.Fp)
})
// fingerprint not found
if idx >= len(req.Refs) {
level.Error(logger).Log("msg", "index out of range", "idx", idx, "len", len(req.Refs), "fp", uint64(res.Fp))
return
}
// if all chunks of a fingerprint are are removed
// then remove the whole group from the response
if len(req.Refs[idx].Refs) == res.Removals.Len() {
req.Refs[idx] = nil // avoid leaking pointer
req.Refs = append(req.Refs[:idx], req.Refs[idx+1:]...)
return
}
for i := range res.Removals {
toRemove := res.Removals[i]
for j := 0; j < len(req.Refs[idx].Refs); j++ {
if toRemove.Checksum == req.Refs[idx].Refs[j].Checksum {
req.Refs[idx].Refs[j] = nil // avoid leaking pointer
req.Refs[idx].Refs = append(req.Refs[idx].Refs[:j], req.Refs[idx].Refs[j+1:]...)
j-- // since we removed the current item at index, we have to redo the same index
}
}
}
}