Merge branch 'main' into roidelapluie/fix_ui_updates

Signed-off-by: Julien <291750+roidelapluie@users.noreply.github.com>
pull/17713/head
Julien 2 weeks ago committed by GitHub
commit 68e2c8e3d0
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      .github/workflows/buf-lint.yml
  2. 4
      .github/workflows/buf.yml
  3. 2
      .github/workflows/check_release_notes.yml
  4. 37
      .github/workflows/ci.yml
  5. 8
      .github/workflows/codeql-analysis.yml
  6. 4
      .github/workflows/container_description.yml
  7. 4
      .github/workflows/fuzzing.yml
  8. 2
      .github/workflows/repo_sync.yml
  9. 4
      .github/workflows/scorecards.yml
  10. 2
      .github/workflows/stale.yml
  11. 2
      config/config.go
  12. 7
      docs/feature_flags.md
  13. 2
      go.mod
  14. 4
      go.sum
  15. 4
      internal/tools/go.mod
  16. 8
      internal/tools/go.sum
  17. 2
      model/histogram/float_histogram.go
  18. 2
      model/histogram/histogram.go
  19. 5
      promql/durations.go
  20. 33
      promql/durations_test.go
  21. 2
      promql/engine.go
  22. 6
      promql/parser/ast.go
  23. 39
      promql/parser/generated_parser.y
  24. 971
      promql/parser/generated_parser.y.go
  25. 6
      promql/parser/lex.go
  26. 100
      promql/parser/parse_test.go
  27. 2
      promql/parser/printer.go
  28. 15
      promql/parser/printer_test.go
  29. 4
      promql/promqltest/test.go
  30. 25
      promql/promqltest/testdata/duration_expression.test
  31. 313
      scrape/helpers_test.go
  32. 22
      scrape/manager.go
  33. 140
      scrape/manager_test.go
  34. 675
      scrape/scrape.go
  35. 2612
      scrape/scrape_test.go
  36. 2
      scrape/target.go
  37. 24
      scrape/target_test.go
  38. 6
      scripts/golangci-lint.yml
  39. 7
      tsdb/fileutil/mmap_windows.go
  40. 9
      util/httputil/compression.go
  41. 399
      util/teststorage/appender.go
  42. 131
      util/teststorage/appender_test.go
  43. 17
      web/ui/package-lock.json

@ -12,7 +12,7 @@ jobs:
name: lint
runs-on: ubuntu-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: bufbuild/buf-setup-action@a47c93e0b1648d5651a065437926377d060baa99 # v1.50.0

@ -12,7 +12,7 @@ jobs:
runs-on: ubuntu-latest
if: github.repository_owner == 'prometheus'
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: bufbuild/buf-setup-action@a47c93e0b1648d5651a065437926377d060baa99 # v1.50.0
@ -25,7 +25,7 @@ jobs:
with:
input: 'prompb'
against: 'https://github.com/prometheus/prometheus.git#branch=main,ref=HEAD~1,subdir=prompb'
- uses: bufbuild/buf-push-action@a654ff18effe4641ebea4a4ce242c49800728459 # v1.1.1
- uses: bufbuild/buf-push-action@1c45f6a21ec277ee4c1fa2772e49b9541ea17f38 # v1.1.1
with:
input: 'prompb'
buf_token: ${{ secrets.BUF_TOKEN }}

