chore: improve logging of push request streams (#17872)

Signed-off-by: Ed Welch <ed@edjusted.com>
Co-authored-by: Paul Rogers <129207811+paul1r@users.noreply.github.com>
pull/17694/head
Ed Welch 7 months ago committed by GitHub
parent da186e59b9
commit ad0bef31e7
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      clients/pkg/promtail/targets/lokipush/pushtarget.go
  2. 16
      docs/sources/shared/configuration.md
  3. 46
      pkg/distributor/http.go
  4. 3
      pkg/distributor/http_test.go
  5. 48
      pkg/loghttp/push/otlp.go
  6. 42
      pkg/loghttp/push/otlp_test.go
  7. 97
      pkg/loghttp/push/push.go
  8. 21
      pkg/loghttp/push/push_test.go
  9. 30
      pkg/runtime/config.go

@ -116,7 +116,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handleLoki(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, t.config.MaxSendMsgSize, r, push.EmptyLimits{}, push.ParseLokiRequest, nil, nil, false)
req, _, err := push.ParseRequest(logger, userID, t.config.MaxSendMsgSize, r, push.EmptyLimits{}, nil, push.ParseLokiRequest, nil, nil, "")
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)

@ -4431,11 +4431,27 @@ These are values which allow you to control aspects of Loki's operation, most co
# CLI flag: -operation-config.log-push-request
[log_push_request: <boolean> | default = false]
# Log a commutative hash of the labels for all streams in a push request. In
# some cases this can potentially be used as an identifier of the agent sending
# the stream. Calculating hashes is epensive so only enable as needed.
# CLI flag: -operation-config.log-hash-of-labels
[log_hash_of_labels: <boolean> | default = false]
# Log every stream in a push request (very verbose, recommend to enable via
# runtime config only).
# CLI flag: -operation-config.log-push-request-streams
[log_push_request_streams: <boolean> | default = false]
# Only show streams that match a provided IP address, LogPushRequestStreams must
# be enabled. Can be used multiple times to filter by multiple IPs.
# CLI flag: -operation-config.filter-push-request-streams-ips
[filter_push_request_streams_ips: <list of strings> | default = []]
# Log service name discovery (very verbose, recommend to enable via runtime
# config only).
# CLI flag: -operation-config.log-service-name-discovery
[log_service_name_discovery: <boolean> | default = false]
# Log metrics for duplicate lines received.
# CLI flag: -operation-config.log-duplicate-metrics
[log_duplicate_metrics: <boolean> | default = false]

