diff --git a/.github/workflows/build-loki-binary.yml b/.github/workflows/build-loki-binary.yml new file mode 100644 index 0000000000..5279a853b8 --- /dev/null +++ b/.github/workflows/build-loki-binary.yml @@ -0,0 +1,49 @@ +# Test workflow to verify Loki binaries compile successfully for multiple Linux architectures. +# This ensures PRs don't break cross-platform builds. +name: Test Build Loki Binaries + +on: + pull_request: {} + +permissions: + contents: read + +env: + GO_VERSION: "1.25.5" + +jobs: + build: + name: Test Build ${{ matrix.binary.name }} (${{ matrix.goos }}/${{ matrix.goarch }}) + runs-on: ubuntu-latest + strategy: + fail-fast: false + matrix: + goos: [linux] + goarch: [amd64, arm64, arm] + binary: + - name: loki + path: ./cmd/loki + - name: logcli + path: ./cmd/logcli + - name: querytee + path: ./cmd/querytee + - name: promtail + path: ./clients/cmd/promtail + steps: + - name: Checkout code + uses: actions/checkout@v4 + + - name: Set up Go + uses: actions/setup-go@v5 + with: + go-version: ${{ env.GO_VERSION }} + cache: true + + - name: Build ${{ matrix.binary.name }} + env: + CGO_ENABLED: "0" + GOOS: ${{ matrix.goos }} + GOARCH: ${{ matrix.goarch }} + run: | + echo "Building ${{ matrix.binary.name }} for $GOOS/$GOARCH" + go build -v -o ${{ matrix.binary.name }}-${{ matrix.goos }}-${{ matrix.goarch }} ${{ matrix.binary.path }} diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 9e7fc4bf0a..052de6df41 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -87,8 +87,8 @@ type Config struct { PushWorkerCount int `yaml:"push_worker_count"` // Request parser - MaxRecvMsgSize int `yaml:"max_recv_msg_size"` - MaxDecompressedSize int `yaml:"max_decompressed_size"` + MaxRecvMsgSize int `yaml:"max_recv_msg_size"` + MaxDecompressedSize int64 `yaml:"max_decompressed_size"` // For testing. factory ring_client.PoolFactory `yaml:"-"` @@ -122,7 +122,7 @@ func (cfg *Config) RegisterFlags(fs *flag.FlagSet) { cfg.RateStore.RegisterFlagsWithPrefix("distributor.rate-store", fs) cfg.WriteFailuresLogging.RegisterFlagsWithPrefix("distributor.write-failures-logging", fs) fs.IntVar(&cfg.MaxRecvMsgSize, "distributor.max-recv-msg-size", 100<<20, "The maximum size of a received message.") - fs.IntVar(&cfg.MaxDecompressedSize, "distributor.max-decompressed-size", 5000<<20, "The maximum size of a decompressed message. Defaults to 50x max-recv-msg-size.") + fs.Int64Var(&cfg.MaxDecompressedSize, "distributor.max-decompressed-size", 5000<<20, "The maximum size of a decompressed message. Defaults to 50x max-recv-msg-size.") fs.IntVar(&cfg.PushWorkerCount, "distributor.push-worker-count", 256, "Number of workers to push batches to ingesters.") fs.BoolVar(&cfg.KafkaEnabled, "distributor.kafka-writes-enabled", false, "Enable writes to Kafka during Push requests.") fs.BoolVar(&cfg.IngesterEnabled, "distributor.ingester-writes-enabled", true, "Enable writes to Ingesters during Push requests. Defaults to true.") @@ -139,7 +139,7 @@ func (cfg *Config) Validate() error { } // Set default maxDecompressedSize if not configured (50x maxRecvMsgSize) if cfg.MaxDecompressedSize == 0 && cfg.MaxRecvMsgSize > 0 { - cfg.MaxDecompressedSize = cfg.MaxRecvMsgSize * 50 + cfg.MaxDecompressedSize = int64(cfg.MaxRecvMsgSize) * 50 } return nil } diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 4a11e5d691..774102ba63 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -2701,7 +2701,7 @@ func TestConfig_Validate(t *testing.T) { tests := []struct { name string cfg Config - expectedMaxDecompressedSize int + expectedMaxDecompressedSize int64 expectedError string }{ { diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index 035ef0df34..a64c200706 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -176,7 +176,7 @@ func (p *fakeParser) parseRequest( _ push.Limits, _ *runtime.TenantConfigs, _ int, - _ int, + _ int64, _ push.UsageTracker, _ push.StreamResolver, _ log.Logger, diff --git a/pkg/loghttp/push/otlp.go b/pkg/loghttp/push/otlp.go index 41e230d89f..e6aac98b0f 100644 --- a/pkg/loghttp/push/otlp.go +++ b/pkg/loghttp/push/otlp.go @@ -44,7 +44,7 @@ const ( messageSizeLargerErrFmt = "%w than max (%d vs %d)" ) -func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize, maxDecompressedSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { +func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { stats := NewPushStats() otlpLogs, err := extractLogs(r, maxRecvMsgSize, maxDecompressedSize, stats) if err != nil { @@ -55,7 +55,7 @@ func ParseOTLPRequest(userID string, r *http.Request, limits Limits, tenantConfi return req, stats, err } -func extractLogs(r *http.Request, maxRecvMsgSize, maxDecompressedSize int, pushStats *Stats) (plog.Logs, error) { +func extractLogs(r *http.Request, maxRecvMsgSize int, maxDecompressedSize int64, pushStats *Stats) (plog.Logs, error) { pushStats.ContentEncoding = r.Header.Get(contentEnc) // bodySize should always reflect the compressed size of the request body bodySize := loki_util.NewSizeReader(r.Body) @@ -76,7 +76,7 @@ func extractLogs(r *http.Request, maxRecvMsgSize, maxDecompressedSize int, pushS _ = reader.Close() }(r) if maxDecompressedSize > 0 { - body = io.LimitReader(body, int64(maxDecompressedSize)+1) + body = io.LimitReader(body, maxDecompressedSize+1) } case zstdContentEncoding: @@ -86,12 +86,12 @@ func extractLogs(r *http.Request, maxRecvMsgSize, maxDecompressedSize int, pushS return plog.NewLogs(), err } if maxDecompressedSize > 0 { - body = io.LimitReader(body, int64(maxDecompressedSize)+1) + body = io.LimitReader(body, maxDecompressedSize+1) } case lz4ContentEncoding: body = io.NopCloser(lz4.NewReader(body)) if maxDecompressedSize > 0 { - body = io.LimitReader(body, int64(maxDecompressedSize)+1) + body = io.LimitReader(body, maxDecompressedSize+1) } case "": // no content encoding, use the body as is @@ -108,7 +108,7 @@ func extractLogs(r *http.Request, maxRecvMsgSize, maxDecompressedSize int, pushS return plog.NewLogs(), fmt.Errorf(messageSizeLargerErrFmt, loki_util.ErrMessageSizeTooLarge, size, maxRecvMsgSize) } // Check the size of the decompressed body - if len(buf) > maxDecompressedSize && maxDecompressedSize > 0 { + if int64(len(buf)) > maxDecompressedSize && maxDecompressedSize > 0 { return plog.NewLogs(), fmt.Errorf(messageSizeLargerErrFmt, loki_util.ErrMessageDecompressedSizeTooLarge, len(buf), maxDecompressedSize) } diff --git a/pkg/loghttp/push/otlp_test.go b/pkg/loghttp/push/otlp_test.go index adf1fa82e6..92962e61bb 100644 --- a/pkg/loghttp/push/otlp_test.go +++ b/pkg/loghttp/push/otlp_test.go @@ -1442,7 +1442,7 @@ func TestContentEncodingAndLength(t *testing.T) { expectedErrorMessage string expectedLogs plog.Logs maxRecvMsgSize int - maxDecompressedSize int + maxDecompressedSize int64 }{ { name: "identity_valid_json", @@ -1722,7 +1722,7 @@ func TestContentEncodingAndLength(t *testing.T) { } if maxDecompressedSize == 0 && !zeroMaxDecompressedSizeTests[tc.name] { if tc.maxRecvMsgSize > 0 { - maxDecompressedSize = tc.maxRecvMsgSize * 50 // 50x default + maxDecompressedSize = int64(tc.maxRecvMsgSize) * 50 // 50x default } else { maxDecompressedSize = 5000 << 20 // 5000 MB fallback default (50x 100MB) } diff --git a/pkg/loghttp/push/push.go b/pkg/loghttp/push/push.go index 4a27a83be9..18456badbc 100644 --- a/pkg/loghttp/push/push.go +++ b/pkg/loghttp/push/push.go @@ -118,7 +118,7 @@ type StreamResolver interface { } type ( - RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize, maxDecompressedSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) + RequestParser func(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) RequestParserWrapper func(inner RequestParser) RequestParser ErrorWriter func(w http.ResponseWriter, errorStr string, code int, logger log.Logger) ) @@ -171,7 +171,7 @@ type Stats struct { HasInternalStreams bool // True if any of the streams has aggregated metrics or is a pattern stream } -func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize, maxDecompressedSize int, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP, format string) (*logproto.PushRequest, *Stats, error) { +func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize int, maxDecompressedSize int64, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, pushRequestParser RequestParser, tracker UsageTracker, streamResolver StreamResolver, presumedAgentIP, format string) (*logproto.PushRequest, *Stats, error) { req, pushStats, err := pushRequestParser(userID, r, limits, tenantConfigs, maxRecvMsgSize, maxDecompressedSize, tracker, streamResolver, logger) if err != nil && !errors.Is(err, ErrAllLogsFiltered) { if errors.Is(err, util.ErrMessageSizeTooLarge) { @@ -304,7 +304,7 @@ func ParseRequest(logger log.Logger, userID string, maxRecvMsgSize, maxDecompres // parsePushRequestBody returns logproto.PushRequest from http.Request body, deserialized according to specified content type. // It also modifies pushStats. -func parsePushRequestBody(r *http.Request, maxRecvMsgSize, maxDecompressedSize int, pushStats *Stats) (*logproto.PushRequest, error) { +func parsePushRequestBody(r *http.Request, maxRecvMsgSize int, maxDecompressedSize int64, pushStats *Stats) (*logproto.PushRequest, error) { // Body var body io.Reader // bodySize should always reflect the compressed size of the request body @@ -333,7 +333,7 @@ func parsePushRequestBody(r *http.Request, maxRecvMsgSize, maxDecompressedSize i }(gzipReader) body = gzipReader if maxDecompressedSize > 0 { - body = io.LimitReader(body, int64(maxDecompressedSize)+1) + body = io.LimitReader(body, maxDecompressedSize+1) } case "deflate": flateReader := flate.NewReader(body) @@ -342,7 +342,7 @@ func parsePushRequestBody(r *http.Request, maxRecvMsgSize, maxDecompressedSize i }(flateReader) body = flateReader if maxDecompressedSize > 0 { - body = io.LimitReader(body, int64(maxDecompressedSize)+1) + body = io.LimitReader(body, maxDecompressedSize+1) } default: return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding) @@ -391,7 +391,7 @@ func parsePushRequestBody(r *http.Request, maxRecvMsgSize, maxDecompressedSize i return &req, nil } -func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize, maxDecompressedSize int, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { +func ParseLokiRequest(userID string, r *http.Request, limits Limits, tenantConfigs *runtime.TenantConfigs, maxRecvMsgSize int, maxDecompressedSize int64, tracker UsageTracker, streamResolver StreamResolver, logger log.Logger) (*logproto.PushRequest, *Stats, error) { pushStats := NewPushStats() req, err := parsePushRequestBody(r, maxRecvMsgSize, maxDecompressedSize, pushStats) diff --git a/pkg/loghttp/push/push_test.go b/pkg/loghttp/push/push_test.go index 88879f18ea..d200b18546 100644 --- a/pkg/loghttp/push/push_test.go +++ b/pkg/loghttp/push/push_test.go @@ -655,7 +655,7 @@ func TestNegativeSizeHandling(t *testing.T) { linesIngested.Reset() // Create a custom request parser that will generate negative sizes - var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ *runtime.TenantConfigs, _ int, _ int, _ UsageTracker, _ StreamResolver, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) { + var mockParser RequestParser = func(_ string, _ *http.Request, _ Limits, _ *runtime.TenantConfigs, _ int, _ int64, _ UsageTracker, _ StreamResolver, _ kitlog.Logger) (*logproto.PushRequest, *Stats, error) { // Create a minimal valid request req := &logproto.PushRequest{ Streams: []logproto.Stream{ @@ -732,7 +732,7 @@ func TestParseRequestWithZeroMaxDecompressedSize(t *testing.T) { contentType string contentEncoding string maxRecvMsgSize int - maxDecompressedSize int + maxDecompressedSize int64 expectedError bool expectedErrorMessage string }{ diff --git a/pkg/util/http.go b/pkg/util/http.go index c6687421c6..9c36f49457 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -169,12 +169,12 @@ const ( // ParseProtoReader parses a compressed proto from an io.Reader. // Deprecated: Use ParseProtoReaderWithLimits for separate compressed/decompressed limits. func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) error { - return ParseProtoReaderWithLimits(ctx, reader, expectedSize, maxSize, maxSize, req, compression) + return ParseProtoReaderWithLimits(ctx, reader, expectedSize, maxSize, int64(maxSize), req, compression) } // ParseProtoReaderWithLimits parses a compressed proto from an io.Reader with separate size limits. // maxCompressedSize limits the compressed input size, maxDecompressedSize limits the decompressed output size. -func ParseProtoReaderWithLimits(ctx context.Context, reader io.Reader, expectedSize, maxCompressedSize, maxDecompressedSize int, req proto.Message, compression CompressionType) error { +func ParseProtoReaderWithLimits(ctx context.Context, reader io.Reader, expectedSize, maxCompressedSize int, maxDecompressedSize int64, req proto.Message, compression CompressionType) error { sp := trace.SpanFromContext(ctx) sp.AddEvent("util.ParseProtoRequest[start reading]") body, err := decompressRequest(reader, expectedSize, maxCompressedSize, maxDecompressedSize, compression, sp) @@ -199,9 +199,9 @@ func ParseProtoReaderWithLimits(ctx context.Context, reader io.Reader, expectedS return nil } -func decompressRequest(reader io.Reader, expectedSize, maxCompressedSize, maxDecompressedSize int, compression CompressionType, sp trace.Span) (body []byte, err error) { +func decompressRequest(reader io.Reader, expectedSize, maxCompressedSize int, maxDecompressedSize int64, compression CompressionType, sp trace.Span) (body []byte, err error) { defer func() { - if err != nil && maxDecompressedSize > 0 && len(body) > maxDecompressedSize { + if err != nil && maxDecompressedSize > 0 && int64(len(body)) > maxDecompressedSize { err = fmt.Errorf(messageSizeLargerErrFmt, ErrMessageDecompressedSizeTooLarge, len(body), maxDecompressedSize) } }() @@ -217,7 +217,7 @@ func decompressRequest(reader io.Reader, expectedSize, maxCompressedSize, maxDec return } -func decompressFromReader(reader io.Reader, expectedSize, maxCompressedSize, maxDecompressedSize int, compression CompressionType, sp trace.Span) ([]byte, error) { +func decompressFromReader(reader io.Reader, expectedSize, maxCompressedSize int, maxDecompressedSize int64, compression CompressionType, sp trace.Span) ([]byte, error) { var ( buf bytes.Buffer body []byte @@ -243,7 +243,7 @@ func decompressFromReader(reader io.Reader, expectedSize, maxCompressedSize, max return body, err } -func decompressFromBuffer(buffer *bytes.Buffer, maxCompressedSize, maxDecompressedSize int, compression CompressionType, sp trace.Span) ([]byte, error) { +func decompressFromBuffer(buffer *bytes.Buffer, maxCompressedSize int, maxDecompressedSize int64, compression CompressionType, sp trace.Span) ([]byte, error) { bufBytes := buffer.Bytes() // Check compressed size if len(bufBytes) > maxCompressedSize { @@ -261,7 +261,7 @@ func decompressFromBuffer(buffer *bytes.Buffer, maxCompressedSize, maxDecompress return nil, err } // Check decompressed size (only if limit is set) - if maxDecompressedSize > 0 && size > maxDecompressedSize { + if maxDecompressedSize > 0 && int64(size) > maxDecompressedSize { return nil, fmt.Errorf(messageSizeLargerErrFmt, ErrMessageDecompressedSizeTooLarge, size, maxDecompressedSize) } body, err := snappy.Decode(nil, bufBytes)