diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index ce3ae1caf5..91155814e0 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -116,7 +116,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) userID, _ := tenant.TenantID(r.Context()) - req, err := push.ParseRequest(logger, userID, t.config.MaxSendMsgSize, r, push.EmptyLimits{}, push.ParseLokiRequest, nil, nil, false) + req, _, err := push.ParseRequest(logger, userID, t.config.MaxSendMsgSize, r, push.EmptyLimits{}, nil, push.ParseLokiRequest, nil, nil, "") if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 09143ac722..350585d8aa 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -4431,11 +4431,27 @@ These are values which allow you to control aspects of Loki's operation, most co # CLI flag: -operation-config.log-push-request [log_push_request: | default = false] +# Log a commutative hash of the labels for all streams in a push request. In +# some cases this can potentially be used as an identifier of the agent sending +# the stream. Calculating hashes is epensive so only enable as needed. +# CLI flag: -operation-config.log-hash-of-labels +[log_hash_of_labels: | default = false] + # Log every stream in a push request (very verbose, recommend to enable via # runtime config only). # CLI flag: -operation-config.log-push-request-streams [log_push_request_streams: | default = false] +# Only show streams that match a provided IP address, LogPushRequestStreams must +# be enabled. Can be used multiple times to filter by multiple IPs. +# CLI flag: -operation-config.filter-push-request-streams-ips +[filter_push_request_streams_ips: | default = []] + +# Log service name discovery (very verbose, recommend to enable via runtime +# config only). +# CLI flag: -operation-config.log-service-name-discovery +[log_service_name_discovery: | default = false] + # Log metrics for duplicate lines received. # CLI flag: -operation-config.log-duplicate-metrics [log_duplicate_metrics: | default = false] diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 81b860e678..4de2ddf3df 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -4,8 +4,11 @@ import ( "errors" "fmt" "net/http" + "slices" "strings" + "time" + "github.com/dustin/go-humanize" "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" @@ -45,7 +48,9 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe streamResolver := newRequestScopedStreamResolver(tenantID, d.validator.Limits, logger) logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID) - req, err := push.ParseRequest(logger, tenantID, d.cfg.MaxRecvMsgSize, r, d.validator.Limits, pushRequestParser, d.usageTracker, streamResolver, logPushRequestStreams) + filterPushRequestStreamsIPs := d.tenantConfigs.FilterPushRequestStreamsIPs(tenantID) + presumedAgentIP := extractPresumedAgentIP(r) + req, pushStats, err := push.ParseRequest(logger, tenantID, d.cfg.MaxRecvMsgSize, r, d.validator.Limits, d.tenantConfigs, pushRequestParser, d.usageTracker, streamResolver, presumedAgentIP) if err != nil { switch { case errors.Is(err, push.ErrRequestBodyTooLarge): @@ -102,14 +107,33 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe } if logPushRequestStreams { - var sb strings.Builder - for _, s := range req.Streams { - sb.WriteString(s.Labels) + shouldLog := true + if len(filterPushRequestStreamsIPs) > 0 { + // if there are filter IP's set, we only want to log if the presumed agent IP is in the list + // this would also then exclude any requests that don't have a presumed agent IP + shouldLog = slices.Contains(filterPushRequestStreamsIPs, presumedAgentIP) + } + + if shouldLog { + for _, s := range req.Streams { + logValues := []interface{}{ + "msg", "push request streams", + "stream", s.Labels, + "streamLabelsHash", util.HashedQuery(s.Labels), // this is to make it easier to do searching and grouping + "streamSizeBytes", humanize.Bytes(uint64(pushStats.StreamSizeBytes[s.Labels])), + } + if timestamp, ok := pushStats.MostRecentEntryTimestampPerStream[s.Labels]; ok { + logValues = append(logValues, "mostRecentLagMs", time.Since(timestamp).Milliseconds()) + } + if presumedAgentIP != "" { + logValues = append(logValues, "presumedAgentIp", presumedAgentIP) + } + if pushStats.HashOfAllStreams != 0 { + logValues = append(logValues, "hashOfAllStreams", pushStats.HashOfAllStreams) + } + level.Debug(logger).Log(logValues...) + } } - level.Debug(logger).Log( - "msg", "push request streams", - "streams", sb.String(), - ) } _, err = d.PushWithResolver(r.Context(), req, streamResolver) @@ -170,3 +194,9 @@ func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request) { ` util.WriteHTMLResponse(w, noRingPage) } + +func extractPresumedAgentIP(r *http.Request) string { + // X-Forwarded-For header may have 2 or more comma-separated addresses: the 2nd (and additional) are typically appended by proxies which handled the traffic. + // Therefore, if the header is included, only log the first address + return strings.Split(r.Header.Get("X-Forwarded-For"), ",")[0] +} diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index 2ea863ef01..6121a21493 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/v3/pkg/loghttp/push" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/require" @@ -171,10 +172,10 @@ func (p *fakeParser) parseRequest( _ string, _ *http.Request, _ push.Limits, + _ *runtime.TenantConfigs, _ int, _ push.UsageTracker, _ push.StreamResolver, - _ bool, _ log.Logger, ) (*logproto.PushRequest, *push.Stats, error) { return &logproto.PushRequest{}, &push.Stats{}, p.parseErr diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 3e973e3eb3..40c6560030 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -24,6 +24,7 @@ import ( "github.com/grafana/loki/pkg/push" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/runtime" loki_util "github.com/grafana/loki/v3/pkg/util" ) @@ -38,14 +39,14 @@ const ( messageSizeLargerErrFmt = "%w than max (%d vs %d)" ) -func ParseOTLPRequest(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { stats := NewPushStats() otlpLogs, err := extractLogs(r, maxRecvMsgSize, stats) if err != nil { return nil, nil, err } - req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger, streamResolver) + req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, limits.OTLPConfig(userID), tenantConfigs, limits.DiscoverServiceName(userID), tracker, stats, logger, streamResolver) return req, stats, nil } @@ -104,7 +105,7 @@ func extractLogs(r *http.Request, maxRecvMsgSize int, pushStats *Stats) (plog.Lo return req.Logs(), nil } -func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger, streamResolver StreamResolver) *logproto.PushRequest { +func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otlpConfig OTLPConfig, tenantConfigs *runtime.TenantConfigs, discoverServiceName []string, tracker UsageTracker, stats *Stats, logger log.Logger, streamResolver StreamResolver) *logproto.PushRequest { if ld.LogRecordCount() == 0 { return &logproto.PushRequest{} } @@ -115,6 +116,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl // Track if request used the Loki OTLP exporter label var usingLokiExporter bool + logServiceNameDiscovery := false + logPushRequestStreams := false + if tenantConfigs != nil { + logServiceNameDiscovery = tenantConfigs.LogServiceNameDiscovery(userID) + logPushRequestStreams = tenantConfigs.LogPushRequestStreams(userID) + } + + mostRecentEntryTimestamp := time.Time{} for i := 0; i < rls.Len(); i++ { sls := rls.At(i).ScopeLogs() res := rls.At(i).Resource() @@ -123,7 +132,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len()) streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size var pushedLabels model.LabelSet - if logPushRequestStreams { + if logServiceNameDiscovery { pushedLabels = make(model.LabelSet, 30) } @@ -142,7 +151,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl if action == IndexLabel { for _, lbl := range attributeAsLabels { streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) - if logPushRequestStreams && pushedLabels != nil { + if logServiceNameDiscovery && pushedLabels != nil { pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) } @@ -168,7 +177,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl } // this must be pushed to the end after log lines are also evaluated - if logPushRequestStreams { + if logServiceNameDiscovery { var sb strings.Builder sb.WriteString("{") labels := make([]string, 0, len(pushedLabels)) @@ -283,7 +292,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl log := logs.At(k) // Use the existing function that already handles log attributes properly - logLabels, entry := otlpLogToPushEntry(log, otlpConfig, logPushRequestStreams, pushedLabels) + logLabels, entry := otlpLogToPushEntry(log, otlpConfig, logServiceNameDiscovery, pushedLabels) + if entry.Timestamp.After(mostRecentEntryTimestamp) { + mostRecentEntryTimestamp = entry.Timestamp + } // Combine resource labels with log labels if any log attributes were indexed var entryLabelsStr string @@ -366,6 +378,8 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl } } + stats.MostRecentEntryTimestamp = mostRecentEntryTimestamp + pr := &push.PushRequest{ Streams: make([]push.Stream, 0, len(pushRequestsByStream)), } @@ -375,6 +389,20 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl if len(stream.Entries) > 0 || len(stream.Labels) > 0 { pr.Streams = append(pr.Streams, stream) } + if logPushRequestStreams { + mostRecentEntryTimestamp := time.Time{} + streamSizeBytes := int64(0) + // It's difficult to calculate these values inline when we process the payload because promotion of resource attributes or log attributes to labels can change the stream with each entry. + // So for simplicity and because this logging is typically disabled, we iterate on the entries to calculate these values here. + for _, entry := range stream.Entries { + streamSizeBytes += int64(len(entry.Line)) + int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata)) + if entry.Timestamp.After(mostRecentEntryTimestamp) { + mostRecentEntryTimestamp = entry.Timestamp + } + } + stats.MostRecentEntryTimestampPerStream[stream.Labels] = mostRecentEntryTimestamp + stats.StreamSizeBytes[stream.Labels] = streamSizeBytes + } } // Increment exporter streams metric once per request if seen @@ -386,7 +414,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl } // otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry. -func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushRequestStreams bool, pushedLabels model.LabelSet) (model.LabelSet, push.Entry) { +func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logServiceNameDiscovery bool, pushedLabels model.LabelSet) (model.LabelSet, push.Entry) { // copy log attributes and all the fields from log(except log.Body) to structured metadata logAttrs := log.Attributes() structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7) @@ -406,7 +434,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushReques if action == IndexLabel { for _, lbl := range attributeAsLabels { logLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) - if logPushRequestStreams && pushedLabels != nil { + if logServiceNameDiscovery && pushedLabels != nil { pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value) } } @@ -433,7 +461,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushReques // Add severity_text as an index label if configured if otlpConfig.SeverityTextAsLabel { logLabels[model.LabelName(OTLPSeverityText)] = model.LabelValue(severityText) - if logPushRequestStreams && pushedLabels != nil { + if logServiceNameDiscovery && pushedLabels != nil { pushedLabels[model.LabelName(OTLPSeverityText)] = model.LabelValue(severityText) } } diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index 626d128035..1581aa01f4 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -111,8 +111,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { time.Hour: nil, }, }, - StreamLabelsSize: 21, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, { @@ -158,8 +160,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { time.Hour: nil, }, }, - StreamLabelsSize: 27, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 27, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, { @@ -205,8 +209,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { time.Hour: nil, }, }, - StreamLabelsSize: 47, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 47, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, { @@ -295,8 +301,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, }, - StreamLabelsSize: 21, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, { @@ -394,8 +402,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, }, - StreamLabelsSize: 21, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 21, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, { @@ -554,8 +564,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { }, }, }, - StreamLabelsSize: 42, - MostRecentEntryTimestamp: now, + StreamLabelsSize: 42, + MostRecentEntryTimestamp: now, + StreamSizeBytes: map[string]int64{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, }, }, } { @@ -575,10 +587,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) { tc.generateLogs(), "foo", tc.otlpConfig, + nil, defaultServiceDetection, tracker, stats, - false, log.NewNopLogger(), streamResolver, ) @@ -911,10 +923,10 @@ func TestOTLPLogAttributesAsIndexLabels(t *testing.T) { generateLogs(), "test-user", customOTLPConfig, + nil, []string{}, // No service name discovery needed tracker, stats, - false, log.NewNopLogger(), streamResolver, ) @@ -1031,10 +1043,10 @@ func TestOTLPSeverityTextAsLabel(t *testing.T) { generateLogs(), "test-user", customOTLPConfig, + nil, []string{}, // No service name discovery needed tracker, stats, - false, log.NewNopLogger(), streamResolver, ) diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 1b2abf03cd..ba1927d85a 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -30,6 +30,7 @@ import ( "github.com/grafana/loki/v3/pkg/loghttp" "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/logql/syntax" + "github.com/grafana/loki/v3/pkg/runtime" "github.com/grafana/loki/v3/pkg/util" loki_util "github.com/grafana/loki/v3/pkg/util" "github.com/grafana/loki/v3/pkg/util/constants" @@ -111,7 +112,7 @@ type StreamResolver interface { } type ( - RequestParser func(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) + RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser ErrorWriter func(w http.ResponseWriter, errorStr string, code int, logger log.Logger) ) @@ -120,23 +121,28 @@ type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64 func NewPushStats() *Stats { return &Stats{ - LogLinesBytes: map[string]map[time.Duration]int64{}, - StructuredMetadataBytes: map[string]map[time.Duration]int64{}, - PolicyNumLines: map[string]int64{}, - ResourceAndSourceMetadataLabels: map[string]map[time.Duration]push.LabelsAdapter{}, + LogLinesBytes: map[string]map[time.Duration]int64{}, + StructuredMetadataBytes: map[string]map[time.Duration]int64{}, + PolicyNumLines: map[string]int64{}, + ResourceAndSourceMetadataLabels: map[string]map[time.Duration]push.LabelsAdapter{}, + MostRecentEntryTimestampPerStream: map[string]time.Time{}, + StreamSizeBytes: map[string]int64{}, } } type Stats struct { - Errs []error - PolicyNumLines map[string]int64 - LogLinesBytes PolicyWithRetentionWithBytes - StructuredMetadataBytes PolicyWithRetentionWithBytes - ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter - StreamLabelsSize int64 - MostRecentEntryTimestamp time.Time - ContentType string - ContentEncoding string + Errs []error + PolicyNumLines map[string]int64 + LogLinesBytes PolicyWithRetentionWithBytes + StructuredMetadataBytes PolicyWithRetentionWithBytes + ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter + StreamLabelsSize int64 + MostRecentEntryTimestamp time.Time + MostRecentEntryTimestampPerStream map[string]time.Time + StreamSizeBytes map[string]int64 + HashOfAllStreams uint64 + ContentType string + ContentEncoding string BodySize int64 // Extra is a place for a wrapped perser to record any interesting stats as key-value pairs to be logged @@ -145,13 +151,13 @@ type Stats struct { IsAggregatedMetric bool } -func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.Request, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool) (*logproto.PushRequest, error) { - req, pushStats, err := pushRequestParser(userID, r, limits, maxRecvMsgSize, tracker, streamResolver, logPushRequestStreams, logger) +func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP string) (*logproto.PushRequest, *Stats, error) { + req, pushStats, err := pushRequestParser(userID, r, limits, tenantConfigs, maxRecvMsgSize, tracker, streamResolver, logger) if err != nil && !errors.Is(err, ErrAllLogsFiltered) { if errors.Is(err, loki_util.ErrMessageSizeTooLarge) { - return nil, fmt.Errorf("%w: %s", ErrRequestBodyTooLarge, err.Error()) + return nil, nil, fmt.Errorf("%w: %s", ErrRequestBodyTooLarge, err.Error()) } - return nil, err + return nil, nil, err } var ( @@ -231,11 +237,8 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http. "mostRecentLagMs", time.Since(pushStats.MostRecentEntryTimestamp).Milliseconds(), } - // X-Forwarded-For header may have 2 or more comma-separated addresses: the 2nd (and additional) are typically appended by proxies which handled the traffic. - // Therefore, if the header is included, only log the first address - agentIP := strings.Split(r.Header.Get("X-Forwarded-For"), ",")[0] - if agentIP != "" { - logValues = append(logValues, "presumedAgentIp", strings.TrimSpace(agentIP)) + if presumedAgentIP != "" { + logValues = append(logValues, "presumedAgentIp", presumedAgentIP) } userAgent := r.Header.Get("User-Agent") @@ -243,13 +246,30 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http. logValues = append(logValues, "userAgent", strings.TrimSpace(userAgent)) } + if tenantConfigs != nil && tenantConfigs.LogHashOfLabels(userID) { + resultHash := uint64(0) + for _, stream := range req.Streams { + // I don't believe a hash will be set, but if it is, use it. + hash := stream.Hash + if hash == 0 { + // calculate an fnv32 hash of the stream labels + // reusing our query hash function for simplicity + hash = uint64(util.HashedQuery(stream.Labels)) + } + // xor the hash with the result hash, this will result in the same hash regardless of the order of the streams + resultHash ^= hash + } + logValues = append(logValues, "hashOfLabels", resultHash) + pushStats.HashOfAllStreams = resultHash + } + logValues = append(logValues, pushStats.Extra...) level.Debug(logger).Log(logValues...) - return req, err + return req, pushStats, err } -func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -319,6 +339,14 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS pushStats.ContentEncoding = contentEncoding discoverServiceName := limits.DiscoverServiceName(userID) + + logServiceNameDiscovery := false + logPushRequestStreams := false + if tenantConfigs != nil { + logServiceNameDiscovery = tenantConfigs.LogServiceNameDiscovery(userID) + logPushRequestStreams = tenantConfigs.LogPushRequestStreams(userID) + } + for i := range req.Streams { s := req.Streams[i] pushStats.StreamLabelsSize += int64(len(s.Labels)) @@ -333,7 +361,7 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS } var beforeServiceName string - if logPushRequestStreams { + if logServiceNameDiscovery { beforeServiceName = lbs.String() } @@ -351,7 +379,7 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS s.Labels = lbs.String() } - if logPushRequestStreams { + if logServiceNameDiscovery { level.Debug(logger).Log( "msg", "push request stream before service name discovery", "labels", beforeServiceName, @@ -374,10 +402,15 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS pushStats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64) } + // These two variables are used to track the most recent entry timestamp and the size of the stream. + // They are only used when logPushRequestStreams is true. + mostRecentEntryTimestamp := time.Time{} + streamSizeBytes := int64(0) for _, e := range s.Entries { pushStats.PolicyNumLines[policy]++ entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata)) pushStats.LogLinesBytes[policy][retentionPeriod] += int64(len(e.Line)) + streamSizeBytes += int64(len(e.Line)) + entryLabelsSize pushStats.StructuredMetadataBytes[policy][retentionPeriod] += entryLabelsSize totalBytesReceived += int64(len(e.Line)) totalBytesReceived += entryLabelsSize @@ -385,6 +418,16 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS if e.Timestamp.After(pushStats.MostRecentEntryTimestamp) { pushStats.MostRecentEntryTimestamp = e.Timestamp } + + if e.Timestamp.After(mostRecentEntryTimestamp) { + mostRecentEntryTimestamp = e.Timestamp + } + } + + // Only populate this map if we are going to log it. + if logPushRequestStreams { + pushStats.MostRecentEntryTimestampPerStream[s.Labels] = mostRecentEntryTimestamp + pushStats.StreamSizeBytes[s.Labels] = streamSizeBytes } if tracker != nil && !pushStats.IsAggregatedMetric { diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 6e2f83c410..38081f1712 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/dskit/flagext" "github.com/grafana/loki/v3/pkg/logproto" + "github.com/grafana/loki/v3/pkg/runtime" util_log "github.com/grafana/loki/v3/pkg/util/log" ) @@ -311,16 +312,17 @@ func TestParseRequest(t *testing.T) { } tracker := NewMockTracker() - data, err := ParseRequest( + data, _, err := ParseRequest( util_log.Logger, "fake", 100<<20, request, test.fakeLimits, + nil, ParseLokiRequest, tracker, streamResolver, - false, + "", ) structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived @@ -452,7 +454,7 @@ func Test_ServiceDetection(t *testing.T) { limits := &fakeLimits{enabled: true, labels: []string{"foo"}} streamResolver := newMockStreamResolver("fake", limits) - data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseLokiRequest, tracker, streamResolver, false) + data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseLokiRequest, tracker, streamResolver, "") require.NoError(t, err) require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) @@ -464,7 +466,7 @@ func Test_ServiceDetection(t *testing.T) { limits := &fakeLimits{enabled: true} streamResolver := newMockStreamResolver("fake", limits) - data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false) + data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "") require.NoError(t, err) require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels) }) @@ -479,7 +481,7 @@ func Test_ServiceDetection(t *testing.T) { indexAttributes: []string{"special"}, } streamResolver := newMockStreamResolver("fake", limits) - data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false) + data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "") require.NoError(t, err) require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels) }) @@ -494,7 +496,7 @@ func Test_ServiceDetection(t *testing.T) { indexAttributes: []string{}, } streamResolver := newMockStreamResolver("fake", limits) - data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false) + data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "") require.NoError(t, err) require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels) }) @@ -596,7 +598,7 @@ func TestNegativeSizeHandling(t *testing.T) { linesIngested.Reset() // Create a custom request parser that will generate negative sizes - var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ int, _ UsageTracker, _ StreamResolver, _ bool, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) { + var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ *runtime.TenantConfigs, _ int, _ UsageTracker, _ StreamResolver, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) { // Create a minimal valid request req := &logproto.PushRequest{ Streams: []logproto.Stream{ @@ -635,16 +637,17 @@ func TestNegativeSizeHandling(t *testing.T) { streamResolver := newMockStreamResolver("fake", &fakeLimits{}) // This should not panic with our guard clauses in place - _, err := ParseRequest( + _, _, err := ParseRequest( util_log.Logger, "fake", 100<<20, request, &fakeLimits{}, + nil, mockParser, NewMockTracker(), streamResolver, - false, + "", ) // No error should be returned diff --git a/pkg/runtime/config.go b/pkg/runtime/config.go index 1655789dae..04b7f00231 100644 --- a/pkg/runtime/config.go +++ b/pkg/runtime/config.go @@ -2,14 +2,19 @@ package runtime import ( "flag" + + "github.com/grafana/dskit/flagext" ) type Config struct { - LogStreamCreation bool `yaml:"log_stream_creation"` - LogPushRequest bool `yaml:"log_push_request"` - LogPushRequestStreams bool `yaml:"log_push_request_streams"` - LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"` - LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"` + LogStreamCreation bool `yaml:"log_stream_creation"` + LogPushRequest bool `yaml:"log_push_request"` + LogHashOfLabels bool `yaml:"log_hash_of_labels"` + LogPushRequestStreams bool `yaml:"log_push_request_streams"` + FilterPushRequestStreamsIPs []string `yaml:"filter_push_request_streams_ips"` + LogServiceNameDiscovery bool `yaml:"log_service_name_discovery"` + LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"` + LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"` // LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace. LimitedLogPushErrors bool `yaml:"limited_log_push_errors"` @@ -19,7 +24,10 @@ type Config struct { func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.LogStreamCreation, "operation-config.log-stream-creation", false, "Log every new stream created by a push request (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogPushRequest, "operation-config.log-push-request", false, "Log every push request (very verbose, recommend to enable via runtime config only).") + f.BoolVar(&cfg.LogHashOfLabels, "operation-config.log-hash-of-labels", false, "Log a commutative hash of the labels for all streams in a push request. In some cases this can potentially be used as an identifier of the agent sending the stream. Calculating hashes is epensive so only enable as needed.") f.BoolVar(&cfg.LogPushRequestStreams, "operation-config.log-push-request-streams", false, "Log every stream in a push request (very verbose, recommend to enable via runtime config only).") + f.Var((*flagext.StringSlice)(&cfg.FilterPushRequestStreamsIPs), "operation-config.filter-push-request-streams-ips", "Only show streams that match a provided IP address, LogPushRequestStreams must be enabled. Can be used multiple times to filter by multiple IPs.") + f.BoolVar(&cfg.LogServiceNameDiscovery, "operation-config.log-service-name-discovery", false, "Log service name discovery (very verbose, recommend to enable via runtime config only).") f.BoolVar(&cfg.LogDuplicateMetrics, "operation-config.log-duplicate-metrics", false, "Log metrics for duplicate lines received.") f.BoolVar(&cfg.LogDuplicateStreamInfo, "operation-config.log-duplicate-stream-info", false, "Log stream info for duplicate lines received") f.BoolVar(&cfg.LimitedLogPushErrors, "operation-config.limited-log-push-errors", true, "Log push errors with a rate limited logger, will show client push errors without overly spamming logs.") @@ -94,10 +102,22 @@ func (o *TenantConfigs) LogPushRequest(userID string) bool { return o.getOverridesForUser(userID).LogPushRequest } +func (o *TenantConfigs) LogHashOfLabels(userID string) bool { + return o.getOverridesForUser(userID).LogHashOfLabels +} + func (o *TenantConfigs) LogPushRequestStreams(userID string) bool { return o.getOverridesForUser(userID).LogPushRequestStreams } +func (o *TenantConfigs) FilterPushRequestStreamsIPs(userID string) []string { + return o.getOverridesForUser(userID).FilterPushRequestStreamsIPs +} + +func (o *TenantConfigs) LogServiceNameDiscovery(userID string) bool { + return o.getOverridesForUser(userID).LogServiceNameDiscovery +} + func (o *TenantConfigs) LogDuplicateMetrics(userID string) bool { return o.getOverridesForUser(userID).LogDuplicateMetrics }