diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index c81d6f5dfb..0d0752b1af 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -3336,6 +3336,11 @@ The `limits_config` block configures global and per-tenant limits in Loki. The v # CLI flag: -validation.discover-log-levels [discover_log_levels: | default = true] +# Field name to use for log levels. If not set, log level would be detected +# based on pre-defined labels as mentioned above. +# CLI flag: -validation.log-level-fields +[log_level_fields: | default = [level LEVEL Level Severity severity SEVERITY lvl LVL Lvl]] + # When true an ingester takes into account only the streams that it owns # according to the ring while applying the stream limit. # CLI flag: -ingester.use-owned-stream-count diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 6fb7bc3fc7..2da6786a47 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1,7 +1,6 @@ package distributor import ( - "bytes" "context" "flag" "fmt" @@ -12,16 +11,12 @@ import ( "strings" "sync" "time" - "unicode" - "unsafe" - "github.com/buger/jsonparser" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/gogo/status" "github.com/prometheus/prometheus/model/labels" "github.com/twmb/franz-go/pkg/kgo" - "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc/codes" "github.com/grafana/dskit/httpgrpc" @@ -50,7 +45,6 @@ import ( kafka_client "github.com/grafana/loki/v3/pkg/kafka/client" "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" - "github.com/grafana/loki/v3/pkg/logql/log/logfmt" "github.com/grafana/loki/v3/pkg/logql/syntax" "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/util" @@ -71,12 +65,6 @@ var ( rfStats = analytics.NewInt("distributor_replication_factor") ) -var allowedLabelsForLevel = map[string]struct{}{ - "level": {}, "LEVEL": {}, "Level": {}, - "severity": {}, "SEVERITY": {}, "Severity": {}, - "lvl": {}, "LVL": {}, "Lvl": {}, -} - // Config for a Distributor. type Config struct { // Distributors ring @@ -447,6 +435,8 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log var validationErrors util.GroupedErrors validationContext := d.validator.getValidationContextForTime(time.Now(), tenantID) + levelDetector := newLevelDetector(validationContext) + shouldDiscoverLevels := levelDetector.shouldDiscoverLogLevels() func() { sp := opentracing.SpanFromContext(ctx) @@ -480,8 +470,6 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log pushSize := 0 prevTs := stream.Entries[0].Timestamp - shouldDiscoverLevels := validationContext.allowStructuredMetadata && validationContext.discoverLogLevels - levelFromLabel, hasLevelLabel := hasAnyLevelLabels(lbs) for _, entry := range stream.Entries { if err := d.validator.ValidateEntry(ctx, validationContext, lbs, entry); err != nil { d.writeFailuresManager.Log(tenantID, err) @@ -491,19 +479,9 @@ func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*log structuredMetadata := logproto.FromLabelAdaptersToLabels(entry.StructuredMetadata) if shouldDiscoverLevels { - var logLevel string - if hasLevelLabel { - logLevel = levelFromLabel - } else if levelFromMetadata, ok := hasAnyLevelLabels(structuredMetadata); ok { - logLevel = levelFromMetadata - } else { - logLevel = detectLogLevelFromLogEntry(entry, structuredMetadata) - } - if logLevel != "" { - entry.StructuredMetadata = append(entry.StructuredMetadata, logproto.LabelAdapter{ - Name: constants.LevelLabel, - Value: logLevel, - }) + logLevel, ok := levelDetector.extractLogLevel(lbs, structuredMetadata, entry) + if ok { + entry.StructuredMetadata = append(entry.StructuredMetadata, logLevel) } } stream.Entries[n] = entry @@ -712,15 +690,6 @@ func (d *Distributor) trackDiscardedData( } } -func hasAnyLevelLabels(l labels.Labels) (string, bool) { - for lbl := range allowedLabelsForLevel { - if l.Has(lbl) { - return l.Get(lbl), true - } - } - return "", false -} - // shardStream shards (divides) the given stream into N smaller streams, where // N is the sharding size for the given stream. shardSteam returns the smaller // streams and their associated keys for hashing to ingesters. @@ -1106,129 +1075,3 @@ func newRingAndLifecycler(cfg RingConfig, instanceCount *atomic.Uint32, logger l func (d *Distributor) HealthyInstancesCount() int { return int(d.healthyInstancesCount.Load()) } - -func detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string { - // otlp logs have a severity number, using which we are defining the log levels. - // Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber - if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" { - otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt) - if err != nil { - return constants.LogLevelInfo - } - if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) { - return constants.LogLevelUnknown - } else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) { - return constants.LogLevelTrace - } else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) { - return constants.LogLevelDebug - } else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) { - return constants.LogLevelInfo - } else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) { - return constants.LogLevelWarn - } else if otlpSeverityNumber <= int(plog.SeverityNumberError4) { - return constants.LogLevelError - } else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) { - return constants.LogLevelFatal - } - return constants.LogLevelUnknown - } - - return extractLogLevelFromLogLine(entry.Line) -} - -func extractLogLevelFromLogLine(log string) string { - logSlice := unsafe.Slice(unsafe.StringData(log), len(log)) - var v []byte - if isJSON(log) { - v = getValueUsingJSONParser(logSlice) - } else { - v = getValueUsingLogfmtParser(logSlice) - } - - switch { - case bytes.EqualFold(v, []byte("trace")), bytes.EqualFold(v, []byte("trc")): - return constants.LogLevelTrace - case bytes.EqualFold(v, []byte("debug")), bytes.EqualFold(v, []byte("dbg")): - return constants.LogLevelDebug - case bytes.EqualFold(v, []byte("info")), bytes.EqualFold(v, []byte("inf")): - return constants.LogLevelInfo - case bytes.EqualFold(v, []byte("warn")), bytes.EqualFold(v, []byte("wrn")), bytes.EqualFold(v, []byte("warning")): - return constants.LogLevelWarn - case bytes.EqualFold(v, []byte("error")), bytes.EqualFold(v, []byte("err")): - return constants.LogLevelError - case bytes.EqualFold(v, []byte("critical")): - return constants.LogLevelCritical - case bytes.EqualFold(v, []byte("fatal")): - return constants.LogLevelFatal - default: - return detectLevelFromLogLine(log) - } -} - -func getValueUsingLogfmtParser(line []byte) []byte { - equalIndex := bytes.Index(line, []byte("=")) - if len(line) == 0 || equalIndex == -1 { - return nil - } - - d := logfmt.NewDecoder(line) - for !d.EOL() && d.ScanKeyval() { - if _, ok := allowedLabelsForLevel[string(d.Key())]; ok { - return (d.Value()) - } - } - return nil -} - -func getValueUsingJSONParser(log []byte) []byte { - for allowedLabel := range allowedLabelsForLevel { - l, _, _, err := jsonparser.Get(log, allowedLabel) - if err == nil { - return l - } - } - return nil -} - -func isJSON(line string) bool { - var firstNonSpaceChar rune - for _, char := range line { - if !unicode.IsSpace(char) { - firstNonSpaceChar = char - break - } - } - - var lastNonSpaceChar rune - for i := len(line) - 1; i >= 0; i-- { - char := rune(line[i]) - if !unicode.IsSpace(char) { - lastNonSpaceChar = char - break - } - } - - return firstNonSpaceChar == '{' && lastNonSpaceChar == '}' -} - -func detectLevelFromLogLine(log string) string { - if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") || - strings.Contains(log, "info") || strings.Contains(log, "INFO") { - return constants.LogLevelInfo - } - if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") || - strings.Contains(log, "error") || strings.Contains(log, "ERROR") { - return constants.LogLevelError - } - if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") || - strings.Contains(log, "warning") || strings.Contains(log, "WARNING") { - return constants.LogLevelWarn - } - if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") { - return constants.LogLevelCritical - } - if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") { - return constants.LogLevelDebug - } - return constants.LogLevelUnknown -} diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index f724729025..c3bf70d253 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -28,12 +28,9 @@ import ( "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "github.com/twmb/franz-go/pkg/kgo" - "go.opentelemetry.io/collector/pdata/plog" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/grafana/loki/pkg/push" - "github.com/grafana/loki/v3/pkg/ingester" "github.com/grafana/loki/v3/pkg/ingester/client" loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" @@ -1611,300 +1608,3 @@ func TestDistributorTee(t *testing.T) { require.Equal(t, "test", tee.tenant) } } - -func Test_DetectLogLevels(t *testing.T) { - setup := func(discoverLogLevels bool) (*validation.Limits, *mockIngester) { - limits := &validation.Limits{} - flagext.DefaultValues(limits) - - limits.DiscoverLogLevels = discoverLogLevels - limits.AllowStructuredMetadata = true - return limits, &mockIngester{} - } - - t.Run("log level detection disabled", func(t *testing.T) { - limits, ingester := setup(false) - distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) - _, err := distributors[0].Push(ctx, writeReq) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) - require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0) - }) - - t.Run("log level detection enabled but level cannot be detected", func(t *testing.T) { - limits, ingester := setup(true) - distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) - _, err := distributors[0].Push(ctx, writeReq) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) - require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 1) - }) - - t.Run("log level detection enabled and warn logs", func(t *testing.T) { - for _, level := range []string{"warn", "Wrn", "WARNING"} { - limits, ingester := setup(true) - distributors, _ := prepare( - t, - 1, - 5, - limits, - func(_ string) (ring_client.PoolClient, error) { return ingester, nil }, - ) - - writeReq := makeWriteRequestWithLabelsWithLevel(1, 10, []string{`{foo="bar"}`}, level) - _, err := distributors[0].Push(ctx, writeReq) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) - require.Equal(t, push.LabelsAdapter{ - { - Name: constants.LevelLabel, - Value: constants.LogLevelWarn, - }, - }, topVal.Streams[0].Entries[0].StructuredMetadata, fmt.Sprintf("level: %s", level)) - } - }) - - t.Run("log level detection enabled but log level already present in stream", func(t *testing.T) { - limits, ingester := setup(true) - distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}) - _, err := distributors[0].Push(ctx, writeReq) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels) - sm := topVal.Streams[0].Entries[0].StructuredMetadata - require.Len(t, sm, 1) - require.Equal(t, sm[0].Name, constants.LevelLabel) - require.Equal(t, sm[0].Value, constants.LogLevelDebug) - }) - - t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) { - limits, ingester := setup(true) - distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) - - writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) - writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{ - { - Name: "severity", - Value: constants.LogLevelWarn, - }, - } - _, err := distributors[0].Push(ctx, writeReq) - require.NoError(t, err) - topVal := ingester.Peek() - require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) - sm := topVal.Streams[0].Entries[0].StructuredMetadata - require.Equal(t, push.LabelsAdapter{ - { - Name: "severity", - Value: constants.LogLevelWarn, - }, { - Name: constants.LevelLabel, - Value: constants.LogLevelWarn, - }, - }, sm) - }) -} - -func Test_detectLogLevelFromLogEntry(t *testing.T) { - for _, tc := range []struct { - name string - entry logproto.Entry - expectedLogLevel string - }{ - { - name: "use severity number from otlp logs", - entry: logproto.Entry{ - Line: "error", - StructuredMetadata: push.LabelsAdapter{ - { - Name: loghttp_push.OTLPSeverityNumber, - Value: fmt.Sprintf("%d", plog.SeverityNumberDebug3), - }, - }, - }, - expectedLogLevel: constants.LogLevelDebug, - }, - { - name: "invalid severity number should not cause any issues", - entry: logproto.Entry{ - StructuredMetadata: push.LabelsAdapter{ - { - Name: loghttp_push.OTLPSeverityNumber, - Value: "foo", - }, - }, - }, - expectedLogLevel: constants.LogLevelInfo, - }, - { - name: "non otlp without any of the log level keywords in log line", - entry: logproto.Entry{ - Line: "foo", - }, - expectedLogLevel: constants.LogLevelUnknown, - }, - { - name: "non otlp with log level keywords in log line", - entry: logproto.Entry{ - Line: "this is a warning log", - }, - expectedLogLevel: constants.LogLevelWarn, - }, - { - name: "json log line with an error", - entry: logproto.Entry{ - Line: `{"foo":"bar","msg":"message with keyword error but it should not get picked up","level":"critical"}`, - }, - expectedLogLevel: constants.LogLevelCritical, - }, - { - name: "json log line with an error", - entry: logproto.Entry{ - Line: `{"FOO":"bar","MSG":"message with keyword error but it should not get picked up","LEVEL":"Critical"}`, - }, - expectedLogLevel: constants.LogLevelCritical, - }, - { - name: "json log line with an warning", - entry: logproto.Entry{ - Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"warn"}`, - }, - expectedLogLevel: constants.LogLevelWarn, - }, - { - name: "json log line with an warning", - entry: logproto.Entry{ - Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","SEVERITY":"FATAL"}`, - }, - expectedLogLevel: constants.LogLevelFatal, - }, - { - name: "json log line with an error in block case", - entry: logproto.Entry{ - Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"ERR"}`, - }, - expectedLogLevel: constants.LogLevelError, - }, - { - name: "json log line with an INFO in block case", - entry: logproto.Entry{ - Line: `{"foo":"bar","msg":"message with keyword INFO get picked up"}`, - }, - expectedLogLevel: constants.LogLevelInfo, - }, - { - name: "logfmt log line with an INFO and not level returns info log level", - entry: logproto.Entry{ - Line: `foo=bar msg="message with info and not level should get picked up"`, - }, - expectedLogLevel: constants.LogLevelInfo, - }, - { - name: "logfmt log line with a warn", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword error but it should not get picked up" level=warn`, - }, - expectedLogLevel: constants.LogLevelWarn, - }, - { - name: "logfmt log line with a warn with camel case", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Warn`, - }, - expectedLogLevel: constants.LogLevelWarn, - }, - { - name: "logfmt log line with a trace", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Trace`, - }, - expectedLogLevel: constants.LogLevelTrace, - }, - { - name: "logfmt log line with some other level returns unknown log level", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword but it should not get picked up" level=NA`, - }, - expectedLogLevel: constants.LogLevelUnknown, - }, - { - name: "logfmt log line with label Severity is allowed for level detection", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword but it should not get picked up" severity=critical`, - }, - expectedLogLevel: constants.LogLevelCritical, - }, - { - name: "logfmt log line with label Severity with camelcase is allowed for level detection", - entry: logproto.Entry{ - Line: `Foo=bar MSG="Message with keyword but it should not get picked up" Severity=critical`, - }, - expectedLogLevel: constants.LogLevelCritical, - }, - { - name: "logfmt log line with a info with non standard case", - entry: logproto.Entry{ - Line: `foo=bar msg="message with keyword error but it should not get picked up" level=inFO`, - }, - expectedLogLevel: constants.LogLevelInfo, - }, - { - name: "logfmt log line with a info with non block case for level", - entry: logproto.Entry{ - Line: `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO`, - }, - expectedLogLevel: constants.LogLevelInfo, - }, - } { - t.Run(tc.name, func(t *testing.T) { - detectedLogLevel := detectLogLevelFromLogEntry(tc.entry, logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata)) - require.Equal(t, tc.expectedLogLevel, detectedLogLevel) - }) - } -} - -func Benchmark_extractLogLevelFromLogLine(b *testing.B) { - // looks scary, but it is some random text of about 1000 chars from charset a-zA-Z0-9 - logLine := "dGzJ6rKk Zj U04SWEqEK4Uwho8 DpNyLz0 Nfs61HJ fz5iKVigg 44 kabOz7ghviGmVONriAdz4lA 7Kis1OTvZGT3 " + - "ZB6ioK4fgJLbzm AuIcbnDZKx3rZ aeZJQzRb3zhrn vok8Efav6cbyzbRUQ PYsEdQxCpdCDcGNsKG FVwe61 nhF06t9hXSNySEWa " + - "gBAXP1J8oEL grep1LfeKjA23ntszKA A772vNyxjQF SjWfJypwI7scxk oLlqRzrDl ostO4CCwx01wDB7Utk0 64A7p5eQDITE6zc3 " + - "rGL DrPnD K2oj Vro2JEvI2YScstnMx SVu H o GUl8fxZJJ1HY0 C QOA HNJr5XtsCNRrLi 0w C0Pd8XWbVZyQkSlsRm zFw1lW " + - "c8j6JFQuQnnB EyL20z0 2Duo0dvynnAGD 45ut2Z Jrz8Nd7Pmg 5oQ09r9vnmy U2 mKHO5uBfndPnbjbr mzOvQs9bM1 9e " + - "yvNSfcbPyhuWvB VKJt2kp8IoTVc XCe Uva5mp9NrGh3TEbjQu1 C Zvdk uPr7St2m kwwMRcS9eC aS6ZuL48eoQUiKo VBPd4m49ymr " + - "eQZ0fbjWpj6qA A6rYs4E 58dqh9ntu8baziDJ4c 1q6aVEig YrMXTF hahrlt 6hKVHfZLFZ V 9hEVN0WKgcpu6L zLxo6YC57 XQyfAGpFM " + - "Wm3 S7if5qCXPzvuMZ2 gNHdst Z39s9uNc58QBDeYRW umyIF BDqEdqhE tAs2gidkqee3aux8b NLDb7 ZZLekc0cQZ GUKQuBg2pL2y1S " + - "RJtBuW ABOqQHLSlNuUw ZlM2nGS2 jwA7cXEOJhY 3oPv4gGAz Uqdre16MF92C06jOH dayqTCK8XmIilT uvgywFSfNadYvRDQa " + - "iUbswJNcwqcr6huw LAGrZS8NGlqqzcD2wFU rm Uqcrh3TKLUCkfkwLm 5CIQbxMCUz boBrEHxvCBrUo YJoF2iyif4xq3q yk " - - for i := 0; i < b.N; i++ { - level := extractLogLevelFromLogLine(logLine) - require.Equal(b, constants.LogLevelUnknown, level) - } -} - -func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) { - logLine := `{"msg": "something" , "level": "error", "id": "1"}` - - for i := 0; i < b.N; i++ { - level := extractLogLevelFromLogLine(logLine) - require.Equal(b, constants.LogLevelError, level) - } -} - -func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) { - logLine := `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO` - - for i := 0; i < b.N; i++ { - level := extractLogLevelFromLogLine(logLine) - require.Equal(b, constants.LogLevelInfo, level) - } -} diff --git a/pkg/distributor/level_detection.go b/pkg/distributor/level_detection.go new file mode 100644 index 0000000000..0a80e67e38 --- /dev/null +++ b/pkg/distributor/level_detection.go @@ -0,0 +1,226 @@ +package distributor + +import ( + "bytes" + "strconv" + "strings" + "unicode" + "unsafe" + + "github.com/buger/jsonparser" + "github.com/prometheus/prometheus/model/labels" + "go.opentelemetry.io/collector/pdata/plog" + + "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/logql/log/logfmt" + "github.com/grafana/loki/v3/pkg/util/constants" +) + +var ( + trace = []byte("trace") + traceAbbrv = []byte("trc") + debug = []byte("debug") + debugAbbrv = []byte("dbg") + info = []byte("info") + infoAbbrv = []byte("inf") + warn = []byte("warn") + warnAbbrv = []byte("wrn") + warning = []byte("warning") + errorStr = []byte("error") + errorAbbrv = []byte("err") + critical = []byte("critical") + fatal = []byte("fatal") +) + +func allowedLabelsForLevel(allowedFields []string) map[string]struct{} { + if len(allowedFields) == 0 { + return map[string]struct{}{ + "level": {}, "LEVEL": {}, "Level": {}, + "severity": {}, "SEVERITY": {}, "Severity": {}, + "lvl": {}, "LVL": {}, "Lvl": {}, + } + } + allowedFieldsMap := make(map[string]struct{}, len(allowedFields)) + for _, field := range allowedFields { + allowedFieldsMap[field] = struct{}{} + } + return allowedFieldsMap +} + +type LevelDetector struct { + validationContext validationContext + allowedLabels map[string]struct{} +} + +func newLevelDetector(validationContext validationContext) *LevelDetector { + logLevelFields := validationContext.logLevelFields + return &LevelDetector{ + validationContext: validationContext, + allowedLabels: allowedLabelsForLevel(logLevelFields), + } +} + +func (l *LevelDetector) shouldDiscoverLogLevels() bool { + return l.validationContext.allowStructuredMetadata && l.validationContext.discoverLogLevels +} + +func (l *LevelDetector) extractLogLevel(labels labels.Labels, structuredMetadata labels.Labels, entry logproto.Entry) (logproto.LabelAdapter, bool) { + levelFromLabel, hasLevelLabel := l.hasAnyLevelLabels(labels) + var logLevel string + if hasLevelLabel { + logLevel = levelFromLabel + } else if levelFromMetadata, ok := l.hasAnyLevelLabels(structuredMetadata); ok { + logLevel = levelFromMetadata + } else { + logLevel = l.detectLogLevelFromLogEntry(entry, structuredMetadata) + } + + if logLevel == "" { + return logproto.LabelAdapter{}, false + } + return logproto.LabelAdapter{ + Name: constants.LevelLabel, + Value: logLevel, + }, true +} + +func (l *LevelDetector) hasAnyLevelLabels(labels labels.Labels) (string, bool) { + for lbl := range l.allowedLabels { + if labels.Has(lbl) { + return labels.Get(lbl), true + } + } + return "", false +} + +func (l *LevelDetector) detectLogLevelFromLogEntry(entry logproto.Entry, structuredMetadata labels.Labels) string { + // otlp logs have a severity number, using which we are defining the log levels. + // Significance of severity number is explained in otel docs here https://opentelemetry.io/docs/specs/otel/logs/data-model/#field-severitynumber + if otlpSeverityNumberTxt := structuredMetadata.Get(push.OTLPSeverityNumber); otlpSeverityNumberTxt != "" { + otlpSeverityNumber, err := strconv.Atoi(otlpSeverityNumberTxt) + if err != nil { + return constants.LogLevelInfo + } + if otlpSeverityNumber == int(plog.SeverityNumberUnspecified) { + return constants.LogLevelUnknown + } else if otlpSeverityNumber <= int(plog.SeverityNumberTrace4) { + return constants.LogLevelTrace + } else if otlpSeverityNumber <= int(plog.SeverityNumberDebug4) { + return constants.LogLevelDebug + } else if otlpSeverityNumber <= int(plog.SeverityNumberInfo4) { + return constants.LogLevelInfo + } else if otlpSeverityNumber <= int(plog.SeverityNumberWarn4) { + return constants.LogLevelWarn + } else if otlpSeverityNumber <= int(plog.SeverityNumberError4) { + return constants.LogLevelError + } else if otlpSeverityNumber <= int(plog.SeverityNumberFatal4) { + return constants.LogLevelFatal + } + return constants.LogLevelUnknown + } + + return l.extractLogLevelFromLogLine(entry.Line) +} + +func (l *LevelDetector) extractLogLevelFromLogLine(log string) string { + logSlice := unsafe.Slice(unsafe.StringData(log), len(log)) + var v []byte + if isJSON(log) { + v = l.getValueUsingJSONParser(logSlice) + } else if isLogFmt(logSlice) { + v = l.getValueUsingLogfmtParser(logSlice) + } else { + return detectLevelFromLogLine(log) + } + + switch { + case bytes.EqualFold(v, trace), bytes.EqualFold(v, traceAbbrv): + return constants.LogLevelTrace + case bytes.EqualFold(v, debug), bytes.EqualFold(v, debugAbbrv): + return constants.LogLevelDebug + case bytes.EqualFold(v, info), bytes.EqualFold(v, infoAbbrv): + return constants.LogLevelInfo + case bytes.EqualFold(v, warn), bytes.EqualFold(v, warnAbbrv), bytes.EqualFold(v, warning): + return constants.LogLevelWarn + case bytes.EqualFold(v, errorStr), bytes.EqualFold(v, errorAbbrv): + return constants.LogLevelError + case bytes.EqualFold(v, critical): + return constants.LogLevelCritical + case bytes.EqualFold(v, fatal): + return constants.LogLevelFatal + default: + return detectLevelFromLogLine(log) + } +} + +func (l *LevelDetector) getValueUsingLogfmtParser(line []byte) []byte { + d := logfmt.NewDecoder(line) + for !d.EOL() && d.ScanKeyval() { + if _, ok := l.allowedLabels[string(d.Key())]; ok { + return d.Value() + } + } + return nil +} + +func (l *LevelDetector) getValueUsingJSONParser(log []byte) []byte { + for allowedLabel := range l.allowedLabels { + l, _, _, err := jsonparser.Get(log, allowedLabel) + if err == nil { + return l + } + } + return nil +} + +func isLogFmt(line []byte) bool { + equalIndex := bytes.Index(line, []byte("=")) + if len(line) == 0 || equalIndex == -1 { + return false + } + return true +} + +func isJSON(line string) bool { + var firstNonSpaceChar rune + for _, char := range line { + if !unicode.IsSpace(char) { + firstNonSpaceChar = char + break + } + } + + var lastNonSpaceChar rune + for i := len(line) - 1; i >= 0; i-- { + char := rune(line[i]) + if !unicode.IsSpace(char) { + lastNonSpaceChar = char + break + } + } + + return firstNonSpaceChar == '{' && lastNonSpaceChar == '}' +} + +func detectLevelFromLogLine(log string) string { + if strings.Contains(log, "info:") || strings.Contains(log, "INFO:") || + strings.Contains(log, "info") || strings.Contains(log, "INFO") { + return constants.LogLevelInfo + } + if strings.Contains(log, "err:") || strings.Contains(log, "ERR:") || + strings.Contains(log, "error") || strings.Contains(log, "ERROR") { + return constants.LogLevelError + } + if strings.Contains(log, "warn:") || strings.Contains(log, "WARN:") || + strings.Contains(log, "warning") || strings.Contains(log, "WARNING") { + return constants.LogLevelWarn + } + if strings.Contains(log, "CRITICAL:") || strings.Contains(log, "critical:") { + return constants.LogLevelCritical + } + if strings.Contains(log, "debug:") || strings.Contains(log, "DEBUG:") { + return constants.LogLevelDebug + } + return constants.LogLevelUnknown +} diff --git a/pkg/distributor/level_detection_test.go b/pkg/distributor/level_detection_test.go new file mode 100644 index 0000000000..a9fb0d36b6 --- /dev/null +++ b/pkg/distributor/level_detection_test.go @@ -0,0 +1,436 @@ +package distributor + +import ( + "fmt" + "testing" + + "github.com/grafana/dskit/flagext" + ring_client "github.com/grafana/dskit/ring/client" + "github.com/stretchr/testify/require" + "go.opentelemetry.io/collector/pdata/plog" + + loghttp_push "github.com/grafana/loki/v3/pkg/loghttp/push" + "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/util/constants" + "github.com/grafana/loki/v3/pkg/validation" + + "github.com/grafana/loki/pkg/push" +) + +func Test_DetectLogLevels(t *testing.T) { + setup := func(discoverLogLevels bool) (*validation.Limits, *mockIngester) { + limits := &validation.Limits{} + flagext.DefaultValues(limits) + + limits.DiscoverLogLevels = discoverLogLevels + limits.AllowStructuredMetadata = true + return limits, &mockIngester{} + } + + t.Run("log level detection disabled", func(t *testing.T) { + limits, ingester := setup(false) + distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 0) + }) + + t.Run("log level detection enabled but level cannot be detected", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Len(t, topVal.Streams[0].Entries[0].StructuredMetadata, 1) + }) + + t.Run("log level detection enabled and warn logs", func(t *testing.T) { + for _, level := range []string{"warn", "Wrn", "WARNING"} { + limits, ingester := setup(true) + distributors, _ := prepare( + t, + 1, + 5, + limits, + func(_ string) (ring_client.PoolClient, error) { return ingester, nil }, + ) + + writeReq := makeWriteRequestWithLabelsWithLevel(1, 10, []string{`{foo="bar"}`}, level) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + require.Equal(t, push.LabelsAdapter{ + { + Name: constants.LevelLabel, + Value: constants.LogLevelWarn, + }, + }, topVal.Streams[0].Entries[0].StructuredMetadata, fmt.Sprintf("level: %s", level)) + } + }) + + t.Run("log level detection enabled but log level already present in stream", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar", level="debug"}`}) + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar", level="debug"}`, topVal.Streams[0].Labels) + sm := topVal.Streams[0].Entries[0].StructuredMetadata + require.Len(t, sm, 1) + require.Equal(t, sm[0].Name, constants.LevelLabel) + require.Equal(t, sm[0].Value, constants.LogLevelDebug) + }) + + t.Run("log level detection enabled but log level already present as structured metadata", func(t *testing.T) { + limits, ingester := setup(true) + distributors, _ := prepare(t, 1, 5, limits, func(_ string) (ring_client.PoolClient, error) { return ingester, nil }) + + writeReq := makeWriteRequestWithLabels(1, 10, []string{`{foo="bar"}`}) + writeReq.Streams[0].Entries[0].StructuredMetadata = push.LabelsAdapter{ + { + Name: "severity", + Value: constants.LogLevelWarn, + }, + } + _, err := distributors[0].Push(ctx, writeReq) + require.NoError(t, err) + topVal := ingester.Peek() + require.Equal(t, `{foo="bar"}`, topVal.Streams[0].Labels) + sm := topVal.Streams[0].Entries[0].StructuredMetadata + require.Equal(t, push.LabelsAdapter{ + { + Name: "severity", + Value: constants.LogLevelWarn, + }, { + Name: constants.LevelLabel, + Value: constants.LogLevelWarn, + }, + }, sm) + }) +} + +func Test_detectLogLevelFromLogEntry(t *testing.T) { + ld := newLevelDetector( + validationContext{ + discoverLogLevels: true, + allowStructuredMetadata: true, + logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}, + }) + + for _, tc := range []struct { + name string + entry logproto.Entry + expectedLogLevel string + }{ + { + name: "use severity number from otlp logs", + entry: logproto.Entry{ + Line: "error", + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: fmt.Sprintf("%d", plog.SeverityNumberDebug3), + }, + }, + }, + expectedLogLevel: constants.LogLevelDebug, + }, + { + name: "invalid severity number should not cause any issues", + entry: logproto.Entry{ + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: "foo", + }, + }, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + { + name: "non otlp without any of the log level keywords in log line", + entry: logproto.Entry{ + Line: "foo", + }, + expectedLogLevel: constants.LogLevelUnknown, + }, + { + name: "non otlp with log level keywords in log line", + entry: logproto.Entry{ + Line: "this is a warning log", + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "json log line with an error", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword error but it should not get picked up","level":"critical"}`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "json log line with an error", + entry: logproto.Entry{ + Line: `{"FOO":"bar","MSG":"message with keyword error but it should not get picked up","LEVEL":"Critical"}`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "json log line with an warning", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"warn"}`, + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "json log line with an warning", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","SEVERITY":"FATAL"}`, + }, + expectedLogLevel: constants.LogLevelFatal, + }, + { + name: "json log line with an error in block case", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","level":"ERR"}`, + }, + expectedLogLevel: constants.LogLevelError, + }, + { + name: "json log line with an INFO in block case", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword INFO get picked up"}`, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + { + name: "logfmt log line with an INFO and not level returns info log level", + entry: logproto.Entry{ + Line: `foo=bar msg="message with info and not level should get picked up"`, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + { + name: "logfmt log line with a warn", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword error but it should not get picked up" level=warn`, + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "logfmt log line with a warn with camel case", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Warn`, + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "logfmt log line with a trace", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword error but it should not get picked up" level=Trace`, + }, + expectedLogLevel: constants.LogLevelTrace, + }, + { + name: "logfmt log line with some other level returns unknown log level", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword but it should not get picked up" level=NA`, + }, + expectedLogLevel: constants.LogLevelUnknown, + }, + { + name: "logfmt log line with label Severity is allowed for level detection", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword but it should not get picked up" severity=critical`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "logfmt log line with label Severity with camelcase is allowed for level detection", + entry: logproto.Entry{ + Line: `Foo=bar MSG="Message with keyword but it should not get picked up" Severity=critical`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "logfmt log line with a info with non standard case", + entry: logproto.Entry{ + Line: `foo=bar msg="message with keyword error but it should not get picked up" level=inFO`, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + { + name: "logfmt log line with a info with non block case for level", + entry: logproto.Entry{ + Line: `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO`, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + } { + t.Run(tc.name, func(t *testing.T) { + detectedLogLevel := ld.detectLogLevelFromLogEntry(tc.entry, logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata)) + require.Equal(t, tc.expectedLogLevel, detectedLogLevel) + }) + } +} + +func Test_detectLogLevelFromLogEntryWithCustomLabels(t *testing.T) { + ld := newLevelDetector( + validationContext{ + discoverLogLevels: true, + allowStructuredMetadata: true, + logLevelFields: []string{"log_level", "logging_level", "LOGGINGLVL", "lvl"}, + }) + + for _, tc := range []struct { + name string + entry logproto.Entry + expectedLogLevel string + }{ + { + name: "use severity number from otlp logs", + entry: logproto.Entry{ + Line: "error", + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: fmt.Sprintf("%d", plog.SeverityNumberDebug3), + }, + }, + }, + expectedLogLevel: constants.LogLevelDebug, + }, + { + name: "invalid severity number should not cause any issues", + entry: logproto.Entry{ + StructuredMetadata: push.LabelsAdapter{ + { + Name: loghttp_push.OTLPSeverityNumber, + Value: "foo", + }, + }, + }, + expectedLogLevel: constants.LogLevelInfo, + }, + { + name: "non otlp without any of the log level keywords in log line", + entry: logproto.Entry{ + Line: "foo", + }, + expectedLogLevel: constants.LogLevelUnknown, + }, + { + name: "non otlp with log level keywords in log line", + entry: logproto.Entry{ + Line: "this is a warning log", + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "json log line with an error", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword error but it should not get picked up","log_level":"critical"}`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "json log line with an error", + entry: logproto.Entry{ + Line: `{"FOO":"bar","MSG":"message with keyword error but it should not get picked up","LOGGINGLVL":"Critical"}`, + }, + expectedLogLevel: constants.LogLevelCritical, + }, + { + name: "json log line with an warning", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","lvl":"warn"}`, + }, + expectedLogLevel: constants.LogLevelWarn, + }, + { + name: "json log line with an warning", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","LOGGINGLVL":"FATAL"}`, + }, + expectedLogLevel: constants.LogLevelFatal, + }, + { + name: "json log line with an error in block case", + entry: logproto.Entry{ + Line: `{"foo":"bar","msg":"message with keyword warn but it should not get picked up","logging_level":"ERR"}`, + }, + expectedLogLevel: constants.LogLevelError, + }, + } { + t.Run(tc.name, func(t *testing.T) { + detectedLogLevel := ld.detectLogLevelFromLogEntry(tc.entry, logproto.FromLabelAdaptersToLabels(tc.entry.StructuredMetadata)) + require.Equal(t, tc.expectedLogLevel, detectedLogLevel) + }) + } +} + +func Benchmark_extractLogLevelFromLogLine(b *testing.B) { + // looks scary, but it is some random text of about 1000 chars from charset a-zA-Z0-9 + logLine := "dGzJ6rKk Zj U04SWEqEK4Uwho8 DpNyLz0 Nfs61HJ fz5iKVigg 44 kabOz7ghviGmVONriAdz4lA 7Kis1OTvZGT3 " + + "ZB6ioK4fgJLbzm AuIcbnDZKx3rZ aeZJQzRb3zhrn vok8Efav6cbyzbRUQ PYsEdQxCpdCDcGNsKG FVwe61 nhF06t9hXSNySEWa " + + "gBAXP1J8oEL grep1LfeKjA23ntszKA A772vNyxjQF SjWfJypwI7scxk oLlqRzrDl ostO4CCwx01wDB7Utk0 64A7p5eQDITE6zc3 " + + "rGL DrPnD K2oj Vro2JEvI2YScstnMx SVu H o GUl8fxZJJ1HY0 C QOA HNJr5XtsCNRrLi 0w C0Pd8XWbVZyQkSlsRm zFw1lW " + + "c8j6JFQuQnnB EyL20z0 2Duo0dvynnAGD 45ut2Z Jrz8Nd7Pmg 5oQ09r9vnmy U2 mKHO5uBfndPnbjbr mzOvQs9bM1 9e " + + "yvNSfcbPyhuWvB VKJt2kp8IoTVc XCe Uva5mp9NrGh3TEbjQu1 C Zvdk uPr7St2m kwwMRcS9eC aS6ZuL48eoQUiKo VBPd4m49ymr " + + "eQZ0fbjWpj6qA A6rYs4E 58dqh9ntu8baziDJ4c 1q6aVEig YrMXTF hahrlt 6hKVHfZLFZ V 9hEVN0WKgcpu6L zLxo6YC57 XQyfAGpFM " + + "Wm3 S7if5qCXPzvuMZ2 gNHdst Z39s9uNc58QBDeYRW umyIF BDqEdqhE tAs2gidkqee3aux8b NLDb7 ZZLekc0cQZ GUKQuBg2pL2y1S " + + "RJtBuW ABOqQHLSlNuUw ZlM2nGS2 jwA7cXEOJhY 3oPv4gGAz Uqdre16MF92C06jOH dayqTCK8XmIilT uvgywFSfNadYvRDQa " + + "iUbswJNcwqcr6huw LAGrZS8NGlqqzcD2wFU rm Uqcrh3TKLUCkfkwLm 5CIQbxMCUz boBrEHxvCBrUo YJoF2iyif4xq3q yk " + ld := &LevelDetector{ + validationContext: validationContext{ + discoverLogLevels: true, + allowStructuredMetadata: true, + logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}, + }, + } + for i := 0; i < b.N; i++ { + level := ld.extractLogLevelFromLogLine(logLine) + require.Equal(b, constants.LogLevelUnknown, level) + } +} + +func Benchmark_optParseExtractLogLevelFromLogLineJson(b *testing.B) { + logLine := `{"msg": "something" , "level": "error", "id": "1"}` + + ld := newLevelDetector( + validationContext{ + discoverLogLevels: true, + allowStructuredMetadata: true, + logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}, + }) + + for i := 0; i < b.N; i++ { + level := ld.extractLogLevelFromLogLine(logLine) + require.Equal(b, constants.LogLevelError, level) + } +} + +func Benchmark_optParseExtractLogLevelFromLogLineLogfmt(b *testing.B) { + logLine := `FOO=bar MSG="message with keyword error but it should not get picked up" LEVEL=inFO` + ld := newLevelDetector( + validationContext{ + discoverLogLevels: true, + allowStructuredMetadata: true, + logLevelFields: []string{"level", "LEVEL", "Level", "severity", "SEVERITY", "Severity", "lvl", "LVL", "Lvl"}, + }) + + for i := 0; i < b.N; i++ { + level := ld.extractLogLevelFromLogLine(logLine) + require.Equal(b, constants.LogLevelInfo, level) + } +} diff --git a/pkg/distributor/limits.go b/pkg/distributor/limits.go index c72eb1939a..22ae540bea 100644 --- a/pkg/distributor/limits.go +++ b/pkg/distributor/limits.go @@ -24,6 +24,7 @@ type Limits interface { IncrementDuplicateTimestamps(userID string) bool DiscoverServiceName(userID string) []string DiscoverLogLevels(userID string) bool + LogLevelFields(userID string) []string ShardStreams(userID string) shardstreams.Config IngestionRateStrategy() string diff --git a/pkg/distributor/validator.go b/pkg/distributor/validator.go index 61ecb175ac..67c16f14b0 100644 --- a/pkg/distributor/validator.go +++ b/pkg/distributor/validator.go @@ -46,6 +46,7 @@ type validationContext struct { incrementDuplicateTimestamps bool discoverServiceName []string discoverLogLevels bool + logLevelFields []string allowStructuredMetadata bool maxStructuredMetadataSize int @@ -71,6 +72,7 @@ func (v Validator) getValidationContextForTime(now time.Time, userID string) val incrementDuplicateTimestamps: v.IncrementDuplicateTimestamps(userID), discoverServiceName: v.DiscoverServiceName(userID), discoverLogLevels: v.DiscoverLogLevels(userID), + logLevelFields: v.LogLevelFields(userID), allowStructuredMetadata: v.AllowStructuredMetadata(userID), maxStructuredMetadataSize: v.MaxStructuredMetadataSize(userID), maxStructuredMetadataCount: v.MaxStructuredMetadataCount(userID), diff --git a/pkg/validation/limits.go b/pkg/validation/limits.go index 353a02a6ad..7387cc567c 100644 --- a/pkg/validation/limits.go +++ b/pkg/validation/limits.go @@ -86,6 +86,7 @@ type Limits struct { IncrementDuplicateTimestamp bool `yaml:"increment_duplicate_timestamp" json:"increment_duplicate_timestamp"` DiscoverServiceName []string `yaml:"discover_service_name" json:"discover_service_name"` DiscoverLogLevels bool `yaml:"discover_log_levels" json:"discover_log_levels"` + LogLevelFields []string `yaml:"log_level_fields" json:"log_level_fields"` // Ingester enforced limits. UseOwnedStreamCount bool `yaml:"use_owned_stream_count" json:"use_owned_stream_count"` @@ -287,6 +288,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { } f.Var((*dskit_flagext.StringSlice)(&l.DiscoverServiceName), "validation.discover-service-name", "If no service_name label exists, Loki maps a single label from the configured list to service_name. If none of the configured labels exist in the stream, label is set to unknown_service. Empty list disables setting the label.") f.BoolVar(&l.DiscoverLogLevels, "validation.discover-log-levels", true, "Discover and add log levels during ingestion, if not present already. Levels would be added to Structured Metadata with name level/LEVEL/Level/Severity/severity/SEVERITY/lvl/LVL/Lvl (case-sensitive) and one of the values from 'trace', 'debug', 'info', 'warn', 'error', 'critical', 'fatal' (case insensitive).") + l.LogLevelFields = []string{"level", "LEVEL", "Level", "Severity", "severity", "SEVERITY", "lvl", "LVL", "Lvl"} + f.Var((*dskit_flagext.StringSlice)(&l.LogLevelFields), "validation.log-level-fields", "Field name to use for log levels. If not set, log level would be detected based on pre-defined labels as mentioned above.") _ = l.RejectOldSamplesMaxAge.Set("7d") f.Var(&l.RejectOldSamplesMaxAge, "validation.reject-old-samples.max-age", "Maximum accepted sample age before rejecting.") @@ -999,6 +1002,10 @@ func (o *Overrides) DiscoverLogLevels(userID string) bool { return o.getOverridesForUser(userID).DiscoverLogLevels } +func (o *Overrides) LogLevelFields(userID string) []string { + return o.getOverridesForUser(userID).LogLevelFields +} + // VolumeEnabled returns whether volume endpoints are enabled for a user. func (o *Overrides) VolumeEnabled(userID string) bool { return o.getOverridesForUser(userID).VolumeEnabled diff --git a/pkg/validation/limits_test.go b/pkg/validation/limits_test.go index 71252c3b6c..ce412305ce 100644 --- a/pkg/validation/limits_test.go +++ b/pkg/validation/limits_test.go @@ -216,6 +216,7 @@ ruler_remote_write_headers: exp: Limits{ RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"foo": "bar"}}, DiscoverServiceName: []string{}, + LogLevelFields: []string{}, // Rest from new defaults StreamRetention: []StreamRetention{ @@ -234,7 +235,7 @@ ruler_remote_write_headers: `, exp: Limits{ DiscoverServiceName: []string{}, - + LogLevelFields: []string{}, // Rest from new defaults StreamRetention: []StreamRetention{ { @@ -254,6 +255,7 @@ retention_stream: `, exp: Limits{ DiscoverServiceName: []string{}, + LogLevelFields: []string{}, StreamRetention: []StreamRetention{ { Period: model.Duration(24 * time.Hour), @@ -274,6 +276,7 @@ reject_old_samples: true exp: Limits{ RejectOldSamples: true, DiscoverServiceName: []string{}, + LogLevelFields: []string{}, // Rest from new defaults RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}}, @@ -293,7 +296,9 @@ query_timeout: 5m `, exp: Limits{ DiscoverServiceName: []string{}, - QueryTimeout: model.Duration(5 * time.Minute), + LogLevelFields: []string{}, + + QueryTimeout: model.Duration(5 * time.Minute), // Rest from new defaults. RulerRemoteWriteHeaders: OverwriteMarshalingStringMap{map[string]string{"a": "b"}},