The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/storage/unified/sql/backend.go

862 lines
24 KiB

package sql
import (
"context"
"database/sql"
"errors"
"fmt"
"math"
"sync"
"time"
"github.com/go-sql-driver/mysql"
"github.com/google/uuid"
unifiedbackend "github.com/grafana/grafana/pkg/storage/unified/backend"
"github.com/jackc/pgx/v5/pgconn"
"github.com/mattn/go-sqlite3"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"google.golang.org/protobuf/proto"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/storage/unified/resource"
"github.com/grafana/grafana/pkg/storage/unified/sql/db"
"github.com/grafana/grafana/pkg/storage/unified/sql/dbutil"
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate"
"github.com/grafana/grafana/pkg/util/debouncer"
)
const tracePrefix = "sql.resource."
const defaultPollingInterval = 100 * time.Millisecond
const defaultWatchBufferSize = 100 // number of events to buffer in the watch stream
const defaultPrunerHistoryLimit = 20
type Backend interface {
resource.StorageBackend
resource.DiagnosticsServer
resource.LifecycleHooks
}
type BackendOptions struct {
DBProvider db.DBProvider
Tracer trace.Tracer
Reg prometheus.Registerer
PollingInterval time.Duration
WatchBufferSize int
IsHA bool
storageMetrics *resource.StorageMetrics
// If true, the backend will prune history on write events.
// Will be removed once fully rolled out.
withPruner bool
// testing
SimulatedNetworkLatency time.Duration // slows down the create transactions by a fixed amount
}
func NewBackend(opts BackendOptions) (Backend, error) {
if opts.DBProvider == nil {
return nil, errors.New("no db provider")
}
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("sql-backend")
}
ctx, cancel := context.WithCancel(context.Background())
if opts.PollingInterval == 0 {
opts.PollingInterval = defaultPollingInterval
}
if opts.WatchBufferSize == 0 {
opts.WatchBufferSize = defaultWatchBufferSize
}
return &backend{
isHA: opts.IsHA,
done: ctx.Done(),
cancel: cancel,
log: log.New("sql-resource-server"),
tracer: opts.Tracer,
reg: opts.Reg,
dbProvider: opts.DBProvider,
pollingInterval: opts.PollingInterval,
watchBufferSize: opts.WatchBufferSize,
storageMetrics: opts.storageMetrics,
bulkLock: &bulkLock{running: make(map[string]bool)},
simulatedNetworkLatency: opts.SimulatedNetworkLatency,
withPruner: opts.withPruner,
}, nil
}
// pruningKey is a comparable key for pruning history.
type pruningKey struct {
namespace string
group string
resource string
name string
}
// Small abstraction to allow for different pruner implementations.
// This can be removed once the debouncer is deployed.
type pruner interface {
Add(key pruningKey) error
Start(ctx context.Context)
}
type noopPruner struct{}
func (p *noopPruner) Add(key pruningKey) error {
return nil
}
func (p *noopPruner) Start(ctx context.Context) {}
type backend struct {
//general
isHA bool
// server lifecycle
done <-chan struct{}
cancel context.CancelFunc
initOnce sync.Once
initErr error
// o11y
log log.Logger
tracer trace.Tracer
reg prometheus.Registerer
storageMetrics *resource.StorageMetrics
// database
dbProvider db.DBProvider
db db.DB
dialect sqltemplate.Dialect
bulkLock *bulkLock
// watch streaming
//stream chan *resource.WatchEvent
pollingInterval time.Duration
watchBufferSize int
notifier eventNotifier
// resource version manager
rvManager *resourceVersionManager
// testing
simulatedNetworkLatency time.Duration
historyPruner pruner
withPruner bool
}
func (b *backend) Init(ctx context.Context) error {
b.initOnce.Do(func() {
b.initErr = b.initLocked(ctx)
})
return b.initErr
}
func (b *backend) initLocked(ctx context.Context) error {
dbConn, err := b.dbProvider.Init(ctx)
if err != nil {
return fmt.Errorf("initialize resource DB: %w", err)
}
if err := dbConn.PingContext(ctx); err != nil {
return fmt.Errorf("ping resource DB: %w", err)
}
b.db = dbConn
driverName := dbConn.DriverName()
b.dialect = sqltemplate.DialectForDriver(driverName)
if b.dialect == nil {
return fmt.Errorf("no dialect for driver %q", driverName)
}
// Initialize ResourceVersionManager
rvManager, err := NewResourceVersionManager(ResourceManagerOptions{
Dialect: b.dialect,
DB: b.db,
Tracer: b.tracer,
})
if err != nil {
return fmt.Errorf("failed to create resource version manager: %w", err)
}
b.rvManager = rvManager
// Initialize notifier after dialect is set up
notifier, err := newNotifier(b)
if err != nil {
return fmt.Errorf("failed to create notifier: %w", err)
}
b.notifier = notifier
if err := b.initPruner(ctx); err != nil {
return fmt.Errorf("failed to create pruner: %w", err)
}
return nil
}
func (b *backend) initPruner(ctx context.Context) error {
if !b.withPruner {
b.log.Debug("using noop history pruner")
b.historyPruner = &noopPruner{}
return nil
}
b.log.Debug("using debounced history pruner")
// Initialize history pruner.
pruner, err := debouncer.NewGroup(debouncer.DebouncerOpts[pruningKey]{
Name: "history_pruner",
BufferSize: 1000,
MinWait: time.Second * 30,
MaxWait: time.Minute * 5,
ProcessHandler: func(ctx context.Context, key pruningKey) error {
return b.db.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
res, err := dbutil.Exec(ctx, tx, sqlResourceHistoryPrune, &sqlPruneHistoryRequest{
SQLTemplate: sqltemplate.New(b.dialect),
HistoryLimit: defaultPrunerHistoryLimit,
Key: &resource.ResourceKey{
Namespace: key.namespace,
Group: key.group,
Resource: key.resource,
Name: key.name,
},
})
if err != nil {
return fmt.Errorf("failed to prune history: %w", err)
}
rows, err := res.RowsAffected()
if err != nil {
return fmt.Errorf("failed to get rows affected: %w", err)
}
b.log.Debug("pruned history successfully",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"name", key.name,
"rows", rows)
return nil
})
},
ErrorHandler: func(key pruningKey, err error) {
b.log.Error("failed to prune history",
"namespace", key.namespace,
"group", key.group,
"resource", key.resource,
"name", key.name,
"error", err)
},
Reg: b.reg,
})
if err != nil {
return err
}
b.historyPruner = pruner
b.historyPruner.Start(ctx)
return nil
}
func (b *backend) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
// ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
if err := b.db.PingContext(ctx); err != nil {
return nil, err
}
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
}
func (b *backend) Stop(_ context.Context) error {
b.cancel()
return nil
}
// GetResourceStats implements Backend.
func (b *backend) GetResourceStats(ctx context.Context, namespace string, minCount int) ([]resource.ResourceStats, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+".GetResourceStats")
defer span.End()
req := &sqlStatsRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Namespace: namespace,
MinCount: minCount, // not used in query... yet?
}
res := make([]resource.ResourceStats, 0, 100)
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceStats, req)
if err != nil {
return err
}
for rows.Next() {
row := resource.ResourceStats{}
err = rows.Scan(&row.Namespace, &row.Group, &row.Resource, &row.Count, &row.ResourceVersion)
if err != nil {
return err
}
if row.Count > int64(minCount) {
res = append(res, row)
}
}
return err
})
return res, err
}
func (b *backend) WriteEvent(ctx context.Context, event resource.WriteEvent) (int64, error) {
_, span := b.tracer.Start(ctx, tracePrefix+"WriteEvent")
defer span.End()
// TODO: validate key ?
switch event.Type {
case resource.WatchEvent_ADDED:
return b.create(ctx, event)
case resource.WatchEvent_MODIFIED:
return b.update(ctx, event)
case resource.WatchEvent_DELETED:
return b.delete(ctx, event)
default:
return 0, fmt.Errorf("unsupported event type")
}
}
func (b *backend) create(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Create")
defer span.End()
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. Insert into resource
if _, err := dbutil.Exec(ctx, tx, sqlResourceInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
if isRowAlreadyExistsError(err) {
return guid, unifiedbackend.ErrResourceAlreadyExists
}
return guid, fmt.Errorf("insert into resource: %w", err)
}
// 2. Insert into resource history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
})
if b.simulatedNetworkLatency > 0 {
time.Sleep(b.simulatedNetworkLatency)
}
return guid, nil
})
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: rv,
Folder: folder,
})
return rv, nil
}
// isRowAlreadyExistsError checks if the error is the result of the row inserted already existing.
func isRowAlreadyExistsError(err error) bool {
var sqlite sqlite3.Error
if errors.As(err, &sqlite) {
return sqlite.ExtendedCode == sqlite3.ErrConstraintUnique
}
var pg *pgconn.PgError
if errors.As(err, &pg) {
// https://www.postgresql.org/docs/current/errcodes-appendix.html
return pg.Code == "23505" // unique_violation
}
var mysqlerr *mysql.MySQLError
if errors.As(err, &mysqlerr) {
// https://dev.mysql.com/doc/mysql-errors/8.0/en/server-error-reference.html
return mysqlerr.Number == 1062 // ER_DUP_ENTRY
}
return false
}
func (b *backend) update(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Update")
defer span.End()
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
// Use rvManager.ExecWithRV instead of direct transaction
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. Update resource
_, err := dbutil.Exec(ctx, tx, sqlResourceUpdate, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
})
if err != nil {
return guid, fmt.Errorf("resource update: %w", err)
}
// 2. Insert into resource history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
})
return guid, nil
})
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: rv,
Folder: folder,
})
return rv, nil
}
func (b *backend) delete(ctx context.Context, event resource.WriteEvent) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"Delete")
defer span.End()
guid := uuid.New().String()
folder := ""
if event.Object != nil {
folder = event.Object.GetFolder()
}
rv, err := b.rvManager.ExecWithRV(ctx, event.Key, func(tx db.Tx) (string, error) {
// 1. delete from resource
_, err := dbutil.Exec(ctx, tx, sqlResourceDelete, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
GUID: guid,
})
if err != nil {
return guid, fmt.Errorf("delete resource: %w", err)
}
// 2. Add event to resource history
if _, err := dbutil.Exec(ctx, tx, sqlResourceHistoryInsert, sqlResourceRequest{
SQLTemplate: sqltemplate.New(b.dialect),
WriteEvent: event,
Folder: folder,
GUID: guid,
}); err != nil {
return guid, fmt.Errorf("insert into resource history: %w", err)
}
_ = b.historyPruner.Add(pruningKey{
namespace: event.Key.Namespace,
group: event.Key.Group,
resource: event.Key.Resource,
name: event.Key.Name,
})
return guid, nil
})
if err != nil {
return 0, err
}
b.notifier.send(ctx, &resource.WrittenEvent{
Type: event.Type,
Key: event.Key,
PreviousRV: event.PreviousRV,
Value: event.Value,
ResourceVersion: rv,
Folder: folder,
})
return rv, nil
}
func (b *backend) ReadResource(ctx context.Context, req *resource.ReadRequest) *resource.BackendReadResponse {
_, span := b.tracer.Start(ctx, tracePrefix+".Read")
defer span.End()
// TODO: validate key ?
readReq := &sqlResourceReadRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: req,
Response: NewReadResponse(),
}
sr := sqlResourceRead
if req.ResourceVersion > 0 {
// read a specific version
sr = sqlResourceHistoryRead
}
var res *resource.BackendReadResponse
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
res, err = dbutil.QueryRow(ctx, tx, sr, readReq)
return err
})
if errors.Is(err, sql.ErrNoRows) {
return &resource.BackendReadResponse{
Error: resource.NewNotFoundError(req.Key),
}
} else if err != nil {
return &resource.BackendReadResponse{Error: resource.AsErrorResult(err)}
}
return res
}
func (b *backend) ListIterator(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"List")
defer span.End()
if err := resource.MigrateListRequestVersionMatch(req, b.log); err != nil {
return 0, err
}
if req.Options == nil || req.Options.Key.Group == "" || req.Options.Key.Resource == "" {
return 0, fmt.Errorf("missing group or resource")
}
if req.Source != resource.ListRequest_STORE {
return b.getHistory(ctx, req, cb)
}
// TODO: think about how to handler VersionMatch. We should be able to use latest for the first page (only).
// TODO: add support for RemainingItemCount
if req.ResourceVersion > 0 || req.NextPageToken != "" {
return b.listAtRevision(ctx, req, cb)
}
return b.listLatest(ctx, req, cb)
}
type listIter struct {
rows db.Rows
offset int64
listRV int64
sortAsc bool
// any error
err error
// The row
rv int64
value []byte
namespace string
name string
folder string
}
// ContinueToken implements resource.ListIterator.
func (l *listIter) ContinueToken() string {
return resource.ContinueToken{ResourceVersion: l.listRV, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
}
func (l *listIter) ContinueTokenWithCurrentRV() string {
return resource.ContinueToken{ResourceVersion: l.rv, StartOffset: l.offset, SortAscending: l.sortAsc}.String()
}
func (l *listIter) Error() error {
return l.err
}
func (l *listIter) Name() string {
return l.name
}
func (l *listIter) Namespace() string {
return l.namespace
}
func (l *listIter) Folder() string {
return l.folder
}
// ResourceVersion implements resource.ListIterator.
func (l *listIter) ResourceVersion() int64 {
return l.rv
}
// Value implements resource.ListIterator.
func (l *listIter) Value() []byte {
return l.value
}
// Next implements resource.ListIterator.
func (l *listIter) Next() bool {
if l.rows.Next() {
l.offset++
l.err = l.rows.Scan(&l.rv, &l.namespace, &l.name, &l.folder, &l.value)
return true
}
return false
}
var _ resource.ListIterator = (*listIter)(nil)
// listLatest fetches the resources from the resource table.
func (b *backend) listLatest(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatest")
defer span.End()
if req.NextPageToken != "" {
return 0, fmt.Errorf("only works for the first page")
}
if req.ResourceVersion > 0 {
return 0, fmt.Errorf("only works for the 'latest' resource version")
}
iter := &listIter{sortAsc: false}
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
if err != nil {
return err
}
listReq := sqlResourceListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: new(resource.ListRequest),
}
listReq.Request = proto.Clone(req).(*resource.ListRequest)
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceList, listReq)
if rows != nil {
defer func() {
if err := rows.Close(); err != nil {
b.log.Warn("listLatest error closing rows", "error", err)
}
}()
}
if err != nil {
return err
}
iter.rows = rows
return cb(iter)
})
return iter.listRV, err
}
// listAtRevision fetches the resources from the resource_history table at a specific revision.
func (b *backend) listAtRevision(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"listAtRevision")
defer span.End()
// Get the RV
iter := &listIter{listRV: req.ResourceVersion, sortAsc: false}
if req.NextPageToken != "" {
continueToken, err := resource.GetContinueToken(req.NextPageToken)
if err != nil {
return 0, fmt.Errorf("get continue token: %w", err)
}
iter.listRV = continueToken.ResourceVersion
iter.offset = continueToken.StartOffset
if req.ResourceVersion != 0 && req.ResourceVersion != iter.listRV {
return 0, apierrors.NewBadRequest("request resource version does not math token")
}
}
if iter.listRV < 1 {
return 0, apierrors.NewBadRequest("expecting an explicit resource version query")
}
// The query below has the potential to be EXTREMELY slow if the resource_history table is big. May be helpful to know
// which stack is calling this.
b.log.Debug("listAtRevision", "ns", req.Options.Key.Namespace, "group", req.Options.Key.Group, "resource", req.Options.Key.Resource, "rv", iter.listRV)
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
limit := int64(0) // ignore limit
if iter.offset > 0 {
limit = math.MaxInt64 // a limit is required for offset
}
listReq := sqlResourceHistoryListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Request: &historyListRequest{
ResourceVersion: iter.listRV,
Limit: limit,
Offset: iter.offset,
Options: req.Options,
},
}
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryList, listReq)
if rows != nil {
defer func() {
if err := rows.Close(); err != nil {
b.log.Warn("listAtRevision error closing rows", "error", err)
}
}()
}
if err != nil {
return err
}
iter.rows = rows
return cb(iter)
})
return iter.listRV, err
}
// getHistory fetches the resources from the resource table.
func (b *backend) getHistory(ctx context.Context, req *resource.ListRequest, cb func(resource.ListIterator) error) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"getHistory")
defer span.End()
listReq := sqlGetHistoryRequest{
SQLTemplate: sqltemplate.New(b.dialect),
Key: req.Options.Key,
Trash: req.Source == resource.ListRequest_TRASH,
}
// We are assuming that users want history in ascending order
// when they are using NotOlderThan matching, and descending order
// for Unset (default) and Exact matching.
listReq.SortAscending = req.GetVersionMatchV2() == resource.ResourceVersionMatchV2_NotOlderThan
iter := &listIter{}
if req.NextPageToken != "" {
continueToken, err := resource.GetContinueToken(req.NextPageToken)
if err != nil {
return 0, fmt.Errorf("get continue token: %w", err)
}
listReq.StartRV = continueToken.ResourceVersion
listReq.SortAscending = continueToken.SortAscending
}
iter.sortAsc = listReq.SortAscending
// Set ExactRV when using Exact matching
if req.VersionMatchV2 == resource.ResourceVersionMatchV2_Exact {
if req.ResourceVersion <= 0 {
return 0, fmt.Errorf("expecting an explicit resource version query when using Exact matching")
}
listReq.ExactRV = req.ResourceVersion
}
// Set MinRV when using NotOlderThan matching to filter at the database level
if req.ResourceVersion > 0 && req.VersionMatchV2 == resource.ResourceVersionMatchV2_NotOlderThan {
listReq.MinRV = req.ResourceVersion
}
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
iter.listRV, err = b.fetchLatestRV(ctx, tx, b.dialect, req.Options.Key.Group, req.Options.Key.Resource)
if err != nil {
return err
}
rows, err := dbutil.QueryRows(ctx, tx, sqlResourceHistoryGet, listReq)
if rows != nil {
defer func() {
if err := rows.Close(); err != nil {
b.log.Warn("listLatest error closing rows", "error", err)
}
}()
}
if err != nil {
return err
}
iter.rows = rows
return cb(iter)
})
return iter.listRV, err
}
func (b *backend) WatchWriteEvents(ctx context.Context) (<-chan *resource.WrittenEvent, error) {
return b.notifier.notify(ctx)
}
// listLatestRVs returns the latest resource version for each (Group, Resource) pair.
func (b *backend) listLatestRVs(ctx context.Context) (groupResourceRV, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"listLatestRVs")
defer span.End()
var grvs []*groupResourceVersion
err := b.db.WithTx(ctx, ReadCommittedRO, func(ctx context.Context, tx db.Tx) error {
var err error
grvs, err = dbutil.Query(ctx, tx, sqlResourceVersionList, &sqlResourceVersionListRequest{
SQLTemplate: sqltemplate.New(b.dialect),
groupResourceVersion: new(groupResourceVersion),
})
return err
})
if err != nil {
return nil, err
}
since := groupResourceRV{}
for _, grv := range grvs {
if since[grv.Group] == nil {
since[grv.Group] = map[string]int64{}
}
since[grv.Group][grv.Resource] = grv.ResourceVersion
}
return since, nil
}
// fetchLatestRV returns the current maximum RV in the resource table
func (b *backend) fetchLatestRV(ctx context.Context, x db.ContextExecer, d sqltemplate.Dialect, group, resource string) (int64, error) {
ctx, span := b.tracer.Start(ctx, tracePrefix+"fetchLatestRV")
defer span.End()
res, err := dbutil.QueryRow(ctx, x, sqlResourceVersionGet, sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(d),
Group: group,
Resource: resource,
ReadOnly: true,
Response: new(resourceVersionResponse),
})
if errors.Is(err, sql.ErrNoRows) {
return 1, nil
} else if err != nil {
return 0, fmt.Errorf("get resource version: %w", err)
}
return res.ResourceVersion, nil
}