feat(unified-storage): prune history table based on limits (#101970)

pull/101347/head^2
Jean-Philippe Quéméner 3 months ago committed by GitHub
parent cacdf00067
commit 1700a8aa9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      packages/grafana-data/src/types/featureToggles.gen.ts
  2. 8
      pkg/services/featuremgmt/registry.go
  3. 1
      pkg/services/featuremgmt/toggles_gen.csv
  4. 4
      pkg/services/featuremgmt/toggles_gen.go
  5. 14
      pkg/services/featuremgmt/toggles_gen.json
  6. 2
      pkg/storage/unified/client.go
  7. 113
      pkg/storage/unified/sql/backend.go
  8. 22
      pkg/storage/unified/sql/data/resource_history_prune.sql
  9. 27
      pkg/storage/unified/sql/queries.go
  10. 15
      pkg/storage/unified/sql/queries_test.go
  11. 15
      pkg/storage/unified/sql/server.go
  12. 2
      pkg/storage/unified/sql/service.go
  13. 22
      pkg/storage/unified/sql/testdata/mysql--resource_history_prune-simple.sql
  14. 22
      pkg/storage/unified/sql/testdata/postgres--resource_history_prune-simple.sql
  15. 22
      pkg/storage/unified/sql/testdata/sqlite--resource_history_prune-simple.sql

@ -259,4 +259,5 @@ export interface FeatureToggles {
extraLanguages?: boolean;
noBackdropBlur?: boolean;
alertingMigrationUI?: boolean;
unifiedStorageHistoryPruner?: boolean;
}

@ -1820,6 +1820,14 @@ var (
HideFromAdminPage: true,
HideFromDocs: true,
},
{
Name: "unifiedStorageHistoryPruner",
Description: "Enables the unified storage history pruner",
Stage: FeatureStageExperimental,
Owner: grafanaSearchAndStorageSquad,
HideFromAdminPage: true,
HideFromDocs: true,
},
}
)

@ -240,3 +240,4 @@ inviteUserExperimental,experimental,@grafana/sharing-squad,false,false,true
extraLanguages,experimental,@grafana/grafana-frontend-platform,false,false,true
noBackdropBlur,experimental,@grafana/grafana-frontend-platform,false,false,true
alertingMigrationUI,experimental,@grafana/alerting-squad,false,false,true
unifiedStorageHistoryPruner,experimental,@grafana/search-and-storage,false,false,false

1 Name Stage Owner requiresDevMode RequiresRestart FrontendOnly
240 extraLanguages experimental @grafana/grafana-frontend-platform false false true
241 noBackdropBlur experimental @grafana/grafana-frontend-platform false false true
242 alertingMigrationUI experimental @grafana/alerting-squad false false true
243 unifiedStorageHistoryPruner experimental @grafana/search-and-storage false false false

@ -970,4 +970,8 @@ const (
// FlagAlertingMigrationUI
// Enables the alerting migration UI, to migrate datasource-managed rules to Grafana-managed rules
FlagAlertingMigrationUI = "alertingMigrationUI"
// FlagUnifiedStorageHistoryPruner
// Enables the unified storage history pruner
FlagUnifiedStorageHistoryPruner = "unifiedStorageHistoryPruner"
)

