|
|
|
@ -24,9 +24,13 @@ func SplitByIntervalMiddleware(interval time.Duration, limits queryrange.Limits, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type lokiResult struct { |
|
|
|
|
req queryrange.Request |
|
|
|
|
resp chan queryrange.Response |
|
|
|
|
err chan error |
|
|
|
|
req queryrange.Request |
|
|
|
|
ch chan *packedResp |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type packedResp struct { |
|
|
|
|
resp queryrange.Response |
|
|
|
|
err error |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type splitByInterval struct { |
|
|
|
@ -79,14 +83,15 @@ func (h *splitByInterval) Process( |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
return nil, ctx.Err() |
|
|
|
|
case err := <-x.err: |
|
|
|
|
return nil, err |
|
|
|
|
case resp := <-x.resp: |
|
|
|
|
case data := <-x.ch: |
|
|
|
|
if data.err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
responses = append(responses, resp) |
|
|
|
|
responses = append(responses, data.resp) |
|
|
|
|
|
|
|
|
|
// see if we can exit early if a limit has been reached
|
|
|
|
|
threshold -= resp.(*LokiResponse).Count() |
|
|
|
|
threshold -= data.resp.(*LokiResponse).Count() |
|
|
|
|
if threshold <= 0 { |
|
|
|
|
return responses, nil |
|
|
|
|
} |
|
|
|
@ -105,12 +110,14 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult) { |
|
|
|
|
queryrange.LogToSpan(ctx, data.req) |
|
|
|
|
|
|
|
|
|
resp, err := h.next.Do(ctx, data.req) |
|
|
|
|
if err != nil { |
|
|
|
|
data.err <- err |
|
|
|
|
} else { |
|
|
|
|
data.resp <- resp |
|
|
|
|
|
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
sp.Finish() |
|
|
|
|
return |
|
|
|
|
case data.ch <- &packedResp{resp, err}: |
|
|
|
|
sp.Finish() |
|
|
|
|
} |
|
|
|
|
sp.Finish() |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
@ -138,9 +145,8 @@ func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryra |
|
|
|
|
input := make([]*lokiResult, 0, len(intervals)) |
|
|
|
|
for _, interval := range intervals { |
|
|
|
|
input = append(input, &lokiResult{ |
|
|
|
|
req: interval, |
|
|
|
|
resp: make(chan queryrange.Response, 1), |
|
|
|
|
err: make(chan error, 1), |
|
|
|
|
req: interval, |
|
|
|
|
ch: make(chan *packedResp), |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|