From bbff4331ead29cbe15dd8e7a970bd754e73a0bd8 Mon Sep 17 00:00:00 2001 From: Assel Meher Date: Tue, 10 Feb 2026 10:34:50 +0100 Subject: [PATCH] fix: reject streams with labels exceeding 16MB (#20719) --- pkg/loghttp/push/otlp.go | 8 ++ pkg/loghttp/push/push.go | 10 +++ pkg/loghttp/push/push_test.go | 136 ++++++++++++++++++++++++++++++++++ 3 files changed, 154 insertions(+) diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index e6aac98b0f..158c5525c2 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -10,6 +10,7 @@ import ( "strings" "time" + "github.com/dustin/go-humanize" "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/klauspost/compress/zstd" @@ -242,6 +243,10 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl } labelsStr := streamLabels.String() + if len(labelsStr) > maxStreamLabelsSize { + return nil, fmt.Errorf("%w: stream labels size %s exceeds limit of %s", ErrRequestBodyTooLarge, humanize.Bytes(uint64(len(labelsStr))), humanize.Bytes(maxStreamLabelsSize)) + } + lbs := modelLabelsSetToLabelsList(streamLabels) totalBytesReceived := int64(0) @@ -370,6 +375,9 @@ func otlpToLokiPushRequest(ctx context.Context, ld plog.Logs, userID string, otl } entryLabelsStr = combinedLabels.String() + if len(entryLabelsStr) > maxStreamLabelsSize { + return nil, fmt.Errorf("%w: stream labels size %s exceeds limit of %s", ErrRequestBodyTooLarge, humanize.Bytes(uint64(len(entryLabelsStr))), humanize.Bytes(maxStreamLabelsSize)) + } entryLbs = modelLabelsSetToLabelsList(combinedLabels) if _, ok := pushRequestsByStream[entryLabelsStr]; !ok { diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 18456badbc..72115ab305 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -79,6 +79,12 @@ const ( applicationJSON = "application/json" LabelServiceName = "service_name" ServiceUnknown = "unknown_service" + + // maxStreamLabelsSize is the maximum allowed size of a single stream's labels string. + // Prometheus' label parser panics when encoding labels that exceed 16MB (2^24 bytes). + // We check the total labels string size per stream before parsing to prevent this panic. + // See: https://github.com/prometheus/prometheus/issues/17993 + maxStreamLabelsSize = 1 << 24 // 16MB ) var ( @@ -409,6 +415,10 @@ func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfi for i := range req.Streams { s := req.Streams[i] + if len(s.Labels) > maxStreamLabelsSize { + return nil, nil, fmt.Errorf("%w: stream labels size %s exceeds limit of %s", ErrRequestBodyTooLarge, humanize.Bytes(uint64(len(s.Labels))), humanize.Bytes(maxStreamLabelsSize)) + } + lbs, err := syntax.ParseLabels(s.Labels) if err != nil { return nil, nil, fmt.Errorf("couldn't parse labels: %w", err) diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index d200b18546..098043083e 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -946,3 +946,139 @@ func (t *MockCustomTracker) DiscardedBytesAdd(_ context.Context, _, _ string, la func (t *MockCustomTracker) ReceivedBytesAdd(_ context.Context, _ string, _ time.Duration, labels labels.Labels, value float64, _ string) { t.receivedBytes[labels.String()] += value } + +// TestRequestParser_StreamLabelsExceed16MB tests that both ParseLokiRequest and +// ParseOTLPRequest reject streams where the total labels string per streamexceeds 16MB +// with ErrRequestBodyTooLarge. +func TestRequestParser_StreamLabelsExceed16MB(t *testing.T) { + const testMaxLabelSize = 1 << 24 // 16MB + + // runParserTest calls the parser and asserts the expected error. + // Pass expectedErr=nil to assert no error, or a sentinel error to assert it's in the chain. + runParserTest := func(t *testing.T, parser RequestParser, request *http.Request, limits *fakeLimits, expectedErr error) { + t.Helper() + if limits == nil { + limits = &fakeLimits{} + } + streamResolver := newMockStreamResolver("fake", limits) + + _, _, err := parser( + "fake", + request, + limits, + nil, // tenantConfigs + 100<<20, // maxRecvMsgSize (100MB - high so transport checks don't trigger first) + 100<<20, // maxDecompressedSize (100MB) + nil, // tracker + streamResolver, + util_log.Logger, + ) + + if expectedErr == nil { + require.NoError(t, err, "parser should not return an error") + } else { + require.Error(t, err, "parser should return an error, not panic") + require.ErrorIs(t, err, expectedErr) + } + } + + t.Run("ParseLokiRequest", func(t *testing.T) { + t.Run("json_single_label_over_16MB", func(t *testing.T) { + longValue := strings.Repeat("a", testMaxLabelSize+1) + jsonBody := fmt.Sprintf(`{"streams": [{ "stream": { "foo": "%s" }, "values": [ [ "1570818238000000000", "test" ] ] }]}`, longValue) + + request := httptest.NewRequest("POST", "/loki/api/v1/push", strings.NewReader(jsonBody)) + request.Header.Add("Content-Type", "application/json") + runParserTest(t, ParseLokiRequest, request, nil, ErrRequestBodyTooLarge) + }) + + t.Run("protobuf_single_label_over_16MB", func(t *testing.T) { + longValue := strings.Repeat("a", testMaxLabelSize+1) + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{foo="%s"}`, longValue), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1570818238000000000), Line: "test"}, + }, + }, + }, + } + request := httptest.NewRequest("POST", "/loki/api/v1/push", strings.NewReader(snappyString(marshalProto(req)))) + request.Header.Add("Content-Type", "application/x-protobuf") + runParserTest(t, ParseLokiRequest, request, nil, ErrRequestBodyTooLarge) + }) + + t.Run("multiple_streams_each_under_16MB_is_ok", func(t *testing.T) { + // Two streams, each with a ~15MB label. Both are individually under 16MB + // so both should succeed, even though combined they exceed 16MB. + value := strings.Repeat("a", 15<<20) + req := &logproto.PushRequest{ + Streams: []logproto.Stream{ + { + Labels: fmt.Sprintf(`{foo="%s"}`, value), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1570818238000000000), Line: "test1"}, + }, + }, + { + Labels: fmt.Sprintf(`{bar="%s"}`, value), + Entries: []logproto.Entry{ + {Timestamp: time.Unix(0, 1570818238000000000), Line: "test2"}, + }, + }, + }, + } + request := httptest.NewRequest("POST", "/loki/api/v1/push", strings.NewReader(snappyString(marshalProto(req)))) + request.Header.Add("Content-Type", "application/x-protobuf") + runParserTest(t, ParseLokiRequest, request, nil, nil) + }) + }) + + t.Run("ParseOTLPRequest", func(t *testing.T) { + t.Run("single_resource_attribute_over_16MB", func(t *testing.T) { + longValue := strings.Repeat("a", testMaxLabelSize+1) + + ld := plog.NewLogs() + rl := ld.ResourceLogs().AppendEmpty() + rl.Resource().Attributes().PutStr("service.name", longValue) + rl.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body") + rl.ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(time.Now().UnixNano())) + + jsonMarshaller := plog.JSONMarshaler{} + body, err := jsonMarshaller.MarshalLogs(ld) + require.NoError(t, err) + + request := httptest.NewRequest("POST", "/otlp/v1/push", bytes.NewReader(body)) + request.Header.Add("Content-Type", "application/json") + runParserTest(t, ParseOTLPRequest, request, nil, ErrRequestBodyTooLarge) + }) + + t.Run("multiple_resource_logs_each_under_16MB_is_ok", func(t *testing.T) { + // Two resource logs, each with a ~15MB attribute as an indexed label. + // Each stream's labels are under 16MB so both should succeed. + value1 := strings.Repeat("a", 15<<20) + value2 := strings.Repeat("b", 15<<20) + + ld := plog.NewLogs() + rl1 := ld.ResourceLogs().AppendEmpty() + rl1.Resource().Attributes().PutStr("label1", value1) + rl1.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body 1") + rl1.ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(time.Now().UnixNano())) + + rl2 := ld.ResourceLogs().AppendEmpty() + rl2.Resource().Attributes().PutStr("label1", value2) + rl2.ScopeLogs().AppendEmpty().LogRecords().AppendEmpty().Body().SetStr("test body 2") + rl2.ScopeLogs().At(0).LogRecords().At(0).SetTimestamp(pcommon.Timestamp(time.Now().UnixNano())) + + jsonMarshaller := plog.JSONMarshaler{} + body, err := jsonMarshaller.MarshalLogs(ld) + require.NoError(t, err) + + limits := &fakeLimits{indexAttributes: []string{"label1"}} + request := httptest.NewRequest("POST", "/otlp/v1/push", bytes.NewReader(body)) + request.Header.Add("Content-Type", "application/json") + runParserTest(t, ParseOTLPRequest, request, limits, nil) + }) + }) +}