@ -20,7 +20,7 @@ jobs:
# Don't run it on dependabot PRs either as humans would take control in case a bump introduces a breaking change.
if: (github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community') && github.event.pull_request.user.login != 'dependabot[bot]'
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
- env:
PR_DESCRIPTION: ${{ github.event.pull_request.body }}
run: |

@ -16,7 +16,7 @@ jobs:
# should also be updated.
image: quay.io/prometheus/golang-builder:1.25-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -34,7 +34,7 @@ jobs:
container:
image: quay.io/prometheus/golang-builder:1.25-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -59,7 +59,7 @@ jobs:
# The go version in this image should be N-1 wrt test_go.
image: quay.io/prometheus/golang-builder:1.24-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- run: make build
@ -78,7 +78,7 @@ jobs:
image: quay.io/prometheus/golang-builder:1.25-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -97,10 +97,10 @@ jobs:
name: Go tests on Windows
runs-on: windows-latest
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
- uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: 1.25.x
- run: |
@ -116,7 +116,7 @@ jobs:
container:
image: quay.io/prometheus/golang-builder:1.25-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- run: go install ./cmd/promtool/.
@ -143,7 +143,7 @@ jobs:
matrix:
thread: [ 0, 1, 2 ]
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -170,7 +170,7 @@ jobs:
# Whenever the Go version is updated here, .promu.yml
# should also be updated.
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -208,7 +208,8 @@ jobs:
container:
image: quay.io/prometheus/golang-builder:1.25-base
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- name: Checkout repository
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -222,11 +223,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: 1.25.x
- name: Install snmp_exporter/generator dependencies
@ -236,18 +237,18 @@ jobs:
id: golangci-lint-version
run: echo "version=$(make print-golangci-lint-version)" >> $GITHUB_OUTPUT
- name: Lint
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
uses: golangci/golangci-lint-action@0a35821d5c230e903fcfe077583637dea1b27b47 # v9.0.0
with:
args: --verbose
version: ${{ steps.golangci-lint-version.outputs.version }}
- name: Lint with slicelabels
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
uses: golangci/golangci-lint-action@0a35821d5c230e903fcfe077583637dea1b27b47 # v9.0.0
with:
# goexperiment.synctest to ensure we don't miss files that depend on it.
args: --verbose --build-tags=slicelabels,goexperiment.synctest
version: ${{ steps.golangci-lint-version.outputs.version }}
- name: Lint with dedupelabels
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
uses: golangci/golangci-lint-action@0a35821d5c230e903fcfe077583637dea1b27b47 # v9.0.0
with:
args: --verbose --build-tags=dedupelabels
version: ${{ steps.golangci-lint-version.outputs.version }}
@ -266,7 +267,7 @@ jobs:
needs: [test_ui, test_go, test_go_more, test_go_oldest, test_windows, golangci, codeql, build_all]
if: github.event_name == 'push' && github.event.ref == 'refs/heads/main'
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -285,7 +286,7 @@ jobs:
||
(github.event_name == 'push' && startsWith(github.ref, 'refs/tags/v3.'))
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3
@ -302,7 +303,7 @@ jobs:
needs: [test_ui, codeql]
steps:
- name: Checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- uses: prometheus/promci@c0916f0a41f13444612a8f0f5e700ea34edd7c19 # v0.5.3

@ -24,17 +24,17 @@ jobs:
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Initialize CodeQL
uses: github/codeql-action/init@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
uses: github/codeql-action/init@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v4.31.9
with:
languages: ${{ matrix.language }}
- name: Autobuild
uses: github/codeql-action/autobuild@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
uses: github/codeql-action/autobuild@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v4.31.9
- name: Perform CodeQL Analysis
uses: github/codeql-action/analyze@0499de31b99561a6d14a36a5f662c2a54f91beee # v4.31.2
uses: github/codeql-action/analyze@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v4.31.9

@ -18,7 +18,7 @@ jobs:
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
steps:
- name: git checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Set docker hub repo name
@ -42,7 +42,7 @@ jobs:
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
steps:
- name: git checkout
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Set quay.io org name

@ -10,12 +10,12 @@ jobs:
steps:
- name: Build Fuzzers
id: build
uses: google/oss-fuzz/infra/cifuzz/actions/build_fuzzers@537c8005ba4c9de026b2fa3550663280d25d6175 # master
uses: google/oss-fuzz/infra/cifuzz/actions/build_fuzzers@4bf20ff8dfda18ad651583ebca9fb17a7ce1940a # master
with:
oss-fuzz-project-name: "prometheus"
dry-run: false
- name: Run Fuzzers
uses: google/oss-fuzz/infra/cifuzz/actions/run_fuzzers@537c8005ba4c9de026b2fa3550663280d25d6175 # master
uses: google/oss-fuzz/infra/cifuzz/actions/run_fuzzers@4bf20ff8dfda18ad651583ebca9fb17a7ce1940a # master
# Note: Regularly check for updates to the pinned commit hash at:
# https://github.com/google/oss-fuzz/tree/master/infra/cifuzz/actions/run_fuzzers
with:

@ -14,7 +14,7 @@ jobs:
container:
image: quay.io/prometheus/golang-builder
steps:
- uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
- uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- run: ./scripts/sync_repo_files.sh

@ -21,7 +21,7 @@ jobs:
steps:
- name: "Checkout code"
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # tag=v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # tag=v6.0.1
with:
persist-credentials: false
@ -45,6 +45,6 @@ jobs:
# Upload the results to GitHub's code scanning dashboard.
- name: "Upload to code-scanning"
uses: github/codeql-action/upload-sarif@0499de31b99561a6d14a36a5f662c2a54f91beee # tag=v4.31.2
uses: github/codeql-action/upload-sarif@5d4e8d1aca955e8d8589aabd499c5cae939e33c7 # v4.31.9
with:
sarif_file: results.sarif

@ -11,7 +11,7 @@ jobs:
if: github.repository_owner == 'prometheus' || github.repository_owner == 'prometheus-community' # Don't run this workflow on forks.
runs-on: ubuntu-latest
steps:
- uses: actions/stale@5f858e3efba33a5ca4407a664cc011ad407f2008 # v10.1.0
- uses: actions/stale@997185467fa4f803885201cee163a9f38240193d # v10.1.1
with:
repo-token: ${{ secrets.GITHUB_TOKEN }}
# opt out of defaults to avoid marking issues as stale and closing them

@ -1022,7 +1022,7 @@ func ToEscapingScheme(s string, v model.ValidationScheme) (model.EscapingScheme,
case model.LegacyValidation:
return model.UnderscoreEscaping, nil
case model.UnsetValidation:
return model.NoEscaping, fmt.Errorf("v is unset: %s", v)
return model.NoEscaping, fmt.Errorf("ValidationScheme is unset: %s", v)
default:
panic(fmt.Errorf("unhandled validation scheme: %s", v))
}

@ -197,7 +197,12 @@ the offset calculation.
`step()` can be used in duration expressions.
For a **range query**, it resolves to the step width of the range query.
For an **instant query**, it resolves to `0s`.
For an **instant query**, it resolves to `0s`.
`range()` can be used in duration expressions.
For a **range query**, it resolves to the full range of the query (end time - start time).
For an **instant query**, it resolves to `0s`.
This is particularly useful in combination with `@end()` to look back over the entire query range, e.g., `max_over_time(metric[range()] @ end())`.
`min(<duration>, <duration>)` and `max(<duration>, <duration>)` can be used to find the minimum or maximum of two duration expressions.

@ -39,7 +39,7 @@ require (
github.com/gophercloud/gophercloud/v2 v2.9.0
github.com/grafana/regexp v0.0.0-20250905093917-f7b3be9d1853
github.com/hashicorp/consul/api v1.32.1
github.com/hashicorp/nomad/api v0.0.0-20251216171439-1dee0671280e
github.com/hashicorp/nomad/api v0.0.0-20251222083347-1355d4cb1671
github.com/hetznercloud/hcloud-go/v2 v2.32.0
github.com/ionos-cloud/sdk-go/v6 v6.3.5
github.com/json-iterator/go v1.1.12

@ -307,8 +307,8 @@ github.com/hashicorp/mdns v1.0.4/go.mod h1:mtBihi+LeNXGtG8L9dX59gAEa12BDtBQSp4v/
github.com/hashicorp/memberlist v0.5.0/go.mod h1:yvyXLpo0QaGE59Y7hDTsTzDD25JYBZ4mHgHUZ8lrOI0=
github.com/hashicorp/memberlist v0.5.3 h1:tQ1jOCypD0WvMemw/ZhhtH+PWpzcftQvgCorLu0hndk=
github.com/hashicorp/memberlist v0.5.3/go.mod h1:h60o12SZn/ua/j0B6iKAZezA4eDaGsIuPO70eOaJ6WE=
github.com/hashicorp/nomad/api v0.0.0-20251216171439-1dee0671280e h1:wGl06iy/H90NSbWjfXWeRwk9SJOks0u4voIryeJFlSA=
github.com/hashicorp/nomad/api v0.0.0-20251216171439-1dee0671280e/go.mod h1:sldFTIgs+FsUeKU3LwVjviAIuksxD8TzDOn02MYwslE=
github.com/hashicorp/nomad/api v0.0.0-20251222083347-1355d4cb1671 h1:4NbynIRljuOUvAQNLLJA1yuWcoL5EC3Qn4c7HCngUds=
github.com/hashicorp/nomad/api v0.0.0-20251222083347-1355d4cb1671/go.mod h1:sldFTIgs+FsUeKU3LwVjviAIuksxD8TzDOn02MYwslE=
github.com/hashicorp/serf v0.10.1 h1:Z1H2J60yRKvfDYAOZLd2MU0ND4AH/WDz7xYHDWQsIPY=
github.com/hashicorp/serf v0.10.1/go.mod h1:yL2t6BqATOLGc5HF7qbFkTfXoPIY0WZdWHfEvMqbG+4=
github.com/hetznercloud/hcloud-go/v2 v2.32.0 h1:BRe+k7ESdYv3xQLBGdKUfk+XBFRJNGKzq70nJI24ciM=

@ -67,8 +67,8 @@ require (
github.com/opencontainers/image-spec v1.1.1 // indirect
github.com/petermattis/goid v0.0.0-20250904145737-900bdf8bb490 // indirect
github.com/pkg/errors v0.9.1 // indirect
github.com/quic-go/qpack v0.5.1 // indirect
github.com/quic-go/quic-go v0.56.0 // indirect
github.com/quic-go/qpack v0.6.0 // indirect
github.com/quic-go/quic-go v0.57.0 // indirect
github.com/rivo/uniseg v0.4.7 // indirect
github.com/rs/cors v1.11.1 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect

@ -154,10 +154,10 @@ github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZb
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9 h1:arwj11zP0yJIxIRiDn22E0H8PxfF7TsTrc2wIPFIsf4=
github.com/protocolbuffers/protoscope v0.0.0-20221109213918-8e7a6aafa2c9/go.mod h1:SKZx6stCn03JN3BOWTwvVIO2ajMkb/zQdTceXYhKw/4=
github.com/quic-go/qpack v0.5.1 h1:giqksBPnT/HDtZ6VhtFKgoLOWmlyo9Ei6u9PqzIMbhI=
github.com/quic-go/qpack v0.5.1/go.mod h1:+PC4XFrEskIVkcLzpEkbLqq1uCoxPhQuvK5rH1ZgaEg=
github.com/quic-go/quic-go v0.56.0 h1:q/TW+OLismmXAehgFLczhCDTYB3bFmua4D9lsNBWxvY=
github.com/quic-go/quic-go v0.56.0/go.mod h1:9gx5KsFQtw2oZ6GZTyh+7YEvOxWCL9WZAepnHxgAo6c=
github.com/quic-go/qpack v0.6.0 h1:g7W+BMYynC1LbYLSqRt8PBg5Tgwxn214ZZR34VIOjz8=
github.com/quic-go/qpack v0.6.0/go.mod h1:lUpLKChi8njB4ty2bFLX2x4gzDqXwUpaO1DP9qMDZII=
github.com/quic-go/quic-go v0.57.0 h1:AsSSrrMs4qI/hLrKlTH/TGQeTMY0ib1pAOX7vA3AdqE=
github.com/quic-go/quic-go v0.57.0/go.mod h1:ly4QBAjHA2VhdnxhojRsCUOeJwKYg+taDlos92xb1+s=
github.com/rivo/uniseg v0.4.7 h1:WUdvkW8uEhrYfLC4ZzdpI2ztxP1I582+49Oc5Mq64VQ=
github.com/rivo/uniseg v0.4.7/go.mod h1:FN3SvrM+Zdj16jyLfmOkMNblXMcoc8DfTHruCPUcx88=
github.com/rodaine/protogofakeit v0.1.1 h1:ZKouljuRM3A+TArppfBqnH8tGZHOwM/pjvtXe9DaXH8=

@ -484,7 +484,7 @@ func (h *FloatHistogram) Sub(other *FloatHistogram) (res *FloatHistogram, counte
// supposed to be used according to the schema.
func (h *FloatHistogram) Equals(h2 *FloatHistogram) bool {
if h2 == nil {
return false
return h == nil
}
if h.Schema != h2.Schema ||

@ -247,7 +247,7 @@ func (h *Histogram) CumulativeBucketIterator() BucketIterator[uint64] {
// supposed to be used according to the schema.
func (h *Histogram) Equals(h2 *Histogram) bool {
if h2 == nil {
return false
return h == nil
}
if h.Schema != h2.Schema || h.Count != h2.Count ||

@ -28,7 +28,8 @@ import (
// in OriginalOffsetExpr representing (1h / 2). This visitor evaluates
// such duration expression, setting OriginalOffset to 30m.
type durationVisitor struct {
step time.Duration
step time.Duration
queryRange time.Duration
}
// Visit finds any duration expressions in AST Nodes and modifies the Node to
@ -121,6 +122,8 @@ func (v *durationVisitor) evaluateDurationExpr(expr parser.Expr) (float64, error
switch n.Op {
case parser.STEP:
return float64(v.step.Seconds()), nil
case parser.RANGE:
return float64(v.queryRange.Seconds()), nil
case parser.MIN:
return math.Min(lhs, rhs), nil
case parser.MAX:

@ -213,6 +213,37 @@ func TestCalculateDuration(t *testing.T) {
},
expected: 3 * time.Second,
},
{
name: "range",
expr: &parser.DurationExpr{
Op: parser.RANGE,
},
expected: 5 * time.Minute,
},
{
name: "range division",
expr: &parser.DurationExpr{
LHS: &parser.DurationExpr{
Op: parser.RANGE,
},
RHS: &parser.NumberLiteral{Val: 2},
Op: parser.DIV,
},
expected: 150 * time.Second,
},
{
name: "max of step and range",
expr: &parser.DurationExpr{
LHS: &parser.DurationExpr{
Op: parser.STEP,
},
RHS: &parser.DurationExpr{
Op: parser.RANGE,
},
Op: parser.MAX,
},
expected: 5 * time.Minute,
},
{
name: "division by zero",
expr: &parser.DurationExpr{
@ -243,7 +274,7 @@ func TestCalculateDuration(t *testing.T) {
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
v := &durationVisitor{step: 1 * time.Second}
v := &durationVisitor{step: 1 * time.Second, queryRange: 5 * time.Minute}
result, err := v.calculateDuration(tt.expr, tt.allowedNegative)
if tt.errorMessage != "" {
require.Error(t, err)

@ -4057,7 +4057,7 @@ func unwrapStepInvariantExpr(e parser.Expr) parser.Expr {
func PreprocessExpr(expr parser.Expr, start, end time.Time, step time.Duration) (parser.Expr, error) {
detectHistogramStatsDecoding(expr)
if err := parser.Walk(&durationVisitor{step: step}, expr, nil); err != nil {
if err := parser.Walk(&durationVisitor{step: step, queryRange: end.Sub(start)}, expr, nil); err != nil {
return nil, err
}

@ -116,8 +116,8 @@ type DurationExpr struct {
LHS, RHS Expr // The operands on the respective sides of the operator.
Wrapped bool // Set when the duration is wrapped in parentheses.
StartPos posrange.Pos // For unary operations and step(), the start position of the operator.
EndPos posrange.Pos // For step(), the end position of the operator.
StartPos posrange.Pos // For unary operations, step(), and range(), the start position of the operator.
EndPos posrange.Pos // For step() and range(), the end position of the operator.
}
// Call represents a function call.
@ -474,7 +474,7 @@ func (e *BinaryExpr) PositionRange() posrange.PositionRange {
}
func (e *DurationExpr) PositionRange() posrange.PositionRange {
if e.Op == STEP {
if e.Op == STEP || e.Op == RANGE {
return posrange.PositionRange{
Start: e.StartPos,
End: e.EndPos,

@ -153,6 +153,7 @@ WITHOUT
START
END
STEP
RANGE
%token preprocessorEnd
// Counter reset hints.
@ -465,7 +466,7 @@ offset_expr: expr OFFSET offset_duration_expr
$$ = $1
}
| expr OFFSET error
{ yylex.(*parser).unexpected("offset", "number, duration, or step()"); $$ = $1 }
{ yylex.(*parser).unexpected("offset", "number, duration, step(), or range()"); $$ = $1 }
;
/*
@ -575,11 +576,11 @@ subquery_expr : expr LEFT_BRACKET positive_duration_expr COLON positive_durati
| expr LEFT_BRACKET positive_duration_expr COLON positive_duration_expr error
{ yylex.(*parser).unexpected("subquery selector", "\"]\""); $$ = $1 }
| expr LEFT_BRACKET positive_duration_expr COLON error
{ yylex.(*parser).unexpected("subquery selector", "number, duration, or step() or \"]\""); $$ = $1 }
{ yylex.(*parser).unexpected("subquery selector", "number, duration, step(), range(), or \"]\""); $$ = $1 }
| expr LEFT_BRACKET positive_duration_expr error
{ yylex.(*parser).unexpected("subquery or range", "\":\" or \"]\""); $$ = $1 }
| expr LEFT_BRACKET error
{ yylex.(*parser).unexpected("subquery or range selector", "number, duration, or step()"); $$ = $1 }
{ yylex.(*parser).unexpected("subquery or range selector", "number, duration, step(), or range()"); $$ = $1 }
;
/*
@ -696,7 +697,7 @@ metric : metric_identifier label_set
;
metric_identifier: AVG | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | IDENTIFIER | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | QUANTILE | STDDEV | STDVAR | SUM | TOPK | WITHOUT | START | END | LIMITK | LIMIT_RATIO | STEP | ANCHORED | SMOOTHED;
metric_identifier: AVG | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | IDENTIFIER | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | QUANTILE | STDDEV | STDVAR | SUM | TOPK | WITHOUT | START | END | LIMITK | LIMIT_RATIO | STEP | RANGE | ANCHORED | SMOOTHED;
label_set : LEFT_BRACE label_set_list RIGHT_BRACE
{ $$ = labels.New($2...) }
@ -953,7 +954,7 @@ counter_reset_hint : UNKNOWN_COUNTER_RESET | COUNTER_RESET | NOT_COUNTER_RESET |
aggregate_op : AVG | BOTTOMK | COUNT | COUNT_VALUES | GROUP | MAX | MIN | QUANTILE | STDDEV | STDVAR | SUM | TOPK | LIMITK | LIMIT_RATIO;
// Inside of grouping options label names can be recognized as keywords by the lexer. This is a list of keywords that could also be a label name.
maybe_label : AVG | BOOL | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | GROUP_LEFT | GROUP_RIGHT | IDENTIFIER | IGNORING | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | ON | QUANTILE | STDDEV | STDVAR | SUM | TOPK | START | END | ATAN2 | LIMITK | LIMIT_RATIO | STEP | ANCHORED | SMOOTHED;
maybe_label : AVG | BOOL | BOTTOMK | BY | COUNT | COUNT_VALUES | GROUP | GROUP_LEFT | GROUP_RIGHT | IDENTIFIER | IGNORING | LAND | LOR | LUNLESS | MAX | METRIC_IDENTIFIER | MIN | OFFSET | ON | QUANTILE | STDDEV | STDVAR | SUM | TOPK | START | END | ATAN2 | LIMITK | LIMIT_RATIO | STEP | RANGE | ANCHORED | SMOOTHED;
unary_op : ADD | SUB;
@ -1088,6 +1089,14 @@ offset_duration_expr : number_duration_literal
EndPos: $3.PositionRange().End,
}
}
| RANGE LEFT_PAREN RIGHT_PAREN
{
$$ = &DurationExpr{
Op: RANGE,
StartPos: $1.PositionRange().Start,
EndPos: $3.PositionRange().End,
}
}
| unary_op STEP LEFT_PAREN RIGHT_PAREN
{
$$ = &DurationExpr{
@ -1100,6 +1109,18 @@ offset_duration_expr : number_duration_literal
StartPos: $1.Pos,
}
}
| unary_op RANGE LEFT_PAREN RIGHT_PAREN
{
$$ = &DurationExpr{
Op: $1.Typ,
RHS: &DurationExpr{
Op: RANGE,
StartPos: $2.PositionRange().Start,
EndPos: $4.PositionRange().End,
},
StartPos: $1.Pos,
}
}
| min_max LEFT_PAREN duration_expr COMMA duration_expr RIGHT_PAREN
{
$$ = &DurationExpr{
@ -1234,6 +1255,14 @@ duration_expr : number_duration_literal
EndPos: $3.PositionRange().End,
}
}
| RANGE LEFT_PAREN RIGHT_PAREN
{
$$ = &DurationExpr{
Op: RANGE,
StartPos: $1.PositionRange().Start,
EndPos: $3.PositionRange().End,
}
}
| min_max LEFT_PAREN duration_expr COMMA duration_expr RIGHT_PAREN
{
$$ = &DurationExpr{

File diff suppressed because it is too large Load Diff

@ -143,6 +143,7 @@ var key = map[string]ItemType{
"start": START,
"end": END,
"step": STEP,
"range": RANGE,
}
var histogramDesc = map[string]ItemType{
@ -915,6 +916,9 @@ func (l *Lexer) scanDurationKeyword() bool {
case "step":
l.emit(STEP)
return true
case "range":
l.emit(RANGE)
return true
case "min":
l.emit(MIN)
return true
@ -1175,7 +1179,7 @@ func lexDurationExpr(l *Lexer) stateFn {
case r == ',':
l.emit(COMMA)
return lexDurationExpr
case r == 's' || r == 'S' || r == 'm' || r == 'M':
case r == 's' || r == 'S' || r == 'm' || r == 'M' || r == 'r' || r == 'R':
if l.scanDurationKeyword() {
return lexDurationExpr
}

@ -2708,7 +2708,7 @@ var testExpr = []struct {
errors: ParseErrors{
ParseErr{
PositionRange: posrange.PositionRange{Start: 4, End: 5},
Err: errors.New("unexpected \"]\" in subquery or range selector, expected number, duration, or step()"),
Err: errors.New("unexpected \"]\" in subquery or range selector, expected number, duration, step(), or range()"),
Query: `foo[]`,
},
},
@ -2741,7 +2741,7 @@ var testExpr = []struct {
errors: ParseErrors{
ParseErr{
PositionRange: posrange.PositionRange{Start: 22, End: 22},
Err: errors.New("unexpected end of input in offset, expected number, duration, or step()"),
Err: errors.New("unexpected end of input in offset, expected number, duration, step(), or range()"),
Query: `some_metric[5m] OFFSET`,
},
},
@ -4698,6 +4698,100 @@ var testExpr = []struct {
},
},
},
{
input: `foo[range()]`,
expected: &MatrixSelector{
VectorSelector: &VectorSelector{
Name: "foo",
LabelMatchers: []*labels.Matcher{
MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
},
PosRange: posrange.PositionRange{Start: 0, End: 3},
},
RangeExpr: &DurationExpr{
Op: RANGE,
StartPos: 4,
EndPos: 11,
},
EndPos: 12,
},
},
{
input: `foo[-range()]`,
expected: &MatrixSelector{
VectorSelector: &VectorSelector{
Name: "foo",
LabelMatchers: []*labels.Matcher{
MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
},
PosRange: posrange.PositionRange{Start: 0, End: 3},
},
RangeExpr: &DurationExpr{
Op: SUB,
StartPos: 4,
RHS: &DurationExpr{Op: RANGE, StartPos: 5, EndPos: 12},
},
EndPos: 13,
},
},
{
input: `foo offset range()`,
expected: &VectorSelector{
Name: "foo",
LabelMatchers: []*labels.Matcher{
MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
},
PosRange: posrange.PositionRange{Start: 0, End: 18},
OriginalOffsetExpr: &DurationExpr{
Op: RANGE,
StartPos: 11,
EndPos: 18,
},
},
},
{
input: `foo offset -range()`,
expected: &VectorSelector{
Name: "foo",
LabelMatchers: []*labels.Matcher{
MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
},
PosRange: posrange.PositionRange{Start: 0, End: 19},
OriginalOffsetExpr: &DurationExpr{
Op: SUB,
RHS: &DurationExpr{Op: RANGE, StartPos: 12, EndPos: 19},
StartPos: 11,
},
},
},
{
input: `foo[max(range(),5s)]`,
expected: &MatrixSelector{
VectorSelector: &VectorSelector{
Name: "foo",
LabelMatchers: []*labels.Matcher{
MustLabelMatcher(labels.MatchEqual, model.MetricNameLabel, "foo"),
},
PosRange: posrange.PositionRange{Start: 0, End: 3},
},
RangeExpr: &DurationExpr{
Op: MAX,
LHS: &DurationExpr{
Op: RANGE,
StartPos: 8,
EndPos: 15,
},
RHS: &NumberLiteral{
Val: 5,
Duration: true,
PosRange: posrange.PositionRange{Start: 16, End: 18},
},
StartPos: 4,
EndPos: 19,
},
EndPos: 20,
},
},
{
input: `foo[4s+4s:1s*2] offset (5s-8)`,
expected: &SubqueryExpr{
@ -4942,7 +5036,7 @@ var testExpr = []struct {
errors: ParseErrors{
ParseErr{
PositionRange: posrange.PositionRange{Start: 8, End: 9},
Err: errors.New(`unexpected "]" in subquery or range selector, expected number, duration, or step()`),
Err: errors.New(`unexpected "]" in subquery or range selector, expected number, duration, step(), or range()`),
Query: `foo[step]`,
},
},

@ -182,6 +182,8 @@ func (node *DurationExpr) writeTo(b *bytes.Buffer) {
switch {
case node.Op == STEP:
b.WriteString("step()")
case node.Op == RANGE:
b.WriteString("range()")
case node.Op == MIN:
b.WriteString("min(")
b.WriteString(node.LHS.String())

@ -266,6 +266,21 @@ func TestExprString(t *testing.T) {
{
in: "foo[200 - min(step() + 10s, -max(step() ^ 2, 3))]",
},
{
in: "foo[range()]",
},
{
in: "foo[-range()]",
},
{
in: "foo offset range()",
},
{
in: "foo offset -range()",
},
{
in: "foo[max(range(), 5s)]",
},
{
in: `predict_linear(foo[1h], 3000)`,
},

@ -1519,6 +1519,10 @@ func (t *test) runInstantQuery(iq atModifierTestCase, cmd *evalCmd, engine promq
// Check query returns same result in range mode,
// by checking against the middle step.
// Skip this check for queries containing range() since it would resolve differently.
if strings.Contains(iq.expr, "range()") {
return nil
}
q, err = engine.NewRangeQuery(t.context, t.storage, nil, iq.expr, iq.evalTime.Add(-time.Minute), iq.evalTime.Add(time.Minute), time.Minute)
if err != nil {
return fmt.Errorf("error creating range query for %q (line %d): %w", cmd.expr, cmd.line, err)

@ -225,4 +225,27 @@ eval range from 50s to 60s step 5s metric1_total offset max(3s,min(step(), 1s))+
{} 8047 8052 8057
eval range from 50s to 60s step 5s metric1_total offset -(min(step(), 2s)-5)+8000
{} 8047 8052 8057
{} 8047 8052 8057
# Test range() function - resolves to query range (end - start).
# For a range query from 50s to 60s, range() = 10s.
eval range from 50s to 60s step 10s count_over_time(metric1_total[range()])
{} 10 10
eval range from 50s to 60s step 5s count_over_time(metric1_total[range()])
{} 10 10 10
eval range from 50s to 60s step 5s metric1_total offset range()
metric1_total{} 40 45 50
eval range from 50s to 60s step 5s metric1_total offset min(range(), 8s)
metric1_total{} 42 47 52
clear
load 1s
metric1_total 0+1x100
# For an instant query (start == end), range() = 0s, offset 0s.
eval instant at 50s metric1_total offset range()
metric1_total{} 50

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -17,240 +17,127 @@ import (
"bytes"
"context"
"encoding/binary"
"fmt"
"math"
"strings"
"sync"
"net/http"
"testing"
"time"
"github.com/gogo/protobuf/proto"
dto "github.com/prometheus/client_model/go"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/config"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/pool"
"github.com/prometheus/prometheus/util/teststorage"
)
type nopAppendable struct{}
// For readability.
type sample = teststorage.Sample
func (nopAppendable) Appender(context.Context) storage.Appender {
return nopAppender{}
}
type nopAppender struct{}
func (nopAppender) SetOptions(*storage.AppendOptions) {}
func (nopAppender) Append(storage.SeriesRef, labels.Labels, int64, float64) (storage.SeriesRef, error) {
return 1, nil
}
func (nopAppender) AppendExemplar(storage.SeriesRef, labels.Labels, exemplar.Exemplar) (storage.SeriesRef, error) {
return 2, nil
}
func (nopAppender) AppendHistogram(storage.SeriesRef, labels.Labels, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 3, nil
}
func (nopAppender) AppendHistogramSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64, *histogram.Histogram, *histogram.FloatHistogram) (storage.SeriesRef, error) {
return 0, nil
}
func (nopAppender) UpdateMetadata(storage.SeriesRef, labels.Labels, metadata.Metadata) (storage.SeriesRef, error) {
return 4, nil
}
func (nopAppender) AppendSTZeroSample(storage.SeriesRef, labels.Labels, int64, int64) (storage.SeriesRef, error) {
return 5, nil
}
func (nopAppender) Commit() error { return nil }
func (nopAppender) Rollback() error { return nil }
type floatSample struct {
metric labels.Labels
t int64
f float64
}
func equalFloatSamples(a, b floatSample) bool {
// Compare Float64bits so NaN values which are exactly the same will compare equal.
return labels.Equal(a.metric, b.metric) && a.t == b.t && math.Float64bits(a.f) == math.Float64bits(b.f)
}
type histogramSample struct {
metric labels.Labels
t int64
h *histogram.Histogram
fh *histogram.FloatHistogram
}
type metadataEntry struct {
m metadata.Metadata
metric labels.Labels
}
func metadataEntryEqual(a, b metadataEntry) bool {
if !labels.Equal(a.metric, b.metric) {
return false
}
if a.m.Type != b.m.Type {
return false
}
if a.m.Unit != b.m.Unit {
return false
}
if a.m.Help != b.m.Help {
return false
}
return true
}
type collectResultAppendable struct {
*collectResultAppender
}
func (a *collectResultAppendable) Appender(context.Context) storage.Appender {
return a
}
// collectResultAppender records all samples that were added through the appender.
// It can be used as its zero value or be backed by another appender it writes samples through.
type collectResultAppender struct {
mtx sync.Mutex
next storage.Appender
resultFloats []floatSample
pendingFloats []floatSample
rolledbackFloats []floatSample
resultHistograms []histogramSample
pendingHistograms []histogramSample
rolledbackHistograms []histogramSample
resultExemplars []exemplar.Exemplar
pendingExemplars []exemplar.Exemplar
resultMetadata []metadataEntry
pendingMetadata []metadataEntry
}
func (*collectResultAppender) SetOptions(*storage.AppendOptions) {}
func (a *collectResultAppender) Append(ref storage.SeriesRef, lset labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingFloats = append(a.pendingFloats, floatSample{
metric: lset,
t: t,
f: v,
})
if a.next == nil {
if ref == 0 {
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
ref = storage.SeriesRef(lset.Hash())
}
return ref, nil
}
ref, err := a.next.Append(ref, lset, t, v)
if err != nil {
return 0, err
func withCtx(ctx context.Context) func(sl *scrapeLoop) {
return func(sl *scrapeLoop) {
sl.ctx = ctx
}
return ref, nil
}
func (a *collectResultAppender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingExemplars = append(a.pendingExemplars, e)
if a.next == nil {
return 0, nil
}
return a.next.AppendExemplar(ref, l, e)
}
func (a *collectResultAppender) AppendHistogram(ref storage.SeriesRef, l labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingHistograms = append(a.pendingHistograms, histogramSample{h: h, fh: fh, t: t, metric: l})
if a.next == nil {
return 0, nil
}
return a.next.AppendHistogram(ref, l, t, h, fh)
}
func (a *collectResultAppender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
func withAppendable(appendable storage.Appendable) func(sl *scrapeLoop) {
return func(sl *scrapeLoop) {
sl.appendable = appendable
}
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{})
}
func (a *collectResultAppender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingMetadata = append(a.pendingMetadata, metadataEntry{metric: l, m: m})
if a.next == nil {
if ref == 0 {
ref = storage.SeriesRef(l.Hash())
}
return ref, nil
}
return a.next.UpdateMetadata(ref, l, m)
}
func (a *collectResultAppender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
return a.Append(ref, l, st, 0.0)
}
func (a *collectResultAppender) Commit() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.resultFloats = append(a.resultFloats, a.pendingFloats...)
a.resultExemplars = append(a.resultExemplars, a.pendingExemplars...)
a.resultHistograms = append(a.resultHistograms, a.pendingHistograms...)
a.resultMetadata = append(a.resultMetadata, a.pendingMetadata...)
a.pendingFloats = nil
a.pendingExemplars = nil
a.pendingHistograms = nil
a.pendingMetadata = nil
if a.next == nil {
return nil
}
return a.next.Commit()
}
func (a *collectResultAppender) Rollback() error {
a.mtx.Lock()
defer a.mtx.Unlock()
a.rolledbackFloats = a.pendingFloats
a.rolledbackHistograms = a.pendingHistograms
a.pendingFloats = nil
a.pendingHistograms = nil
if a.next == nil {
return nil
// newTestScrapeLoop is the initial scrape loop for all tests.
// It returns scrapeLoop and mock scraper you can customize.
//
// It's recommended to use withXYZ functions for simple option customizations, e.g:
//
// appTest := teststorage.NewAppendable()
// sl, _ := newTestScrapeLoop(t, withAppendable(appTest))
//
// However, when changing more than one scrapeLoop options it's more readable to have one explicit opt function:
//
// ctx, cancel := context.WithCancel(t.Context())
// appTest := teststorage.NewAppendable()
// sl, scraper := newTestScrapeLoop(t, func(sl *scrapeLoop) {
// sl.ctx = ctx
// sl.appendable = appTest
// // Since we're writing samples directly below we need to provide a protocol fallback.
// sl.fallbackScrapeProtocol = "text/plain"
// })
//
// NOTE: Try to NOT add more parameter to this function. Try to NOT add more
// newTestScrapeLoop-like constructors. It should be flexible enough with scrapeLoop
// used for initial options.
func newTestScrapeLoop(t testing.TB, opts ...func(sl *scrapeLoop)) (_ *scrapeLoop, scraper *testScraper) {
metrics := newTestScrapeMetrics(t)
sl := &scrapeLoop{
stopped: make(chan struct{}),
l: promslog.NewNopLogger(),
cache: newScrapeCache(metrics),
interval: 10 * time.Millisecond,
timeout: 1 * time.Hour,
sampleMutator: nopMutator,
reportSampleMutator: nopMutator,
appendable: teststorage.NewAppendable(),
buffers: pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) }),
metrics: metrics,
maxSchema: histogram.ExponentialSchemaMax,
honorTimestamps: true,
enableCompression: true,
validationScheme: model.UTF8Validation,
symbolTable: labels.NewSymbolTable(),
appendMetadataToWAL: true, // Tests assumes it's enabled, unless explicitly turned off.
}
return a.next.Rollback()
}
func (a *collectResultAppender) String() string {
var sb strings.Builder
for _, s := range a.resultFloats {
sb.WriteString(fmt.Sprintf("committed: %s %f %d\n", s.metric, s.f, s.t))
for _, o := range opts {
o(sl)
}
for _, s := range a.pendingFloats {
sb.WriteString(fmt.Sprintf("pending: %s %f %d\n", s.metric, s.f, s.t))
// Validate user opts for convenience.
require.Nil(t, sl.parentCtx, "newTestScrapeLoop does not support injecting non-nil parent context")
require.Nil(t, sl.appenderCtx, "newTestScrapeLoop does not support injecting non-nil appender context")
require.Nil(t, sl.cancel, "newTestScrapeLoop does not support injecting custom cancel function")
require.Nil(t, sl.scraper, "newTestScrapeLoop does not support injecting scraper, it's mocked, use the returned scraper")
rootCtx := t.Context()
// Use sl.ctx for context injection.
// True contexts (sl.appenderCtx, sl.parentCtx, sl.ctx) are populated from it
if sl.ctx != nil {
rootCtx = sl.ctx
}
for _, s := range a.rolledbackFloats {
sb.WriteString(fmt.Sprintf("rolledback: %s %f %d\n", s.metric, s.f, s.t))
ctx, cancel := context.WithCancel(rootCtx)
sl.ctx = ctx
sl.cancel = cancel
sl.appenderCtx = rootCtx
sl.parentCtx = rootCtx
scraper = &testScraper{}
sl.scraper = scraper
return sl, scraper
}
func newTestScrapePool(t *testing.T, injectNewLoop func(options scrapeLoopOptions) loop) *scrapePool {
return &scrapePool{
ctx: t.Context(),
cancel: func() {},
logger: promslog.NewNopLogger(),
config: &config.ScrapeConfig{},
options: &Options{},
client: http.DefaultClient,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
injectTestNewLoop: injectNewLoop,
appendable: teststorage.NewAppendable(),
symbolTable: labels.NewSymbolTable(),
metrics: newTestScrapeMetrics(t),
}
return sb.String()
}
// protoMarshalDelimited marshals a MetricFamily into a delimited

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -39,8 +39,8 @@ import (
"github.com/prometheus/prometheus/util/pool"
)
// NewManager is the Manager constructor.
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), app storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
// NewManager is the Manager constructor using Appendable.
func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(string) (*logging.JSONFileLogger, error), appendable storage.Appendable, registerer prometheus.Registerer) (*Manager, error) {
if o == nil {
o = &Options{}
}
@ -54,7 +54,7 @@ func NewManager(o *Options, logger *slog.Logger, newScrapeFailureLogger func(str
}
m := &Manager{
append: app,
appendable: appendable,
opts: o,
logger: logger,
newScrapeFailureLogger: newScrapeFailureLogger,
@ -87,15 +87,15 @@ type Options struct {
// Option to enable appending of scraped Metadata to the TSDB/other appenders. Individual appenders
// can decide what to do with metadata, but for practical purposes this flag exists so that metadata
// can be written to the WAL and thus read for remote write.
// TODO: implement some form of metadata storage
AppendMetadata bool
// Option to increase the interval used by scrape manager to throttle target groups updates.
DiscoveryReloadInterval model.Duration
// Option to enable the ingestion of the created timestamp as a synthetic zero sample.
// See: https://github.com/prometheus/proposals/blob/main/proposals/2023-06-13_created-timestamp.md
EnableStartTimestampZeroIngestion bool
// EnableTypeAndUnitLabels
// EnableTypeAndUnitLabels represents type-and-unit-labels feature flag.
EnableTypeAndUnitLabels bool
// Optional HTTP client options to use when scraping.
@ -111,9 +111,11 @@ type Options struct {
// Manager maintains a set of scrape pools and manages start/stop cycles
// when receiving new target groups from the discovery manager.
type Manager struct {
opts *Options
logger *slog.Logger
append storage.Appendable
opts *Options
logger *slog.Logger
appendable storage.Appendable
graceShut chan struct{}
offsetSeed uint64 // Global offsetSeed seed is used to spread scrape workload across HA setup.
@ -194,7 +196,7 @@ func (m *Manager) reload() {
continue
}
m.metrics.targetScrapePools.Inc()
sp, err := newScrapePool(scrapeConfig, m.append, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
sp, err := newScrapePool(scrapeConfig, m.appendable, m.offsetSeed, m.logger.With("scrape_pool", setName), m.buffers, m.opts, m.metrics)
if err != nil {
m.metrics.targetScrapePoolsFailed.Inc()
m.logger.Error("error creating new scrape pool", "err", err, "scrape_pool", setName)

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -51,6 +51,7 @@ import (
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/tsdb/tsdbutil"
"github.com/prometheus/prometheus/util/runutil"
"github.com/prometheus/prometheus/util/teststorage"
"github.com/prometheus/prometheus/util/testutil"
)
@ -527,21 +528,12 @@ scrape_configs:
ch <- struct{}{}
return noopLoop()
}
sp := &scrapePool{
appendable: &nopAppendable{},
activeTargets: map[uint64]*Target{
1: {},
},
loops: map[uint64]loop{
1: noopLoop(),
},
newLoop: newLoop,
logger: nil,
config: cfg1.ScrapeConfigs[0],
client: http.DefaultClient,
metrics: scrapeManager.metrics,
symbolTable: labels.NewSymbolTable(),
}
sp := newTestScrapePool(t, newLoop)
sp.activeTargets[1] = &Target{}
sp.loops[1] = noopLoop()
sp.config = cfg1.ScrapeConfigs[0]
sp.metrics = scrapeManager.metrics
scrapeManager.scrapePools = map[string]*scrapePool{
"job1": sp,
}
@ -691,18 +683,11 @@ scrape_configs:
for _, sc := range cfg.ScrapeConfigs {
_, cancel := context.WithCancel(context.Background())
defer cancel()
sp := &scrapePool{
appendable: &nopAppendable{},
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{
1: noopLoop(),
},
newLoop: newLoop,
logger: nil,
config: sc,
client: http.DefaultClient,
cancel: cancel,
}
sp := newTestScrapePool(t, newLoop)
sp.loops[1] = noopLoop()
sp.config = cfg1.ScrapeConfigs[0]
sp.metrics = scrapeManager.metrics
for _, c := range sc.ServiceDiscoveryConfigs {
staticConfig := c.(discovery.StaticConfig)
for _, group := range staticConfig {
@ -764,7 +749,7 @@ func TestManagerSTZeroIngestion(t *testing.T) {
for _, testWithST := range []bool{false, true} {
t.Run(fmt.Sprintf("withST=%v", testWithST), func(t *testing.T) {
for _, testSTZeroIngest := range []bool{false, true} {
t.Run(fmt.Sprintf("ctZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
t.Run(fmt.Sprintf("stZeroIngest=%v", testSTZeroIngest), func(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
@ -777,11 +762,11 @@ func TestManagerSTZeroIngestion(t *testing.T) {
// TODO(bwplotka): Add more types than just counter?
encoded := prepareTestEncodedCounter(t, testFormat, expectedMetricName, expectedSampleValue, sampleTs, stTs)
app := &collectResultAppender{}
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: testSTZeroIngest,
skipOffsetting: true,
}, &collectResultAppendable{app})
}, app)
defer scrapeManager.Stop()
server := setupTestServer(t, config.ScrapeProtocolsHeaders[testFormat], encoded)
@ -806,11 +791,8 @@ scrape_configs:
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant samples.
if len(app.resultFloats) > 0 {
if len(app.ResultSamples()) > 0 {
return nil
}
return errors.New("expected some float samples, got none")
@ -818,32 +800,32 @@ scrape_configs:
// Verify results.
// Verify what we got vs expectations around ST injection.
samples := findSamplesForMetric(app.resultFloats, expectedMetricName)
got := findSamplesForMetric(app.ResultSamples(), expectedMetricName)
if testWithST && testSTZeroIngest {
require.Len(t, samples, 2)
require.Equal(t, 0.0, samples[0].f)
require.Equal(t, timestamp.FromTime(stTs), samples[0].t)
require.Equal(t, expectedSampleValue, samples[1].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[1].t)
require.Len(t, got, 2)
require.Equal(t, 0.0, got[0].V)
require.Equal(t, timestamp.FromTime(stTs), got[0].T)
require.Equal(t, expectedSampleValue, got[1].V)
require.Equal(t, timestamp.FromTime(sampleTs), got[1].T)
} else {
require.Len(t, samples, 1)
require.Equal(t, expectedSampleValue, samples[0].f)
require.Equal(t, timestamp.FromTime(sampleTs), samples[0].t)
require.Len(t, got, 1)
require.Equal(t, expectedSampleValue, got[0].V)
require.Equal(t, timestamp.FromTime(sampleTs), got[0].T)
}
// Verify what we got vs expectations around additional _created series for OM text.
// enableSTZeroInjection also kills that _created line.
createdSeriesSamples := findSamplesForMetric(app.resultFloats, expectedCreatedMetricName)
gotSTSeries := findSamplesForMetric(app.ResultSamples(), expectedCreatedMetricName)
if testFormat == config.OpenMetricsText1_0_0 && testWithST && !testSTZeroIngest {
// For OM Text, when counter has ST, and feature flag disabled we should see _created lines.
require.Len(t, createdSeriesSamples, 1)
require.Len(t, gotSTSeries, 1)
// Conversion taken from common/expfmt.writeOpenMetricsFloat.
// We don't check the st timestamp as explicit ts was not implemented in expfmt.Encoder,
// but exists in OM https://github.com/prometheus/OpenMetrics/blob/v1.0.0/specification/OpenMetrics.md#:~:text=An%20example%20with%20a%20Metric%20with%20no%20labels%2C%20and%20a%20MetricPoint%20with%20a%20timestamp%20and%20a%20created
// We can implement this, but we want to potentially get rid of OM 1.0 ST lines
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, createdSeriesSamples[0].f)
require.Equal(t, float64(timestamppb.New(stTs).AsTime().UnixNano())/1e9, gotSTSeries[0].V)
} else {
require.Empty(t, createdSeriesSamples)
require.Empty(t, gotSTSeries)
}
})
}
@ -885,9 +867,9 @@ func prepareTestEncodedCounter(t *testing.T, format config.ScrapeProtocol, mName
}
}
func findSamplesForMetric(floats []floatSample, metricName string) (ret []floatSample) {
func findSamplesForMetric(floats []sample, metricName string) (ret []sample) {
for _, f := range floats {
if f.metric.Get(model.MetricNameLabel) == metricName {
if f.L.Get(model.MetricNameLabel) == metricName {
ret = append(ret, f)
}
}
@ -964,11 +946,11 @@ func TestManagerSTZeroIngestionHistogram(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
app := &collectResultAppender{}
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: tc.enableSTZeroIngestion,
skipOffsetting: true,
}, &collectResultAppendable{app})
}, app)
defer scrapeManager.Stop()
once := sync.Once{}
@ -1012,43 +994,33 @@ scrape_configs:
`, serverURL.Host)
applyConfig(t, testConfig, scrapeManager, discoveryManager)
var got []histogramSample
// Wait for one scrape.
ctx, cancel = context.WithTimeout(ctx, 1*time.Minute)
defer cancel()
require.NoError(t, runutil.Retry(100*time.Millisecond, ctx.Done(), func() error {
app.mtx.Lock()
defer app.mtx.Unlock()
// Check if scrape happened and grab the relevant histograms, they have to be there - or it's a bug
// and it's not worth waiting.
for _, h := range app.resultHistograms {
if h.metric.Get(model.MetricNameLabel) == mName {
got = append(got, h)
}
}
if len(app.resultHistograms) > 0 {
if len(app.ResultSamples()) > 0 {
return nil
}
return errors.New("expected some histogram samples, got none")
}), "after 1 minute")
got := findSamplesForMetric(app.ResultSamples(), mName)
// Check for zero samples, assuming we only injected always one histogram sample.
// Did it contain ST to inject? If yes, was ST zero enabled?
if tc.inputHistSample.CreatedTimestamp.IsValid() && tc.enableSTZeroIngestion {
require.Len(t, got, 2)
// Zero sample.
require.Equal(t, histogram.Histogram{}, *got[0].h)
require.Equal(t, histogram.Histogram{}, *got[0].H)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].h.Sum)
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[1].H.Sum)
return
}
// Expect only one, valid sample.
require.Len(t, got, 1)
// Quick soft check to make sure it's the same sample or at least not zero.
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].h.Sum)
require.Equal(t, tc.inputHistSample.GetSampleSum(), got[0].H.Sum)
})
}
}
@ -1083,11 +1055,11 @@ func TestNHCBAndSTZeroIngestion(t *testing.T) {
ctx := t.Context()
app := &collectResultAppender{}
app := teststorage.NewAppendable()
discoveryManager, scrapeManager := runManagers(t, ctx, &Options{
EnableStartTimestampZeroIngestion: true,
skipOffsetting: true,
}, &collectResultAppendable{app})
}, app)
defer scrapeManager.Stop()
once := sync.Once{}
@ -1146,33 +1118,19 @@ scrape_configs:
return exists
}, 5*time.Second, 100*time.Millisecond, "scrape pool should be created for job 'test'")
// Helper function to get matching histograms to avoid race conditions.
getMatchingHistograms := func() []histogramSample {
app.mtx.Lock()
defer app.mtx.Unlock()
var got []histogramSample
for _, h := range app.resultHistograms {
if h.metric.Get(model.MetricNameLabel) == mName {
got = append(got, h)
}
}
return got
}
require.Eventually(t, func() bool {
return len(getMatchingHistograms()) > 0
return len(app.ResultSamples()) > 0
}, 1*time.Minute, 100*time.Millisecond, "expected histogram samples, got none")
// Verify that samples were ingested (proving both features work together).
got := getMatchingHistograms()
got := findSamplesForMetric(app.ResultSamples(), mName)
// With ST zero ingestion enabled and a created timestamp present, we expect 2 samples:
// one zero sample and one actual sample.
require.Len(t, got, 2, "expected 2 histogram samples (zero sample + actual sample)")
require.Equal(t, histogram.Histogram{}, *got[0].h, "first sample should be zero sample")
require.InDelta(t, expectedHistogramSum, got[1].h.Sum, 1e-9, "second sample should retain the expected sum")
require.Len(t, app.resultExemplars, 2, "expected 2 exemplars from histogram buckets")
require.Equal(t, histogram.Histogram{}, *got[0].H, "first sample should be zero sample")
require.InDelta(t, expectedHistogramSum, got[1].H.Sum, 1e-9, "second sample should retain the expected sum")
require.Len(t, got[1].ES, 2, "expected 2 exemplars on second histogram")
}
func applyConfig(
@ -1203,7 +1161,7 @@ func runManagers(t *testing.T, ctx context.Context, opts *Options, app storage.A
}
opts.DiscoveryReloadInterval = model.Duration(100 * time.Millisecond)
if app == nil {
app = nopAppendable{}
app = teststorage.NewAppendable()
}
reg := prometheus.NewRegistry()
@ -1601,7 +1559,7 @@ scrape_configs:
cfg := loadConfiguration(t, cfgText)
m, err := NewManager(&Options{}, nil, nil, &nopAppendable{}, prometheus.NewRegistry())
m, err := NewManager(&Options{}, nil, nil, teststorage.NewAppendable(), prometheus.NewRegistry())
require.NoError(t, err)
defer m.Stop()
require.NoError(t, m.ApplyConfig(cfg))

@ -1,4 +1,4 @@
// Copyright 2016 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -59,6 +59,8 @@ import (
"github.com/prometheus/prometheus/util/pool"
)
var aOptionRejectEarlyOOO = storage.AppendOptions{DiscardOutOfOrder: true}
// ScrapeTimestampTolerance is the tolerance for scrape appends timestamps
// alignment, to enable better compression at the TSDB level.
// See https://github.com/prometheus/prometheus/issues/7846
@ -67,7 +69,7 @@ var ScrapeTimestampTolerance = 2 * time.Millisecond
// AlignScrapeTimestamps enables the tolerance for scrape appends timestamps described above.
var AlignScrapeTimestamps = true
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", labels.MetricName)
var errNameLabelMandatory = fmt.Errorf("missing metric name (%s label)", model.MetricNameLabel)
var _ FailureLogger = (*logging.JSONFileLogger)(nil)
@ -82,8 +84,9 @@ type FailureLogger interface {
type scrapePool struct {
appendable storage.Appendable
logger *slog.Logger
ctx context.Context
cancel context.CancelFunc
httpOpts []config_util.HTTPClientOption
options *Options
// mtx must not be taken after targetMtx.
mtx sync.Mutex
@ -102,16 +105,15 @@ type scrapePool struct {
droppedTargets []*Target // Subject to KeepDroppedTargets limit.
droppedTargetsCount int // Count of all dropped targets.
// Constructor for new scrape loops. This is settable for testing convenience.
newLoop func(scrapeLoopOptions) loop
// newLoop injection for testing purposes.
injectTestNewLoop func(scrapeLoopOptions) loop
metrics *scrapeMetrics
metrics *scrapeMetrics
buffers *pool.Pool
offsetSeed uint64
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
validationScheme model.ValidationScheme
escapingScheme model.EscapingScheme
}
type labelLimits struct {
@ -120,118 +122,80 @@ type labelLimits struct {
labelValueLengthLimit int
}
type scrapeLoopOptions struct {
target *Target
scraper scraper
sampleLimit int
bucketLimit int
maxSchema int32
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
trackTimestampsStaleness bool
interval time.Duration
timeout time.Duration
scrapeNativeHist bool
alwaysScrapeClassicHist bool
convertClassicHistToNHCB bool
fallbackScrapeProtocol string
mrc []*relabel.Config
cache *scrapeCache
enableCompression bool
}
const maxAheadTime = 10 * time.Minute
// returning an empty label set is interpreted as "drop".
type labelsMutator func(labels.Labels) labels.Labels
func newScrapePool(cfg *config.ScrapeConfig, app storage.Appendable, offsetSeed uint64, logger *slog.Logger, buffers *pool.Pool, options *Options, metrics *scrapeMetrics) (*scrapePool, error) {
// scrapeLoopAppendAdapter allows support for multiple storage.Appender versions.
type scrapeLoopAppendAdapter interface {
Commit() error
Rollback() error
addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) error
append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error)
}
func newScrapePool(
cfg *config.ScrapeConfig,
appendable storage.Appendable,
offsetSeed uint64,
logger *slog.Logger,
buffers *pool.Pool,
options *Options,
metrics *scrapeMetrics,
) (*scrapePool, error) {
if logger == nil {
logger = promslog.NewNopLogger()
}
if buffers == nil {
buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
}
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, options.HTTPClientOptions...)
if err != nil {
return nil, err
}
// Validate scheme so we don't need to do it later.
// We also do it on scrapePool.reload(...)
// TODO(bwplotka): Can we move it to scrape config validation?
if err := namevalidationutil.CheckNameValidationScheme(cfg.MetricNameValidationScheme); err != nil {
return nil, errors.New("newScrapePool: MetricNameValidationScheme must be set in scrape configuration")
}
var escapingScheme model.EscapingScheme
escapingScheme, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme)
if err != nil {
if _, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme); err != nil {
return nil, fmt.Errorf("invalid metric name escaping scheme, %w", err)
}
symbols := labels.NewSymbolTable()
ctx, cancel := context.WithCancel(context.Background())
sp := &scrapePool{
appendable: appendable,
logger: logger,
ctx: ctx,
cancel: cancel,
appendable: app,
options: options,
config: cfg,
client: client,
activeTargets: map[uint64]*Target{},
loops: map[uint64]loop{},
symbolTable: labels.NewSymbolTable(),
symbolTable: symbols,
lastSymbolTableCheck: time.Now(),
logger: logger,
activeTargets: map[uint64]*Target{},
metrics: metrics,
httpOpts: options.HTTPClientOptions,
validationScheme: cfg.MetricNameValidationScheme,
escapingScheme: escapingScheme,
}
sp.newLoop = func(opts scrapeLoopOptions) loop {
// Update the targets retrieval function for metadata to a new scrape cache.
cache := opts.cache
if cache == nil {
cache = newScrapeCache(metrics)
}
opts.target.SetMetadataStore(cache)
return newScrapeLoop(
ctx,
opts.scraper,
logger.With("target", opts.target),
buffers,
func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, opts.target, opts.honorLabels, opts.mrc)
},
func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
func(ctx context.Context) storage.Appender { return app.Appender(ctx) },
cache,
sp.symbolTable,
offsetSeed,
opts.honorTimestamps,
opts.trackTimestampsStaleness,
opts.enableCompression,
opts.sampleLimit,
opts.bucketLimit,
opts.maxSchema,
opts.labelLimits,
opts.interval,
opts.timeout,
opts.alwaysScrapeClassicHist,
opts.convertClassicHistToNHCB,
cfg.ScrapeNativeHistogramsEnabled(),
options.EnableStartTimestampZeroIngestion,
options.EnableTypeAndUnitLabels,
options.ExtraMetrics,
options.AppendMetadata,
opts.target,
options.PassMetadataInContext,
metrics,
options.skipOffsetting,
sp.validationScheme,
sp.escapingScheme,
opts.fallbackScrapeProtocol,
)
buffers: buffers,
offsetSeed: offsetSeed,
}
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
return sp, nil
}
func (sp *scrapePool) newLoop(opts scrapeLoopOptions) loop {
if sp.injectTestNewLoop != nil {
return sp.injectTestNewLoop(opts)
}
return newScrapeLoop(opts)
}
func (sp *scrapePool) ActiveTargets() []*Target {
sp.targetMtx.Lock()
defer sp.targetMtx.Unlock()
@ -323,7 +287,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.metrics.targetScrapePoolReloads.Inc()
start := time.Now()
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, sp.httpOpts...)
client, err := newScrapeClient(cfg.HTTPClientConfig, cfg.JobName, sp.options.HTTPClientOptions...)
if err != nil {
sp.metrics.targetScrapePoolReloadsFailed.Inc()
return err
@ -333,17 +297,14 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
sp.config = cfg
oldClient := sp.client
sp.client = client
// Validate scheme so we don't need to do it later.
if err := namevalidationutil.CheckNameValidationScheme(cfg.MetricNameValidationScheme); err != nil {
return errors.New("scrapePool.reload: MetricNameValidationScheme must be set in scrape configuration")
}
sp.validationScheme = cfg.MetricNameValidationScheme
var escapingScheme model.EscapingScheme
escapingScheme, err = model.ToEscapingScheme(cfg.MetricNameEscapingScheme)
if err != nil {
return fmt.Errorf("invalid metric name escaping scheme, %w", err)
if _, err = config.ToEscapingScheme(cfg.MetricNameEscapingScheme, cfg.MetricNameValidationScheme); err != nil {
return fmt.Errorf("scrapePool.reload: invalid metric name escaping scheme, %w", err)
}
sp.escapingScheme = escapingScheme
sp.metrics.targetScrapePoolTargetLimit.WithLabelValues(sp.config.JobName).Set(float64(sp.config.TargetLimit))
sp.restartLoops(reuseCache)
@ -355,30 +316,7 @@ func (sp *scrapePool) reload(cfg *config.ScrapeConfig) error {
}
func (sp *scrapePool) restartLoops(reuseCache bool) {
var (
wg sync.WaitGroup
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
enableCompression = sp.config.EnableCompression
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
mrc = sp.config.MetricRelabelConfigs
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
scrapeNativeHist = sp.config.ScrapeNativeHistogramsEnabled()
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistogramsEnabled()
convertClassicHistToNHCB = sp.config.ConvertClassicHistogramsToNHCBEnabled()
)
var wg sync.WaitGroup
sp.targetMtx.Lock()
forcedErr := sp.refreshTargetLimitErr()
@ -392,38 +330,27 @@ func (sp *scrapePool) restartLoops(reuseCache bool) {
}
t := sp.activeTargets[fp]
targetInterval, targetTimeout, err := t.intervalAndTimeout(interval, timeout)
var (
s = &targetScraper{
targetInterval, targetTimeout, err := t.intervalAndTimeout(
time.Duration(sp.config.ScrapeInterval),
time.Duration(sp.config.ScrapeTimeout),
)
escapingScheme, _ := config.ToEscapingScheme(sp.config.MetricNameEscapingScheme, sp.config.MetricNameValidationScheme)
newLoop := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: &targetScraper{
Target: t,
client: sp.client,
timeout: targetTimeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, sp.escapingScheme),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
bodySizeLimit: int64(sp.config.BodySizeLimit),
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, escapingScheme),
acceptEncodingHeader: acceptEncodingHeader(sp.config.EnableCompression),
metrics: sp.metrics,
}
newLoop = sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
maxSchema: maxSchema,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
enableCompression: enableCompression,
trackTimestampsStaleness: trackTimestampsStaleness,
mrc: mrc,
cache: cache,
interval: targetInterval,
timeout: targetTimeout,
fallbackScrapeProtocol: fallbackScrapeProtocol,
scrapeNativeHist: scrapeNativeHist,
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
convertClassicHistToNHCB: convertClassicHistToNHCB,
})
)
},
cache: cache,
interval: targetInterval,
timeout: targetTimeout,
sp: sp,
})
if err != nil {
newLoop.setForcedError(err)
}
@ -516,31 +443,10 @@ func (sp *scrapePool) Sync(tgs []*targetgroup.Group) {
// scrape loops for new targets, and stops scrape loops for disappeared targets.
// It returns after all stopped scrape loops terminated.
func (sp *scrapePool) sync(targets []*Target) {
var (
uniqueLoops = make(map[uint64]loop)
interval = time.Duration(sp.config.ScrapeInterval)
timeout = time.Duration(sp.config.ScrapeTimeout)
bodySizeLimit = int64(sp.config.BodySizeLimit)
sampleLimit = int(sp.config.SampleLimit)
bucketLimit = int(sp.config.NativeHistogramBucketLimit)
maxSchema = pickSchema(sp.config.NativeHistogramMinBucketFactor)
labelLimits = &labelLimits{
labelLimit: int(sp.config.LabelLimit),
labelNameLengthLimit: int(sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(sp.config.LabelValueLengthLimit),
}
honorLabels = sp.config.HonorLabels
honorTimestamps = sp.config.HonorTimestamps
enableCompression = sp.config.EnableCompression
trackTimestampsStaleness = sp.config.TrackTimestampsStaleness
mrc = sp.config.MetricRelabelConfigs
fallbackScrapeProtocol = sp.config.ScrapeFallbackProtocol.HeaderMediaType()
scrapeNativeHist = sp.config.ScrapeNativeHistogramsEnabled()
alwaysScrapeClassicHist = sp.config.AlwaysScrapeClassicHistogramsEnabled()
convertClassicHistToNHCB = sp.config.ConvertClassicHistogramsToNHCBEnabled()
)
uniqueLoops := make(map[uint64]loop)
sp.targetMtx.Lock()
escapingScheme, _ := config.ToEscapingScheme(sp.config.MetricNameEscapingScheme, sp.config.MetricNameValidationScheme)
for _, t := range targets {
hash := t.hash()
@ -549,34 +455,25 @@ func (sp *scrapePool) sync(targets []*Target) {
// so whether changed via relabeling or not, they'll exist and hold the correct values
// for every target.
var err error
interval, timeout, err = t.intervalAndTimeout(interval, timeout)
s := &targetScraper{
Target: t,
client: sp.client,
timeout: timeout,
bodySizeLimit: bodySizeLimit,
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, sp.escapingScheme),
acceptEncodingHeader: acceptEncodingHeader(enableCompression),
metrics: sp.metrics,
}
targetInterval, targetTimeout, err := t.intervalAndTimeout(
time.Duration(sp.config.ScrapeInterval),
time.Duration(sp.config.ScrapeTimeout),
)
l := sp.newLoop(scrapeLoopOptions{
target: t,
scraper: s,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
maxSchema: maxSchema,
labelLimits: labelLimits,
honorLabels: honorLabels,
honorTimestamps: honorTimestamps,
enableCompression: enableCompression,
trackTimestampsStaleness: trackTimestampsStaleness,
mrc: mrc,
interval: interval,
timeout: timeout,
scrapeNativeHist: scrapeNativeHist,
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
convertClassicHistToNHCB: convertClassicHistToNHCB,
fallbackScrapeProtocol: fallbackScrapeProtocol,
target: t,
scraper: &targetScraper{
Target: t,
client: sp.client,
timeout: targetTimeout,
bodySizeLimit: int64(sp.config.BodySizeLimit),
acceptHeader: acceptHeader(sp.config.ScrapeProtocols, escapingScheme),
acceptEncodingHeader: acceptEncodingHeader(sp.config.EnableCompression),
metrics: sp.metrics,
},
cache: newScrapeCache(sp.metrics),
interval: targetInterval,
timeout: targetTimeout,
sp: sp,
})
if err != nil {
l.setForcedError(err)
@ -661,7 +558,7 @@ func verifyLabelLimits(lset labels.Labels, limits *labelLimits) error {
return nil
}
met := lset.Get(labels.MetricName)
met := lset.Get(model.MetricNameLabel)
if limits.labelLimit > 0 {
nbLabels := lset.Len()
if nbLabels > limits.labelLimit {
@ -749,8 +646,8 @@ func mutateReportSampleLabels(lset labels.Labels, target *Target) labels.Labels
return lb.Labels()
}
// appender returns an appender for ingested samples from the target.
func appender(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
// appenderWithLimits returns an appender with additional validation.
func appenderWithLimits(app storage.Appender, sampleLimit, bucketLimit int, maxSchema int32) storage.Appender {
app = &timeLimitAppender{
Appender: app,
maxTime: timestamp.FromTime(time.Now().Add(maxAheadTime)),
@ -927,55 +824,63 @@ type cacheEntry struct {
}
type scrapeLoop struct {
scraper scraper
l *slog.Logger
scrapeFailureLogger FailureLogger
scrapeFailureLoggerMtx sync.RWMutex
cache *scrapeCache
lastScrapeSize int
buffers *pool.Pool
offsetSeed uint64
honorTimestamps bool
trackTimestampsStaleness bool
enableCompression bool
forcedErr error
forcedErrMtx sync.Mutex
sampleLimit int
bucketLimit int
maxSchema int32
labelLimits *labelLimits
interval time.Duration
timeout time.Duration
validationScheme model.ValidationScheme
escapingScheme model.EscapingScheme
alwaysScrapeClassicHist bool
convertClassicHistToNHCB bool
enableSTZeroIngestion bool
enableTypeAndUnitLabels bool
fallbackScrapeProtocol string
enableNativeHistogramScraping bool
appender func(ctx context.Context) storage.Appender
symbolTable *labels.SymbolTable
sampleMutator labelsMutator
reportSampleMutator labelsMutator
parentCtx context.Context
appenderCtx context.Context
// Parameters.
ctx context.Context
cancel func()
stopped chan struct{}
parentCtx context.Context
appenderCtx context.Context
l *slog.Logger
cache *scrapeCache
disabledEndOfRunStalenessMarkers atomic.Bool
reportExtraMetrics bool
appendMetadataToWAL bool
metrics *scrapeMetrics
interval time.Duration
timeout time.Duration
sampleMutator labelsMutator
reportSampleMutator labelsMutator
scraper scraper
// Static params per scrapePool.
appendable storage.Appendable
buffers *pool.Pool
offsetSeed uint64
symbolTable *labels.SymbolTable
metrics *scrapeMetrics
// Options from config.ScrapeConfig.
sampleLimit int
bucketLimit int
maxSchema int32
labelLimits *labelLimits
honorLabels bool
honorTimestamps bool
trackTimestampsStaleness bool
enableNativeHistogramScraping bool
alwaysScrapeClassicHist bool
convertClassicHistToNHCB bool
fallbackScrapeProtocol string
enableCompression bool
mrc []*relabel.Config
validationScheme model.ValidationScheme
// Options from scrape.Options.
enableSTZeroIngestion bool
enableTypeAndUnitLabels bool
reportExtraMetrics bool
appendMetadataToWAL bool
passMetadataInContext bool
skipOffsetting bool // For testability.
// error injection through setForcedError.
forcedErr error
forcedErrMtx sync.Mutex
// Special logger set on setScrapeFailureLogger
scrapeFailureLoggerMtx sync.RWMutex
scrapeFailureLogger FailureLogger
skipOffsetting bool // For testability.
// Locally cached data.
lastScrapeSize int
disabledEndOfRunStalenessMarkers atomic.Bool
}
// scrapeCache tracks mappings of exposed metric strings to label sets and
@ -1000,8 +905,8 @@ type scrapeCache struct {
seriesCur map[storage.SeriesRef]*cacheEntry
seriesPrev map[storage.SeriesRef]*cacheEntry
// TODO(bwplotka): Consider moving Metadata API to use WAL instead of scrape loop to
// avoid locking (using metadata API can block scraping).
// TODO(bwplotka): Consider moving metadata caching to head. See
// https://github.com/prometheus/prometheus/issues/17619.
metaMtx sync.Mutex // Mutex is needed due to api touching it when metadata is queried.
metadata map[string]*metaEntry // metadata by metric family name.
@ -1236,99 +1141,87 @@ func (c *scrapeCache) LengthMetadata() int {
return len(c.metadata)
}
func newScrapeLoop(ctx context.Context,
sc scraper,
l *slog.Logger,
buffers *pool.Pool,
sampleMutator labelsMutator,
reportSampleMutator labelsMutator,
appender func(ctx context.Context) storage.Appender,
cache *scrapeCache,
symbolTable *labels.SymbolTable,
offsetSeed uint64,
honorTimestamps bool,
trackTimestampsStaleness bool,
enableCompression bool,
sampleLimit int,
bucketLimit int,
maxSchema int32,
labelLimits *labelLimits,
interval time.Duration,
timeout time.Duration,
alwaysScrapeClassicHist bool,
convertClassicHistToNHCB bool,
enableNativeHistogramScraping bool,
enableSTZeroIngestion bool,
enableTypeAndUnitLabels bool,
reportExtraMetrics bool,
appendMetadataToWAL bool,
target *Target,
passMetadataInContext bool,
metrics *scrapeMetrics,
skipOffsetting bool,
validationScheme model.ValidationScheme,
escapingScheme model.EscapingScheme,
fallbackScrapeProtocol string,
) *scrapeLoop {
if l == nil {
l = promslog.NewNopLogger()
}
if buffers == nil {
buffers = pool.New(1e3, 1e6, 3, func(sz int) any { return make([]byte, 0, sz) })
}
if cache == nil {
cache = newScrapeCache(metrics)
}
// scrapeLoopOptions contains static options that do not change per scrapePool lifecycle.
type scrapeLoopOptions struct {
target *Target
scraper scraper
cache *scrapeCache
interval, timeout time.Duration
sp *scrapePool
}
appenderCtx := ctx
// newScrapeLoop constructs new scrapeLoop.
// NOTE: Technically this could be a scrapePool method, but it's a standalone function to make it clear scrapeLoop
// can be used outside scrapePool lifecycle (e.g. in tests).
func newScrapeLoop(opts scrapeLoopOptions) *scrapeLoop {
// Update the targets retrieval function for metadata to a new target.
opts.target.SetMetadataStore(opts.cache)
if passMetadataInContext {
appenderCtx := opts.sp.ctx
if opts.sp.options.PassMetadataInContext {
// Store the cache and target in the context. This is then used by downstream OTel Collector
// to lookup the metadata required to process the samples. Not used by Prometheus itself.
// TODO(gouthamve) We're using a dedicated context because using the parentCtx caused a memory
// leak. We should ideally fix the main leak. See: https://github.com/prometheus/prometheus/pull/10590
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, cache)
appenderCtx = ContextWithTarget(appenderCtx, target)
}
sl := &scrapeLoop{
scraper: sc,
buffers: buffers,
cache: cache,
appender: appender,
symbolTable: symbolTable,
sampleMutator: sampleMutator,
reportSampleMutator: reportSampleMutator,
stopped: make(chan struct{}),
offsetSeed: offsetSeed,
l: l,
parentCtx: ctx,
appenderCtx: appenderCtx,
honorTimestamps: honorTimestamps,
trackTimestampsStaleness: trackTimestampsStaleness,
enableCompression: enableCompression,
sampleLimit: sampleLimit,
bucketLimit: bucketLimit,
maxSchema: maxSchema,
labelLimits: labelLimits,
interval: interval,
timeout: timeout,
alwaysScrapeClassicHist: alwaysScrapeClassicHist,
convertClassicHistToNHCB: convertClassicHistToNHCB,
enableSTZeroIngestion: enableSTZeroIngestion,
enableTypeAndUnitLabels: enableTypeAndUnitLabels,
fallbackScrapeProtocol: fallbackScrapeProtocol,
enableNativeHistogramScraping: enableNativeHistogramScraping,
reportExtraMetrics: reportExtraMetrics,
appendMetadataToWAL: appendMetadataToWAL,
metrics: metrics,
skipOffsetting: skipOffsetting,
validationScheme: validationScheme,
escapingScheme: escapingScheme,
}
sl.ctx, sl.cancel = context.WithCancel(ctx)
return sl
// TODO(bwplotka): Remove once OpenTelemetry collector uses AppenderV2 (add issue)
appenderCtx = ContextWithMetricMetadataStore(appenderCtx, opts.cache)
appenderCtx = ContextWithTarget(appenderCtx, opts.target)
}
ctx, cancel := context.WithCancel(opts.sp.ctx)
return &scrapeLoop{
ctx: ctx,
cancel: cancel,
stopped: make(chan struct{}),
parentCtx: opts.sp.ctx,
appenderCtx: appenderCtx,
l: opts.sp.logger.With("target", opts.target),
cache: opts.cache,
interval: opts.interval,
timeout: opts.timeout,
sampleMutator: func(l labels.Labels) labels.Labels {
return mutateSampleLabels(l, opts.target, opts.sp.config.HonorLabels, opts.sp.config.MetricRelabelConfigs)
},
reportSampleMutator: func(l labels.Labels) labels.Labels { return mutateReportSampleLabels(l, opts.target) },
scraper: opts.scraper,
// Static params per scrapePool.
appendable: opts.sp.appendable,
buffers: opts.sp.buffers,
offsetSeed: opts.sp.offsetSeed,
symbolTable: opts.sp.symbolTable,
metrics: opts.sp.metrics,
// config.ScrapeConfig.
sampleLimit: int(opts.sp.config.SampleLimit),
bucketLimit: int(opts.sp.config.NativeHistogramBucketLimit),
maxSchema: pickSchema(opts.sp.config.NativeHistogramMinBucketFactor),
labelLimits: &labelLimits{
labelLimit: int(opts.sp.config.LabelLimit),
labelNameLengthLimit: int(opts.sp.config.LabelNameLengthLimit),
labelValueLengthLimit: int(opts.sp.config.LabelValueLengthLimit),
},
honorLabels: opts.sp.config.HonorLabels,
honorTimestamps: opts.sp.config.HonorTimestamps,
trackTimestampsStaleness: opts.sp.config.TrackTimestampsStaleness,
enableNativeHistogramScraping: opts.sp.config.ScrapeNativeHistogramsEnabled(),
alwaysScrapeClassicHist: opts.sp.config.AlwaysScrapeClassicHistogramsEnabled(),
convertClassicHistToNHCB: opts.sp.config.ConvertClassicHistogramsToNHCBEnabled(),
fallbackScrapeProtocol: opts.sp.config.ScrapeFallbackProtocol.HeaderMediaType(),
enableCompression: opts.sp.config.EnableCompression,
mrc: opts.sp.config.MetricRelabelConfigs,
validationScheme: opts.sp.config.MetricNameValidationScheme,
// scrape.Options.
enableSTZeroIngestion: opts.sp.options.EnableStartTimestampZeroIngestion,
enableTypeAndUnitLabels: opts.sp.options.EnableTypeAndUnitLabels,
reportExtraMetrics: opts.sp.options.ExtraMetrics,
appendMetadataToWAL: opts.sp.options.AppendMetadata,
passMetadataInContext: opts.sp.options.PassMetadataInContext,
skipOffsetting: opts.sp.options.skipOffsetting,
}
}
func (sl *scrapeLoop) setScrapeFailureLogger(l FailureLogger) {
@ -1407,6 +1300,11 @@ mainLoop:
}
}
func (sl *scrapeLoop) appender() scrapeLoopAppendAdapter {
// NOTE(bwplotka): Add AppenderV2 implementation, see https://github.com/prometheus/prometheus/issues/17632.
return &scrapeLoopAppender{scrapeLoop: sl, Appender: sl.appendable.Appender(sl.appenderCtx)}
}
// scrapeAndReport performs a scrape and then appends the result to the storage
// together with reporting metrics, by using as few appenders as possible.
// In the happy scenario, a single appender is used.
@ -1428,10 +1326,10 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
var total, added, seriesAdded, bytesRead int
var err, appErr, scrapeErr error
app := sl.appender(sl.appenderCtx)
app := sl.appender()
defer func() {
if err != nil {
app.Rollback()
_ = app.Rollback()
return
}
err = app.Commit()
@ -1449,9 +1347,9 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
if forcedErr := sl.getForcedError(); forcedErr != nil {
scrapeErr = forcedErr
// Add stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.appenderCtx)
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Append failed", "err", err)
}
if errc != nil {
@ -1507,16 +1405,16 @@ func (sl *scrapeLoop) scrapeAndReport(last, appendTime time.Time, errc chan<- er
// A failed scrape is the same as an empty scrape,
// we still call sl.append to trigger stale markers.
total, added, seriesAdded, appErr = sl.append(app, b, contentType, appendTime)
total, added, seriesAdded, appErr = app.append(b, contentType, appendTime)
if appErr != nil {
app.Rollback()
app = sl.appender(sl.appenderCtx)
_ = app.Rollback()
app = sl.appender()
sl.l.Debug("Append failed", "err", appErr)
// The append failed, probably due to a parse error or sample limit.
// Call sl.append again with an empty scrape to trigger stale markers.
if _, _, _, err := sl.append(app, []byte{}, "", appendTime); err != nil {
app.Rollback()
app = sl.appender(sl.appenderCtx)
if _, _, _, err := app.append([]byte{}, "", appendTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Append failed", "err", err)
}
}
@ -1586,11 +1484,11 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
// If the target has since been recreated and scraped, the
// stale markers will be out of order and ignored.
// sl.context would have been cancelled, hence using sl.appenderCtx.
app := sl.appender(sl.appenderCtx)
app := sl.appender()
var err error
defer func() {
if err != nil {
app.Rollback()
_ = app.Rollback()
return
}
err = app.Commit()
@ -1598,9 +1496,9 @@ func (sl *scrapeLoop) endOfRunStaleness(last time.Time, ticker *time.Ticker, int
sl.l.Warn("Stale commit failed", "err", err)
}
}()
if _, _, _, err = sl.append(app, []byte{}, "", staleTime); err != nil {
app.Rollback()
app = sl.appender(sl.appenderCtx)
if _, _, _, err = app.append([]byte{}, "", staleTime); err != nil {
_ = app.Rollback()
app = sl.appender()
sl.l.Warn("Stale append failed", "err", err)
}
if err = sl.reportStale(app, staleTime); err != nil {
@ -1634,7 +1532,7 @@ type appendErrors struct {
func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (err error) {
sl.cache.forEachStale(func(ref storage.SeriesRef, lset labels.Labels) bool {
// Series no longer exposed, mark it stale.
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
app.SetOptions(&aOptionRejectEarlyOOO)
_, err = app.Append(ref, lset, defTime, math.Float64frombits(value.StaleNaN))
app.SetOptions(nil)
switch {
@ -1648,12 +1546,20 @@ func (sl *scrapeLoop) updateStaleMarkers(app storage.Appender, defTime int64) (e
return err
}
func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
type scrapeLoopAppender struct {
*scrapeLoop
storage.Appender
}
var _ scrapeLoopAppendAdapter = &scrapeLoopAppender{}
func (sl *scrapeLoopAppender) append(b []byte, contentType string, ts time.Time) (total, added, seriesAdded int, err error) {
defTime := timestamp.FromTime(ts)
if len(b) == 0 {
// Empty scrape. Just update the stale makers and swap the cache (but don't flush it).
err = sl.updateStaleMarkers(app, defTime)
err = sl.updateStaleMarkers(sl.Appender, defTime)
sl.cache.iterDone(false)
return total, added, seriesAdded, err
}
@ -1696,7 +1602,7 @@ func (sl *scrapeLoop) append(app storage.Appender, b []byte, contentType string,
exemplars := make([]exemplar.Exemplar, 0, 1)
// Take an appender with limits.
app = appender(app, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
app := appenderWithLimits(sl.Appender, sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
defer func() {
if err != nil {
@ -1785,7 +1691,7 @@ loop:
continue
}
if !lset.Has(labels.MetricName) {
if !lset.Has(model.MetricNameLabel) {
err = errNameLabelMandatory
break loop
}
@ -1859,7 +1765,7 @@ loop:
// But make sure we only do this if we have a cache entry (ce) for our series.
sl.cache.trackStaleness(ref, ce)
}
if sampleAdded && sampleLimitErr == nil && bucketLimitErr == nil {
if sampleLimitErr == nil && bucketLimitErr == nil {
seriesAdded++
}
}
@ -1917,7 +1823,7 @@ loop:
// In majority cases we can trust that the current series/histogram is matching the lastMeta and lastMFName.
// However, optional TYPE etc metadata and broken OM text can break this, detect those cases here.
// TODO(bwplotka): Consider moving this to parser as many parser users end up doing this (e.g. ST and NHCB parsing).
if isSeriesPartOfFamily(lset.Get(labels.MetricName), lastMFName, lastMeta.Type) {
if isSeriesPartOfFamily(lset.Get(model.MetricNameLabel), lastMFName, lastMeta.Type) {
if _, merr := app.UpdateMetadata(ref, lset, lastMeta.Metadata); merr != nil {
// No need to fail the scrape on errors appending metadata.
sl.l.Debug("Error when appending metadata in scrape loop", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", lastMeta.Metadata), "err", merr)
@ -2029,7 +1935,7 @@ func isSeriesPartOfFamily(mName string, mfName []byte, typ model.MetricType) boo
// during normal operation (e.g., accidental cardinality explosion, sudden traffic spikes).
// Current case ordering prevents exercising other cases when limits are exceeded.
// Remaining error cases typically occur only a few times, often during initial setup.
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (bool, error) {
func (sl *scrapeLoop) checkAddError(met []byte, err error, sampleLimitErr, bucketLimitErr *error, appErrs *appendErrors) (sampleAdded bool, _ error) {
switch {
case err == nil:
return true, nil
@ -2141,7 +2047,7 @@ var (
}
)
func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
func (sl *scrapeLoop) report(app scrapeLoopAppendAdapter, start time.Time, duration time.Duration, scraped, added, seriesAdded, bytes int, scrapeErr error) (err error) {
sl.scraper.Report(start, duration, scrapeErr)
ts := timestamp.FromTime(start)
@ -2152,71 +2058,70 @@ func (sl *scrapeLoop) report(app storage.Appender, start time.Time, duration tim
}
b := labels.NewBuilderWithSymbolTable(sl.symbolTable)
if err = sl.addReportSample(app, scrapeHealthMetric, ts, health, b); err != nil {
if err = app.addReportSample(scrapeHealthMetric, ts, health, b, false); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeDurationMetric, ts, duration.Seconds(), b); err != nil {
if err = app.addReportSample(scrapeDurationMetric, ts, duration.Seconds(), b, false); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, float64(scraped), b); err != nil {
if err = app.addReportSample(scrapeSamplesMetric, ts, float64(scraped), b, false); err != nil {
return err
}
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, float64(added), b); err != nil {
if err = app.addReportSample(samplesPostRelabelMetric, ts, float64(added), b, false); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, float64(seriesAdded), b); err != nil {
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, float64(seriesAdded), b, false); err != nil {
return err
}
if sl.reportExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b); err != nil {
if err = app.addReportSample(scrapeTimeoutMetric, ts, sl.timeout.Seconds(), b, false); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b); err != nil {
if err = app.addReportSample(scrapeSampleLimitMetric, ts, float64(sl.sampleLimit), b, false); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, float64(bytes), b); err != nil {
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, float64(bytes), b, false); err != nil {
return err
}
}
return err
}
func (sl *scrapeLoop) reportStale(app storage.Appender, start time.Time) (err error) {
func (sl *scrapeLoop) reportStale(app scrapeLoopAppendAdapter, start time.Time) (err error) {
ts := timestamp.FromTime(start)
app.SetOptions(&storage.AppendOptions{DiscardOutOfOrder: true})
stale := math.Float64frombits(value.StaleNaN)
b := labels.NewBuilder(labels.EmptyLabels())
if err = sl.addReportSample(app, scrapeHealthMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeHealthMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeDurationMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeDurationMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSamplesMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeSamplesMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, samplesPostRelabelMetric, ts, stale, b); err != nil {
if err = app.addReportSample(samplesPostRelabelMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSeriesAddedMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeSeriesAddedMetric, ts, stale, b, true); err != nil {
return err
}
if sl.reportExtraMetrics {
if err = sl.addReportSample(app, scrapeTimeoutMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeTimeoutMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeSampleLimitMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeSampleLimitMetric, ts, stale, b, true); err != nil {
return err
}
if err = sl.addReportSample(app, scrapeBodySizeBytesMetric, ts, stale, b); err != nil {
if err = app.addReportSample(scrapeBodySizeBytesMetric, ts, stale, b, true); err != nil {
return err
}
}
return err
}
func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t int64, v float64, b *labels.Builder) error {
func (sl *scrapeLoopAppender) addReportSample(s reportSample, t int64, v float64, b *labels.Builder, rejectOOO bool) (err error) {
ce, ok, _ := sl.cache.get(s.name)
var ref storage.SeriesRef
var lset labels.Labels
@ -2228,18 +2133,26 @@ func (sl *scrapeLoop) addReportSample(app storage.Appender, s reportSample, t in
// with scraped metrics in the cache.
// We have to drop it when building the actual metric.
b.Reset(labels.EmptyLabels())
b.Set(labels.MetricName, string(s.name[:len(s.name)-1]))
b.Set(model.MetricNameLabel, string(s.name[:len(s.name)-1]))
lset = sl.reportSampleMutator(b.Labels())
}
ref, err := app.Append(ref, lset, t, v)
// This will be improved in AppenderV2.
if rejectOOO {
sl.SetOptions(&aOptionRejectEarlyOOO)
ref, err = sl.Append(ref, lset, t, v)
sl.SetOptions(nil)
} else {
ref, err = sl.Append(ref, lset, t, v)
}
switch {
case err == nil:
if !ok {
sl.cache.addRef(s.name, ref, lset, lset.Hash())
// We only need to add metadata once a scrape target appears.
if sl.appendMetadataToWAL {
if _, merr := app.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
if _, merr := sl.UpdateMetadata(ref, lset, s.Metadata); merr != nil {
sl.l.Debug("Error when appending metadata in addReportSample", "ref", fmt.Sprintf("%d", ref), "metadata", fmt.Sprintf("%+v", s.Metadata), "err", merr)
}
}

File diff suppressed because it is too large Load Diff

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at

@ -1,4 +1,4 @@
// Copyright 2013 The Prometheus Authors
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
@ -14,7 +14,6 @@
package scrape
import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
@ -36,7 +35,7 @@ import (
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/teststorage"
)
const (
@ -611,12 +610,12 @@ func TestBucketLimitAppender(t *testing.T) {
},
}
resApp := &collectResultAppender{}
appTest := teststorage.NewAppendable()
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &bucketLimitAppender{Appender: resApp, limit: c.limit}
app := &bucketLimitAppender{Appender: appTest.Appender(t.Context()), limit: c.limit}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
@ -697,12 +696,12 @@ func TestMaxSchemaAppender(t *testing.T) {
},
}
resApp := &collectResultAppender{}
appTest := teststorage.NewAppendable()
for _, c := range cases {
for _, floatHisto := range []bool{true, false} {
t.Run(fmt.Sprintf("floatHistogram=%t", floatHisto), func(t *testing.T) {
app := &maxSchemaAppender{Appender: resApp, maxSchema: c.maxSchema}
app := &maxSchemaAppender{Appender: appTest.Appender(t.Context()), maxSchema: c.maxSchema}
ts := int64(10 * time.Minute / time.Millisecond)
lbls := labels.FromStrings("__name__", "sparse_histogram_series")
var err error
@ -723,17 +722,12 @@ func TestMaxSchemaAppender(t *testing.T) {
}
}
// Test sample_limit when a scrape containst Native Histograms.
// Test sample_limit when a scrape contains Native Histograms.
func TestAppendWithSampleLimitAndNativeHistogram(t *testing.T) {
const sampleLimit = 2
resApp := &collectResultAppender{}
sl := newBasicScrapeLoop(t, context.Background(), nil, func(_ context.Context) storage.Appender {
return resApp
}, 0)
sl.sampleLimit = sampleLimit
appTest := teststorage.NewAppendable()
now := time.Now()
app := appender(sl.appender(context.Background()), sl.sampleLimit, sl.bucketLimit, sl.maxSchema)
app := appenderWithLimits(appTest.Appender(t.Context()), 2, 0, histogram.ExponentialSchemaMax)
// sample_limit is set to 2, so first two scrapes should work
_, err := app.Append(0, labels.FromStrings(model.MetricNameLabel, "foo"), timestamp.FromTime(now), 1)

@ -24,11 +24,11 @@ jobs:
runs-on: ubuntu-latest
steps:
- name: Checkout repository
uses: actions/checkout@11bd71901bbe5b1630ceea73d27597364c9af683 # v4.2.2
uses: actions/checkout@8e8c483db84b4bee98b60c0593521ed34d9990e8 # v6.0.1
with:
persist-credentials: false
- name: Install Go
uses: actions/setup-go@44694675825211faa026b3c33043df3e48a5fa00 # v6.0.0
uses: actions/setup-go@4dc6199c7b1a012772edbd06daecab0f50c9053c # v6.1.0
with:
go-version: 1.25.x
- name: Install snmp_exporter/generator dependencies
@ -38,7 +38,7 @@ jobs:
id: golangci-lint-version
run: echo "version=$(make print-golangci-lint-version)" >> $GITHUB_OUTPUT
- name: Lint
uses: golangci/golangci-lint-action@4afd733a84b1f43292c63897423277bb7f4313a9 # v8.0.0
uses: golangci/golangci-lint-action@0a35821d5c230e903fcfe077583637dea1b27b47 # v9.0.0
with:
args: --verbose
version: ${{ steps.golangci-lint-version.outputs.version }}

@ -27,14 +27,15 @@ func mmap(f *os.File, size int) ([]byte, error) {
}
addr, errno := syscall.MapViewOfFile(h, syscall.FILE_MAP_READ, 0, 0, uintptr(size))
if addr == 0 {
return nil, os.NewSyscallError("MapViewOfFile", errno)
}
if err := syscall.CloseHandle(syscall.Handle(h)); err != nil {
return nil, os.NewSyscallError("CloseHandle", err)
}
if addr == 0 {
return nil, os.NewSyscallError("MapViewOfFile", errno)
}
return (*[maxMapSize]byte)(unsafe.Pointer(addr))[:size], nil
}

@ -56,6 +56,7 @@ func (c *compressedResponseWriter) Close() {
// Constructs a new compressedResponseWriter based on client request headers.
func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request) *compressedResponseWriter {
writer.Header().Add("Vary", acceptEncodingHeader)
raw := req.Header.Get(acceptEncodingHeader)
var (
encoding string
@ -65,13 +66,17 @@ func newCompressedResponseWriter(writer http.ResponseWriter, req *http.Request)
encoding, raw, commaFound = strings.Cut(raw, ",")
switch strings.TrimSpace(encoding) {
case gzipEncoding:
writer.Header().Set(contentEncodingHeader, gzipEncoding)
h := writer.Header()
h.Del("Content-Length") // avoid stale length after compression
h.Set(contentEncodingHeader, gzipEncoding)
return &compressedResponseWriter{
ResponseWriter: writer,
writer: gzip.NewWriter(writer),
}
case deflateEncoding:
writer.Header().Set(contentEncodingHeader, deflateEncoding)
h := writer.Header()
h.Del("Content-Length")
h.Set(contentEncodingHeader, deflateEncoding)
return &compressedResponseWriter{
ResponseWriter: writer,
writer: zlib.NewWriter(writer),

@ -0,0 +1,399 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package teststorage
import (
"context"
"errors"
"fmt"
"math"
"slices"
"strings"
"sync"
"github.com/prometheus/common/model"
"go.uber.org/atomic"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/storage"
)
// Sample represents test, combined sample for mocking storage.AppenderV2.
type Sample struct {
MF string
L labels.Labels
M metadata.Metadata
ST, T int64
V float64
H *histogram.Histogram
FH *histogram.FloatHistogram
ES []exemplar.Exemplar
}
func (s Sample) String() string {
// Attempting to format similar to ~ OpenMetrics 2.0 for readability.
b := strings.Builder{}
if s.M.Help != "" {
b.WriteString("HELP ")
b.WriteString(s.M.Help)
b.WriteString("\n")
}
if s.M.Type != model.MetricTypeUnknown && s.M.Type != "" {
b.WriteString("type@")
b.WriteString(string(s.M.Type))
b.WriteString(" ")
}
if s.M.Unit != "" {
b.WriteString("unit@")
b.WriteString(s.M.Unit)
b.WriteString(" ")
}
// Print all value types on purpose, to catch bugs for appending multiple sample types at once.
h := ""
if s.H != nil {
h = s.H.String()
}
fh := ""
if s.FH != nil {
fh = s.FH.String()
}
b.WriteString(fmt.Sprintf("%s %v%v%v st@%v t@%v\n", s.L.String(), s.V, h, fh, s.ST, s.T))
return b.String()
}
func (s Sample) Equals(other Sample) bool {
return strings.Compare(s.MF, other.MF) == 0 &&
labels.Equal(s.L, other.L) &&
s.M.Equals(other.M) &&
s.ST == other.ST &&
s.T == other.T &&
math.Float64bits(s.V) == math.Float64bits(other.V) && // Compare Float64bits so NaN values which are exactly the same will compare equal.
s.H.Equals(other.H) &&
s.FH.Equals(other.FH) &&
slices.EqualFunc(s.ES, other.ES, exemplar.Exemplar.Equals)
}
// Appendable is a storage.Appendable mock.
// It allows recording all samples that were added through the appender and injecting errors.
// Appendable will panic if more than one Appender is open.
type Appendable struct {
appendErrFn func(ls labels.Labels) error // If non-nil, inject appender error on every Append, AppendHistogram and ST zero calls.
appendExemplarsError error // If non-nil, inject exemplar error.
commitErr error // If non-nil, inject commit error.
mtx sync.Mutex
openAppenders atomic.Int32 // Guard against multi-appender use.
// Recorded results.
pendingSamples []Sample
resultSamples []Sample
rolledbackSamples []Sample
// Optional chain (Appender will collect samples, then run next).
next storage.Appendable
}
// NewAppendable returns mock Appendable.
func NewAppendable() *Appendable {
return &Appendable{}
}
// Then chains another appender from the provided appendable for the Appender calls.
func (a *Appendable) Then(appendable storage.Appendable) *Appendable {
a.next = appendable
return a
}
// WithErrs allows injecting errors to the appender.
func (a *Appendable) WithErrs(appendErrFn func(ls labels.Labels) error, appendExemplarsError, commitErr error) *Appendable {
a.appendErrFn = appendErrFn
a.appendExemplarsError = appendExemplarsError
a.commitErr = commitErr
return a
}
// PendingSamples returns pending samples (samples appended without commit).
func (a *Appendable) PendingSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
ret := make([]Sample, len(a.pendingSamples))
copy(ret, a.pendingSamples)
return ret
}
// ResultSamples returns committed samples.
func (a *Appendable) ResultSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
ret := make([]Sample, len(a.resultSamples))
copy(ret, a.resultSamples)
return ret
}
// RolledbackSamples returns rolled back samples.
func (a *Appendable) RolledbackSamples() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
ret := make([]Sample, len(a.rolledbackSamples))
copy(ret, a.rolledbackSamples)
return ret
}
func (a *Appendable) ResultReset() {
a.mtx.Lock()
defer a.mtx.Unlock()
a.pendingSamples = a.pendingSamples[:0]
a.resultSamples = a.resultSamples[:0]
a.rolledbackSamples = a.rolledbackSamples[:0]
}
// ResultMetadata returns resultSamples with samples only containing L and M.
// This is for compatibility with tests that only focus on metadata.
//
// TODO: Rewrite tests to test metadata on resultSamples instead.
func (a *Appendable) ResultMetadata() []Sample {
a.mtx.Lock()
defer a.mtx.Unlock()
var ret []Sample
for _, s := range a.resultSamples {
if s.M.IsEmpty() {
continue
}
ret = append(ret, Sample{L: s.L, M: s.M})
}
return ret
}
func (a *Appendable) String() string {
var sb strings.Builder
sb.WriteString("committed:\n")
for _, s := range a.resultSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
sb.WriteString("pending:\n")
for _, s := range a.pendingSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
sb.WriteString("rolledback:\n")
for _, s := range a.rolledbackSamples {
sb.WriteString("\n")
sb.WriteString(s.String())
}
return sb.String()
}
var errClosedAppender = errors.New("appender was already committed/rolledback")
type appender struct {
err error
next storage.Appender
a *Appendable
}
func (a *appender) checkErr() error {
a.a.mtx.Lock()
defer a.a.mtx.Unlock()
return a.err
}
func (a *Appendable) Appender(ctx context.Context) storage.Appender {
ret := &appender{a: a}
if a.openAppenders.Inc() > 1 {
ret.err = errors.New("teststorage.Appendable.Appender() concurrent use is not supported; attempted opening new Appender() without Commit/Rollback of the previous one. Extend the implementation if concurrent mock is needed")
}
if a.next != nil {
ret.next = a.next.Appender(ctx)
}
return ret
}
func (*appender) SetOptions(*storage.AppendOptions) {}
func (a *appender) Append(ref storage.SeriesRef, ls labels.Labels, t int64, v float64) (storage.SeriesRef, error) {
if err := a.checkErr(); err != nil {
return 0, err
}
if a.a.appendErrFn != nil {
if err := a.a.appendErrFn(ls); err != nil {
return 0, err
}
}
a.a.mtx.Lock()
a.a.pendingSamples = append(a.a.pendingSamples, Sample{L: ls, T: t, V: v})
a.a.mtx.Unlock()
if a.next != nil {
return a.next.Append(ref, ls, t, v)
}
return computeOrCheckRef(ref, ls)
}
func computeOrCheckRef(ref storage.SeriesRef, ls labels.Labels) (storage.SeriesRef, error) {
h := ls.Hash()
if ref == 0 {
// Use labels hash as a stand-in for unique series reference, to avoid having to track all series.
return storage.SeriesRef(h), nil
}
if storage.SeriesRef(h) != ref {
// Check for buggy ref while we at it.
return 0, errors.New("teststorage.appender: found input ref not matching labels; potential bug in Appendable user")
}
return ref, nil
}
func (a *appender) AppendHistogram(ref storage.SeriesRef, ls labels.Labels, t int64, h *histogram.Histogram, fh *histogram.FloatHistogram) (storage.SeriesRef, error) {
if err := a.checkErr(); err != nil {
return 0, err
}
if a.a.appendErrFn != nil {
if err := a.a.appendErrFn(ls); err != nil {
return 0, err
}
}
a.a.mtx.Lock()
a.a.pendingSamples = append(a.a.pendingSamples, Sample{L: ls, T: t, H: h, FH: fh})
a.a.mtx.Unlock()
if a.next != nil {
return a.next.AppendHistogram(ref, ls, t, h, fh)
}
return computeOrCheckRef(ref, ls)
}
func (a *appender) AppendExemplar(ref storage.SeriesRef, l labels.Labels, e exemplar.Exemplar) (storage.SeriesRef, error) {
if err := a.checkErr(); err != nil {
return 0, err
}
if a.a.appendExemplarsError != nil {
return 0, a.a.appendExemplarsError
}
a.a.mtx.Lock()
// NOTE(bwplotka): Eventually exemplar has to be attached to a series and soon
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
i := len(a.a.pendingSamples) - 1
for ; i >= 0; i-- { // Attach exemplars to the last matching sample.
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
a.a.pendingSamples[i].ES = append(a.a.pendingSamples[i].ES, e)
break
}
}
a.a.mtx.Unlock()
if i < 0 {
return 0, fmt.Errorf("teststorage.appender: exemplar appender without series; ref %v; l %v; exemplar: %v", ref, l, e)
}
if a.next != nil {
return a.next.AppendExemplar(ref, l, e)
}
return computeOrCheckRef(ref, l)
}
func (a *appender) AppendSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64) (storage.SeriesRef, error) {
return a.Append(ref, l, st, 0.0) // This will change soon with AppenderV2, but we already report ST as 0 samples.
}
func (a *appender) AppendHistogramSTZeroSample(ref storage.SeriesRef, l labels.Labels, _, st int64, h *histogram.Histogram, _ *histogram.FloatHistogram) (storage.SeriesRef, error) {
if h != nil {
return a.AppendHistogram(ref, l, st, &histogram.Histogram{}, nil)
}
return a.AppendHistogram(ref, l, st, nil, &histogram.FloatHistogram{}) // This will change soon with AppenderV2, but we already report ST as 0 histograms.
}
func (a *appender) UpdateMetadata(ref storage.SeriesRef, l labels.Labels, m metadata.Metadata) (storage.SeriesRef, error) {
if err := a.checkErr(); err != nil {
return 0, err
}
a.a.mtx.Lock()
// NOTE(bwplotka): Eventually metadata has to be attached to a series and soon
// the AppenderV2 will guarantee that for TSDB. Assume this from the mock perspective
// with the naive attaching. See: https://github.com/prometheus/prometheus/issues/17632
i := len(a.a.pendingSamples) - 1
for ; i >= 0; i-- { // Attach metadata to the last matching sample.
if ref == storage.SeriesRef(a.a.pendingSamples[i].L.Hash()) {
a.a.pendingSamples[i].M = m
break
}
}
a.a.mtx.Unlock()
if i < 0 {
return 0, fmt.Errorf("teststorage.appender: metadata update without series; ref %v; l %v; m: %v", ref, l, m)
}
if a.next != nil {
return a.next.UpdateMetadata(ref, l, m)
}
return computeOrCheckRef(ref, l)
}
func (a *appender) Commit() error {
if err := a.checkErr(); err != nil {
return err
}
defer a.a.openAppenders.Dec()
if a.a.commitErr != nil {
return a.a.commitErr
}
a.a.mtx.Lock()
a.a.resultSamples = append(a.a.resultSamples, a.a.pendingSamples...)
a.a.pendingSamples = a.a.pendingSamples[:0]
a.err = errClosedAppender
a.a.mtx.Unlock()
if a.a.next != nil {
return a.next.Commit()
}
return nil
}
func (a *appender) Rollback() error {
if err := a.checkErr(); err != nil {
return err
}
defer a.a.openAppenders.Dec()
a.a.mtx.Lock()
a.a.rolledbackSamples = append(a.a.rolledbackSamples, a.a.pendingSamples...)
a.a.pendingSamples = a.a.pendingSamples[:0]
a.err = errClosedAppender
a.a.mtx.Unlock()
if a.next != nil {
return a.next.Rollback()
}
return nil
}

@ -0,0 +1,131 @@
// Copyright The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package teststorage
import (
"errors"
"fmt"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/stretchr/testify/require"
"github.com/prometheus/prometheus/model/exemplar"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/model/metadata"
"github.com/prometheus/prometheus/util/testutil"
)
// TestSample_RequireEqual ensures standard testutil.RequireEqual is enough for comparisons.
// This is thanks to the fact metadata has now Equals method.
func TestSample_RequireEqual(t *testing.T) {
a := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
}
testutil.RequireEqual(t, a, a)
b1 := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2_diff", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123}, // test_metric2_diff is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
}
requireNotEqual(t, a, b1)
b2 := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo2")}}}, // exemplar is different.
}
requireNotEqual(t, a, b2)
b3 := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123, T: 123}, // Timestamp is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
}
requireNotEqual(t, a, b3)
b4 := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter", Unit: "metric", Help: "some help text"}},
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 456.456}, // Value is different.
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
}
requireNotEqual(t, a, b4)
b5 := []Sample{
{},
{L: labels.FromStrings("__name__", "test_metric_total"), M: metadata.Metadata{Type: "counter2", Unit: "metric", Help: "some help text"}}, // Different type.
{L: labels.FromStrings("__name__", "test_metric2", "foo", "bar"), M: metadata.Metadata{Type: "gauge", Unit: "", Help: "other help text"}, V: 123.123},
{ES: []exemplar.Exemplar{{Labels: labels.FromStrings("__name__", "yolo")}}},
}
requireNotEqual(t, a, b5)
}
// TODO(bwplotka): While this mimick testutil.RequireEqual just making it negative, this does not literally test
// testutil.RequireEqual. Either build test suita that mocks `testing.TB` or get rid of testutil.RequireEqual somehow.
func requireNotEqual(t testing.TB, a, b any) {
t.Helper()
if !cmp.Equal(a, b, cmp.Comparer(labels.Equal)) {
return
}
require.Fail(t, fmt.Sprintf("Equal, but expected not: \n"+
"a: %s\n"+
"b: %s", a, b))
}
func TestConcurrentAppender_ReturnsErrAppender(t *testing.T) {
a := NewAppendable()
// Non-concurrent multiple use if fine.
app := a.Appender(t.Context())
require.Equal(t, int32(1), a.openAppenders.Load())
require.NoError(t, app.Commit())
// Repeated commit fails.
require.Error(t, app.Commit())
app = a.Appender(t.Context())
require.NoError(t, app.Rollback())
// Commit after rollback fails.
require.Error(t, app.Commit())
a.WithErrs(
nil,
nil,
errors.New("commit err"),
)
app = a.Appender(t.Context())
require.Error(t, app.Commit())
a.WithErrs(nil, nil, nil)
app = a.Appender(t.Context())
require.NoError(t, app.Commit())
require.Equal(t, int32(0), a.openAppenders.Load())
// Concurrent use should return appender that errors.
_ = a.Appender(t.Context())
app = a.Appender(t.Context())
_, err := app.Append(0, labels.EmptyLabels(), 0, 0)
require.Error(t, err)
_, err = app.AppendHistogram(0, labels.EmptyLabels(), 0, nil, nil)
require.Error(t, err)
require.Error(t, app.Commit())
require.Error(t, app.Rollback())
}

@ -24,7 +24,7 @@
},
"mantine-ui": {
"name": "@prometheus-io/mantine-ui",
"version": "0.308.0",
"version": "0.308.1",
"dependencies": {
"@codemirror/autocomplete": "^6.19.1",
"@codemirror/language": "^6.11.3",
@ -42,7 +42,7 @@
"@microsoft/fetch-event-source": "^2.0.1",
"@nexucis/fuzzy": "^0.5.1",
"@nexucis/kvsearch": "^0.9.1",
"@prometheus-io/codemirror-promql": "0.308.0",
"@prometheus-io/codemirror-promql": "0.308.1",
"@reduxjs/toolkit": "^2.10.1",
"@tabler/icons-react": "^3.35.0",
"@tanstack/react-query": "^5.90.7",
@ -88,10 +88,10 @@
},
"module/codemirror-promql": {
"name": "@prometheus-io/codemirror-promql",
"version": "0.308.0",
"version": "0.308.1",
"license": "Apache-2.0",
"dependencies": {
"@prometheus-io/lezer-promql": "0.308.0",
"@prometheus-io/lezer-promql": "0.308.1",
"lru-cache": "^11.2.2"
},
"devDependencies": {
@ -121,7 +121,7 @@
},
"module/lezer-promql": {
"name": "@prometheus-io/lezer-promql",
"version": "0.308.0",
"version": "0.308.1",
"license": "Apache-2.0",
"devDependencies": {
"@lezer/generator": "^1.8.0",
@ -8693,10 +8693,11 @@
}
},
"node_modules/ts-jest": {
"version": "29.4.5",
"resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.4.5.tgz",
"integrity": "sha512-HO3GyiWn2qvTQA4kTgjDcXiMwYQt68a1Y8+JuLRVpdIzm+UOLSHgl/XqR4c6nzJkq5rOkjc02O2I7P7l/Yof0Q==",
"version": "29.4.6",
"resolved": "https://registry.npmjs.org/ts-jest/-/ts-jest-29.4.6.tgz",
"integrity": "sha512-fSpWtOO/1AjSNQguk43hb/JCo16oJDnMJf3CdEGNkqsEX3t0KX96xvyX1D7PfLCpVoKu4MfVrqUkFyblYoY4lA==",
"dev": true,
"license": "MIT",
"dependencies": {
"bs-logger": "^0.2.6",
"fast-json-stable-stringify": "^2.1.0",

Loading…
Cancel
Save