mirror of https://github.com/grafana/grafana
Secrets: make operations sync (#107732)
* Secrets: make operations sync * k8s gen / update query to list secure values to include the version * always store new version of a secret * make update-workspace * go mod tidy * update queries * update queries * improve and use testutils in decrypt_store_test * fix broken test * make update-workspace * ./hack/update-codegen.sh secret * update Test_SecureValueMetadataStorage_CreateAndRead * undo dependency changes * linter: fix remaining errors --------- Co-authored-by: Matheus Macabu <macabu.matheus@gmail.com> Co-authored-by: Matheus Macabu <macabu@users.noreply.github.com>pull/107888/head
parent
ded7912ea3
commit
8283d35e56
@ -1,64 +0,0 @@ |
||||
package contracts |
||||
|
||||
import ( |
||||
"context" |
||||
) |
||||
|
||||
type contextRequestIdKey struct{} |
||||
|
||||
type OutboxMessageType string |
||||
|
||||
func GetRequestId(ctx context.Context) string { |
||||
v := ctx.Value(contextRequestIdKey{}) |
||||
requestId, ok := v.(string) |
||||
if !ok { |
||||
return "" |
||||
} |
||||
|
||||
return requestId |
||||
} |
||||
|
||||
func ContextWithRequestID(ctx context.Context, requestId string) context.Context { |
||||
return context.WithValue(ctx, contextRequestIdKey{}, requestId) |
||||
} |
||||
|
||||
const ( |
||||
CreateSecretOutboxMessage OutboxMessageType = "create" |
||||
UpdateSecretOutboxMessage OutboxMessageType = "update" |
||||
DeleteSecretOutboxMessage OutboxMessageType = "delete" |
||||
) |
||||
|
||||
type AppendOutboxMessage struct { |
||||
RequestID string |
||||
Type OutboxMessageType |
||||
Name string |
||||
Namespace string |
||||
EncryptedSecret string |
||||
KeeperName *string |
||||
ExternalID *string |
||||
} |
||||
|
||||
type OutboxMessage struct { |
||||
RequestID string |
||||
Type OutboxMessageType |
||||
MessageID int64 |
||||
Name string |
||||
Namespace string |
||||
EncryptedSecret string |
||||
KeeperName *string |
||||
ExternalID *string |
||||
// How many times this message has been received
|
||||
ReceiveCount int |
||||
Created int64 |
||||
} |
||||
|
||||
type OutboxQueue interface { |
||||
// Appends a message to the outbox queue
|
||||
Append(ctx context.Context, message AppendOutboxMessage) (int64, error) |
||||
// Receives at most n messages from the outbox queue
|
||||
ReceiveN(ctx context.Context, n uint) ([]OutboxMessage, error) |
||||
// Deletes a message from the outbox queue
|
||||
Delete(ctx context.Context, messageID int64) error |
||||
// Increments the number of times each message has been received by 1. Must be atomic.
|
||||
IncrementReceiveCount(ctx context.Context, messageIDs []int64) error |
||||
} |
@ -0,0 +1,94 @@ |
||||
package service_test |
||||
|
||||
import ( |
||||
"testing" |
||||
|
||||
"github.com/grafana/grafana/pkg/apis/secret/v0alpha1" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/testutils" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/xkube" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestCrud(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
t.Run("creating a secure value creates new versions", func(t *testing.T) { |
||||
t.Parallel() |
||||
sut := testutils.Setup(t) |
||||
|
||||
sv1, err := sut.CreateSv(t.Context()) |
||||
require.NoError(t, err) |
||||
|
||||
// Create the same secure value twice
|
||||
input := sv1.DeepCopy() |
||||
input.Spec.Description = "d2" |
||||
input.Spec.Value = v0alpha1.NewExposedSecureValue("v2") |
||||
|
||||
sv2, err := sut.CreateSv(t.Context(), testutils.CreateSvWithSv(input)) |
||||
require.NoError(t, err) |
||||
require.True(t, sv2.Status.Version > sv1.Status.Version) |
||||
|
||||
// Read the secure value
|
||||
sv, err := sut.SecureValueService.Read(t.Context(), xkube.Namespace(sv2.Namespace), sv2.Name) |
||||
require.NoError(t, err) |
||||
|
||||
// It should be the latest version
|
||||
require.Equal(t, sv2.Namespace, sv.Namespace) |
||||
require.Equal(t, sv2.Name, sv.Name) |
||||
require.Equal(t, "d2", sv.Spec.Description) |
||||
require.Equal(t, sv2.Status.Version, sv.Status.Version) |
||||
}) |
||||
|
||||
t.Run("updating a secure value creates new versions", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
|
||||
// Create a secure value
|
||||
sv1, err := sut.CreateSv(t.Context()) |
||||
require.NoError(t, err) |
||||
|
||||
ns := sv1.Namespace |
||||
name := sv1.Name |
||||
|
||||
// Update the secure value
|
||||
input := sv1.DeepCopy() |
||||
input.Spec.Description = "d2" |
||||
sv2, err := sut.UpdateSv(t.Context(), input) |
||||
require.NoError(t, err) |
||||
|
||||
// Read the secure value
|
||||
sv3, err := sut.SecureValueService.Read(t.Context(), xkube.Namespace(ns), name) |
||||
require.NoError(t, err) |
||||
|
||||
// Nothing has changed except for the updated field.
|
||||
require.Equal(t, ns, sv2.Namespace) |
||||
require.Equal(t, name, sv2.Name) |
||||
require.True(t, sv2.Status.Version > sv1.Status.Version) |
||||
require.Equal(t, "d2", sv2.Spec.Description) |
||||
|
||||
require.Equal(t, ns, sv3.Namespace) |
||||
require.Equal(t, name, sv3.Name) |
||||
require.Equal(t, sv2.Status.Version, sv3.Status.Version) |
||||
require.Equal(t, "d2", sv3.Spec.Description) |
||||
}) |
||||
|
||||
t.Run("deleting secure values", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
|
||||
sv1, err := sut.CreateSv(t.Context()) |
||||
require.NoError(t, err) |
||||
|
||||
sv2, err := sut.DeleteSv(t.Context(), sv1.Namespace, sv1.Name) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, sv1.Namespace, sv2.Namespace) |
||||
require.Equal(t, sv1.Name, sv2.Name) |
||||
|
||||
_, err = sut.SecureValueMetadataStorage.Read(t.Context(), xkube.Namespace(sv1.Namespace), sv1.Name, contracts.ReadOpts{}) |
||||
require.ErrorIs(t, err, contracts.ErrSecureValueNotFound) |
||||
}) |
||||
} |
@ -1,44 +0,0 @@ |
||||
package worker |
||||
|
||||
import ( |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
) |
||||
|
||||
const ( |
||||
namespace = "grafana_secrets_manager" |
||||
subsystem = "outbox_worker" |
||||
) |
||||
|
||||
// OutboxMetrics is a struct that contains all the metrics for an implementation of the secrets service.
|
||||
type OutboxMetrics struct { |
||||
OutboxMessageProcessingDuration *prometheus.HistogramVec |
||||
} |
||||
|
||||
func newOutboxMetrics() *OutboxMetrics { |
||||
return &OutboxMetrics{ |
||||
OutboxMessageProcessingDuration: prometheus.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: namespace, |
||||
Subsystem: subsystem, |
||||
Name: "message_processing_duration_seconds", |
||||
Help: "Duration of outbox message processing", |
||||
Buckets: prometheus.DefBuckets, |
||||
}, []string{"message_type", "keeper_type"}), |
||||
} |
||||
} |
||||
|
||||
// NewOutboxMetrics creates a new SecretsMetrics struct containing registered metrics
|
||||
func NewOutboxMetrics(reg prometheus.Registerer) *OutboxMetrics { |
||||
m := newOutboxMetrics() |
||||
|
||||
if reg != nil { |
||||
reg.MustRegister( |
||||
m.OutboxMessageProcessingDuration, |
||||
) |
||||
} |
||||
|
||||
return m |
||||
} |
||||
|
||||
func NewTestMetrics() *OutboxMetrics { |
||||
return newOutboxMetrics() |
||||
} |
@ -1,274 +0,0 @@ |
||||
package worker |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana-app-sdk/logging" |
||||
secretv0alpha1 "github.com/grafana/grafana/pkg/apis/secret/v0alpha1" |
||||
"github.com/grafana/grafana/pkg/registry" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/tracectx" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/xkube" |
||||
"github.com/grafana/grafana/pkg/services/featuremgmt" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/trace" |
||||
) |
||||
|
||||
// Consumes and processes messages from the secure value outbox queue
|
||||
type Worker struct { |
||||
config Config |
||||
tracer trace.Tracer |
||||
database contracts.Database |
||||
outboxQueue contracts.OutboxQueue |
||||
secureValueMetadataStorage contracts.SecureValueMetadataStorage |
||||
keeperMetadataStorage contracts.KeeperMetadataStorage |
||||
keeperService contracts.KeeperService |
||||
encryptionManager contracts.EncryptionManager |
||||
metrics *OutboxMetrics |
||||
enabled bool |
||||
} |
||||
|
||||
// DefaultConfig for the secure value outbox worker.
|
||||
var DefaultConfig = Config{ |
||||
BatchSize: 20, |
||||
ReceiveTimeout: 5 * time.Second, |
||||
PollingInterval: 100 * time.Millisecond, |
||||
MaxMessageProcessingAttempts: 10, |
||||
} |
||||
|
||||
// ProvideWorkerConfig used for wire.
|
||||
func ProvideWorkerConfig() Config { |
||||
return DefaultConfig |
||||
} |
||||
|
||||
type Config struct { |
||||
// The max number of messages to fetch from the outbox queue in a batch
|
||||
BatchSize uint |
||||
// How long to wait for a request to fetch messages from the outbox queue
|
||||
ReceiveTimeout time.Duration |
||||
// How often to poll the outbox queue for new messages
|
||||
PollingInterval time.Duration |
||||
// How many tries to try to process a message before marking the operation as failed
|
||||
MaxMessageProcessingAttempts uint |
||||
} |
||||
|
||||
func NewWorker( |
||||
config Config, |
||||
tracer trace.Tracer, |
||||
database contracts.Database, |
||||
outboxQueue contracts.OutboxQueue, |
||||
secureValueMetadataStorage contracts.SecureValueMetadataStorage, |
||||
keeperMetadataStorage contracts.KeeperMetadataStorage, |
||||
keeperService contracts.KeeperService, |
||||
encryptionManager contracts.EncryptionManager, |
||||
features featuremgmt.FeatureToggles, |
||||
reg prometheus.Registerer, |
||||
) (*Worker, error) { |
||||
if config.BatchSize == 0 { |
||||
return nil, fmt.Errorf("config.BatchSize is required") |
||||
} |
||||
if config.ReceiveTimeout == 0 { |
||||
return nil, fmt.Errorf("config.ReceiveTimeout is required") |
||||
} |
||||
if config.PollingInterval == 0 { |
||||
return nil, fmt.Errorf("config.PollingInterval is required") |
||||
} |
||||
if config.MaxMessageProcessingAttempts == 0 { |
||||
return nil, fmt.Errorf("config.MaxMessageProcessingAttempts is required") |
||||
} |
||||
|
||||
// Require both features to be enabled for the worker to run.
|
||||
enabled := features.IsEnabledGlobally(featuremgmt.FlagGrafanaAPIServerWithExperimentalAPIs) && features.IsEnabledGlobally(featuremgmt.FlagSecretsManagementAppPlatform) |
||||
|
||||
return &Worker{ |
||||
config: config, |
||||
tracer: tracer, |
||||
database: database, |
||||
outboxQueue: outboxQueue, |
||||
secureValueMetadataStorage: secureValueMetadataStorage, |
||||
keeperMetadataStorage: keeperMetadataStorage, |
||||
keeperService: keeperService, |
||||
encryptionManager: encryptionManager, |
||||
metrics: NewOutboxMetrics(reg), |
||||
enabled: enabled, |
||||
}, nil |
||||
} |
||||
|
||||
// Ensure that Worker implements the BackgroundService interface, so we can start it as a background service.
|
||||
var _ registry.BackgroundService = (*Worker)(nil) |
||||
|
||||
// Run is the main method to drive the worker
|
||||
func (w *Worker) Run(ctx context.Context) error { |
||||
if !w.enabled { |
||||
return nil |
||||
} |
||||
|
||||
logging.FromContext(ctx).Debug("starting worker control loop") |
||||
|
||||
t := time.NewTicker(w.config.PollingInterval) |
||||
defer t.Stop() |
||||
|
||||
for { |
||||
select { |
||||
// If the context was canceled
|
||||
case <-ctx.Done(): |
||||
// return the reason it was canceled
|
||||
return ctx.Err() |
||||
|
||||
// Otherwise try to receive messages
|
||||
case <-t.C: |
||||
if ctx.Err() != nil { |
||||
return ctx.Err() |
||||
} |
||||
|
||||
if err := w.ReceiveAndProcessMessages(ctx); err != nil { |
||||
logging.FromContext(ctx).Error("receiving outbox messages", "err", err.Error()) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
// TODO: don't rollback every message when a single error happens
|
||||
func (w *Worker) ReceiveAndProcessMessages(ctx context.Context) error { |
||||
messageIDs := make([]int64, 0) |
||||
|
||||
txErr := w.database.Transaction(ctx, func(ctx context.Context) error { |
||||
timeoutCtx, cancel := context.WithTimeout(ctx, w.config.ReceiveTimeout) |
||||
messages, err := w.outboxQueue.ReceiveN(timeoutCtx, w.config.BatchSize) |
||||
cancel() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
for _, message := range messages { |
||||
messageIDs = append(messageIDs, message.MessageID) |
||||
if err := w.processMessage(ctx, message); err != nil { |
||||
return fmt.Errorf("processing message: %+v %w", message, err) |
||||
} |
||||
} |
||||
return nil |
||||
}) |
||||
|
||||
// This call is made outside the transaction to make sure the receive count is updated on rollbacks.
|
||||
incrementErr := w.outboxQueue.IncrementReceiveCount(ctx, messageIDs) |
||||
if incrementErr != nil { |
||||
incrementErr = fmt.Errorf("incrementing receive count for outbox message: %w", incrementErr) |
||||
} |
||||
|
||||
return errors.Join(txErr, incrementErr) |
||||
} |
||||
|
||||
func (w *Worker) processMessage(ctx context.Context, message contracts.OutboxMessage) error { |
||||
start := time.Now() |
||||
keeperType := "unknown" |
||||
defer func() { |
||||
w.metrics.OutboxMessageProcessingDuration.WithLabelValues(string(message.Type), keeperType).Observe(time.Since(start).Seconds()) |
||||
}() |
||||
logging.FromContext(ctx).Debug("processing message", "type", message.Type, "name", message.Name, "namespace", message.Namespace, "receiveCount", message.ReceiveCount) |
||||
|
||||
opts := []trace.SpanStartOption{} |
||||
// If there's no request ID in the message, start a new root span and log an error.
|
||||
ctx, err := tracectx.HexDecodeTraceIntoContext(ctx, message.RequestID) |
||||
if err != nil { |
||||
opts = append(opts, trace.WithNewRoot()) |
||||
logging.FromContext(ctx).Error("decoding trace context from message", "err", err.Error(), "message.requestID", message.RequestID) |
||||
} |
||||
|
||||
opts = append(opts, trace.WithAttributes( |
||||
attribute.String("message.requestID", message.RequestID), |
||||
attribute.Int64("message.id", message.MessageID), |
||||
attribute.String("message.type", string(message.Type)), |
||||
attribute.String("message.namespace", message.Namespace), |
||||
attribute.String("message.secureValue.name", message.Name), |
||||
attribute.Int("message.receive.count", message.ReceiveCount), |
||||
)) |
||||
|
||||
ctx, span := w.tracer.Start(ctx, "Worker.ProcessMessage", opts...) |
||||
defer span.End() |
||||
|
||||
if message.ReceiveCount >= int(w.config.MaxMessageProcessingAttempts) { |
||||
if err := w.secureValueMetadataStorage.SetStatus(ctx, xkube.Namespace(message.Namespace), message.Name, secretv0alpha1.SecureValueStatus{Phase: secretv0alpha1.SecureValuePhaseFailed, Message: fmt.Sprintf("Reached max number of attempts to complete operation: %s", message.Type)}); err != nil { |
||||
return fmt.Errorf("setting secret metadata status to Succeeded: message=%+v", message) |
||||
} |
||||
if err := w.outboxQueue.Delete(ctx, message.MessageID); err != nil { |
||||
return fmt.Errorf("deleting message from outbox queue: %w", err) |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
keeperCfg, err := w.keeperMetadataStorage.GetKeeperConfig(ctx, message.Namespace, message.KeeperName, contracts.ReadOpts{ForUpdate: true}) |
||||
if err != nil { |
||||
return fmt.Errorf("fetching keeper config: namespace=%+v keeperName=%+v %w", message.Namespace, message.KeeperName, err) |
||||
} |
||||
keeperType = string(keeperCfg.Type()) |
||||
|
||||
keeper, err := w.keeperService.KeeperForConfig(keeperCfg) |
||||
if err != nil { |
||||
return fmt.Errorf("getting keeper for config: namespace=%+v keeperName=%+v %w", message.Namespace, message.KeeperName, err) |
||||
} |
||||
logging.FromContext(ctx).Debug("retrieved keeper", "namespace", message.Namespace, "keeperName", message.KeeperName, "type", keeperCfg.Type()) |
||||
|
||||
switch message.Type { |
||||
case contracts.CreateSecretOutboxMessage: |
||||
rawSecret, err := w.encryptionManager.Decrypt(ctx, message.Namespace, []byte(message.EncryptedSecret)) |
||||
if err != nil { |
||||
return fmt.Errorf("decrypting secure value secret: %w", err) |
||||
} |
||||
|
||||
externalID, err := keeper.Store(ctx, keeperCfg, message.Namespace, string(rawSecret)) |
||||
if err != nil { |
||||
return fmt.Errorf("storing secret: message=%+v %w", message, err) |
||||
} |
||||
|
||||
if err := w.secureValueMetadataStorage.SetExternalID(ctx, xkube.Namespace(message.Namespace), message.Name, externalID); err != nil { |
||||
return fmt.Errorf("setting secret metadata externalID: externalID=%+v message=%+v %w", externalID, message, err) |
||||
} |
||||
|
||||
// Setting the status to Succeeded must be the last action
|
||||
// since it acts as a fence to clients.
|
||||
if err := w.secureValueMetadataStorage.SetStatus(ctx, xkube.Namespace(message.Namespace), message.Name, secretv0alpha1.SecureValueStatus{Phase: secretv0alpha1.SecureValuePhaseSucceeded}); err != nil { |
||||
return fmt.Errorf("setting secret metadata status to Succeeded: message=%+v %w", message, err) |
||||
} |
||||
|
||||
case contracts.UpdateSecretOutboxMessage: |
||||
rawSecret, err := w.encryptionManager.Decrypt(ctx, message.Namespace, []byte(message.EncryptedSecret)) |
||||
if err != nil { |
||||
return fmt.Errorf("decrypting secure value secret: %w", err) |
||||
} |
||||
|
||||
if err := keeper.Update(ctx, keeperCfg, message.Namespace, contracts.ExternalID(*message.ExternalID), string(rawSecret)); err != nil { |
||||
return fmt.Errorf("calling keeper to update secret: %w", err) |
||||
} |
||||
|
||||
// Setting the status to Succeeded must be the last action
|
||||
// since it acts as a fence to clients.
|
||||
if err := w.secureValueMetadataStorage.SetStatus(ctx, xkube.Namespace(message.Namespace), message.Name, secretv0alpha1.SecureValueStatus{Phase: secretv0alpha1.SecureValuePhaseSucceeded}); err != nil { |
||||
return fmt.Errorf("setting secret metadata status to Succeeded: message=%+v", message) |
||||
} |
||||
|
||||
case contracts.DeleteSecretOutboxMessage: |
||||
if err := keeper.Delete(ctx, keeperCfg, message.Namespace, contracts.ExternalID(*message.ExternalID)); err != nil { |
||||
return fmt.Errorf("calling keeper to delete secret: %w", err) |
||||
} |
||||
if err := w.secureValueMetadataStorage.Delete(ctx, xkube.Namespace(message.Namespace), message.Name); err != nil { |
||||
return fmt.Errorf("deleting secure value metadata: %+w", err) |
||||
} |
||||
|
||||
default: |
||||
return fmt.Errorf("unhandled message type: %s", message.Type) |
||||
} |
||||
|
||||
// Delete the message from the queue after completing all operations because
|
||||
// if the message is deleted first, the response may be lost,
|
||||
// resulting in an error, but since the message was actually deleted
|
||||
// the worker would never retry.
|
||||
if err := w.outboxQueue.Delete(ctx, message.MessageID); err != nil { |
||||
return fmt.Errorf("deleting message from outbox queue: %w", err) |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -1,246 +0,0 @@ |
||||
package worker_test |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"testing" |
||||
"time" |
||||
|
||||
secretv0alpha1 "github.com/grafana/grafana/pkg/apis/secret/v0alpha1" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/testutils" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/worker" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/xkube" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
type fakeKeeperService struct { |
||||
keeperForConfigFunc func(cfg secretv0alpha1.KeeperConfig) (contracts.Keeper, error) |
||||
} |
||||
|
||||
func newFakeKeeperService(keeperForConfigFunc func(cfg secretv0alpha1.KeeperConfig) (contracts.Keeper, error)) *fakeKeeperService { |
||||
return &fakeKeeperService{keeperForConfigFunc: keeperForConfigFunc} |
||||
} |
||||
|
||||
func (s *fakeKeeperService) KeeperForConfig(cfg secretv0alpha1.KeeperConfig) (contracts.Keeper, error) { |
||||
return s.keeperForConfigFunc(cfg) |
||||
} |
||||
|
||||
func TestProcessMessage(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
t.Run("secure value metadata status is set to Failed when processing a message fails too many times", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
// Given a worker that will attempt to process a message N times
|
||||
workerCfg := worker.Config{ |
||||
BatchSize: 10, |
||||
ReceiveTimeout: 1 * time.Second, |
||||
PollingInterval: time.Millisecond, |
||||
MaxMessageProcessingAttempts: 2, |
||||
} |
||||
|
||||
// And an error that keeps happening
|
||||
keeperService := newFakeKeeperService(func(cfg secretv0alpha1.KeeperConfig) (contracts.Keeper, error) { |
||||
return nil, fmt.Errorf("oops") |
||||
}) |
||||
|
||||
sut := testutils.Setup(t, testutils.WithWorkerConfig(workerCfg), testutils.WithKeeperService(keeperService)) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
for range workerCfg.MaxMessageProcessingAttempts + 1 { |
||||
// The secure value status should be Pending while the worker is trying to process the message
|
||||
sv, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhasePending, sv.Status.Phase) |
||||
|
||||
// Worker tries to process messages
|
||||
_ = sut.Worker.ReceiveAndProcessMessages(ctx) |
||||
} |
||||
|
||||
// After the worker fails to process a message too many times,
|
||||
// the secure value status is changed to Failed
|
||||
sv, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhaseFailed, sv.Status.Phase) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
}) |
||||
|
||||
t.Run("create sv: secure value metadata status is set to Succeeded when message is processed successfully", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
// Worker receives and processes the message
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
// and sets the secure value status to Succeeded
|
||||
sv, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhaseSucceeded, sv.Status.Phase) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
}) |
||||
|
||||
t.Run("update sv: secure value metadata status is set to Succeeded when message is processed successfully", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
// Worker receives and processes the message
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
// and sets the secure value status to Succeeded
|
||||
sv, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhaseSucceeded, sv.Status.Phase) |
||||
|
||||
sv.Spec.Description = "desc2" |
||||
sv.Spec.Value = secretv0alpha1.NewExposedSecureValue("v2") |
||||
|
||||
// Queue an update operation
|
||||
sv, err = sut.UpdateSv(ctx, sv) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhasePending, sv.Status.Phase) |
||||
|
||||
// Worker receives and processes the message
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
updatedSv, err := sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhaseSucceeded, updatedSv.Status.Phase) |
||||
require.Equal(t, sv.Spec.Description, updatedSv.Spec.Description) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
}) |
||||
|
||||
t.Run("delete sv: secure value metadata is deleted", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
|
||||
// Worker receives and processes the message
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
// and sets the secure value status to Succeeded
|
||||
sv, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhaseSucceeded, sv.Status.Phase) |
||||
|
||||
// Queue a delete operation
|
||||
updatedSv, err := sut.DeleteSv(ctx, sv.Namespace, sv.Name) |
||||
require.NoError(t, err) |
||||
require.Equal(t, secretv0alpha1.SecureValuePhasePending, updatedSv.Status.Phase) |
||||
|
||||
// Worker receives and processes the message
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
// The secure value has been deleted
|
||||
_, err = sut.SecureValueMetadataStorage.Read(ctx, xkube.Namespace(sv.Namespace), sv.Name, contracts.ReadOpts{}) |
||||
require.ErrorIs(t, err, contracts.ErrSecureValueNotFound) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
}) |
||||
|
||||
t.Run("when creating a secure value, the secret is encrypted before it is added to the outbox queue", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
var secret string |
||||
_, err := sut.CreateSv(ctx, func(cfg *testutils.CreateSvConfig) { |
||||
secret = string(cfg.Sv.Spec.Value) |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(messages)) |
||||
|
||||
encryptedSecret := messages[0].EncryptedSecret |
||||
require.NotEmpty(t, secret) |
||||
require.NotEmpty(t, encryptedSecret) |
||||
require.NotEqual(t, secret, encryptedSecret) |
||||
}) |
||||
|
||||
t.Run("when updating a secure value, the secret is encrypted before it is added to the outbox queue", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
sv.Spec.Value = secretv0alpha1.NewExposedSecureValue("v2") |
||||
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
newValue := "v2" |
||||
sv.Spec.Value = secretv0alpha1.NewExposedSecureValue(newValue) |
||||
|
||||
// Queue an update secure value operation
|
||||
_, err = sut.UpdateSv(ctx, sv) |
||||
require.NoError(t, err) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(messages)) |
||||
|
||||
encryptedSecret := messages[0].EncryptedSecret |
||||
require.NotEmpty(t, encryptedSecret) |
||||
require.NotEqual(t, newValue, encryptedSecret) |
||||
}) |
||||
|
||||
t.Run("when deleting a secure value, no value is added to the outbox message", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
sut := testutils.Setup(t) |
||||
ctx := context.Background() |
||||
|
||||
// Queue a create secure value operation
|
||||
sv, err := sut.CreateSv(ctx) |
||||
require.NoError(t, err) |
||||
sv.Spec.Value = secretv0alpha1.NewExposedSecureValue("v2") |
||||
|
||||
require.NoError(t, sut.Worker.ReceiveAndProcessMessages(ctx)) |
||||
|
||||
// Queue a delete secure value operation
|
||||
_, err = sut.DeleteSv(ctx, sv.Namespace, sv.Name) |
||||
require.NoError(t, err) |
||||
|
||||
messages, err := sut.OutboxQueue.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(messages)) |
||||
require.Empty(t, messages[0].EncryptedSecret) |
||||
}) |
||||
} |
File diff suppressed because one or more lines are too long
@ -1,4 +0,0 @@ |
||||
DELETE FROM {{ .Ident "secret_secure_value" }} |
||||
WHERE {{ .Ident "namespace" }} = {{ .Arg .Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Name }} |
||||
; |
@ -0,0 +1,10 @@ |
||||
SELECT |
||||
{{ .Ident "version" }} |
||||
FROM |
||||
{{ .Ident "secret_secure_value" }} |
||||
WHERE |
||||
{{ .Ident "namespace" }} = {{ .Arg .Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Name }} |
||||
ORDER BY {{ .Ident "version" }} DESC |
||||
LIMIT 1 |
||||
; |
@ -1,33 +0,0 @@ |
||||
INSERT INTO {{ .Ident "secret_secure_value_outbox" }} ( |
||||
{{ .Ident "request_id" }}, |
||||
{{ .Ident "message_type" }}, |
||||
{{ .Ident "name" }}, |
||||
{{ .Ident "namespace" }}, |
||||
{{ if .Row.EncryptedSecret.Valid }} |
||||
{{ .Ident "encrypted_secret" }}, |
||||
{{ end }} |
||||
{{ if .Row.KeeperName.Valid }} |
||||
{{ .Ident "keeper_name" }}, |
||||
{{ end }} |
||||
{{ if .Row.ExternalID.Valid }} |
||||
{{ .Ident "external_id" }}, |
||||
{{ end }} |
||||
{{ .Ident "receive_count" }}, |
||||
{{ .Ident "created" }} |
||||
) VALUES ( |
||||
{{ .Arg .Row.RequestID }}, |
||||
{{ .Arg .Row.MessageType }}, |
||||
{{ .Arg .Row.Name }}, |
||||
{{ .Arg .Row.Namespace }}, |
||||
{{ if .Row.EncryptedSecret.Valid }} |
||||
{{ .Arg .Row.EncryptedSecret.String }}, |
||||
{{ end }} |
||||
{{ if .Row.KeeperName.Valid }} |
||||
{{ .Arg .Row.KeeperName.String }}, |
||||
{{ end }} |
||||
{{ if .Row.ExternalID.Valid }} |
||||
{{ .Arg .Row.ExternalID.String }}, |
||||
{{ end }} |
||||
{{ .Arg .Row.ReceiveCount }}, |
||||
{{ .Arg .Row.Created }} |
||||
); |
@ -1,5 +0,0 @@ |
||||
DELETE FROM |
||||
{{ .Ident "secret_secure_value_outbox" }} |
||||
WHERE |
||||
{{ .Ident "id" }} = {{ .Arg .MessageID }} |
||||
; |
@ -1,6 +0,0 @@ |
||||
SELECT |
||||
{{ .Ident "id" }} |
||||
FROM {{ .Ident "secret_secure_value_outbox" }} |
||||
ORDER BY id ASC |
||||
LIMIT {{ .Arg .ReceiveLimit }} |
||||
; |
@ -1,8 +0,0 @@ |
||||
SELECT |
||||
{{ .Ident "created" }}, |
||||
{{ .Ident "message_type" }} |
||||
FROM |
||||
{{ .Ident "secret_secure_value_outbox" }} |
||||
WHERE |
||||
{{ .Ident "id" }} = {{ .Arg .MessageID }} |
||||
; |
@ -1,19 +0,0 @@ |
||||
SELECT |
||||
{{ .Ident "request_id" }}, |
||||
{{ .Ident "id" }}, |
||||
{{ .Ident "message_type" }}, |
||||
{{ .Ident "name" }}, |
||||
{{ .Ident "namespace" }}, |
||||
{{ .Ident "encrypted_secret" }}, |
||||
{{ .Ident "keeper_name" }}, |
||||
{{ .Ident "external_id" }}, |
||||
{{ .Ident "receive_count" }}, |
||||
{{ .Ident "created" }} |
||||
FROM |
||||
{{ .Ident "secret_secure_value_outbox" }} |
||||
WHERE |
||||
{{ .Ident "id" }} IN ({{ .ArgList .MessageIDs }}) |
||||
ORDER BY |
||||
{{ .Ident "id" }} ASC |
||||
{{ .SelectFor "UPDATE SKIP LOCKED" }} |
||||
; |
@ -1,7 +0,0 @@ |
||||
UPDATE |
||||
{{ .Ident "secret_secure_value_outbox" }} |
||||
SET |
||||
{{ .Ident "receive_count" }} = {{ .Ident "receive_count" }} + 1 |
||||
WHERE |
||||
{{ .Ident "id" }} IN ({{ .ArgList .MessageIDs }}) |
||||
; |
@ -0,0 +1,8 @@ |
||||
UPDATE |
||||
{{ .Ident "secret_secure_value" }} |
||||
SET |
||||
{{ .Ident "active" }} = ({{ .Ident "version" }} = {{ .Arg .Version}}) |
||||
WHERE |
||||
{{ .Ident "namespace" }} = {{ .Arg .Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Name }} |
||||
; |
@ -0,0 +1,9 @@ |
||||
UPDATE |
||||
{{ .Ident "secret_secure_value" }} |
||||
SET |
||||
{{ .Ident "active" }} = false |
||||
WHERE |
||||
{{ .Ident "namespace" }} = {{ .Arg .Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Name }} AND |
||||
{{ .Ident "version" }} = {{ .Arg .Version }} |
||||
; |
@ -1,30 +0,0 @@ |
||||
UPDATE |
||||
{{ .Ident "secret_secure_value" }} |
||||
SET |
||||
{{ .Ident "guid" }} = {{ .Arg .Row.GUID }}, |
||||
{{ .Ident "name" }} = {{ .Arg .Row.Name }}, |
||||
{{ .Ident "namespace" }} = {{ .Arg .Row.Namespace }}, |
||||
{{ .Ident "annotations" }} = {{ .Arg .Row.Annotations }}, |
||||
{{ .Ident "labels" }} = {{ .Arg .Row.Labels }}, |
||||
{{ .Ident "created" }} = {{ .Arg .Row.Created }}, |
||||
{{ .Ident "created_by" }} = {{ .Arg .Row.CreatedBy }}, |
||||
{{ .Ident "updated" }} = {{ .Arg .Row.Updated }}, |
||||
{{ .Ident "updated_by" }} = {{ .Arg .Row.UpdatedBy }}, |
||||
{{ .Ident "status_phase" }} = {{ .Arg .Row.Phase }}, |
||||
{{ if .Row.Message.Valid }} |
||||
{{ .Ident "status_message" }} = {{ .Arg .Row.Message.String }}, |
||||
{{ end }} |
||||
{{ .Ident "description" }} = {{ .Arg .Row.Description }}, |
||||
{{ if .Row.Keeper.Valid }} |
||||
{{ .Ident "keeper" }} = {{ .Arg .Row.Keeper.String }}, |
||||
{{ end }} |
||||
{{ if .Row.Decrypters.Valid }} |
||||
{{ .Ident "decrypters" }} = {{ .Arg .Row.Decrypters.String }}, |
||||
{{ end }} |
||||
{{ if .Row.Ref.Valid }} |
||||
{{ .Ident "ref" }} = {{ .Arg .Row.Ref.String }}, |
||||
{{ end }} |
||||
{{ .Ident "external_id" }} = {{ .Arg .Row.ExternalID }} |
||||
WHERE {{ .Ident "namespace" }} = {{ .Arg .Row.Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Row.Name }} |
||||
; |
@ -1,8 +0,0 @@ |
||||
UPDATE |
||||
{{ .Ident "secret_secure_value" }} |
||||
SET |
||||
{{ .Ident "status_phase" }} = {{ .Arg .Phase }}, |
||||
{{ .Ident "status_message" }} = {{ .Arg .Message }} |
||||
WHERE {{ .Ident "namespace" }} = {{ .Arg .Namespace }} AND |
||||
{{ .Ident "name" }} = {{ .Arg .Name }} |
||||
; |
@ -1,397 +0,0 @@ |
||||
package metadata |
||||
|
||||
import ( |
||||
"context" |
||||
"database/sql" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/storage/secret/metadata/metrics" |
||||
unifiedsql "github.com/grafana/grafana/pkg/storage/unified/sql" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"go.opentelemetry.io/otel/attribute" |
||||
"go.opentelemetry.io/otel/codes" |
||||
"go.opentelemetry.io/otel/trace" |
||||
|
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/assert" |
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts" |
||||
"github.com/grafana/grafana/pkg/storage/unified/sql/sqltemplate" |
||||
) |
||||
|
||||
type outboxStore struct { |
||||
db contracts.Database |
||||
dialect sqltemplate.Dialect |
||||
tracer trace.Tracer |
||||
metrics *metrics.StorageMetrics |
||||
} |
||||
|
||||
func ProvideOutboxQueue( |
||||
db contracts.Database, |
||||
tracer trace.Tracer, |
||||
reg prometheus.Registerer, |
||||
) contracts.OutboxQueue { |
||||
return &outboxStore{ |
||||
db: db, |
||||
dialect: sqltemplate.DialectForDriver(db.DriverName()), |
||||
metrics: metrics.NewStorageMetrics(reg), |
||||
tracer: tracer, |
||||
} |
||||
} |
||||
|
||||
type outboxMessageDB struct { |
||||
RequestID string |
||||
MessageID int64 |
||||
MessageType contracts.OutboxMessageType |
||||
Name string |
||||
Namespace string |
||||
EncryptedSecret sql.NullString |
||||
KeeperName sql.NullString |
||||
ExternalID sql.NullString |
||||
ReceiveCount int |
||||
Created int64 |
||||
} |
||||
|
||||
func (s *outboxStore) Append(ctx context.Context, input contracts.AppendOutboxMessage) (messageID int64, err error) { |
||||
ctx, span := s.tracer.Start(ctx, "outboxStore.Append", trace.WithAttributes( |
||||
attribute.String("name", input.Name), |
||||
attribute.String("namespace", input.Namespace), |
||||
attribute.String("type", string(input.Type)), |
||||
attribute.String("requestID", input.RequestID), |
||||
)) |
||||
defer span.End() |
||||
|
||||
defer func() { |
||||
if err != nil { |
||||
span.SetStatus(codes.Error, "failed to append outbox message") |
||||
span.RecordError(err) |
||||
} |
||||
|
||||
if messageID != 0 { |
||||
span.SetAttributes(attribute.Int64("messageID", messageID)) |
||||
} |
||||
}() |
||||
|
||||
assert.True(input.Type != "", "outboxStore.Append: outbox message type is required") |
||||
|
||||
start := time.Now() |
||||
messageID, err = s.insertMessage(ctx, input) |
||||
if err != nil { |
||||
return messageID, fmt.Errorf("inserting message into outbox table: %+w", err) |
||||
} |
||||
|
||||
s.metrics.OutboxAppendDuration.WithLabelValues(string(input.Type)).Observe(time.Since(start).Seconds()) |
||||
s.metrics.OutboxAppendCount.WithLabelValues(string(input.Type)).Inc() |
||||
|
||||
return messageID, nil |
||||
} |
||||
|
||||
func (s *outboxStore) insertMessage(ctx context.Context, input contracts.AppendOutboxMessage) (int64, error) { |
||||
keeperName := sql.NullString{} |
||||
if input.KeeperName != nil { |
||||
keeperName = sql.NullString{ |
||||
Valid: true, |
||||
String: *input.KeeperName, |
||||
} |
||||
} |
||||
|
||||
externalID := sql.NullString{} |
||||
if input.ExternalID != nil { |
||||
externalID = sql.NullString{ |
||||
Valid: true, |
||||
String: *input.ExternalID, |
||||
} |
||||
} |
||||
|
||||
encryptedSecret := sql.NullString{} |
||||
if input.Type == contracts.CreateSecretOutboxMessage || input.Type == contracts.UpdateSecretOutboxMessage { |
||||
encryptedSecret = sql.NullString{ |
||||
Valid: true, |
||||
String: input.EncryptedSecret, |
||||
} |
||||
} |
||||
|
||||
req := appendSecureValueOutbox{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
Row: &outboxMessageDB{ |
||||
RequestID: input.RequestID, |
||||
MessageType: input.Type, |
||||
Name: input.Name, |
||||
Namespace: input.Namespace, |
||||
EncryptedSecret: encryptedSecret, |
||||
KeeperName: keeperName, |
||||
ExternalID: externalID, |
||||
ReceiveCount: 0, |
||||
Created: time.Now().UTC().UnixMilli(), |
||||
}, |
||||
} |
||||
|
||||
query, err := sqltemplate.Execute(sqlSecureValueOutboxAppend, req) |
||||
if err != nil { |
||||
return 0, fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxAppend.Name(), err) |
||||
} |
||||
|
||||
result, err := s.db.ExecContext(ctx, query, req.GetArgs()...) |
||||
if err != nil { |
||||
if unifiedsql.IsRowAlreadyExistsError(err) { |
||||
return 0, contracts.ErrSecureValueOperationInProgress |
||||
} |
||||
return 0, fmt.Errorf("inserting message into secure value outbox table: %w", err) |
||||
} |
||||
|
||||
rowsAffected, err := result.RowsAffected() |
||||
if err != nil { |
||||
return 0, fmt.Errorf("get rows affected: %w", err) |
||||
} |
||||
|
||||
if rowsAffected != 1 { |
||||
return 0, fmt.Errorf("expected to affect 1 row, but affected %d", rowsAffected) |
||||
} |
||||
|
||||
id, err := result.LastInsertId() |
||||
if err != nil { |
||||
return id, fmt.Errorf("fetching last inserted id: %w", err) |
||||
} |
||||
|
||||
return id, nil |
||||
} |
||||
|
||||
func (s *outboxStore) ReceiveN(ctx context.Context, limit uint) ([]contracts.OutboxMessage, error) { |
||||
messageIDs, err := s.fetchMessageIdsInQueue(ctx, limit) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("fetching message ids from queue: %w", err) |
||||
} |
||||
// If queue is empty
|
||||
if len(messageIDs) == 0 { |
||||
return nil, nil |
||||
} |
||||
req := receiveNSecureValueOutbox{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
MessageIDs: messageIDs, |
||||
} |
||||
|
||||
start := time.Now() |
||||
query, err := sqltemplate.Execute(sqlSecureValueOutboxReceiveN, req) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxReceiveN.Name(), err) |
||||
} |
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, req.GetArgs()...) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("fetching rows from secure value outbox table: %w", err) |
||||
} |
||||
defer func() { _ = rows.Close() }() |
||||
|
||||
messages := make([]contracts.OutboxMessage, 0) |
||||
|
||||
for rows.Next() { |
||||
var row outboxMessageDB |
||||
if err := rows.Scan( |
||||
&row.RequestID, |
||||
&row.MessageID, |
||||
&row.MessageType, |
||||
&row.Name, |
||||
&row.Namespace, |
||||
&row.EncryptedSecret, |
||||
&row.KeeperName, |
||||
&row.ExternalID, |
||||
&row.ReceiveCount, |
||||
&row.Created, |
||||
); err != nil { |
||||
return nil, fmt.Errorf("scanning row from secure value outbox table: %w", err) |
||||
} |
||||
|
||||
var keeperName *string |
||||
if row.KeeperName.Valid { |
||||
keeperName = &row.KeeperName.String |
||||
} |
||||
|
||||
var externalID *string |
||||
if row.ExternalID.Valid { |
||||
externalID = &row.ExternalID.String |
||||
} |
||||
|
||||
msg := contracts.OutboxMessage{ |
||||
RequestID: row.RequestID, |
||||
Type: row.MessageType, |
||||
MessageID: row.MessageID, |
||||
Name: row.Name, |
||||
Namespace: row.Namespace, |
||||
KeeperName: keeperName, |
||||
ExternalID: externalID, |
||||
ReceiveCount: row.ReceiveCount, |
||||
Created: row.Created, |
||||
} |
||||
|
||||
if row.MessageType != contracts.DeleteSecretOutboxMessage && row.EncryptedSecret.Valid { |
||||
msg.EncryptedSecret = row.EncryptedSecret.String |
||||
} |
||||
|
||||
messages = append(messages, msg) |
||||
} |
||||
|
||||
if err := rows.Err(); err != nil { |
||||
return messages, fmt.Errorf("reading rows: %w", err) |
||||
} |
||||
|
||||
s.metrics.OutboxReceiveDuration.Observe(time.Since(start).Seconds()) |
||||
s.metrics.OutboxReceiveCount.Add(float64(len(messages))) |
||||
|
||||
return messages, nil |
||||
} |
||||
|
||||
func (s *outboxStore) fetchMessageIdsInQueue(ctx context.Context, limit uint) ([]int64, error) { |
||||
req := fetchMessageIDsOutbox{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
ReceiveLimit: limit, |
||||
} |
||||
|
||||
query, err := sqltemplate.Execute(sqlSecureValueOutboxFetchMessageIDs, req) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxFetchMessageIDs.Name(), err) |
||||
} |
||||
|
||||
rows, err := s.db.QueryContext(ctx, query, req.GetArgs()...) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("fetching rows from secure value outbox table: %w", err) |
||||
} |
||||
defer func() { _ = rows.Close() }() |
||||
|
||||
messageIDs := make([]int64, 0, limit) |
||||
|
||||
for rows.Next() { |
||||
var id int64 |
||||
if err := rows.Scan(&id); err != nil { |
||||
return nil, fmt.Errorf("scanning row; %w", err) |
||||
} |
||||
messageIDs = append(messageIDs, id) |
||||
} |
||||
|
||||
if err := rows.Err(); err != nil { |
||||
return nil, fmt.Errorf("reading rows: %w", err) |
||||
} |
||||
|
||||
return messageIDs, nil |
||||
} |
||||
|
||||
func (s *outboxStore) Delete(ctx context.Context, messageID int64) (err error) { |
||||
ctx, span := s.tracer.Start(ctx, "outboxStore.Append", trace.WithAttributes( |
||||
attribute.Int64("messageID", messageID), |
||||
)) |
||||
defer span.End() |
||||
|
||||
defer func() { |
||||
if err != nil { |
||||
span.SetStatus(codes.Error, "failed to delete message from outbox") |
||||
span.RecordError(err) |
||||
} |
||||
}() |
||||
|
||||
assert.True(messageID != 0, "outboxStore.Delete: messageID is required") |
||||
|
||||
start := time.Now() |
||||
if err := s.deleteMessage(ctx, messageID); err != nil { |
||||
return fmt.Errorf("deleting message from outbox table %+w", err) |
||||
} |
||||
|
||||
s.metrics.OutboxDeleteDuration.Observe(time.Since(start).Seconds()) |
||||
s.metrics.OutboxDeleteCount.Inc() |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (s *outboxStore) deleteMessage(ctx context.Context, messageID int64) error { |
||||
tsReq := getOutboxMessageTimestamp{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
MessageID: messageID, |
||||
} |
||||
|
||||
// First query the object so we can get the timestamp and calculate the total lifetime
|
||||
timestampQuery, err := sqltemplate.Execute(sqlSecureValueOutboxQueryTimestamp, tsReq) |
||||
if err != nil { |
||||
return fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxQueryTimestamp.Name(), err) |
||||
} |
||||
|
||||
rows, err := s.db.QueryContext(ctx, timestampQuery, tsReq.GetArgs()...) |
||||
if err != nil { |
||||
return fmt.Errorf("querying timestamp from secure value outbox table: %w", err) |
||||
} |
||||
|
||||
if !rows.Next() { |
||||
_ = rows.Close() |
||||
return fmt.Errorf("no row found for message id=%v", messageID) |
||||
} |
||||
|
||||
var timestamp int64 |
||||
var messageType string |
||||
if err := rows.Scan(×tamp, &messageType); err != nil { |
||||
_ = rows.Close() |
||||
return fmt.Errorf("scanning timestamp: %w", err) |
||||
} |
||||
|
||||
// Explicitly close rows and check for errors before proceeding
|
||||
if err := rows.Close(); err != nil { |
||||
return fmt.Errorf("closing rows: %w", err) |
||||
} |
||||
|
||||
if err := rows.Err(); err != nil { |
||||
return fmt.Errorf("rows error: %w", err) |
||||
} |
||||
|
||||
totalLifetime := time.Since(time.UnixMilli(timestamp)) |
||||
s.metrics.OutboxTotalMessageLifetimeDuration.WithLabelValues(messageType).Observe(totalLifetime.Seconds()) |
||||
|
||||
// Then delete the object
|
||||
delReq := deleteSecureValueOutbox{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
MessageID: messageID, |
||||
} |
||||
|
||||
query, err := sqltemplate.Execute(sqlSecureValueOutboxDelete, delReq) |
||||
if err != nil { |
||||
return fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxDelete.Name(), err) |
||||
} |
||||
|
||||
result, err := s.db.ExecContext(ctx, query, delReq.GetArgs()...) |
||||
if err != nil { |
||||
return fmt.Errorf("deleting message id=%v from secure value outbox table: %w", messageID, err) |
||||
} |
||||
|
||||
rowsAffected, err := result.RowsAffected() |
||||
if err != nil { |
||||
return fmt.Errorf("get rows affected: %w", err) |
||||
} |
||||
|
||||
// TODO: Presumably it's a bug if we delete 0 rows?
|
||||
if rowsAffected > 1 { |
||||
return fmt.Errorf("bug: deleted more than one row from the outbox table, should delete only one at a time: deleted=%v", rowsAffected) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (s *outboxStore) IncrementReceiveCount(ctx context.Context, messageIDs []int64) error { |
||||
if len(messageIDs) == 0 { |
||||
return nil |
||||
} |
||||
|
||||
req := incrementReceiveCountOutbox{ |
||||
SQLTemplate: sqltemplate.New(s.dialect), |
||||
MessageIDs: messageIDs, |
||||
} |
||||
|
||||
start := time.Now() |
||||
query, err := sqltemplate.Execute(sqlSecureValueOutboxUpdateReceiveCount, req) |
||||
if err != nil { |
||||
return fmt.Errorf("execute template %q: %w", sqlSecureValueOutboxUpdateReceiveCount.Name(), err) |
||||
} |
||||
|
||||
_, err = s.db.ExecContext(ctx, query, req.GetArgs()...) |
||||
if err != nil { |
||||
return fmt.Errorf("updating outbox messages receive count: %w", err) |
||||
} |
||||
|
||||
s.metrics.OutboxIncrementReceiveCountDuration.Observe(time.Since(start).Seconds()) |
||||
s.metrics.OutboxIncrementReceiveCountCount.Add(float64(len(messageIDs))) |
||||
|
||||
return nil |
||||
} |
@ -1,269 +0,0 @@ |
||||
package metadata |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"math/rand" |
||||
"slices" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/grafana/pkg/registry/apis/secret/contracts" |
||||
"github.com/grafana/grafana/pkg/services/sqlstore" |
||||
"github.com/grafana/grafana/pkg/storage/secret/database" |
||||
"github.com/grafana/grafana/pkg/storage/secret/migrator" |
||||
"github.com/stretchr/testify/require" |
||||
"go.opentelemetry.io/otel/trace/noop" |
||||
) |
||||
|
||||
type outboxStoreModel struct { |
||||
rows []contracts.OutboxMessage |
||||
} |
||||
|
||||
func newOutboxStoreModel() *outboxStoreModel { |
||||
return &outboxStoreModel{} |
||||
} |
||||
|
||||
func (model *outboxStoreModel) Append(messageID int64, message contracts.AppendOutboxMessage) { |
||||
model.rows = append(model.rows, contracts.OutboxMessage{ |
||||
Type: message.Type, |
||||
MessageID: messageID, |
||||
Name: message.Name, |
||||
Namespace: message.Namespace, |
||||
EncryptedSecret: message.EncryptedSecret, |
||||
KeeperName: message.KeeperName, |
||||
ExternalID: message.ExternalID, |
||||
}) |
||||
} |
||||
|
||||
func (model *outboxStoreModel) ReceiveN(n uint) []contracts.OutboxMessage { |
||||
maxMessages := min(len(model.rows), int(n)) |
||||
if maxMessages == 0 { |
||||
return nil |
||||
} |
||||
return model.rows[:maxMessages] |
||||
} |
||||
|
||||
func (model *outboxStoreModel) Delete(messageID int64) { |
||||
oldLen := len(model.rows) |
||||
model.rows = slices.DeleteFunc(model.rows, func(m contracts.OutboxMessage) bool { |
||||
return m.MessageID == messageID |
||||
}) |
||||
if len(model.rows) != oldLen-1 { |
||||
panic("Delete: deleted more than one message") |
||||
} |
||||
} |
||||
|
||||
func TestOutboxStoreModel(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
model := newOutboxStoreModel() |
||||
|
||||
require.Empty(t, model.ReceiveN(10)) |
||||
|
||||
appendOutboxMessage := contracts.AppendOutboxMessage{ |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "s-1", |
||||
Namespace: "n-1", |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
|
||||
outboxMessage1 := contracts.OutboxMessage{ |
||||
MessageID: 1, |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "s-1", |
||||
Namespace: "n-1", |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
|
||||
outboxMessage2 := contracts.OutboxMessage{ |
||||
MessageID: 2, |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "s-1", |
||||
Namespace: "n-1", |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
|
||||
model.Append(1, appendOutboxMessage) |
||||
|
||||
require.Equal(t, []contracts.OutboxMessage{outboxMessage1}, model.ReceiveN(10)) |
||||
|
||||
model.Append(2, appendOutboxMessage) |
||||
|
||||
require.Equal(t, []contracts.OutboxMessage{outboxMessage1, outboxMessage2}, model.ReceiveN(10)) |
||||
|
||||
model.Delete(outboxMessage1.MessageID) |
||||
|
||||
require.Equal(t, []contracts.OutboxMessage{outboxMessage2}, model.ReceiveN(5)) |
||||
|
||||
model.Delete(outboxMessage2.MessageID) |
||||
|
||||
require.Empty(t, model.ReceiveN(1)) |
||||
} |
||||
|
||||
func TestOutboxStoreSecureValueOperationInProgress(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
t.Run("Append returns error when the queue already contains an operation for the secure value", func(t *testing.T) { |
||||
t.Parallel() |
||||
|
||||
testDB := sqlstore.NewTestStore(t, sqlstore.WithMigrator(migrator.New())) |
||||
tracer := noop.NewTracerProvider().Tracer("test") |
||||
|
||||
ctx := context.Background() |
||||
|
||||
outbox := ProvideOutboxQueue(database.ProvideDatabase(testDB, tracer), tracer, nil) |
||||
|
||||
_, err := outbox.Append(ctx, contracts.AppendOutboxMessage{ |
||||
RequestID: "1", |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "name1", |
||||
Namespace: "ns1", |
||||
EncryptedSecret: "v1", |
||||
KeeperName: nil, |
||||
ExternalID: nil, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
_, err = outbox.Append(ctx, contracts.AppendOutboxMessage{ |
||||
RequestID: "1", |
||||
Type: contracts.UpdateSecretOutboxMessage, |
||||
Name: "name1", |
||||
Namespace: "ns1", |
||||
EncryptedSecret: "v1", |
||||
KeeperName: nil, |
||||
ExternalID: nil, |
||||
}) |
||||
|
||||
require.ErrorIs(t, err, contracts.ErrSecureValueOperationInProgress) |
||||
}) |
||||
} |
||||
|
||||
func TestOutboxStore(t *testing.T) { |
||||
testDB := sqlstore.NewTestStore(t, sqlstore.WithMigrator(migrator.New())) |
||||
tracer := noop.NewTracerProvider().Tracer("test") |
||||
|
||||
ctx := context.Background() |
||||
|
||||
outbox := ProvideOutboxQueue(database.ProvideDatabase(testDB, tracer), tracer, nil) |
||||
|
||||
m1 := contracts.AppendOutboxMessage{ |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "s-1", |
||||
Namespace: "n-1", |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
m2 := contracts.AppendOutboxMessage{ |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: "s-1", |
||||
Namespace: "n-2", |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
|
||||
messages, err := outbox.ReceiveN(ctx, 10) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
|
||||
messageID1, err := outbox.Append(ctx, m1) |
||||
require.NoError(t, err) |
||||
|
||||
for range 2 { |
||||
messages, err = outbox.ReceiveN(ctx, 10) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(messages)) |
||||
require.Equal(t, messageID1, messages[0].MessageID) |
||||
} |
||||
|
||||
messageID2, err := outbox.Append(ctx, m2) |
||||
require.NoError(t, err) |
||||
|
||||
messages, err = outbox.ReceiveN(ctx, 3) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 2, len(messages)) |
||||
require.Equal(t, messageID1, messages[0].MessageID) |
||||
require.Equal(t, messageID2, messages[1].MessageID) |
||||
|
||||
require.NoError(t, outbox.Delete(ctx, messageID1)) |
||||
messages, err = outbox.ReceiveN(ctx, 10) |
||||
require.NoError(t, err) |
||||
require.Equal(t, 1, len(messages)) |
||||
require.Equal(t, messageID2, messages[0].MessageID) |
||||
|
||||
require.NoError(t, outbox.Delete(ctx, messageID2)) |
||||
|
||||
messages, err = outbox.ReceiveN(ctx, 100) |
||||
require.NoError(t, err) |
||||
require.Empty(t, messages) |
||||
} |
||||
|
||||
func TestOutboxStoreProperty(t *testing.T) { |
||||
seed := time.Now().UnixMicro() |
||||
rng := rand.New(rand.NewSource(seed)) |
||||
|
||||
defer func() { |
||||
if t.Failed() { |
||||
fmt.Printf("TestOutboxStoreProperty: SEED=%+v\n\n", seed) |
||||
} |
||||
}() |
||||
|
||||
// The number of iterations was decided arbitrarily based on the time the test takes to run
|
||||
for range 10 { |
||||
testDB := sqlstore.NewTestStore(t, sqlstore.WithMigrator(migrator.New())) |
||||
tracer := noop.NewTracerProvider().Tracer("test") |
||||
|
||||
outbox := ProvideOutboxQueue(database.ProvideDatabase(testDB, tracer), tracer, nil) |
||||
|
||||
model := newOutboxStoreModel() |
||||
|
||||
ctx := context.Background() |
||||
|
||||
for i := range 100 { |
||||
n := rng.Intn(3) |
||||
switch n { |
||||
case 0: |
||||
time.Sleep(1 * time.Microsecond) |
||||
message := contracts.AppendOutboxMessage{ |
||||
Type: contracts.CreateSecretOutboxMessage, |
||||
Name: fmt.Sprintf("s-%d", i), |
||||
Namespace: fmt.Sprintf("n-%d", i), |
||||
EncryptedSecret: "value", |
||||
ExternalID: nil, |
||||
} |
||||
messageID, err := outbox.Append(ctx, message) |
||||
require.NoError(t, err) |
||||
|
||||
model.Append(messageID, message) |
||||
|
||||
case 1: |
||||
n := uint(rng.Intn(10)) |
||||
messages, err := outbox.ReceiveN(ctx, n) |
||||
require.NoError(t, err) |
||||
|
||||
modelMessages := model.ReceiveN(n) |
||||
|
||||
require.Equal(t, len(modelMessages), len(messages)) |
||||
for i := range len(modelMessages) { |
||||
require.Equal(t, modelMessages[i].MessageID, messages[i].MessageID) |
||||
} |
||||
|
||||
case 2: |
||||
if len(model.rows) == 0 { |
||||
continue |
||||
} |
||||
|
||||
message := model.rows[rng.Intn(len(model.rows))] |
||||
|
||||
model.Delete(message.MessageID) |
||||
require.NoError(t, outbox.Delete(ctx, message.MessageID)) |
||||
|
||||
default: |
||||
panic(fmt.Sprintf("unhandled action: %+v", n)) |
||||
} |
||||
} |
||||
} |
||||
} |
@ -1,4 +0,0 @@ |
||||
DELETE FROM `secret_secure_value` |
||||
WHERE `namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
; |
@ -0,0 +1,10 @@ |
||||
SELECT |
||||
`version` |
||||
FROM |
||||
`secret_secure_value` |
||||
WHERE |
||||
`namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
ORDER BY `version` DESC |
||||
LIMIT 1 |
||||
; |
@ -1,21 +0,0 @@ |
||||
INSERT INTO `secret_secure_value_outbox` ( |
||||
`request_id`, |
||||
`message_type`, |
||||
`name`, |
||||
`namespace`, |
||||
`encrypted_secret`, |
||||
`keeper_name`, |
||||
`external_id`, |
||||
`receive_count`, |
||||
`created` |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
'', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO `secret_secure_value_outbox` ( |
||||
`request_id`, |
||||
`message_type`, |
||||
`name`, |
||||
`namespace`, |
||||
`keeper_name`, |
||||
`external_id`, |
||||
`receive_count`, |
||||
`created` |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'keeper', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO `secret_secure_value_outbox` ( |
||||
`request_id`, |
||||
`message_type`, |
||||
`name`, |
||||
`namespace`, |
||||
`encrypted_secret`, |
||||
`keeper_name`, |
||||
`receive_count`, |
||||
`created` |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO `secret_secure_value_outbox` ( |
||||
`request_id`, |
||||
`message_type`, |
||||
`name`, |
||||
`namespace`, |
||||
`encrypted_secret`, |
||||
`external_id`, |
||||
`receive_count`, |
||||
`created` |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,5 +0,0 @@ |
||||
DELETE FROM |
||||
`secret_secure_value_outbox` |
||||
WHERE |
||||
`id` = 1 |
||||
; |
@ -1,6 +0,0 @@ |
||||
SELECT |
||||
`id` |
||||
FROM `secret_secure_value_outbox` |
||||
ORDER BY id ASC |
||||
LIMIT 10 |
||||
; |
@ -1,19 +0,0 @@ |
||||
SELECT |
||||
`request_id`, |
||||
`id`, |
||||
`message_type`, |
||||
`name`, |
||||
`namespace`, |
||||
`encrypted_secret`, |
||||
`keeper_name`, |
||||
`external_id`, |
||||
`receive_count`, |
||||
`created` |
||||
FROM |
||||
`secret_secure_value_outbox` |
||||
WHERE |
||||
`id` IN (1, 2, 3) |
||||
ORDER BY |
||||
`id` ASC |
||||
FOR UPDATE SKIP LOCKED |
||||
; |
@ -1,7 +0,0 @@ |
||||
UPDATE |
||||
`secret_secure_value_outbox` |
||||
SET |
||||
`receive_count` = `receive_count` + 1 |
||||
WHERE |
||||
`id` IN (1, 2, 3) |
||||
; |
@ -0,0 +1,8 @@ |
||||
UPDATE |
||||
`secret_secure_value` |
||||
SET |
||||
`active` = (`version` = 1) |
||||
WHERE |
||||
`namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
; |
@ -1,22 +0,0 @@ |
||||
UPDATE |
||||
`secret_secure_value` |
||||
SET |
||||
`guid` = 'abc', |
||||
`name` = 'name', |
||||
`namespace` = 'ns', |
||||
`annotations` = '{"x":"XXXX"}', |
||||
`labels` = '{"a":"AAA", "b", "BBBB"}', |
||||
`created` = 1234, |
||||
`created_by` = 'user:ryan', |
||||
`updated` = 5678, |
||||
`updated_by` = 'user:cameron', |
||||
`status_phase` = 'creating', |
||||
`status_message` = 'message_test', |
||||
`description` = 'description', |
||||
`keeper` = 'keeper_test', |
||||
`decrypters` = 'decrypters_test', |
||||
`ref` = 'ref_test', |
||||
`external_id` = 'extId' |
||||
WHERE `namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
; |
@ -1,18 +0,0 @@ |
||||
UPDATE |
||||
`secret_secure_value` |
||||
SET |
||||
`guid` = 'abc', |
||||
`name` = 'name', |
||||
`namespace` = 'ns', |
||||
`annotations` = '{"x":"XXXX"}', |
||||
`labels` = '{"a":"AAA", "b", "BBBB"}', |
||||
`created` = 1234, |
||||
`created_by` = 'user:ryan', |
||||
`updated` = 5678, |
||||
`updated_by` = 'user:cameron', |
||||
`status_phase` = 'creating', |
||||
`description` = 'description', |
||||
`external_id` = 'extId' |
||||
WHERE `namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
; |
@ -1,8 +0,0 @@ |
||||
UPDATE |
||||
`secret_secure_value` |
||||
SET |
||||
`status_phase` = 'Succeeded', |
||||
`status_message` = 'message-1' |
||||
WHERE `namespace` = 'ns' AND |
||||
`name` = 'name' |
||||
; |
@ -1,4 +0,0 @@ |
||||
DELETE FROM "secret_secure_value" |
||||
WHERE "namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -0,0 +1,10 @@ |
||||
SELECT |
||||
"version" |
||||
FROM |
||||
"secret_secure_value" |
||||
WHERE |
||||
"namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
ORDER BY "version" DESC |
||||
LIMIT 1 |
||||
; |
@ -1,21 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"keeper_name", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
'', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"keeper_name", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'keeper', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"keeper_name", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,5 +0,0 @@ |
||||
DELETE FROM |
||||
"secret_secure_value_outbox" |
||||
WHERE |
||||
"id" = 1 |
||||
; |
@ -1,6 +0,0 @@ |
||||
SELECT |
||||
"id" |
||||
FROM "secret_secure_value_outbox" |
||||
ORDER BY id ASC |
||||
LIMIT 10 |
||||
; |
@ -1,19 +0,0 @@ |
||||
SELECT |
||||
"request_id", |
||||
"id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"keeper_name", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
FROM |
||||
"secret_secure_value_outbox" |
||||
WHERE |
||||
"id" IN (1, 2, 3) |
||||
ORDER BY |
||||
"id" ASC |
||||
FOR UPDATE SKIP LOCKED |
||||
; |
@ -1,7 +0,0 @@ |
||||
UPDATE |
||||
"secret_secure_value_outbox" |
||||
SET |
||||
"receive_count" = "receive_count" + 1 |
||||
WHERE |
||||
"id" IN (1, 2, 3) |
||||
; |
@ -0,0 +1,8 @@ |
||||
UPDATE |
||||
"secret_secure_value" |
||||
SET |
||||
"active" = ("version" = 1) |
||||
WHERE |
||||
"namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -1,22 +0,0 @@ |
||||
UPDATE |
||||
"secret_secure_value" |
||||
SET |
||||
"guid" = 'abc', |
||||
"name" = 'name', |
||||
"namespace" = 'ns', |
||||
"annotations" = '{"x":"XXXX"}', |
||||
"labels" = '{"a":"AAA", "b", "BBBB"}', |
||||
"created" = 1234, |
||||
"created_by" = 'user:ryan', |
||||
"updated" = 5678, |
||||
"updated_by" = 'user:cameron', |
||||
"status_phase" = 'creating', |
||||
"status_message" = 'message_test', |
||||
"description" = 'description', |
||||
"keeper" = 'keeper_test', |
||||
"decrypters" = 'decrypters_test', |
||||
"ref" = 'ref_test', |
||||
"external_id" = 'extId' |
||||
WHERE "namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -1,18 +0,0 @@ |
||||
UPDATE |
||||
"secret_secure_value" |
||||
SET |
||||
"guid" = 'abc', |
||||
"name" = 'name', |
||||
"namespace" = 'ns', |
||||
"annotations" = '{"x":"XXXX"}', |
||||
"labels" = '{"a":"AAA", "b", "BBBB"}', |
||||
"created" = 1234, |
||||
"created_by" = 'user:ryan', |
||||
"updated" = 5678, |
||||
"updated_by" = 'user:cameron', |
||||
"status_phase" = 'creating', |
||||
"description" = 'description', |
||||
"external_id" = 'extId' |
||||
WHERE "namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -1,8 +0,0 @@ |
||||
UPDATE |
||||
"secret_secure_value" |
||||
SET |
||||
"status_phase" = 'Succeeded', |
||||
"status_message" = 'message-1' |
||||
WHERE "namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -1,4 +0,0 @@ |
||||
DELETE FROM "secret_secure_value" |
||||
WHERE "namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
; |
@ -0,0 +1,10 @@ |
||||
SELECT |
||||
"version" |
||||
FROM |
||||
"secret_secure_value" |
||||
WHERE |
||||
"namespace" = 'ns' AND |
||||
"name" = 'name' |
||||
ORDER BY "version" DESC |
||||
LIMIT 1 |
||||
; |
@ -1,21 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"keeper_name", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
'', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"keeper_name", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'keeper', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"keeper_name", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'keeper', |
||||
0, |
||||
1234 |
||||
); |
@ -1,19 +0,0 @@ |
||||
INSERT INTO "secret_secure_value_outbox" ( |
||||
"request_id", |
||||
"message_type", |
||||
"name", |
||||
"namespace", |
||||
"encrypted_secret", |
||||
"external_id", |
||||
"receive_count", |
||||
"created" |
||||
) VALUES ( |
||||
'', |
||||
'some-type', |
||||
'name', |
||||
'namespace', |
||||
'encrypted', |
||||
'external-id', |
||||
0, |
||||
1234 |
||||
); |
@ -1,5 +0,0 @@ |
||||
DELETE FROM |
||||
"secret_secure_value_outbox" |
||||
WHERE |
||||
"id" = 1 |
||||
; |
Some files were not shown because too many files have changed in this diff Show More
Loading…
Reference in new issue