@ -4195,6 +4195,20 @@
"codeowner": "@grafana/search-and-storage"
}
},
{
"metadata": {
"name": "unifiedStorageHistoryPruner",
"resourceVersion": "1742163088045",
"creationTimestamp": "2025-03-16T22:11:28Z"
},
"spec": {
"description": "Enables the unified storage history pruner",
"stage": "experimental",
"codeowner": "@grafana/search-and-storage",
"hideFromAdminPage": true,
"hideFromDocs": true
}
},
{
"metadata": {
"name": "unifiedStorageSearch",

@ -136,7 +136,7 @@ func newClient(opts options.StorageOptions,
if err != nil {
return nil, err
}
server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics, indexMetrics)
server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics, indexMetrics, features)
if err != nil {
return nil, err
}

@ -10,6 +10,7 @@ import (
"time"
"github.com/google/uuid"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
@ -20,6 +21,7 @@ import (
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/util/debouncer"
)
const tracePrefix = "sql.resource."
@ -35,11 +37,16 @@ type Backend interface {
type BackendOptions struct {
DBProvider db.DBProvider
Tracer trace.Tracer
Reg prometheus.Registerer
PollingInterval time.Duration
WatchBufferSize int
IsHA bool
storageMetrics *resource.StorageMetrics
// If true, the backend will prune history on write events.
// Will be removed once fully rolled out.
withPruner bool
// testing
SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount
}
@ -65,15 +72,39 @@ func NewBackend(opts BackendOptions) (Backend, error) {
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
reg: opts.Reg,
dbProvider: opts.DBProvider,
pollingInterval: opts.PollingInterval,
watchBufferSize: opts.WatchBufferSize,
storageMetrics: opts.storageMetrics,
bulkLock: &bulkLock{running: make(map[string]bool)},
simulatedNetworkLatency: opts.SimulatedNetworkLatency,
withPruner: opts.withPruner,
}, nil
}
// pruningKey is a comparable key for pruning history.
type pruningKey struct {
namespace string
group string
resource string
}
// Small abstraction to allow for different pruner implementations.
// This can be removed once the debouncer is deployed.
type pruner interface {
Add(key pruningKey) error
Start(ctx context.Context)
}
type noopPruner struct{}
func (p *noopPruner) Add(key pruningKey) error {
return nil
}
func (p *noopPruner) Start(ctx context.Context) {}
type backend struct {
//general
isHA bool
@ -87,6 +118,7 @@ type backend struct {
// o11y
log log.Logger
tracer trace.Tracer
reg prometheus.Registerer
storageMetrics *resource.StorageMetrics
// database
@ -106,6 +138,9 @@ type backend struct {
// testing
simulatedNetworkLatency time.Duration
historyPruner pruner
withPruner bool
}
func (b *backend) Init(ctx context.Context) error {
@ -116,13 +151,18 @@ func (b *backend) Init(ctx context.Context) error {
}
func (b *backend) initLocked(ctx context.Context) error {
db, err := b.dbProvider.Init(ctx)
dbConn, err := b.dbProvider.Init(ctx)
if err != nil {
return fmt.Errorf("initialize resource DB: %w", err)
}
b.db = db
driverName := db.DriverName()
if err := dbConn.PingContext(ctx); err != nil {
return fmt.Errorf("ping resource DB: %w", err)
}
b.db = dbConn
driverName := dbConn.DriverName()
b.dialect = sqltemplate.DialectForDriver(driverName)
if b.dialect == nil {
return fmt.Errorf("no dialect for driver %q", driverName)
@ -146,7 +186,68 @@ func (b *backend) initLocked(ctx context.Context) error {
}
b.notifier = notifier
return b.db.PingContext(ctx)
if err := b.initPruner(ctx); err != nil {
return fmt.Errorf("failed to create pruner: %w", err)
}
return nil
}
func (b *backend) initPruner(ctx context.Context) error {
if !b.withPruner {
b.log.Debug("using noop history pruner")
b.historyPruner = &noopPruner{}
return nil
}
b.log.Debug("using debounced history pruner")
// Initialize history pruner.
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{
Name: "history_pruner",
BufferSize: 1000,
MinWait: time.Second * 30,
MaxWait: time.Minute * 5,
ProcessHandler: func(ctx context.Context, key pruningKey) error {
return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{
SQLTemplate: sqltemplate.New(b.dialect),
HistoryLimit: 100,
Key: &resource.ResourceKey{
Namespace: key.namespace,
Group: key.group,
Resource: key.resource,
},
})
if err != nil {
return fmt.Errorf("failed to prune history: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
b.log.Debug("pruned history successfully",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"rows", rows)
return nil
})
},
ErrorHandler: func(key pruningKey, err error) {
b.log.Error("failed to prune history",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"error", err)
},
Reg: b.reg,
})
if err != nil {
return err
}
b.historyPruner = pruner
b.historyPruner.Start(ctx)
return nil
}
func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
@ -246,6 +347,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource})
if b.simulatedNetworkLatency > 0 {
time.Sleep(b.simulatedNetworkLatency)
}
@ -299,6 +401,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource})
return guid, nil
})
@ -346,6 +449,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource})
return guid, nil
})
@ -394,6 +498,7 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource})
// 3. Update all resource history entries with the new UID
// Note: we do not update any history entries that have a deletion timestamp included. This will become

@ -0,0 +1,22 @@
DELETE FROM {{ .Ident "resource_history" }}
WHERE {{ .Ident "guid" }} IN (
SELECT {{ .Ident "guid" }}
FROM (
SELECT
{{ .Ident "guid" }},
ROW_NUMBER() OVER (
PARTITION BY
{{ .Ident "namespace" }},
{{ .Ident "group" }},
{{ .Ident "resource" }},
{{ .Ident "name" }}
ORDER BY {{ .Ident "resource_version" }} DESC
) AS {{ .Ident "rn" }}
FROM {{ .Ident "resource_history" }}
WHERE
{{ .Ident "namespace" }} = {{ .Arg .Key.Namespace }}
AND {{ .Ident "group" }} = {{ .Arg .Key.Group }}
AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }}
) AS {{ .Ident "ranked" }}
WHERE {{ .Ident "rn" }} > {{ .Arg .HistoryLimit }}
);

