mirror of https://github.com/grafana/loki
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
481 lines
12 KiB
481 lines
12 KiB
package queryrange
|
|
|
|
import (
|
|
"context"
|
|
"net/http"
|
|
"slices"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/axiomhq/hyperloglog"
|
|
"github.com/dustin/go-humanize"
|
|
"github.com/grafana/dskit/httpgrpc"
|
|
"github.com/prometheus/prometheus/model/labels"
|
|
|
|
"github.com/grafana/loki/v3/pkg/logproto"
|
|
logql_log "github.com/grafana/loki/v3/pkg/logql/log"
|
|
"github.com/grafana/loki/v3/pkg/logql/syntax"
|
|
"github.com/grafana/loki/v3/pkg/logqlmodel"
|
|
"github.com/grafana/loki/v3/pkg/querier/plan"
|
|
base "github.com/grafana/loki/v3/pkg/querier/queryrange/queryrangebase"
|
|
"github.com/grafana/loki/v3/pkg/util/httpreq"
|
|
|
|
"github.com/grafana/loki/pkg/push"
|
|
)
|
|
|
|
func NewDetectedFieldsHandler(
|
|
limitedHandler base.Handler,
|
|
logHandler base.Handler,
|
|
limits Limits,
|
|
) base.Handler {
|
|
return base.HandlerFunc(
|
|
func(ctx context.Context, req base.Request) (base.Response, error) {
|
|
r, ok := req.(*DetectedFieldsRequest)
|
|
if !ok {
|
|
return nil, httpgrpc.Errorf(
|
|
http.StatusBadRequest,
|
|
"invalid request type, expected *DetectedFieldsRequest",
|
|
)
|
|
}
|
|
|
|
resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
re, ok := resp.(*LokiResponse)
|
|
if !ok || re.Status != "success" {
|
|
return resp, nil
|
|
}
|
|
|
|
var fields []*logproto.DetectedField
|
|
var values []string
|
|
|
|
if r.Values && r.Name != "" {
|
|
values = parseDetectedFieldValues(r.Limit, re.Data.Result, r.Name)
|
|
} else {
|
|
detectedFields := parseDetectedFields(r.Limit, re.Data.Result)
|
|
fields = make([]*logproto.DetectedField, len(detectedFields))
|
|
fieldCount := 0
|
|
for k, v := range detectedFields {
|
|
p := v.parsers
|
|
if len(p) == 0 {
|
|
p = nil
|
|
}
|
|
fields[fieldCount] = &logproto.DetectedField{
|
|
Label: k,
|
|
Type: v.fieldType,
|
|
Cardinality: v.Estimate(),
|
|
Parsers: p,
|
|
JsonPath: v.jsonPath,
|
|
}
|
|
|
|
fieldCount++
|
|
}
|
|
}
|
|
|
|
dfResp := DetectedFieldsResponse{
|
|
Response: &logproto.DetectedFieldsResponse{
|
|
Fields: fields,
|
|
Values: values,
|
|
},
|
|
Headers: re.Headers,
|
|
}
|
|
|
|
// Otherwise all they get is the field limit, which is a bit confusing
|
|
if len(fields) > 0 || len(values) > 0 {
|
|
dfResp.Response.Limit = r.GetLimit()
|
|
}
|
|
|
|
return &dfResp, nil
|
|
})
|
|
}
|
|
|
|
type bytesUnit []string
|
|
|
|
func (b bytesUnit) Contains(s string) bool {
|
|
for _, u := range b {
|
|
if strings.HasSuffix(s, u) {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
var allowedBytesUnits = bytesUnit{
|
|
"b",
|
|
"kib",
|
|
"kb",
|
|
"mib",
|
|
"mb",
|
|
"gib",
|
|
"gb",
|
|
"tib",
|
|
"tb",
|
|
"pib",
|
|
"pb",
|
|
"eib",
|
|
"eb",
|
|
"ki",
|
|
"k",
|
|
"mi",
|
|
"m",
|
|
"gi",
|
|
"g",
|
|
"ti",
|
|
"t",
|
|
"pi",
|
|
"p",
|
|
"ei",
|
|
"e",
|
|
}
|
|
|
|
func parseDetectedFieldValues(limit uint32, streams []push.Stream, name string) []string {
|
|
values := map[string]struct{}{}
|
|
for _, stream := range streams {
|
|
streamLbls, err := syntax.ParseLabels(stream.Labels)
|
|
if err != nil {
|
|
streamLbls = labels.EmptyLabels()
|
|
}
|
|
|
|
for _, entry := range stream.Entries {
|
|
if len(values) >= int(limit) {
|
|
break
|
|
}
|
|
|
|
structuredMetadata := getStructuredMetadata(entry)
|
|
if vals, ok := structuredMetadata[name]; ok {
|
|
for _, v := range vals {
|
|
values[v] = struct{}{}
|
|
}
|
|
}
|
|
|
|
entryLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, labels.StableHash(streamLbls))
|
|
parsedLabels, _ := parseEntry(entry, entryLbls)
|
|
if vals, ok := parsedLabels[name]; ok {
|
|
for _, v := range vals {
|
|
// special case bytes values, so they can be directly inserted into a query
|
|
if bs, err := humanize.ParseBytes(v); err == nil && allowedBytesUnits.Contains(strings.ToLower(v)) {
|
|
bsString := strings.Replace(humanize.Bytes(bs), " ", "", 1)
|
|
values[bsString] = struct{}{}
|
|
} else {
|
|
values[v] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
response := make([]string, 0, len(values))
|
|
for v := range values {
|
|
response = append(response, v)
|
|
}
|
|
|
|
return response
|
|
}
|
|
|
|
func makeDownstreamRequest(
|
|
ctx context.Context,
|
|
limits Limits,
|
|
limitedHandler, logHandler base.Handler,
|
|
req *DetectedFieldsRequest,
|
|
) (base.Response, error) {
|
|
expr, err := syntax.ParseLogSelector(req.Query, true)
|
|
if err != nil {
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
|
|
}
|
|
|
|
if err := validateMaxEntriesLimits(ctx, req.LineLimit, limits); err != nil {
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
|
|
}
|
|
|
|
if err := validateMatchers(ctx, limits, expr.Matchers()); err != nil {
|
|
return nil, httpgrpc.Errorf(http.StatusBadRequest, "%s", err.Error())
|
|
}
|
|
|
|
lokiReq := &LokiRequest{
|
|
Query: req.GetQuery(),
|
|
Step: req.GetStep(),
|
|
StartTs: req.GetStartTs(),
|
|
EndTs: req.GetEndTs(),
|
|
Direction: logproto.BACKWARD,
|
|
Limit: req.GetLineLimit(),
|
|
Path: "/loki/api/v1/query_range",
|
|
}
|
|
|
|
lokiReq.Plan = &plan.QueryPlan{
|
|
AST: expr,
|
|
}
|
|
|
|
// Note(twhitney): The logic for parsing detected fields relies on the Entry.Parsed field being populated.
|
|
// The behavior of populating Entry.Parsed is different in ingesters and stores.
|
|
// We need to set this header to make sure Entry.Parsed is populated when getting logs from the store.
|
|
// Entries from the head block in the ingester always have the Parsed field populated.
|
|
ctx = httpreq.InjectHeader(
|
|
ctx,
|
|
httpreq.LokiEncodingFlagsHeader,
|
|
(string)(httpreq.FlagCategorizeLabels),
|
|
)
|
|
if expr.HasFilter() {
|
|
return logHandler.Do(ctx, lokiReq)
|
|
}
|
|
return limitedHandler.Do(ctx, lokiReq)
|
|
}
|
|
|
|
type parsedFields struct {
|
|
sketch *hyperloglog.Sketch
|
|
fieldType logproto.DetectedFieldType
|
|
parsers []string
|
|
jsonPath []string // Original JSON path as an array of components (e.g., ["user", "id"] for field "user_id")
|
|
}
|
|
|
|
func newParsedFields(parsers []string) *parsedFields {
|
|
return &parsedFields{
|
|
sketch: hyperloglog.New(),
|
|
fieldType: logproto.DetectedFieldString,
|
|
parsers: parsers,
|
|
jsonPath: nil,
|
|
}
|
|
}
|
|
|
|
func (p *parsedFields) Insert(value string) {
|
|
p.sketch.Insert([]byte(value))
|
|
}
|
|
|
|
func (p *parsedFields) Estimate() uint64 {
|
|
return p.sketch.Estimate()
|
|
}
|
|
|
|
func (p *parsedFields) Marshal() ([]byte, error) {
|
|
return p.sketch.MarshalBinary()
|
|
}
|
|
|
|
func (p *parsedFields) DetermineType(value string) {
|
|
p.fieldType = determineType(value)
|
|
}
|
|
|
|
func determineType(value string) logproto.DetectedFieldType {
|
|
if _, err := strconv.ParseInt(value, 10, 64); err == nil {
|
|
return logproto.DetectedFieldInt
|
|
}
|
|
|
|
if _, err := strconv.ParseFloat(value, 64); err == nil {
|
|
return logproto.DetectedFieldFloat
|
|
}
|
|
|
|
if _, err := strconv.ParseBool(value); err == nil {
|
|
return logproto.DetectedFieldBoolean
|
|
}
|
|
|
|
if _, err := time.ParseDuration(value); err == nil {
|
|
return logproto.DetectedFieldDuration
|
|
}
|
|
|
|
if _, err := humanize.ParseBytes(value); err == nil {
|
|
return logproto.DetectedFieldBytes
|
|
}
|
|
|
|
return logproto.DetectedFieldString
|
|
}
|
|
|
|
func parseDetectedFields(limit uint32, streams logqlmodel.Streams) map[string]*parsedFields {
|
|
detectedFields := make(map[string]*parsedFields, limit)
|
|
fieldCount := uint32(0)
|
|
emtpyparsers := []string{}
|
|
|
|
for _, stream := range streams {
|
|
streamLbls, err := syntax.ParseLabels(stream.Labels)
|
|
if err != nil {
|
|
streamLbls = labels.EmptyLabels()
|
|
}
|
|
|
|
for _, entry := range stream.Entries {
|
|
structuredMetadata := getStructuredMetadata(entry)
|
|
for k, vals := range structuredMetadata {
|
|
df, ok := detectedFields[k]
|
|
if !ok && fieldCount < limit {
|
|
df = newParsedFields(emtpyparsers)
|
|
detectedFields[k] = df
|
|
fieldCount++
|
|
}
|
|
|
|
if df == nil {
|
|
continue
|
|
}
|
|
|
|
detectType := true
|
|
for _, v := range vals {
|
|
parsedFields := detectedFields[k]
|
|
if detectType {
|
|
// we don't want to determine the type for every line, so we assume the type in each stream will be the same, and re-detect the type for the next stream
|
|
parsedFields.DetermineType(v)
|
|
detectType = false
|
|
}
|
|
|
|
parsedFields.Insert(v)
|
|
}
|
|
}
|
|
|
|
entryLbls := logql_log.NewBaseLabelsBuilder().ForLabels(streamLbls, labels.StableHash(streamLbls))
|
|
parsedLabels, parsers := parseEntry(entry, entryLbls)
|
|
for k, vals := range parsedLabels {
|
|
df, ok := detectedFields[k]
|
|
if !ok && fieldCount < limit {
|
|
df = newParsedFields(parsers)
|
|
detectedFields[k] = df
|
|
fieldCount++
|
|
}
|
|
|
|
if df == nil {
|
|
continue
|
|
}
|
|
|
|
for _, parser := range parsers {
|
|
if !slices.Contains(df.parsers, parser) {
|
|
df.parsers = append(df.parsers, parser)
|
|
}
|
|
}
|
|
|
|
// If we parsed with JSON, check for a JSON path
|
|
if slices.Contains(parsers, "json") {
|
|
// Get the JSON path if it exists
|
|
df.jsonPath = entryLbls.GetJSONPath(k)
|
|
}
|
|
|
|
detectType := true
|
|
for _, v := range vals {
|
|
parsedFields := detectedFields[k]
|
|
if detectType {
|
|
// we don't want to determine the type for every line, so we assume the type in each stream will be the same, and re-detect the type for the next stream
|
|
parsedFields.DetermineType(v)
|
|
detectType = false
|
|
}
|
|
|
|
parsedFields.Insert(v)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return detectedFields
|
|
}
|
|
|
|
func getStructuredMetadata(entry push.Entry) map[string][]string {
|
|
labels := map[string]map[string]struct{}{}
|
|
for _, lbl := range entry.StructuredMetadata {
|
|
if values, ok := labels[lbl.Name]; ok {
|
|
values[lbl.Value] = struct{}{}
|
|
} else {
|
|
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
|
|
}
|
|
}
|
|
|
|
result := make(map[string][]string, len(labels))
|
|
for lbl, values := range labels {
|
|
vals := make([]string, 0, len(values))
|
|
for v := range values {
|
|
vals = append(vals, v)
|
|
}
|
|
result[lbl] = vals
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func parseEntry(entry push.Entry, lbls *logql_log.LabelsBuilder) (map[string][]string, []string) {
|
|
origParsed := getParsedLabels(entry)
|
|
|
|
// if the original query has any parser expressions, then we need to differentiate the
|
|
// original stream labels from any parsed labels
|
|
for name := range origParsed {
|
|
lbls.Del(name)
|
|
}
|
|
streamLbls := lbls.LabelsResult().Stream()
|
|
lblBuilder := lbls.ForLabels(streamLbls, labels.StableHash(streamLbls))
|
|
|
|
parsed := make(map[string][]string, len(origParsed))
|
|
for lbl, values := range origParsed {
|
|
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
|
|
lbl == logqlmodel.PreserveErrorLabel {
|
|
continue
|
|
}
|
|
|
|
parsed[lbl] = values
|
|
}
|
|
|
|
line := entry.Line
|
|
parser := "json"
|
|
jsonParser := logql_log.NewJSONParser(true)
|
|
_, jsonSuccess := jsonParser.Process(0, []byte(line), lblBuilder)
|
|
if !jsonSuccess || lblBuilder.HasErr() {
|
|
lblBuilder.Reset()
|
|
|
|
logFmtParser := logql_log.NewLogfmtParser(false, false)
|
|
parser = "logfmt"
|
|
_, logfmtSuccess := logFmtParser.Process(0, []byte(line), lblBuilder)
|
|
if !logfmtSuccess || lblBuilder.HasErr() {
|
|
return parsed, nil
|
|
}
|
|
}
|
|
|
|
parsedLabels := map[string]map[string]struct{}{}
|
|
for lbl, values := range parsed {
|
|
if vals, ok := parsedLabels[lbl]; ok {
|
|
for _, value := range values {
|
|
vals[value] = struct{}{}
|
|
}
|
|
} else {
|
|
parsedLabels[lbl] = map[string]struct{}{}
|
|
for _, value := range values {
|
|
parsedLabels[lbl][value] = struct{}{}
|
|
}
|
|
}
|
|
}
|
|
|
|
lblsResult := lblBuilder.LabelsResult().Parsed()
|
|
lblsResult.Range(func(lbl labels.Label) {
|
|
if values, ok := parsedLabels[lbl.Name]; ok {
|
|
values[lbl.Value] = struct{}{}
|
|
} else {
|
|
parsedLabels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
|
|
}
|
|
})
|
|
|
|
result := make(map[string][]string, len(parsedLabels))
|
|
for lbl, values := range parsedLabels {
|
|
if lbl == logqlmodel.ErrorLabel || lbl == logqlmodel.ErrorDetailsLabel ||
|
|
lbl == logqlmodel.PreserveErrorLabel {
|
|
continue
|
|
}
|
|
vals := make([]string, 0, len(values))
|
|
for v := range values {
|
|
vals = append(vals, v)
|
|
}
|
|
result[lbl] = vals
|
|
}
|
|
|
|
return result, []string{parser}
|
|
}
|
|
|
|
func getParsedLabels(entry push.Entry) map[string][]string {
|
|
labels := map[string]map[string]struct{}{}
|
|
for _, lbl := range entry.Parsed {
|
|
if values, ok := labels[lbl.Name]; ok {
|
|
values[lbl.Value] = struct{}{}
|
|
} else {
|
|
labels[lbl.Name] = map[string]struct{}{lbl.Value: {}}
|
|
}
|
|
}
|
|
|
|
result := make(map[string][]string, len(labels))
|
|
for lbl, values := range labels {
|
|
vals := make([]string, 0, len(values))
|
|
for v := range values {
|
|
vals = append(vals, v)
|
|
}
|
|
result[lbl] = vals
|
|
}
|
|
|
|
return result
|
|
}
|
|
|