The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/infra/serverlock/serverlock.go

401 lines
13 KiB

package serverlock
import (
"context"
"errors"
"fmt"
"math/rand"
"time"
"go.opentelemetry.io/otel/attribute"
"github.com/grafana/grafana/pkg/infra/db"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
)
func ProvideService(sqlStore db.DB, tracer tracing.Tracer) *ServerLockService {
return &ServerLockService{
SQLStore: sqlStore,
tracer: tracer,
log: log.New("infra.lockservice"),
}
}
// ServerLockService allows servers in HA mode to claim a lock and execute a function if the server was granted the lock
// It exposes 2 services LockAndExecute and LockExecuteAndRelease, which are intended to be used independently, don't mix
// them up (ie, use the same actionName for both of them).
type ServerLockService struct {
SQLStore db.DB
tracer tracing.Tracer
log log.Logger
}
// LockAndExecute try to create a lock for this server and only executes the
// `fn` function when successful. This should not be used at low internal. But services
// that needs to be run once every ex 10m.
func (sl *ServerLockService) LockAndExecute(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
start := time.Now()
ctx, span := sl.tracer.Start(ctx, "ServerLockService.LockAndExecute")
span.SetAttributes(attribute.String("serverlock.actionName", actionName))
defer span.End()
ctxLogger := sl.log.FromContext(ctx)
ctxLogger.Debug("Start LockAndExecute", "actionName", actionName)
// gets or creates a lockable row
rowLock, err := sl.getOrCreate(ctx, actionName)
if err != nil {
span.RecordError(err)
return err
}
// avoid execution if last lock happened less than `maxInterval` ago
if sl.isLockWithinInterval(rowLock, maxInterval) {
return nil
}
// try to get lock based on rowLock version
acquiredLock, err := sl.acquireLock(ctx, rowLock)
if err != nil {
span.RecordError(err)
return err
}
if acquiredLock {
sl.executeFunc(ctx, actionName, fn)
}
ctxLogger.Debug("LockAndExecute finished", "actionName", actionName, "acquiredLock", acquiredLock, "duration", time.Since(start))
return nil
}
func (sl *ServerLockService) acquireLock(ctx context.Context, serverLock *serverLock) (bool, error) {
ctx, span := sl.tracer.Start(ctx, "ServerLockService.acquireLock")
defer span.End()
var result bool
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *db.Session) error {
newVersion := serverLock.Version + 1
sql := `UPDATE server_lock SET
version = ?,
last_execution = ?
WHERE
operation_uid = ? AND version = ?`
res, err := dbSession.Exec(sql, newVersion, time.Now().Unix(),
serverLock.OperationUID, serverLock.Version)
if err != nil {
return err
}
affected, err := res.RowsAffected()
result = affected == 1
return err
})
return result, err
}
func (sl *ServerLockService) getOrCreate(ctx context.Context, actionName string) (*serverLock, error) {
ctx, span := sl.tracer.Start(ctx, "ServerLockService.getOrCreate")
defer span.End()
var result *serverLock
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *db.Session) error {
sqlRes := &serverLock{}
has, err := dbSession.SQL("SELECT * FROM server_lock WHERE operation_uid = ?",
actionName).Get(sqlRes)
if err != nil {
return err
}
if has {
result = sqlRes
return nil
}
lockRow := &serverLock{
OperationUID: actionName,
LastExecution: 0,
}
affected := int64(1)
rawSQL := `INSERT INTO server_lock (operation_uid, last_execution, version) VALUES (?, ?, ?)`
if sl.SQLStore.GetDBType() == migrator.Postgres {
rawSQL += ` RETURNING id`
var id int64
_, err := dbSession.SQL(rawSQL, lockRow.OperationUID, lockRow.LastExecution, 0).Get(&id)
if err != nil {
return err
}
lockRow.Id = id
} else {
res, err := dbSession.Exec(
rawSQL,
lockRow.OperationUID, lockRow.LastExecution, 0)
if err != nil {
return err
}
lastID, err := res.LastInsertId()
if err != nil {
fmt.Println("Error getting last insert id", err)
sl.log.FromContext(ctx).Error("Error getting last insert id", "actionName", actionName, "error", err)
}
lockRow.Id = lastID
affected, err = res.RowsAffected()
if err != nil {
sl.log.FromContext(ctx).Error("Error getting rows affected", "actionName", actionName, "error", err)
}
}
if affected != 1 || lockRow.Id == 0 {
// this means that there was no error but there is something not working correctly
sl.log.FromContext(ctx).Error("Expected rows affected to be 1 if there was no error",
"actionName", actionName,
"rowsAffected", affected,
"lockRow ID", lockRow.Id)
}
result = lockRow
return nil
})
return result, err
}
// LockExecuteAndRelease Creates the lock, executes the func, and then release the locks. The locking mechanism is
// based on the UNIQUE constraint of the actionName in the database (column operation_uid), so a new process can not insert
// a new operation if already exists one. The parameter 'maxInterval' is a timeout safeguard, if the LastExecution in the
// database is older than maxInterval, we will assume the lock as timeouted. The 'maxInterval' parameter should be so long
// that is impossible for 2 processes to run at the same time.
func (sl *ServerLockService) LockExecuteAndRelease(ctx context.Context, actionName string, maxInterval time.Duration, fn func(ctx context.Context)) error {
start := time.Now()
ctx, span := sl.tracer.Start(ctx, "ServerLockService.LockExecuteAndRelease")
span.SetAttributes(attribute.String("serverlock.actionName", actionName))
defer span.End()
ctxLogger := sl.log.FromContext(ctx)
ctxLogger.Debug("Start LockExecuteAndRelease", "actionName", actionName)
err := sl.acquireForRelease(ctx, actionName, maxInterval)
// could not get the lock, returning
if err != nil {
span.RecordError(err)
return err
}
sl.executeFunc(ctx, actionName, fn)
err = sl.releaseLock(ctx, actionName)
if err != nil {
span.RecordError(err)
ctxLogger.Error("Failed to release the lock", "error", err)
}
ctxLogger.Debug("LockExecuteAndRelease finished", "actionName", actionName, "duration", time.Since(start))
return nil
}
// RetryOpt is a callback function called after each failed lock acquisition try.
// It gets the number of tries passed as an arg.
type RetryOpt func(int) error
type LockTimeConfig struct {
MaxInterval time.Duration // Duration after which we consider a lock to be dead and overtake it. Make sure this is big enough so that a server cannot acquire the lock while another server is processing.
MinWait time.Duration // Minimum time to wait before retrying to acquire the lock.
MaxWait time.Duration // Maximum time to wait before retrying to acquire the lock.
}
// LockExecuteAndReleaseWithRetries mimics LockExecuteAndRelease but waits for the lock to be released if it is already taken.
func (sl *ServerLockService) LockExecuteAndReleaseWithRetries(ctx context.Context, actionName string, timeConfig LockTimeConfig, fn func(ctx context.Context), retryOpts ...RetryOpt) error {
start := time.Now()
ctx, span := sl.tracer.Start(ctx, "ServerLockService.LockExecuteAndReleaseWithRetries")
span.SetAttributes(attribute.String("serverlock.actionName", actionName))
defer span.End()
ctxLogger := sl.log.FromContext(ctx)
ctxLogger.Debug("Start LockExecuteAndReleaseWithRetries", "actionName", actionName)
lockChecks := 0
for {
lockChecks++
err := sl.acquireForRelease(ctx, actionName, timeConfig.MaxInterval)
// could not get the lock
if err != nil {
var lockedErr *ServerLockExistsError
if errors.As(err, &lockedErr) {
// if the lock is already taken, wait and try again
if lockChecks == 1 { // only warn on first lock check
ctxLogger.Warn("another instance has the lock, waiting for it to be released", "actionName", actionName)
}
for _, op := range retryOpts {
if err := op(lockChecks); err != nil {
return err
}
}
time.Sleep(lockWait(timeConfig.MinWait, timeConfig.MaxWait))
continue
}
span.RecordError(err)
return err
}
// lock was acquired and released successfully
break
}
sl.executeFunc(ctx, actionName, fn)
if err := sl.releaseLock(ctx, actionName); err != nil {
span.RecordError(err)
ctxLogger.Error("Failed to release the lock", "error", err)
}
ctxLogger.Debug("LockExecuteAndReleaseWithRetries finished", "actionName", actionName, "duration", time.Since(start))
return nil
}
// generate a random duration between minWait and maxWait to ensure instances unlock gradually
func lockWait(minWait time.Duration, maxWait time.Duration) time.Duration {
return time.Duration(rand.Int63n(int64(maxWait-minWait)) + int64(minWait))
}
// acquireForRelease will check if the lock is already on the database, if it is, will check with maxInterval if it is
// timeouted. Returns nil error if the lock was acquired correctly
func (sl *ServerLockService) acquireForRelease(ctx context.Context, actionName string, maxInterval time.Duration) error {
ctx, span := sl.tracer.Start(ctx, "ServerLockService.acquireForRelease")
defer span.End()
// getting the lock - as the action name has a Unique constraint, this will fail if the lock is already on the database
err := sl.SQLStore.WithTransactionalDbSession(ctx, func(dbSession *db.Session) error {
// we need to find if the lock is in the database
result := &serverLock{}
sqlRaw := `SELECT * FROM server_lock WHERE operation_uid = ?`
if sl.SQLStore.GetDBType() == migrator.MySQL || sl.SQLStore.GetDBType() == migrator.Postgres {
sqlRaw += ` FOR UPDATE`
}
has, err := dbSession.SQL(
sqlRaw,
actionName).Get(result)
if err != nil {
return err
}
ctxLogger := sl.log.FromContext(ctx)
if has {
if sl.isLockWithinInterval(result, maxInterval) {
return &ServerLockExistsError{actionName: actionName}
} else {
// lock has timed out, so we update the timestamp
result.LastExecution = time.Now().Unix()
res, err := dbSession.Exec("UPDATE server_lock SET last_execution = ? WHERE operation_uid = ?",
result.LastExecution, actionName)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
ctxLogger.Error("Error getting rows affected", "actionName", actionName, "error", err)
}
if affected != 1 {
ctxLogger.Error("Expected rows affected to be 1 if there was no error", "actionName", actionName, "rowsAffected", affected)
}
return nil
}
} else {
// lock not found, creating it
res, err := dbSession.Exec(
"INSERT INTO server_lock (operation_uid, last_execution, version) VALUES (?, ?, ?)",
actionName, time.Now().Unix(), 0)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
ctxLogger.Error("Error getting rows affected", "actionName", actionName, "error", err)
}
lastID, err := res.LastInsertId()
if err != nil {
ctxLogger.Error("Error getting last insert id", "actionName", actionName, "error", err)
}
if affected != 1 || lastID == 0 {
// this means that there was no error but there is something not working correctly
ctxLogger.Error("Expected rows affected to be 1 if there was no error",
"actionName", actionName,
"rowsAffected", affected,
"lastID", lastID)
}
}
return nil
})
return err
}
// releaseLock will delete the row at the database. This is only intended to be used within the scope of LockExecuteAndRelease
// method, but not as to manually release a Lock
func (sl *ServerLockService) releaseLock(ctx context.Context, actionName string) error {
ctx, span := sl.tracer.Start(ctx, "ServerLockService.releaseLock")
defer span.End()
err := sl.SQLStore.WithDbSession(ctx, func(dbSession *db.Session) error {
sql := `DELETE FROM server_lock WHERE operation_uid=? `
res, err := dbSession.Exec(sql, actionName)
if err != nil {
return err
}
affected, err := res.RowsAffected()
if err != nil {
sl.log.FromContext(ctx).Debug("Error getting rows affected", "actionName", actionName, "error", err)
}
if affected != 1 {
sl.log.FromContext(ctx).Debug("Error releasing lock", "actionName", actionName, "rowsAffected", affected)
}
return nil
})
return err
}
func (sl *ServerLockService) isLockWithinInterval(lock *serverLock, maxInterval time.Duration) bool {
if lock.LastExecution != 0 {
lastExecutionTime := time.Unix(lock.LastExecution, 0)
if time.Since(lastExecutionTime) < maxInterval {
return true
}
}
return false
}
func (sl *ServerLockService) executeFunc(ctx context.Context, actionName string, fn func(ctx context.Context)) {
start := time.Now()
ctx, span := sl.tracer.Start(ctx, "ServerLockService.executeFunc")
defer span.End()
ctxLogger := sl.log.FromContext(ctx)
ctxLogger.Debug("Start execution", "actionName", actionName)
fn(ctx)
ctxLogger.Debug("Execution finished", "actionName", actionName, "duration", time.Since(start))
}