@ -44,6 +44,7 @@ var (
sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql")
sqlResourceHistoryGet = mustTemplate("resource_history_get.sql")
sqlResourceHistoryDelete = mustTemplate("resource_history_delete.sql")
sqlResourceHistoryPrune = mustTemplate("resource_history_prune.sql")
sqlResourceInsertFromHistory = mustTemplate("resource_insert_from_history.sql")
// sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql")
@ -252,6 +253,32 @@ func (r sqlGetHistoryRequest) Validate() error {
return nil // TODO
}
// prune resource history
type sqlPruneHistoryRequest struct {
sqltemplate.SQLTemplate
Key *resource.ResourceKey
HistoryLimit int64
}
func (r *sqlPruneHistoryRequest) Validate() error {
if r.HistoryLimit <= 0 {
return fmt.Errorf("history limit must be greater than zero")
}
if r.Key == nil {
return fmt.Errorf("missing key")
}
if r.Key.Namespace == "" {
return fmt.Errorf("missing namespace")
}
if r.Key.Group == "" {
return fmt.Errorf("missing group")
}
if r.Key.Resource == "" {
return fmt.Errorf("missing resource")
}
return nil
}
// update resource history
type sqlResourceHistoryUpdateRequest struct {

@ -255,6 +255,21 @@ func TestUnifiedStorageQueries(t *testing.T) {
},
},
sqlResourceHistoryPrune: {
{
Name: "simple",
Data: &sqlPruneHistoryRequest{
SQLTemplate: mocks.NewTestingSQLTemplate(),
Key: &resource.ResourceKey{
Namespace: "nn",
Group: "gg",
Resource: "rr",
},
HistoryLimit: 100,
},
},
},
sqlResourceVersionGet: {
{
Name: "single path",

@ -10,6 +10,7 @@ import (
infraDB "github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/storage/unified/resource"
@ -18,7 +19,9 @@ import (
// Creates a new ResourceServer
func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics) (resource.ResourceServer, error) {
tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient,
searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics,
indexMetrics *resource.BleveIndexMetrics, features featuremgmt.FeatureToggles) (resource.ResourceServer, error) {
apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver")
opts := resource.ResourceServerOptions{
Tracer: tracer,
@ -46,8 +49,16 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg,
}
isHA := isHighAvailabilityEnabled(cfg.SectionWithEnvOverrides("database"))
withPruner := features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner)
store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA, storageMetrics: storageMetrics})
store, err := NewBackend(BackendOptions{
DBProvider: eDB,
Tracer: tracer,
Reg: reg,
IsHA: isHA,
withPruner: withPruner,
storageMetrics: storageMetrics,
})
if err != nil {
return nil, err
}

@ -118,7 +118,7 @@ func (s *service) start(ctx context.Context) error {
return err
}
server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics, s.indexMetrics)
server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics, s.indexMetrics, s.features)
if err != nil {
return err
}

@ -0,0 +1,22 @@
DELETE FROM `resource_history`
WHERE `guid` IN (
SELECT `guid`
FROM (
SELECT
`guid`,
ROW_NUMBER() OVER (
PARTITION BY
`namespace`,
`group`,
`resource`,
`name`
ORDER BY `resource_version` DESC
) AS `rn`
FROM `resource_history`
WHERE
`namespace` = 'nn'
AND `group` = 'gg'
AND `resource` = 'rr'
) AS `ranked`
WHERE `rn` > 100
);

@ -0,0 +1,22 @@
DELETE FROM "resource_history"
WHERE "guid" IN (
SELECT "guid"
FROM (
SELECT
"guid",
ROW_NUMBER() OVER (
PARTITION BY
"namespace",
"group",
"resource",
"name"
ORDER BY "resource_version" DESC
) AS "rn"
FROM "resource_history"
WHERE
"namespace" = 'nn'
AND "group" = 'gg'
AND "resource" = 'rr'
) AS "ranked"
WHERE "rn" > 100
);

@ -0,0 +1,22 @@
DELETE FROM "resource_history"
WHERE "guid" IN (
SELECT "guid"
FROM (
SELECT
"guid",
ROW_NUMBER() OVER (
PARTITION BY
"namespace",
"group",
"resource",
"name"
ORDER BY "resource_version" DESC
) AS "rn"
FROM "resource_history"
WHERE
"namespace" = 'nn'
AND "group" = 'gg'
AND "resource" = 'rr'
) AS "ranked"
WHERE "rn" > 100
);
Loading…
Cancel
Save