@ -10,11 +10,8 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"golang.org/x/exp/slices"
"github.com/grafana/loki/pkg/queue"
v1 "github.com/grafana/loki/pkg/storage/bloom/v1"
"github.com/grafana/loki/pkg/storage/stores/shipper/bloomshipper"
)
@ -23,11 +20,9 @@ type workerConfig struct {
}
type workerMetrics struct {
dequeuedTasks * prometheus . CounterVec
dequeueErrors * prometheus . CounterVec
dequeueWaitTime * prometheus . SummaryVec
storeAccessLatency * prometheus . HistogramVec
bloomQueryLatency * prometheus . HistogramVec
dequeuedTasks * prometheus . CounterVec
dequeueErrors * prometheus . CounterVec
dequeueWaitTime * prometheus . SummaryVec
}
func newWorkerMetrics ( registerer prometheus . Registerer , namespace , subsystem string ) * workerMetrics {
@ -51,19 +46,6 @@ func newWorkerMetrics(registerer prometheus.Registerer, namespace, subsystem str
Name : "dequeue_wait_time" ,
Help : "Time spent waiting for dequeuing tasks from queue" ,
} , labels ) ,
bloomQueryLatency : promauto . With ( registerer ) . NewHistogramVec ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "bloom_query_latency" ,
Help : "Latency in seconds of processing bloom blocks" ,
} , append ( labels , "status" ) ) ,
// TODO(chaudum): Move this metric into the bloomshipper
storeAccessLatency : promauto . With ( registerer ) . NewHistogramVec ( prometheus . HistogramOpts {
Namespace : namespace ,
Subsystem : subsystem ,
Name : "store_latency" ,
Help : "Latency in seconds of accessing the bloom store component" ,
} , append ( labels , "operation" ) ) ,
}
}
@ -78,18 +60,18 @@ type worker struct {
id string
cfg workerConfig
queue * queue . RequestQueue
shipper bloomshipper . Interfac e
store bloomshipper . Stor e
pending * pendingTasks
logger log . Logger
metrics * workerMetrics
}
func newWorker ( id string , cfg workerConfig , queue * queue . RequestQueue , shipper bloomshipper . Interfac e , pending * pendingTasks , logger log . Logger , metrics * workerMetrics ) * worker {
func newWorker ( id string , cfg workerConfig , queue * queue . RequestQueue , store bloomshipper . Stor e , pending * pendingTasks , logger log . Logger , metrics * workerMetrics ) * worker {
w := & worker {
id : id ,
cfg : cfg ,
queue : queue ,
shipper : shipper ,
store : store ,
pending : pending ,
logger : log . With ( logger , "worker" , id ) ,
metrics : metrics ,
@ -107,6 +89,8 @@ func (w *worker) starting(_ context.Context) error {
func ( w * worker ) running ( _ context . Context ) error {
idx := queue . StartIndexWithLocalQueue
p := processor { store : w . store , logger : w . logger }
for st := w . State ( ) ; st == services . Running || st == services . Stopping ; {
taskCtx := context . Background ( )
dequeueStart := time . Now ( )
@ -128,8 +112,7 @@ func (w *worker) running(_ context.Context) error {
}
w . metrics . dequeuedTasks . WithLabelValues ( w . id ) . Add ( float64 ( len ( items ) ) )
tasksPerDay := make ( map [ model . Time ] [ ] Task )
tasks := make ( [ ] Task , 0 , len ( items ) )
for _ , item := range items {
task , ok := item . ( Task )
if ! ok {
@ -139,91 +122,12 @@ func (w *worker) running(_ context.Context) error {
}
level . Debug ( w . logger ) . Log ( "msg" , "dequeued task" , "task" , task . ID )
w . pending . Delete ( task . ID )
tasksPerDay [ task . day ] = append ( tasksPerDay [ task . day ] , task )
tasks = append ( tasks , task )
}
for day , tasks := range tasksPerDay {
// Remove tasks that are already cancelled
tasks = slices . DeleteFunc ( tasks , func ( t Task ) bool {
if res := t . ctx . Err ( ) ; res != nil {
t . CloseWithError ( res )
return true
}
return false
} )
// no tasks to process
// continue with tasks of next day
if len ( tasks ) == 0 {
continue
}
// interval is [Start, End)
interval := bloomshipper . NewInterval ( day , day . Add ( Day ) )
logger := log . With ( w . logger , "day" , day . Time ( ) , "tenant" , tasks [ 0 ] . Tenant )
level . Debug ( logger ) . Log ( "msg" , "process tasks" , "tasks" , len ( tasks ) )
storeFetchStart := time . Now ( )
blockRefs , err := w . shipper . GetBlockRefs ( taskCtx , tasks [ 0 ] . Tenant , interval )
w . metrics . storeAccessLatency . WithLabelValues ( w . id , "GetBlockRefs" ) . Observe ( time . Since ( storeFetchStart ) . Seconds ( ) )
if err != nil {
for _ , t := range tasks {
t . CloseWithError ( err )
}
// continue with tasks of next day
continue
}
if len ( tasks ) == 0 {
continue
}
// No blocks found.
// Since there are no blocks for the given tasks, we need to return the
// unfiltered list of chunk refs.
if len ( blockRefs ) == 0 {
level . Warn ( logger ) . Log ( "msg" , "no blocks found" )
for _ , t := range tasks {
t . Close ( )
}
// continue with tasks of next day
continue
}
// Remove tasks that are already cancelled
tasks = slices . DeleteFunc ( tasks , func ( t Task ) bool {
if res := t . ctx . Err ( ) ; res != nil {
t . CloseWithError ( res )
return true
}
return false
} )
// no tasks to process
// continue with tasks of next day
if len ( tasks ) == 0 {
continue
}
tasksForBlocks := partitionFingerprintRange ( tasks , blockRefs )
blockRefs = blockRefs [ : 0 ]
for _ , b := range tasksForBlocks {
blockRefs = append ( blockRefs , b . blockRef )
}
err = w . processBlocksWithCallback ( taskCtx , tasks [ 0 ] . Tenant , blockRefs , tasksForBlocks )
if err != nil {
for _ , t := range tasks {
t . CloseWithError ( err )
}
// continue with tasks of next day
continue
}
// all tasks for this day are done.
// close them to notify the request handler
for _ , task := range tasks {
task . Close ( )
}
err = p . run ( taskCtx , tasks )
if err != nil {
level . Error ( w . logger ) . Log ( "msg" , "failed to process tasks" , "err" , err )
}
// return dequeued items back to the pool
@ -238,41 +142,3 @@ func (w *worker) stopping(err error) error {
w . queue . UnregisterConsumerConnection ( w . id )
return nil
}
func ( w * worker ) processBlocksWithCallback ( taskCtx context . Context , tenant string , blockRefs [ ] bloomshipper . BlockRef , boundedRefs [ ] boundedTasks ) error {
return w . shipper . ForEach ( taskCtx , tenant , blockRefs , func ( bq * v1 . BlockQuerier , bounds v1 . FingerprintBounds ) error {
for _ , b := range boundedRefs {
if b . blockRef . Bounds . Equal ( bounds ) {
return w . processBlock ( bq , b . tasks )
}
}
return nil
} )
}
func ( w * worker ) processBlock ( blockQuerier * v1 . BlockQuerier , tasks [ ] Task ) error {
schema , err := blockQuerier . Schema ( )
if err != nil {
return err
}
tokenizer := v1 . NewNGramTokenizer ( schema . NGramLen ( ) , 0 )
iters := make ( [ ] v1 . PeekingIterator [ v1 . Request ] , 0 , len ( tasks ) )
for _ , task := range tasks {
it := v1 . NewPeekingIter ( task . RequestIter ( tokenizer ) )
iters = append ( iters , it )
}
fq := blockQuerier . Fuse ( iters )
start := time . Now ( )
err = fq . Run ( )
duration := time . Since ( start ) . Seconds ( )
if err != nil {
w . metrics . bloomQueryLatency . WithLabelValues ( w . id , "failure" ) . Observe ( duration )
return err
}
w . metrics . bloomQueryLatency . WithLabelValues ( w . id , "success" ) . Observe ( duration )
return nil
}