diff --git a/packages/grafana-data/src/types/featureToggles.gen.ts b/packages/grafana-data/src/types/featureToggles.gen.ts index 033b7c30e05..2658b66d145 100644 --- a/packages/grafana-data/src/types/featureToggles.gen.ts +++ b/packages/grafana-data/src/types/featureToggles.gen.ts @@ -259,4 +259,5 @@ export interface FeatureToggles { extraLanguages?: boolean; noBackdropBlur?: boolean; alertingMigrationUI?: boolean; + unifiedStorageHistoryPruner?: boolean; } diff --git a/pkg/services/featuremgmt/registry.go b/pkg/services/featuremgmt/registry.go index c6abe10a09a..eb027590f7c 100644 --- a/pkg/services/featuremgmt/registry.go +++ b/pkg/services/featuremgmt/registry.go @@ -1820,6 +1820,14 @@ var ( HideFromAdminPage: true, HideFromDocs: true, }, + { + Name: "unifiedStorageHistoryPruner", + Description: "Enables the unified storage history pruner", + Stage: FeatureStageExperimental, + Owner: grafanaSearchAndStorageSquad, + HideFromAdminPage: true, + HideFromDocs: true, + }, } ) diff --git a/pkg/services/featuremgmt/toggles_gen.csv b/pkg/services/featuremgmt/toggles_gen.csv index f2c41c26500..8bdbad38ba8 100644 --- a/pkg/services/featuremgmt/toggles_gen.csv +++ b/pkg/services/featuremgmt/toggles_gen.csv @@ -240,3 +240,4 @@ inviteUserExperimental,experimental,@grafana/sharing-squad,false,false,true extraLanguages,experimental,@grafana/grafana-frontend-platform,false,false,true noBackdropBlur,experimental,@grafana/grafana-frontend-platform,false,false,true alertingMigrationUI,experimental,@grafana/alerting-squad,false,false,true +unifiedStorageHistoryPruner,experimental,@grafana/search-and-storage,false,false,false diff --git a/pkg/services/featuremgmt/toggles_gen.go b/pkg/services/featuremgmt/toggles_gen.go index 3fcc06d9d94..1f4dee5e21d 100644 --- a/pkg/services/featuremgmt/toggles_gen.go +++ b/pkg/services/featuremgmt/toggles_gen.go @@ -970,4 +970,8 @@ const ( // FlagAlertingMigrationUI // Enables the alerting migration UI, to migrate datasource-managed rules to Grafana-managed rules FlagAlertingMigrationUI = "alertingMigrationUI" + + // FlagUnifiedStorageHistoryPruner + // Enables the unified storage history pruner + FlagUnifiedStorageHistoryPruner = "unifiedStorageHistoryPruner" ) diff --git a/pkg/services/featuremgmt/toggles_gen.json b/pkg/services/featuremgmt/toggles_gen.json index ce2d9c3dcfa..7caf31a6d24 100644 --- a/pkg/services/featuremgmt/toggles_gen.json +++ b/pkg/services/featuremgmt/toggles_gen.json @@ -4195,6 +4195,20 @@ "codeowner": "@grafana/search-and-storage" } }, + { + "metadata": { + "name": "unifiedStorageHistoryPruner", + "resourceVersion": "1742163088045", + "creationTimestamp": "2025-03-16T22:11:28Z" + }, + "spec": { + "description": "Enables the unified storage history pruner", + "stage": "experimental", + "codeowner": "@grafana/search-and-storage", + "hideFromAdminPage": true, + "hideFromDocs": true + } + }, { "metadata": { "name": "unifiedStorageSearch", diff --git a/pkg/storage/unified/client.go b/pkg/storage/unified/client.go index 3eebaffeac8..c16fbc92ad3 100644 --- a/pkg/storage/unified/client.go +++ b/pkg/storage/unified/client.go @@ -136,7 +136,7 @@ func newClient(opts options.StorageOptions, if err != nil { return nil, err } - server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics, indexMetrics) + server, err := sql.NewResourceServer(db, cfg, tracer, reg, authzc, searchOptions, storageMetrics, indexMetrics, features) if err != nil { return nil, err } diff --git a/pkg/storage/unified/sql/backend.go b/pkg/storage/unified/sql/backend.go index 9220633ad7b..53cf85ef9f5 100644 --- a/pkg/storage/unified/sql/backend.go +++ b/pkg/storage/unified/sql/backend.go @@ -10,6 +10,7 @@ import ( "time" "github.com/google/uuid" + "github.com/prometheus/client_golang/prometheus" "go.opentelemetry.io/otel/trace" "go.opentelemetry.io/otel/trace/noop" "google.golang.org/protobuf/proto" @@ -20,6 +21,7 @@ import ( "github.com/grafana/grafana/pkg/storage/unified/sql/db" "github.com/grafana/grafana/pkg/storage/unified/sql/dbutil" "github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" + "github.com/grafana/grafana/pkg/util/debouncer" ) const tracePrefix = "sql.resource." @@ -35,11 +37,16 @@ type Backend interface { type BackendOptions struct { DBProvider db.DBProvider Tracer trace.Tracer + Reg prometheus.Registerer PollingInterval time.Duration WatchBufferSize int IsHA bool storageMetrics *resource.StorageMetrics + // If true, the backend will prune history on write events. + // Will be removed once fully rolled out. + withPruner bool + // testing SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount } @@ -65,15 +72,39 @@ func NewBackend(opts BackendOptions) (Backend, error) { cancel: cancel, log: log.New("sql-resource-server"), tracer: opts.Tracer, + reg: opts.Reg, dbProvider: opts.DBProvider, pollingInterval: opts.PollingInterval, watchBufferSize: opts.WatchBufferSize, storageMetrics: opts.storageMetrics, bulkLock: &bulkLock{running: make(map[string]bool)}, simulatedNetworkLatency: opts.SimulatedNetworkLatency, + withPruner: opts.withPruner, }, nil } +// pruningKey is a comparable key for pruning history. +type pruningKey struct { + namespace string + group string + resource string +} + +// Small abstraction to allow for different pruner implementations. +// This can be removed once the debouncer is deployed. +type pruner interface { + Add(key pruningKey) error + Start(ctx context.Context) +} + +type noopPruner struct{} + +func (p *noopPruner) Add(key pruningKey) error { + return nil +} + +func (p *noopPruner) Start(ctx context.Context) {} + type backend struct { //general isHA bool @@ -87,6 +118,7 @@ type backend struct { // o11y log log.Logger tracer trace.Tracer + reg prometheus.Registerer storageMetrics *resource.StorageMetrics // database @@ -106,6 +138,9 @@ type backend struct { // testing simulatedNetworkLatency time.Duration + + historyPruner pruner + withPruner bool } func (b *backend) Init(ctx context.Context) error { @@ -116,13 +151,18 @@ func (b *backend) Init(ctx context.Context) error { } func (b *backend) initLocked(ctx context.Context) error { - db, err := b.dbProvider.Init(ctx) + dbConn, err := b.dbProvider.Init(ctx) if err != nil { return fmt.Errorf("initialize resource DB: %w", err) } - b.db = db - driverName := db.DriverName() + if err := dbConn.PingContext(ctx); err != nil { + return fmt.Errorf("ping resource DB: %w", err) + } + + b.db = dbConn + + driverName := dbConn.DriverName() b.dialect = sqltemplate.DialectForDriver(driverName) if b.dialect == nil { return fmt.Errorf("no dialect for driver %q", driverName) @@ -146,7 +186,68 @@ func (b *backend) initLocked(ctx context.Context) error { } b.notifier = notifier - return b.db.PingContext(ctx) + if err := b.initPruner(ctx); err != nil { + return fmt.Errorf("failed to create pruner: %w", err) + } + + return nil +} + +func (b *backend) initPruner(ctx context.Context) error { + if !b.withPruner { + b.log.Debug("using noop history pruner") + b.historyPruner = &noopPruner{} + return nil + } + b.log.Debug("using debounced history pruner") + // Initialize history pruner. + pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{ + Name: "history_pruner", + BufferSize: 1000, + MinWait: time.Second * 30, + MaxWait: time.Minute * 5, + ProcessHandler: func(ctx context.Context, key pruningKey) error { + return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error { + res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{ + SQLTemplate: sqltemplate.New(b.dialect), + HistoryLimit: 100, + Key: &resource.ResourceKey{ + Namespace: key.namespace, + Group: key.group, + Resource: key.resource, + }, + }) + if err != nil { + return fmt.Errorf("failed to prune history: %w", err) + } + rows, err := res.RowsAffected() + if err != nil { + return fmt.Errorf("failed to get rows affected: %w", err) + } + b.log.Debug("pruned history successfully", + "namespace", key.namespace, + "group", key.group, + "resource", key.resource, + "rows", rows) + return nil + }) + }, + ErrorHandler: func(key pruningKey, err error) { + b.log.Error("failed to prune history", + "namespace", key.namespace, + "group", key.group, + "resource", key.resource, + "error", err) + }, + Reg: b.reg, + }) + if err != nil { + return err + } + + b.historyPruner = pruner + b.historyPruner.Start(ctx) + return nil } func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) { @@ -246,6 +347,7 @@ func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return guid, fmt.Errorf("insert into resource history: %w", err) } + _ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource}) if b.simulatedNetworkLatency > 0 { time.Sleep(b.simulatedNetworkLatency) } @@ -299,6 +401,7 @@ func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return guid, fmt.Errorf("insert into resource history: %w", err) } + _ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource}) return guid, nil }) @@ -346,6 +449,7 @@ func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, }); err != nil { return guid, fmt.Errorf("insert into resource history: %w", err) } + _ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource}) return guid, nil }) @@ -394,6 +498,7 @@ func (b *backend) restore(ctx context.Context, event resource.WriteEvent) (int64 }); err != nil { return guid, fmt.Errorf("insert into resource history: %w", err) } + _ = b.historyPruner.Add(pruningKey{namespace: event.Key.Namespace, group: event.Key.Group, resource: event.Key.Resource}) // 3. Update all resource history entries with the new UID // Note: we do not update any history entries that have a deletion timestamp included. This will become diff --git a/pkg/storage/unified/sql/data/resource_history_prune.sql b/pkg/storage/unified/sql/data/resource_history_prune.sql new file mode 100644 index 00000000000..e636e9dc73c --- /dev/null +++ b/pkg/storage/unified/sql/data/resource_history_prune.sql @@ -0,0 +1,22 @@ +DELETE FROM {{ .Ident "resource_history" }} +WHERE {{ .Ident "guid" }} IN ( + SELECT {{ .Ident "guid" }} + FROM ( + SELECT + {{ .Ident "guid" }}, + ROW_NUMBER() OVER ( + PARTITION BY + {{ .Ident "namespace" }}, + {{ .Ident "group" }}, + {{ .Ident "resource" }}, + {{ .Ident "name" }} + ORDER BY {{ .Ident "resource_version" }} DESC + ) AS {{ .Ident "rn" }} + FROM {{ .Ident "resource_history" }} + WHERE + {{ .Ident "namespace" }} = {{ .Arg .Key.Namespace }} + AND {{ .Ident "group" }} = {{ .Arg .Key.Group }} + AND {{ .Ident "resource" }} = {{ .Arg .Key.Resource }} + ) AS {{ .Ident "ranked" }} + WHERE {{ .Ident "rn" }} > {{ .Arg .HistoryLimit }} +); diff --git a/pkg/storage/unified/sql/queries.go b/pkg/storage/unified/sql/queries.go index 3902ee8ecc3..3ea34472024 100644 --- a/pkg/storage/unified/sql/queries.go +++ b/pkg/storage/unified/sql/queries.go @@ -44,6 +44,7 @@ var ( sqlResourceHistoryPoll = mustTemplate("resource_history_poll.sql") sqlResourceHistoryGet = mustTemplate("resource_history_get.sql") sqlResourceHistoryDelete = mustTemplate("resource_history_delete.sql") + sqlResourceHistoryPrune = mustTemplate("resource_history_prune.sql") sqlResourceInsertFromHistory = mustTemplate("resource_insert_from_history.sql") // sqlResourceLabelsInsert = mustTemplate("resource_labels_insert.sql") @@ -252,6 +253,32 @@ func (r sqlGetHistoryRequest) Validate() error { return nil // TODO } +// prune resource history +type sqlPruneHistoryRequest struct { + sqltemplate.SQLTemplate + Key *resource.ResourceKey + HistoryLimit int64 +} + +func (r *sqlPruneHistoryRequest) Validate() error { + if r.HistoryLimit <= 0 { + return fmt.Errorf("history limit must be greater than zero") + } + if r.Key == nil { + return fmt.Errorf("missing key") + } + if r.Key.Namespace == "" { + return fmt.Errorf("missing namespace") + } + if r.Key.Group == "" { + return fmt.Errorf("missing group") + } + if r.Key.Resource == "" { + return fmt.Errorf("missing resource") + } + return nil +} + // update resource history type sqlResourceHistoryUpdateRequest struct { diff --git a/pkg/storage/unified/sql/queries_test.go b/pkg/storage/unified/sql/queries_test.go index c8662ad310a..4dfb7e13327 100644 --- a/pkg/storage/unified/sql/queries_test.go +++ b/pkg/storage/unified/sql/queries_test.go @@ -255,6 +255,21 @@ func TestUnifiedStorageQueries(t *testing.T) { }, }, + sqlResourceHistoryPrune: { + { + Name: "simple", + Data: &sqlPruneHistoryRequest{ + SQLTemplate: mocks.NewTestingSQLTemplate(), + Key: &resource.ResourceKey{ + Namespace: "nn", + Group: "gg", + Resource: "rr", + }, + HistoryLimit: 100, + }, + }, + }, + sqlResourceVersionGet: { { Name: "single path", diff --git a/pkg/storage/unified/sql/server.go b/pkg/storage/unified/sql/server.go index 330e265d9f7..0a6664187cc 100644 --- a/pkg/storage/unified/sql/server.go +++ b/pkg/storage/unified/sql/server.go @@ -10,6 +10,7 @@ import ( infraDB "github.com/grafana/grafana/pkg/infra/db" "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" "github.com/grafana/grafana/pkg/setting" "github.com/grafana/grafana/pkg/storage/unified/resource" @@ -18,7 +19,9 @@ import ( // Creates a new ResourceServer func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, - tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics, indexMetrics *resource.BleveIndexMetrics) (resource.ResourceServer, error) { + tracer tracing.Tracer, reg prometheus.Registerer, ac types.AccessClient, + searchOptions resource.SearchOptions, storageMetrics *resource.StorageMetrics, + indexMetrics *resource.BleveIndexMetrics, features featuremgmt.FeatureToggles) (resource.ResourceServer, error) { apiserverCfg := cfg.SectionWithEnvOverrides("grafana-apiserver") opts := resource.ResourceServerOptions{ Tracer: tracer, @@ -46,8 +49,16 @@ func NewResourceServer(db infraDB.DB, cfg *setting.Cfg, } isHA := isHighAvailabilityEnabled(cfg.SectionWithEnvOverrides("database")) + withPruner := features.IsEnabledGlobally(featuremgmt.FlagUnifiedStorageHistoryPruner) - store, err := NewBackend(BackendOptions{DBProvider: eDB, Tracer: tracer, IsHA: isHA, storageMetrics: storageMetrics}) + store, err := NewBackend(BackendOptions{ + DBProvider: eDB, + Tracer: tracer, + Reg: reg, + IsHA: isHA, + withPruner: withPruner, + storageMetrics: storageMetrics, + }) if err != nil { return nil, err } diff --git a/pkg/storage/unified/sql/service.go b/pkg/storage/unified/sql/service.go index 6b4e3dc2a87..8148111e017 100644 --- a/pkg/storage/unified/sql/service.go +++ b/pkg/storage/unified/sql/service.go @@ -118,7 +118,7 @@ func (s *service) start(ctx context.Context) error { return err } - server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics, s.indexMetrics) + server, err := NewResourceServer(s.db, s.cfg, s.tracing, s.reg, authzClient, searchOptions, s.storageMetrics, s.indexMetrics, s.features) if err != nil { return err } diff --git a/pkg/storage/unified/sql/testdata/mysql--resource_history_prune-simple.sql b/pkg/storage/unified/sql/testdata/mysql--resource_history_prune-simple.sql new file mode 100755 index 00000000000..fa50d446b4e --- /dev/null +++ b/pkg/storage/unified/sql/testdata/mysql--resource_history_prune-simple.sql @@ -0,0 +1,22 @@ +DELETE FROM `resource_history` +WHERE `guid` IN ( + SELECT `guid` + FROM ( + SELECT + `guid`, + ROW_NUMBER() OVER ( + PARTITION BY + `namespace`, + `group`, + `resource`, + `name` + ORDER BY `resource_version` DESC + ) AS `rn` + FROM `resource_history` + WHERE + `namespace` = 'nn' + AND `group` = 'gg' + AND `resource` = 'rr' + ) AS `ranked` + WHERE `rn` > 100 +); diff --git a/pkg/storage/unified/sql/testdata/postgres--resource_history_prune-simple.sql b/pkg/storage/unified/sql/testdata/postgres--resource_history_prune-simple.sql new file mode 100755 index 00000000000..9994708de39 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/postgres--resource_history_prune-simple.sql @@ -0,0 +1,22 @@ +DELETE FROM "resource_history" +WHERE "guid" IN ( + SELECT "guid" + FROM ( + SELECT + "guid", + ROW_NUMBER() OVER ( + PARTITION BY + "namespace", + "group", + "resource", + "name" + ORDER BY "resource_version" DESC + ) AS "rn" + FROM "resource_history" + WHERE + "namespace" = 'nn' + AND "group" = 'gg' + AND "resource" = 'rr' + ) AS "ranked" + WHERE "rn" > 100 +); diff --git a/pkg/storage/unified/sql/testdata/sqlite--resource_history_prune-simple.sql b/pkg/storage/unified/sql/testdata/sqlite--resource_history_prune-simple.sql new file mode 100755 index 00000000000..9994708de39 --- /dev/null +++ b/pkg/storage/unified/sql/testdata/sqlite--resource_history_prune-simple.sql @@ -0,0 +1,22 @@ +DELETE FROM "resource_history" +WHERE "guid" IN ( + SELECT "guid" + FROM ( + SELECT + "guid", + ROW_NUMBER() OVER ( + PARTITION BY + "namespace", + "group", + "resource", + "name" + ORDER BY "resource_version" DESC + ) AS "rn" + FROM "resource_history" + WHERE + "namespace" = 'nn' + AND "group" = 'gg' + AND "resource" = 'rr' + ) AS "ranked" + WHERE "rn" > 100 +);