|
|
|
@ -418,7 +418,7 @@ func (g *Gateway) boundedShards( |
|
|
|
|
// sending multiple requests to the entire keyspace).
|
|
|
|
|
|
|
|
|
|
logger := util_log.WithContext(ctx, g.log) |
|
|
|
|
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.getShardsWithBlooms") |
|
|
|
|
sp, ctx := opentracing.StartSpanFromContext(ctx, "indexgateway.boundedShards") |
|
|
|
|
defer sp.Finish() |
|
|
|
|
|
|
|
|
|
// 1) for all bounds, get chunk refs
|
|
|
|
@ -431,6 +431,11 @@ func (g *Gateway) boundedShards( |
|
|
|
|
for _, g := range grps { |
|
|
|
|
ct += len(g) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
sp.LogKV( |
|
|
|
|
"stage", "queried local index", |
|
|
|
|
"index_chunks_resolved", ct, |
|
|
|
|
) |
|
|
|
|
// TODO(owen-d): pool
|
|
|
|
|
refs := make([]*logproto.ChunkRef, 0, ct) |
|
|
|
|
|
|
|
|
@ -443,11 +448,15 @@ func (g *Gateway) boundedShards( |
|
|
|
|
filtered := refs |
|
|
|
|
|
|
|
|
|
// 2) filter via blooms if enabled
|
|
|
|
|
if g.bloomQuerier != nil && len(syntax.ExtractLineFilters(p.Plan().AST)) > 0 { |
|
|
|
|
filters := syntax.ExtractLineFilters(p.Plan().AST) |
|
|
|
|
if g.bloomQuerier != nil && len(filters) > 0 { |
|
|
|
|
filtered, err = g.bloomQuerier.FilterChunkRefs(ctx, instanceID, req.From, req.Through, refs, p.Plan()) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
sp.LogKV( |
|
|
|
|
"stage", "queried bloom gateway", |
|
|
|
|
) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
g.metrics.preFilterChunks.WithLabelValues(routeShards).Observe(float64(ct)) |
|
|
|
@ -508,6 +517,7 @@ func (g *Gateway) boundedShards( |
|
|
|
|
"through", req.Through.Time().String(), |
|
|
|
|
"length", req.Through.Time().Sub(req.From.Time()).String(), |
|
|
|
|
"end_delta", time.Since(req.Through.Time()).String(), |
|
|
|
|
"filters", len(filters), |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
// 3) build shards
|
|
|
|
|