@ -4,8 +4,11 @@ import (
"errors"
"fmt"
"net/http"
"slices"
"strings"
"time"
"github.com/dustin/go-humanize"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
@ -45,7 +48,9 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
streamResolver := newRequestScopedStreamResolver(tenantID, d.validator.Limits, logger)
logPushRequestStreams := d.tenantConfigs.LogPushRequestStreams(tenantID)
req, err := push.ParseRequest(logger, tenantID, d.cfg.MaxRecvMsgSize, r, d.validator.Limits, pushRequestParser, d.usageTracker, streamResolver, logPushRequestStreams)
filterPushRequestStreamsIPs := d.tenantConfigs.FilterPushRequestStreamsIPs(tenantID)
presumedAgentIP := extractPresumedAgentIP(r)
req, pushStats, err := push.ParseRequest(logger, tenantID, d.cfg.MaxRecvMsgSize, r, d.validator.Limits, d.tenantConfigs, pushRequestParser, d.usageTracker, streamResolver, presumedAgentIP)
if err != nil {
switch {
case errors.Is(err, push.ErrRequestBodyTooLarge):
@ -102,14 +107,33 @@ func (d *Distributor) pushHandler(w http.ResponseWriter, r *http.Request, pushRe
}
if logPushRequestStreams {
var sb strings.Builder
for _, s := range req.Streams {
sb.WriteString(s.Labels)
shouldLog := true
if len(filterPushRequestStreamsIPs) > 0 {
// if there are filter IP's set, we only want to log if the presumed agent IP is in the list
// this would also then exclude any requests that don't have a presumed agent IP
shouldLog = slices.Contains(filterPushRequestStreamsIPs, presumedAgentIP)
}
if shouldLog {
for _, s := range req.Streams {
logValues := []interface{}{
"msg", "push request streams",
"stream", s.Labels,
"streamLabelsHash", util.HashedQuery(s.Labels), // this is to make it easier to do searching and grouping
"streamSizeBytes", humanize.Bytes(uint64(pushStats.StreamSizeBytes[s.Labels])),
}
if timestamp, ok := pushStats.MostRecentEntryTimestampPerStream[s.Labels]; ok {
logValues = append(logValues, "mostRecentLagMs", time.Since(timestamp).Milliseconds())
}
if presumedAgentIP != "" {
logValues = append(logValues, "presumedAgentIp", presumedAgentIP)
}
if pushStats.HashOfAllStreams != 0 {
logValues = append(logValues, "hashOfAllStreams", pushStats.HashOfAllStreams)
}
level.Debug(logger).Log(logValues...)
}
}
level.Debug(logger).Log(
"msg", "push request streams",
"streams", sb.String(),
)
}
_, err = d.PushWithResolver(r.Context(), req, streamResolver)
@ -170,3 +194,9 @@ func (d *Distributor) ServeHTTP(w http.ResponseWriter, r *http.Request) {
</html>`
util.WriteHTMLResponse(w, noRingPage)
}
func extractPresumedAgentIP(r *http.Request) string {
// X-Forwarded-For header may have 2 or more comma-separated addresses: the 2nd (and additional) are typically appended by proxies which handled the traffic.
// Therefore, if the header is included, only log the first address
return strings.Split(r.Header.Get("X-Forwarded-For"), ",")[0]
}

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/dskit/flagext"
"github.com/stretchr/testify/require"
@ -171,10 +172,10 @@ func (p *fakeParser) parseRequest(
_ string,
_ *http.Request,
_ push.Limits,
_ *runtime.TenantConfigs,
_ int,
_ push.UsageTracker,
_ push.StreamResolver,
_ bool,
_ log.Logger,
) (*logproto.PushRequest, *push.Stats, error) {
return &logproto.PushRequest{}, &push.Stats{}, p.parseErr

@ -24,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"
loki_util "github.com/grafana/loki/v3/pkg/util"
)
@ -38,14 +39,14 @@ const (
messageSizeLargerErrFmt = "%w than max (%d vs %d)"
)
func ParseOTLPRequest(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
stats := NewPushStats()
otlpLogs, err := extractLogs(r, maxRecvMsgSize, stats)
if err != nil {
return nil, nil, err
}
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, limits.OTLPConfig(userID), limits.DiscoverServiceName(userID), tracker, stats, logPushRequestStreams, logger, streamResolver)
req := otlpToLokiPushRequest(r.Context(), otlpLogs, userID, limits.OTLPConfig(userID), tenantConfigs, limits.DiscoverServiceName(userID), tracker, stats, logger, streamResolver)
return req, stats, nil
}
@ -104,7 +105,7 @@ func extractLogs(r *http.Request, maxRecvMsgSize int, pushStats *Stats) (plog.Lo
return req.Logs(), nil
}
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otlpConfig OTLPConfig, discoverServiceName []string, tracker UsageTracker, stats *Stats, logPushRequestStreams bool, logger log.Logger, streamResolver StreamResolver) *logproto.PushRequest {
func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otlpConfig OTLPConfig, tenantConfigs *runtime.TenantConfigs, discoverServiceName []string, tracker UsageTracker, stats *Stats, logger log.Logger, streamResolver StreamResolver) *logproto.PushRequest {
if ld.LogRecordCount() == 0 {
return &logproto.PushRequest{}
}
@ -115,6 +116,14 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
// Track if request used the Loki OTLP exporter label
var usingLokiExporter bool
logServiceNameDiscovery := false
logPushRequestStreams := false
if tenantConfigs != nil {
logServiceNameDiscovery = tenantConfigs.LogServiceNameDiscovery(userID)
logPushRequestStreams = tenantConfigs.LogPushRequestStreams(userID)
}
mostRecentEntryTimestamp := time.Time{}
for i := 0; i < rls.Len(); i++ {
sls := rls.At(i).ScopeLogs()
res := rls.At(i).Resource()
@ -123,7 +132,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
resourceAttributesAsStructuredMetadata := make(push.LabelsAdapter, 0, resAttrs.Len())
streamLabels := make(model.LabelSet, 30) // we have a default labels limit of 30 so just initialize the map of same size
var pushedLabels model.LabelSet
if logPushRequestStreams {
if logServiceNameDiscovery {
pushedLabels = make(model.LabelSet, 30)
}
@ -142,7 +151,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
streamLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if logPushRequestStreams && pushedLabels != nil {
if logServiceNameDiscovery && pushedLabels != nil {
pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
@ -168,7 +177,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
}
// this must be pushed to the end after log lines are also evaluated
if logPushRequestStreams {
if logServiceNameDiscovery {
var sb strings.Builder
sb.WriteString("{")
labels := make([]string, 0, len(pushedLabels))
@ -283,7 +292,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
log := logs.At(k)
// Use the existing function that already handles log attributes properly
logLabels, entry := otlpLogToPushEntry(log, otlpConfig, logPushRequestStreams, pushedLabels)
logLabels, entry := otlpLogToPushEntry(log, otlpConfig, logServiceNameDiscovery, pushedLabels)
if entry.Timestamp.After(mostRecentEntryTimestamp) {
mostRecentEntryTimestamp = entry.Timestamp
}
// Combine resource labels with log labels if any log attributes were indexed
var entryLabelsStr string
@ -366,6 +378,8 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
}
}
stats.MostRecentEntryTimestamp = mostRecentEntryTimestamp
pr := &push.PushRequest{
Streams: make([]push.Stream, 0, len(pushRequestsByStream)),
}
@ -375,6 +389,20 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
if len(stream.Entries) > 0 || len(stream.Labels) > 0 {
pr.Streams = append(pr.Streams, stream)
}
if logPushRequestStreams {
mostRecentEntryTimestamp := time.Time{}
streamSizeBytes := int64(0)
// It's difficult to calculate these values inline when we process the payload because promotion of resource attributes or log attributes to labels can change the stream with each entry.
// So for simplicity and because this logging is typically disabled, we iterate on the entries to calculate these values here.
for _, entry := range stream.Entries {
streamSizeBytes += int64(len(entry.Line)) + int64(loki_util.StructuredMetadataSize(entry.StructuredMetadata))
if entry.Timestamp.After(mostRecentEntryTimestamp) {
mostRecentEntryTimestamp = entry.Timestamp
}
}
stats.MostRecentEntryTimestampPerStream[stream.Labels] = mostRecentEntryTimestamp
stats.StreamSizeBytes[stream.Labels] = streamSizeBytes
}
}
// Increment exporter streams metric once per request if seen
@ -386,7 +414,7 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl
}
// otlpLogToPushEntry converts an OTLP log record to a Loki push.Entry.
func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushRequestStreams bool, pushedLabels model.LabelSet) (model.LabelSet, push.Entry) {
func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logServiceNameDiscovery bool, pushedLabels model.LabelSet) (model.LabelSet, push.Entry) {
// copy log attributes and all the fields from log(except log.Body) to structured metadata
logAttrs := log.Attributes()
structuredMetadata := make(push.LabelsAdapter, 0, logAttrs.Len()+7)
@ -406,7 +434,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushReques
if action == IndexLabel {
for _, lbl := range attributeAsLabels {
logLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
if logPushRequestStreams && pushedLabels != nil {
if logServiceNameDiscovery && pushedLabels != nil {
pushedLabels[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
}
@ -433,7 +461,7 @@ func otlpLogToPushEntry(log plog.LogRecord, otlpConfig OTLPConfig, logPushReques
// Add severity_text as an index label if configured
if otlpConfig.SeverityTextAsLabel {
logLabels[model.LabelName(OTLPSeverityText)] = model.LabelValue(severityText)
if logPushRequestStreams && pushedLabels != nil {
if logServiceNameDiscovery && pushedLabels != nil {
pushedLabels[model.LabelName(OTLPSeverityText)] = model.LabelValue(severityText)
}
}

@ -111,8 +111,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
time.Hour: nil,
},
},
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
{
@ -158,8 +160,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
time.Hour: nil,
},
},
StreamLabelsSize: 27,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 27,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
{
@ -205,8 +209,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
time.Hour: nil,
},
},
StreamLabelsSize: 47,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 47,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
{
@ -295,8 +301,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
},
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
{
@ -394,8 +402,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
},
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 21,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
{
@ -554,8 +564,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
},
},
},
StreamLabelsSize: 42,
MostRecentEntryTimestamp: now,
StreamLabelsSize: 42,
MostRecentEntryTimestamp: now,
StreamSizeBytes: map[string]int64{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
},
},
} {
@ -575,10 +587,10 @@ func TestOTLPToLokiPushRequest(t *testing.T) {
tc.generateLogs(),
"foo",
tc.otlpConfig,
nil,
defaultServiceDetection,
tracker,
stats,
false,
log.NewNopLogger(),
streamResolver,
)
@ -911,10 +923,10 @@ func TestOTLPLogAttributesAsIndexLabels(t *testing.T) {
generateLogs(),
"test-user",
customOTLPConfig,
nil,
[]string{}, // No service name discovery needed
tracker,
stats,
false,
log.NewNopLogger(),
streamResolver,
)
@ -1031,10 +1043,10 @@ func TestOTLPSeverityTextAsLabel(t *testing.T) {
generateLogs(),
"test-user",
customOTLPConfig,
nil,
[]string{}, // No service name discovery needed
tracker,
stats,
false,
log.NewNopLogger(),
streamResolver,
)

@ -30,6 +30,7 @@ import (
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/syntax"
"github.com/grafana/loki/v3/pkg/runtime"
"github.com/grafana/loki/v3/pkg/util"
loki_util "github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
@ -111,7 +112,7 @@ type StreamResolver interface {
}
type (
RequestParser func(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error)
RequestParserWrapper func(inner RequestParser) RequestParser
ErrorWriter func(w http.ResponseWriter, errorStr string, code int, logger log.Logger)
)
@ -120,23 +121,28 @@ type PolicyWithRetentionWithBytes map[string]map[time.Duration]int64
func NewPushStats() *Stats {
return &Stats{
LogLinesBytes: map[string]map[time.Duration]int64{},
StructuredMetadataBytes: map[string]map[time.Duration]int64{},
PolicyNumLines: map[string]int64{},
ResourceAndSourceMetadataLabels: map[string]map[time.Duration]push.LabelsAdapter{},
LogLinesBytes: map[string]map[time.Duration]int64{},
StructuredMetadataBytes: map[string]map[time.Duration]int64{},
PolicyNumLines: map[string]int64{},
ResourceAndSourceMetadataLabels: map[string]map[time.Duration]push.LabelsAdapter{},
MostRecentEntryTimestampPerStream: map[string]time.Time{},
StreamSizeBytes: map[string]int64{},
}
}
type Stats struct {
Errs []error
PolicyNumLines map[string]int64
LogLinesBytes PolicyWithRetentionWithBytes
StructuredMetadataBytes PolicyWithRetentionWithBytes
ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter
StreamLabelsSize int64
MostRecentEntryTimestamp time.Time
ContentType string
ContentEncoding string
Errs []error
PolicyNumLines map[string]int64
LogLinesBytes PolicyWithRetentionWithBytes
StructuredMetadataBytes PolicyWithRetentionWithBytes
ResourceAndSourceMetadataLabels map[string]map[time.Duration]push.LabelsAdapter
StreamLabelsSize int64
MostRecentEntryTimestamp time.Time
MostRecentEntryTimestampPerStream map[string]time.Time
StreamSizeBytes map[string]int64
HashOfAllStreams uint64
ContentType string
ContentEncoding string
BodySize int64
// Extra is a place for a wrapped perser to record any interesting stats as key-value pairs to be logged
@ -145,13 +151,13 @@ type Stats struct {
IsAggregatedMetric bool
}
func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.Request, limits Limits, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool) (*logproto.PushRequest, error) {
req, pushStats, err := pushRequestParser(userID, r, limits, maxRecvMsgSize, tracker, streamResolver, logPushRequestStreams, logger)
func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP string) (*logproto.PushRequest, *Stats, error) {
req, pushStats, err := pushRequestParser(userID, r, limits, tenantConfigs, maxRecvMsgSize, tracker, streamResolver, logger)
if err != nil && !errors.Is(err, ErrAllLogsFiltered) {
if errors.Is(err, loki_util.ErrMessageSizeTooLarge) {
return nil, fmt.Errorf("%w: %s", ErrRequestBodyTooLarge, err.Error())
return nil, nil, fmt.Errorf("%w: %s", ErrRequestBodyTooLarge, err.Error())
}
return nil, err
return nil, nil, err
}
var (
@ -231,11 +237,8 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
"mostRecentLagMs", time.Since(pushStats.MostRecentEntryTimestamp).Milliseconds(),
}
// X-Forwarded-For header may have 2 or more comma-separated addresses: the 2nd (and additional) are typically appended by proxies which handled the traffic.
// Therefore, if the header is included, only log the first address
agentIP := strings.Split(r.Header.Get("X-Forwarded-For"), ",")[0]
if agentIP != "" {
logValues = append(logValues, "presumedAgentIp", strings.TrimSpace(agentIP))
if presumedAgentIP != "" {
logValues = append(logValues, "presumedAgentIp", presumedAgentIP)
}
userAgent := r.Header.Get("User-Agent")
@ -243,13 +246,30 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, r *http.
logValues = append(logValues, "userAgent", strings.TrimSpace(userAgent))
}
if tenantConfigs != nil && tenantConfigs.LogHashOfLabels(userID) {
resultHash := uint64(0)
for _, stream := range req.Streams {
// I don't believe a hash will be set, but if it is, use it.
hash := stream.Hash
if hash == 0 {
// calculate an fnv32 hash of the stream labels
// reusing our query hash function for simplicity
hash = uint64(util.HashedQuery(stream.Labels))
}
// xor the hash with the result hash, this will result in the same hash regardless of the order of the streams
resultHash ^= hash
}
logValues = append(logValues, "hashOfLabels", resultHash)
pushStats.HashOfAllStreams = resultHash
}
logValues = append(logValues, pushStats.Extra...)
level.Debug(logger).Log(logValues...)
return req, err
return req, pushStats, err
}
func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logPushRequestStreams bool, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) {
// Body
var body io.Reader
// bodySize should always reflect the compressed size of the request body
@ -319,6 +339,14 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS
pushStats.ContentEncoding = contentEncoding
discoverServiceName := limits.DiscoverServiceName(userID)
logServiceNameDiscovery := false
logPushRequestStreams := false
if tenantConfigs != nil {
logServiceNameDiscovery = tenantConfigs.LogServiceNameDiscovery(userID)
logPushRequestStreams = tenantConfigs.LogPushRequestStreams(userID)
}
for i := range req.Streams {
s := req.Streams[i]
pushStats.StreamLabelsSize += int64(len(s.Labels))
@ -333,7 +361,7 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS
}
var beforeServiceName string
if logPushRequestStreams {
if logServiceNameDiscovery {
beforeServiceName = lbs.String()
}
@ -351,7 +379,7 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS
s.Labels = lbs.String()
}
if logPushRequestStreams {
if logServiceNameDiscovery {
level.Debug(logger).Log(
"msg", "push request stream before service name discovery",
"labels", beforeServiceName,
@ -374,10 +402,15 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS
pushStats.StructuredMetadataBytes[policy] = make(map[time.Duration]int64)
}
// These two variables are used to track the most recent entry timestamp and the size of the stream.
// They are only used when logPushRequestStreams is true.
mostRecentEntryTimestamp := time.Time{}
streamSizeBytes := int64(0)
for _, e := range s.Entries {
pushStats.PolicyNumLines[policy]++
entryLabelsSize := int64(util.StructuredMetadataSize(e.StructuredMetadata))
pushStats.LogLinesBytes[policy][retentionPeriod] += int64(len(e.Line))
streamSizeBytes += int64(len(e.Line)) + entryLabelsSize
pushStats.StructuredMetadataBytes[policy][retentionPeriod] += entryLabelsSize
totalBytesReceived += int64(len(e.Line))
totalBytesReceived += entryLabelsSize
@ -385,6 +418,16 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, maxRecvMsgS
if e.Timestamp.After(pushStats.MostRecentEntryTimestamp) {
pushStats.MostRecentEntryTimestamp = e.Timestamp
}
if e.Timestamp.After(mostRecentEntryTimestamp) {
mostRecentEntryTimestamp = e.Timestamp
}
}
// Only populate this map if we are going to log it.
if logPushRequestStreams {
pushStats.MostRecentEntryTimestampPerStream[s.Labels] = mostRecentEntryTimestamp
pushStats.StreamSizeBytes[s.Labels] = streamSizeBytes
}
if tracker != nil && !pushStats.IsAggregatedMetric {

@ -25,6 +25,7 @@ import (
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/runtime"
util_log "github.com/grafana/loki/v3/pkg/util/log"
)
@ -311,16 +312,17 @@ func TestParseRequest(t *testing.T) {
}
tracker := NewMockTracker()
data, err := ParseRequest(
data, _, err := ParseRequest(
util_log.Logger,
"fake",
100<<20,
request,
test.fakeLimits,
nil,
ParseLokiRequest,
tracker,
streamResolver,
false,
"",
)
structuredMetadataBytesReceived := int(structuredMetadataBytesReceivedStats.Value()["total"].(int64)) - previousStructuredMetadataBytesReceived
@ -452,7 +454,7 @@ func Test_ServiceDetection(t *testing.T) {
limits := &fakeLimits{enabled: true, labels: []string{"foo"}}
streamResolver := newMockStreamResolver("fake", limits)
data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseLokiRequest, tracker, streamResolver, false)
data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseLokiRequest, tracker, streamResolver, "")
require.NoError(t, err)
require.Equal(t, labels.FromStrings("foo", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
@ -464,7 +466,7 @@ func Test_ServiceDetection(t *testing.T) {
limits := &fakeLimits{enabled: true}
streamResolver := newMockStreamResolver("fake", limits)
data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false)
data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "")
require.NoError(t, err)
require.Equal(t, labels.FromStrings("k8s_job_name", "bar", LabelServiceName, "bar").String(), data.Streams[0].Labels)
})
@ -479,7 +481,7 @@ func Test_ServiceDetection(t *testing.T) {
indexAttributes: []string{"special"},
}
streamResolver := newMockStreamResolver("fake", limits)
data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false)
data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "")
require.NoError(t, err)
require.Equal(t, labels.FromStrings("special", "sauce", LabelServiceName, "sauce").String(), data.Streams[0].Labels)
})
@ -494,7 +496,7 @@ func Test_ServiceDetection(t *testing.T) {
indexAttributes: []string{},
}
streamResolver := newMockStreamResolver("fake", limits)
data, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, ParseOTLPRequest, tracker, streamResolver, false)
data, _, err := ParseRequest(util_log.Logger, "fake", 100<<20, request, limits, nil, ParseOTLPRequest, tracker, streamResolver, "")
require.NoError(t, err)
require.Equal(t, labels.FromStrings(LabelServiceName, ServiceUnknown).String(), data.Streams[0].Labels)
})
@ -596,7 +598,7 @@ func TestNegativeSizeHandling(t *testing.T) {
linesIngested.Reset()
// Create a custom request parser that will generate negative sizes
var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ int, _ UsageTracker, _ StreamResolver, _ bool, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) {
var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ *runtime.TenantConfigs, _ int, _ UsageTracker, _ StreamResolver, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) {
// Create a minimal valid request
req := &logproto.PushRequest{
Streams: []logproto.Stream{
@ -635,16 +637,17 @@ func TestNegativeSizeHandling(t *testing.T) {
streamResolver := newMockStreamResolver("fake", &fakeLimits{})
// This should not panic with our guard clauses in place
_, err := ParseRequest(
_, _, err := ParseRequest(
util_log.Logger,
"fake",
100<<20,
request,
&fakeLimits{},
nil,
mockParser,
NewMockTracker(),
streamResolver,
false,
"",
)
// No error should be returned

@ -2,14 +2,19 @@ package runtime
import (
"flag"
"github.com/grafana/dskit/flagext"
)
type Config struct {
LogStreamCreation bool `yaml:"log_stream_creation"`
LogPushRequest bool `yaml:"log_push_request"`
LogPushRequestStreams bool `yaml:"log_push_request_streams"`
LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"`
LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"`
LogStreamCreation bool `yaml:"log_stream_creation"`
LogPushRequest bool `yaml:"log_push_request"`
LogHashOfLabels bool `yaml:"log_hash_of_labels"`
LogPushRequestStreams bool `yaml:"log_push_request_streams"`
FilterPushRequestStreamsIPs []string `yaml:"filter_push_request_streams_ips"`
LogServiceNameDiscovery bool `yaml:"log_service_name_discovery"`
LogDuplicateMetrics bool `yaml:"log_duplicate_metrics"`
LogDuplicateStreamInfo bool `yaml:"log_duplicate_stream_info"`
// LimitedLogPushErrors is to be implemented and will allow logging push failures at a controlled pace.
LimitedLogPushErrors bool `yaml:"limited_log_push_errors"`
@ -19,7 +24,10 @@ type Config struct {
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.LogStreamCreation, "operation-config.log-stream-creation", false, "Log every new stream created by a push request (very verbose, recommend to enable via runtime config only).")
f.BoolVar(&cfg.LogPushRequest, "operation-config.log-push-request", false, "Log every push request (very verbose, recommend to enable via runtime config only).")
f.BoolVar(&cfg.LogHashOfLabels, "operation-config.log-hash-of-labels", false, "Log a commutative hash of the labels for all streams in a push request. In some cases this can potentially be used as an identifier of the agent sending the stream. Calculating hashes is epensive so only enable as needed.")
f.BoolVar(&cfg.LogPushRequestStreams, "operation-config.log-push-request-streams", false, "Log every stream in a push request (very verbose, recommend to enable via runtime config only).")
f.Var((*flagext.StringSlice)(&cfg.FilterPushRequestStreamsIPs), "operation-config.filter-push-request-streams-ips", "Only show streams that match a provided IP address, LogPushRequestStreams must be enabled. Can be used multiple times to filter by multiple IPs.")
f.BoolVar(&cfg.LogServiceNameDiscovery, "operation-config.log-service-name-discovery", false, "Log service name discovery (very verbose, recommend to enable via runtime config only).")
f.BoolVar(&cfg.LogDuplicateMetrics, "operation-config.log-duplicate-metrics", false, "Log metrics for duplicate lines received.")
f.BoolVar(&cfg.LogDuplicateStreamInfo, "operation-config.log-duplicate-stream-info", false, "Log stream info for duplicate lines received")
f.BoolVar(&cfg.LimitedLogPushErrors, "operation-config.limited-log-push-errors", true, "Log push errors with a rate limited logger, will show client push errors without overly spamming logs.")
@ -94,10 +102,22 @@ func (o *TenantConfigs) LogPushRequest(userID string) bool {
return o.getOverridesForUser(userID).LogPushRequest
}
func (o *TenantConfigs) LogHashOfLabels(userID string) bool {
return o.getOverridesForUser(userID).LogHashOfLabels
}
func (o *TenantConfigs) LogPushRequestStreams(userID string) bool {
return o.getOverridesForUser(userID).LogPushRequestStreams
}
func (o *TenantConfigs) FilterPushRequestStreamsIPs(userID string) []string {
return o.getOverridesForUser(userID).FilterPushRequestStreamsIPs
}
func (o *TenantConfigs) LogServiceNameDiscovery(userID string) bool {
return o.getOverridesForUser(userID).LogServiceNameDiscovery
}
func (o *TenantConfigs) LogDuplicateMetrics(userID string) bool {
return o.getOverridesForUser(userID).LogDuplicateMetrics
}

Loading…
Cancel
Save