|
|
|
|
@ -14,6 +14,7 @@ import ( |
|
|
|
|
"github.com/grafana/loki/v3/pkg/logql" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/logql/log" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax" |
|
|
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel/stats" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
@ -71,6 +72,9 @@ func newEntryIterator(ctx context.Context, |
|
|
|
|
streamHash uint64 |
|
|
|
|
top = newTopK(int(req.Limit), req.Direction) |
|
|
|
|
) |
|
|
|
|
statistics := stats.FromContext(ctx) |
|
|
|
|
// For dataobjs, this maps to sections downloaded
|
|
|
|
|
statistics.AddChunksDownloaded(1) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
n, err := reader.Read(ctx, buf) |
|
|
|
|
@ -94,10 +98,13 @@ func newEntryIterator(ctx context.Context, |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
timestamp := record.Timestamp.UnixNano() |
|
|
|
|
statistics.AddDecompressedLines(1) |
|
|
|
|
line, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...) |
|
|
|
|
if !ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
statistics.AddPostFilterLines(1) |
|
|
|
|
|
|
|
|
|
var metadata []logproto.LabelAdapter |
|
|
|
|
if len(record.Metadata) > 0 { |
|
|
|
|
metadata = logproto.FromLabelsToLabelAdapters(record.Metadata) |
|
|
|
|
@ -293,6 +300,10 @@ func newSampleIterator(ctx context.Context, |
|
|
|
|
streamHash uint64 |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
statistics := stats.FromContext(ctx) |
|
|
|
|
// For dataobjs, this maps to sections downloaded
|
|
|
|
|
statistics.AddChunksDownloaded(1) |
|
|
|
|
|
|
|
|
|
for { |
|
|
|
|
n, err := reader.Read(ctx, buf) |
|
|
|
|
if err != nil && err != io.EOF { |
|
|
|
|
@ -328,10 +339,12 @@ func newSampleIterator(ctx context.Context, |
|
|
|
|
// TODO(twhitney): when iterating over multiple extractors, we need a way to pre-process as much of the line as possible
|
|
|
|
|
// In the case of multi-variant expressions, the only difference between the multiple extractors should be the final value, with all
|
|
|
|
|
// other filters and processing already done.
|
|
|
|
|
statistics.AddDecompressedLines(1) |
|
|
|
|
value, parsedLabels, ok := streamExtractor.Process(timestamp, record.Line, record.Metadata...) |
|
|
|
|
if !ok { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
statistics.AddPostFilterLines(1) |
|
|
|
|
|
|
|
|
|
// Get or create series for the parsed labels
|
|
|
|
|
labelString := parsedLabels.String() |
|
|
|
|
|