|
|
|
@ -202,16 +202,34 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
chunks := h.getChunkSeriesSet(ctx, query, filteredMatchers) |
|
|
|
|
if err := chunks.Err(); err != nil { |
|
|
|
|
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
defer func() { |
|
|
|
|
if err := querier.Close(); err != nil { |
|
|
|
|
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var hints *storage.SelectHints |
|
|
|
|
if query.Hints != nil { |
|
|
|
|
hints = &storage.SelectHints{ |
|
|
|
|
Start: query.Hints.StartMs, |
|
|
|
|
End: query.Hints.EndMs, |
|
|
|
|
Step: query.Hints.StepMs, |
|
|
|
|
Func: query.Hints.Func, |
|
|
|
|
Grouping: query.Hints.Grouping, |
|
|
|
|
Range: query.Hints.RangeMs, |
|
|
|
|
By: query.Hints.By, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
ws, err := StreamChunkedReadResponses( |
|
|
|
|
NewChunkedWriter(w, f), |
|
|
|
|
int64(i), |
|
|
|
|
// The streaming API has to provide the series sorted.
|
|
|
|
|
chunks, |
|
|
|
|
querier.Select(ctx, true, hints, filteredMatchers...), |
|
|
|
|
sortedExternalLabels, |
|
|
|
|
h.remoteReadMaxBytesInFrame, |
|
|
|
|
h.marshalPool, |
|
|
|
@ -236,35 +254,6 @@ func (h *readHandler) remoteReadStreamedXORChunks(ctx context.Context, w http.Re |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// getChunkSeriesSet executes a query to retrieve a ChunkSeriesSet,
|
|
|
|
|
// encapsulating the operation in its own function to ensure timely release of
|
|
|
|
|
// the querier resources.
|
|
|
|
|
func (h *readHandler) getChunkSeriesSet(ctx context.Context, query *prompb.Query, filteredMatchers []*labels.Matcher) storage.ChunkSeriesSet { |
|
|
|
|
querier, err := h.queryable.ChunkQuerier(query.StartTimestampMs, query.EndTimestampMs) |
|
|
|
|
if err != nil { |
|
|
|
|
return storage.ErrChunkSeriesSet(err) |
|
|
|
|
} |
|
|
|
|
defer func() { |
|
|
|
|
if err := querier.Close(); err != nil { |
|
|
|
|
level.Warn(h.logger).Log("msg", "Error on chunk querier close", "err", err.Error()) |
|
|
|
|
} |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
var hints *storage.SelectHints |
|
|
|
|
if query.Hints != nil { |
|
|
|
|
hints = &storage.SelectHints{ |
|
|
|
|
Start: query.Hints.StartMs, |
|
|
|
|
End: query.Hints.EndMs, |
|
|
|
|
Step: query.Hints.StepMs, |
|
|
|
|
Func: query.Hints.Func, |
|
|
|
|
Grouping: query.Hints.Grouping, |
|
|
|
|
Range: query.Hints.RangeMs, |
|
|
|
|
By: query.Hints.By, |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
return querier.Select(ctx, true, hints, filteredMatchers...) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// filterExtLabelsFromMatchers change equality matchers which match external labels
|
|
|
|
|
// to a matcher that looks for an empty label,
|
|
|
|
|
// as that label should not be present in the storage.
|
|
|
|
|