mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
280 lines
7.3 KiB
280 lines
7.3 KiB
package queryrange
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/grafana/dskit/httpgrpc"
|
|
"github.com/grafana/dskit/tenant"
|
|
"github.com/pkg/errors"
|
|
"github.com/prometheus/client_golang/prometheus"
|
|
"github.com/prometheus/client_golang/prometheus/promauto"
|
|
"github.com/prometheus/common/model"
|
|
attribute "go.opentelemetry.io/otel/attribute"
|
|
"go.opentelemetry.io/otel/trace"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
"github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
|
|
"github.com/grafana/loki/v3/pkg/storage/config"
|
|
"github.com/grafana/loki/v3/pkg/util/constants"
|
|
"github.com/grafana/loki/v3/pkg/util/validation"
|
|
)
|
|
|
|
type lokiResult struct {
|
|
req queryrangebase.Request
|
|
ch chan *packedResp
|
|
}
|
|
|
|
type packedResp struct {
|
|
resp queryrangebase.Response
|
|
err error
|
|
}
|
|
|
|
type SplitByMetrics struct {
|
|
splits prometheus.Histogram
|
|
}
|
|
|
|
func NewSplitByMetrics(r prometheus.Registerer) *SplitByMetrics {
|
|
return &SplitByMetrics{
|
|
splits: promauto.With(r).NewHistogram(prometheus.HistogramOpts{
|
|
Namespace: constants.Loki,
|
|
Name: "query_frontend_partitions",
|
|
Help: "Number of time-based partitions (sub-requests) per request",
|
|
Buckets: prometheus.ExponentialBuckets(1, 4, 5), // 1 -> 1024
|
|
}),
|
|
}
|
|
}
|
|
|
|
type splitByInterval struct {
|
|
configs []config.PeriodConfig
|
|
next queryrangebase.Handler
|
|
limits Limits
|
|
merger queryrangebase.Merger
|
|
metrics *SplitByMetrics
|
|
splitter splitter
|
|
}
|
|
|
|
// SplitByIntervalMiddleware creates a new Middleware that splits log requests by a given interval.
|
|
func SplitByIntervalMiddleware(configs []config.PeriodConfig, limits Limits, merger queryrangebase.Merger, splitter splitter, metrics *SplitByMetrics) queryrangebase.Middleware {
|
|
if metrics == nil {
|
|
metrics = NewSplitByMetrics(nil)
|
|
}
|
|
|
|
return queryrangebase.MiddlewareFunc(func(next queryrangebase.Handler) queryrangebase.Handler {
|
|
return &splitByInterval{
|
|
configs: configs,
|
|
next: next,
|
|
limits: limits,
|
|
merger: merger,
|
|
metrics: metrics,
|
|
splitter: splitter,
|
|
}
|
|
})
|
|
}
|
|
|
|
func (h *splitByInterval) Feed(ctx context.Context, input []*lokiResult) chan *lokiResult {
|
|
ch := make(chan *lokiResult)
|
|
|
|
go func() {
|
|
defer close(ch)
|
|
for _, d := range input {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case ch <- d:
|
|
continue
|
|
}
|
|
}
|
|
}()
|
|
|
|
return ch
|
|
}
|
|
|
|
func (h *splitByInterval) Process(
|
|
ctx context.Context,
|
|
parallelism int,
|
|
threshold int64,
|
|
input []*lokiResult,
|
|
maxSeries int,
|
|
) ([]queryrangebase.Response, error) {
|
|
var responses []queryrangebase.Response
|
|
ctx, cancel := context.WithCancelCause(ctx)
|
|
defer cancel(errors.New("split by interval process canceled"))
|
|
|
|
ch := h.Feed(ctx, input)
|
|
|
|
// queries with 0 limits should not be exited early
|
|
var unlimited bool
|
|
if threshold == 0 {
|
|
unlimited = true
|
|
}
|
|
|
|
// Parallelism will be at least 1
|
|
p := max(parallelism, 1)
|
|
// don't spawn unnecessary goroutines
|
|
if len(input) < parallelism {
|
|
p = len(input)
|
|
}
|
|
|
|
// per request wrapped handler for limiting the amount of series.
|
|
next := newSeriesLimiter(maxSeries).Wrap(h.next)
|
|
for i := 0; i < p; i++ {
|
|
go h.loop(ctx, ch, next)
|
|
}
|
|
|
|
for _, x := range input {
|
|
select {
|
|
case <-ctx.Done():
|
|
return nil, ctx.Err()
|
|
case data := <-x.ch:
|
|
if data.err != nil {
|
|
return nil, data.err
|
|
}
|
|
|
|
responses = append(responses, data.resp)
|
|
|
|
// see if we can exit early if a limit has been reached
|
|
if casted, ok := data.resp.(*LokiResponse); !unlimited && ok {
|
|
threshold -= casted.Count()
|
|
|
|
if threshold <= 0 {
|
|
return responses, nil
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
}
|
|
|
|
return responses, nil
|
|
}
|
|
|
|
func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next queryrangebase.Handler) {
|
|
for data := range ch {
|
|
|
|
ctx, sp := tracer.Start(ctx, "interval")
|
|
if sp.SpanContext().IsSampled() {
|
|
data.req.LogToSpan(sp)
|
|
}
|
|
|
|
resp, err := next.Do(ctx, data.req)
|
|
sp.End()
|
|
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case data.ch <- &packedResp{resp, err}:
|
|
// The parent Process method will return on the first error. So stop
|
|
// processng.
|
|
if err != nil {
|
|
return
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
func (h *splitByInterval) Do(ctx context.Context, r queryrangebase.Request) (queryrangebase.Response, error) {
|
|
tenantIDs, err := tenant.TenantIDs(ctx)
|
|
if err != nil {
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
|
|
}
|
|
|
|
var interval time.Duration
|
|
switch r.(type) {
|
|
case *LokiSeriesRequest, *LabelRequest:
|
|
interval = validation.MaxDurationOrZeroPerTenant(tenantIDs, h.limits.MetadataQuerySplitDuration)
|
|
default:
|
|
interval = validation.SmallestPositiveNonZeroDurationPerTenant(tenantIDs, h.limits.QuerySplitDuration)
|
|
}
|
|
|
|
// skip split by if unset
|
|
if interval == 0 {
|
|
return h.next.Do(ctx, r)
|
|
}
|
|
|
|
intervals := h.splitter.split(time.Now().UTC(), tenantIDs, r, interval)
|
|
|
|
h.metrics.splits.Observe(float64(len(intervals)))
|
|
|
|
// no interval should not be processed by the frontend.
|
|
if len(intervals) == 0 {
|
|
return h.next.Do(ctx, r)
|
|
}
|
|
|
|
trace.SpanFromContext(ctx).SetAttributes(attribute.Int("n_intervals", len(intervals)))
|
|
|
|
if len(intervals) == 1 {
|
|
return h.next.Do(ctx, intervals[0])
|
|
}
|
|
|
|
var limit int64
|
|
switch req := r.(type) {
|
|
case *LokiRequest:
|
|
limit = int64(req.Limit)
|
|
if req.Direction == logproto.BACKWARD {
|
|
for i, j := 0, len(intervals)-1; i < j; i, j = i+1, j-1 {
|
|
intervals[i], intervals[j] = intervals[j], intervals[i]
|
|
}
|
|
}
|
|
case *DetectedFieldsRequest:
|
|
limit = int64(req.LineLimit)
|
|
for i, j := 0, len(intervals)-1; i < j; i, j = i+1, j-1 {
|
|
intervals[i], intervals[j] = intervals[j], intervals[i]
|
|
}
|
|
case *LokiSeriesRequest, *LabelRequest, *logproto.IndexStatsRequest, *logproto.VolumeRequest, *logproto.ShardsRequest, *DetectedLabelsRequest:
|
|
// Set this to 0 since this is not used in Series/Labels/Index Request.
|
|
limit = 0
|
|
default:
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, "unknown request type")
|
|
}
|
|
|
|
input := make([]*lokiResult, 0, len(intervals))
|
|
for _, interval := range intervals {
|
|
input = append(input, &lokiResult{
|
|
req: interval,
|
|
ch: make(chan *packedResp),
|
|
})
|
|
}
|
|
|
|
maxSeriesCapture := func(id string) int { return h.limits.MaxQuerySeries(ctx, id) }
|
|
maxSeries := validation.SmallestPositiveIntPerTenant(tenantIDs, maxSeriesCapture)
|
|
maxParallelism := MinWeightedParallelism(ctx, tenantIDs, h.configs, h.limits, model.Time(r.GetStart().UnixMilli()), model.Time(r.GetEnd().UnixMilli()))
|
|
resps, err := h.Process(ctx, maxParallelism, limit, input, maxSeries)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
return h.merger.MergeResponse(resps...)
|
|
}
|
|
|
|
// maxRangeVectorAndOffsetDurationFromQueryString
|
|
func maxRangeVectorAndOffsetDurationFromQueryString(q string) (time.Duration, time.Duration, error) {
|
|
parsed, err := syntax.ParseExpr(q)
|
|
if err != nil {
|
|
return 0, 0, err
|
|
}
|
|
dur, offset := maxRangeVectorAndOffsetDuration(parsed)
|
|
return dur, offset, nil
|
|
}
|
|
|
|
// maxRangeVectorAndOffsetDuration returns the maximum range vector and offset duration within a LogQL query.
|
|
func maxRangeVectorAndOffsetDuration(expr syntax.Expr) (time.Duration, time.Duration) {
|
|
if _, ok := expr.(syntax.SampleExpr); !ok {
|
|
return 0, 0
|
|
}
|
|
|
|
var maxRVDuration, maxOffset time.Duration
|
|
expr.Walk(func(e syntax.Expr) bool {
|
|
if r, ok := e.(*syntax.LogRangeExpr); ok {
|
|
if r.Interval > maxRVDuration {
|
|
maxRVDuration = r.Interval
|
|
}
|
|
if r.Offset > maxOffset {
|
|
maxOffset = r.Offset
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
return maxRVDuration, maxOffset
|
|
}
|
|
|