unistore: add eventstore and notifier (#107182)

* Add datastore

* too many slashes

* lint

* add metadata store

* simplify meta

* Add eventstore

* golint

* lint

* Add datastore

* too many slashes

* lint

* pr comments

* extract ParseKey

* readcloser

* remove get prefix

* use dedicated keys

* parsekey

* sameresource

* unrelated

* name

* renmae tests

* add key validation

* fix tests

* refactor a bit

* lint

* allow empty ns

* get keys instead of list

* rename the functions

* refactor yield candidate

* update test

* lint

* missing err check

* address comments

* increase the timeout
pull/107303/head
Georges Chaudy 3 weeks ago committed by GitHub
parent 57c59cf3cb
commit 4a272fb61b
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 189
      pkg/storage/unified/resource/eventstore.go
  2. 398
      pkg/storage/unified/resource/eventstore_test.go
  3. 120
      pkg/storage/unified/resource/notifier.go
  4. 442
      pkg/storage/unified/resource/notifier_test.go

@ -0,0 +1,189 @@
package resource
import (
"bytes"
"context"
"encoding/json"
"fmt"
"iter"
"strconv"
"strings"
)
const (
eventsSection = "unified/events"
)
// eventStore is a store for events.
type eventStore struct {
kv KV
}
type EventKey struct {
Namespace string
Group string
Resource string
Name string
ResourceVersion int64
}
func (k EventKey) String() string {
return fmt.Sprintf("%d~%s~%s~%s~%s", k.ResourceVersion, k.Namespace, k.Group, k.Resource, k.Name)
}
func (k EventKey) Validate() error {
if k.Namespace == "" {
return fmt.Errorf("namespace cannot be empty")
}
if k.Group == "" {
return fmt.Errorf("group cannot be empty")
}
if k.Resource == "" {
return fmt.Errorf("resource cannot be empty")
}
if k.Name == "" {
return fmt.Errorf("name cannot be empty")
}
if k.ResourceVersion < 0 {
return fmt.Errorf("resource version must be non-negative")
}
// Validate each field against the naming rules (reusing the regex from datastore.go)
if !validNameRegex.MatchString(k.Namespace) {
return fmt.Errorf("namespace '%s' is invalid", k.Namespace)
}
if !validNameRegex.MatchString(k.Group) {
return fmt.Errorf("group '%s' is invalid", k.Group)
}
if !validNameRegex.MatchString(k.Resource) {
return fmt.Errorf("resource '%s' is invalid", k.Resource)
}
if !validNameRegex.MatchString(k.Name) {
return fmt.Errorf("name '%s' is invalid", k.Name)
}
return nil
}
type Event struct {
Namespace string `json:"namespace"`
Group string `json:"group"`
Resource string `json:"resource"`
Name string `json:"name"`
ResourceVersion int64 `json:"resource_version"`
Action DataAction `json:"action"`
Folder string `json:"folder"`
PreviousRV int64 `json:"previous_rv"`
}
func newEventStore(kv KV) *eventStore {
return &eventStore{
kv: kv,
}
}
// ParseEventKey parses a key string back into an EventKey struct
func ParseEventKey(key string) (EventKey, error) {
parts := strings.Split(key, "~")
if len(parts) != 5 {
return EventKey{}, fmt.Errorf("invalid key format: expected 5 parts, got %d", len(parts))
}
rv, err := strconv.ParseInt(parts[0], 10, 64)
if err != nil {
return EventKey{}, fmt.Errorf("invalid resource version: %w", err)
}
return EventKey{
ResourceVersion: rv,
Namespace: parts[1],
Group: parts[2],
Resource: parts[3],
Name: parts[4],
}, nil
}
// LastEventKey returns the Event Key of the event with the highest resource version.
// If no events are found, it returns ErrNotFound.
func (n *eventStore) LastEventKey(ctx context.Context) (EventKey, error) {
for key, err := range n.kv.Keys(ctx, eventsSection, ListOptions{Sort: SortOrderDesc, Limit: 1}) {
if err != nil {
return EventKey{}, err
}
eventKey, err := ParseEventKey(key)
if err != nil {
return EventKey{}, err
}
return eventKey, nil
}
return EventKey{}, ErrNotFound
}
// Save an event to the store.
func (n *eventStore) Save(ctx context.Context, event Event) error {
eventKey := EventKey{
Namespace: event.Namespace,
Group: event.Group,
Resource: event.Resource,
Name: event.Name,
ResourceVersion: event.ResourceVersion,
}
if err := eventKey.Validate(); err != nil {
return fmt.Errorf("invalid event key: %w", err)
}
var buf bytes.Buffer
encoder := json.NewEncoder(&buf)
if err := encoder.Encode(event); err != nil {
return err
}
return n.kv.Save(ctx, eventsSection, eventKey.String(), &buf)
}
func (n *eventStore) Get(ctx context.Context, key EventKey) (Event, error) {
if err := key.Validate(); err != nil {
return Event{}, fmt.Errorf("invalid event key: %w", err)
}
obj, err := n.kv.Get(ctx, eventsSection, key.String())
if err != nil {
return Event{}, err
}
var event Event
if err = json.NewDecoder(obj.Value).Decode(&event); err != nil {
_ = obj.Value.Close()
return Event{}, err
}
defer func() { _ = obj.Value.Close() }()
return event, nil
}
// ListSince returns a sequence of events since the given resource version.
func (n *eventStore) ListSince(ctx context.Context, sinceRV int64) iter.Seq2[Event, error] {
opts := ListOptions{
Sort: SortOrderAsc,
StartKey: EventKey{
ResourceVersion: sinceRV,
}.String(),
}
return func(yield func(Event, error) bool) {
for key, err := range n.kv.Keys(ctx, eventsSection, opts) {
if err != nil {
return
}
obj, err := n.kv.Get(ctx, eventsSection, key)
if err != nil {
return
}
var event Event
if err := json.NewDecoder(obj.Value).Decode(&event); err != nil {
return
}
if !yield(event, nil) {
return
}
}
}
}

@ -0,0 +1,398 @@
package resource
import (
"context"
"encoding/json"
"testing"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupTestEventStore(t *testing.T) *eventStore {
db := setupTestBadgerDB(t)
t.Cleanup(func() {
err := db.Close()
require.NoError(t, err)
})
kv := NewBadgerKV(db)
return newEventStore(kv)
}
func TestNewEventStore(t *testing.T) {
store := setupTestEventStore(t)
assert.NotNil(t, store.kv)
}
func TestEventKey_String(t *testing.T) {
tests := []struct {
name string
eventKey EventKey
expected string
}{
{
name: "basic event key",
eventKey: EventKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1000,
},
expected: "1000~default~apps~resource~test-resource",
},
{
name: "empty namespace",
eventKey: EventKey{
Namespace: "",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 2000,
},
expected: "2000~~apps~resource~test-resource",
},
{
name: "special characters in name",
eventKey: EventKey{
Namespace: "test-ns",
Group: "apps",
Resource: "resource",
Name: "test-resource-with-dashes",
ResourceVersion: 3000,
},
expected: "3000~test-ns~apps~resource~test-resource-with-dashes",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := tt.eventKey.String()
assert.Equal(t, tt.expected, result)
})
}
}
func TestEventKey_Validate(t *testing.T) {
tests := []struct {
name string
key string
expected EventKey
expectError bool
}{
{
name: "valid key",
key: "1000~default~apps~resource~test-resource",
expected: EventKey{
ResourceVersion: 1000,
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
},
},
{
name: "empty namespace",
key: "2000~~apps~resource~test-resource",
expected: EventKey{
ResourceVersion: 2000,
Namespace: "",
Group: "apps",
Resource: "resource",
Name: "test-resource",
},
},
{
name: "special characters in name",
key: "3000~test-ns~apps~resource~test-resource-with-dashes",
expected: EventKey{
ResourceVersion: 3000,
Namespace: "test-ns",
Group: "apps",
Resource: "resource",
Name: "test-resource-with-dashes",
},
},
{
name: "invalid key - too few parts",
key: "1000~default~apps~resource",
expectError: true,
},
{
name: "invalid key - too many parts",
key: "1000~default~apps~resource~test~extra",
expectError: true,
},
{
name: "invalid resource version",
key: "invalid~default~apps~resource~test",
expectError: true,
},
{
name: "empty key",
key: "",
expectError: true,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result, err := ParseEventKey(tt.key)
if tt.expectError {
assert.Error(t, err)
assert.Equal(t, EventKey{}, result)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expected, result)
}
})
}
}
func TestEventStore_ParseEventKey(t *testing.T) {
originalKey := EventKey{
ResourceVersion: 1234567890,
Namespace: "test-namespace",
Group: "apps",
Resource: "resource",
Name: "test-resource",
}
// Convert to string and back
keyString := originalKey.String()
parsedKey, err := ParseEventKey(keyString)
require.NoError(t, err)
assert.Equal(t, originalKey, parsedKey)
}
func TestEventStore_Save_Get(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
event := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 999,
}
// Save the event
err := store.Save(ctx, event)
require.NoError(t, err)
// Get the event back
eventKey := EventKey{
Namespace: event.Namespace,
Group: event.Group,
Resource: event.Resource,
Name: event.Name,
ResourceVersion: event.ResourceVersion,
}
retrievedEvent, err := store.Get(ctx, eventKey)
require.NoError(t, err)
assert.Equal(t, event, retrievedEvent)
}
func TestEventStore_Get_NotFound(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
nonExistentKey := EventKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "non-existent",
ResourceVersion: 9999,
}
_, err := store.Get(ctx, nonExistentKey)
assert.Error(t, err)
}
func TestEventStore_LastEventKey(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
// Test when no events exist
_, err := store.LastEventKey(ctx)
assert.Error(t, err)
assert.Equal(t, ErrNotFound, err)
// Add some events with different resource versions
events := []Event{
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-1",
ResourceVersion: 1000,
Action: DataActionCreated,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-2",
ResourceVersion: 3000, // highest
Action: DataActionCreated,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-3",
ResourceVersion: 2000,
Action: DataActionCreated,
},
}
// Save all events
for _, event := range events {
err := store.Save(ctx, event)
require.NoError(t, err)
}
// Get the last event key (should be the one with highest RV)
lastKey, err := store.LastEventKey(ctx)
require.NoError(t, err)
expectedKey := EventKey{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-2",
ResourceVersion: 3000,
}
assert.Equal(t, expectedKey, lastKey)
}
func TestEventStore_ListSince(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
// Add events with different resource versions
events := []Event{
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-1",
ResourceVersion: 1000,
Action: DataActionCreated,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-2",
ResourceVersion: 2000,
Action: DataActionUpdated,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-3",
ResourceVersion: 3000,
Action: DataActionDeleted,
},
}
// Save all events
for _, event := range events {
err := store.Save(ctx, event)
require.NoError(t, err)
}
// List events since RV 1500 (should get events with RV 2000 and 3000)
retrievedEvents := make([]Event, 0, 2)
for event, err := range store.ListSince(ctx, 1500) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
// Should return events in descending order of resource version
require.Len(t, retrievedEvents, 2)
assert.Equal(t, int64(2000), retrievedEvents[0].ResourceVersion)
assert.Equal(t, int64(3000), retrievedEvents[1].ResourceVersion)
}
func TestEventStore_ListSince_Empty(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
// List events when store is empty
retrievedEvents := make([]Event, 0)
for event, err := range store.ListSince(ctx, 0) {
require.NoError(t, err)
retrievedEvents = append(retrievedEvents, event)
}
assert.Empty(t, retrievedEvents)
}
func TestEvent_JSONSerialization(t *testing.T) {
event := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 999,
}
// Serialize to JSON
data, err := json.Marshal(event)
require.NoError(t, err)
// Deserialize from JSON
var deserializedEvent Event
err = json.Unmarshal(data, &deserializedEvent)
require.NoError(t, err)
assert.Equal(t, event, deserializedEvent)
}
func TestEventKey_Struct(t *testing.T) {
key := EventKey{
Namespace: "test-namespace",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1234567890,
}
assert.Equal(t, "test-namespace", key.Namespace)
assert.Equal(t, "apps", key.Group)
assert.Equal(t, "resource", key.Resource)
assert.Equal(t, "test-resource", key.Name)
assert.Equal(t, int64(1234567890), key.ResourceVersion)
}
func TestEventStore_Save_InvalidJSON(t *testing.T) {
ctx := context.Background()
store := setupTestEventStore(t)
// This should work fine as the Event struct should be serializable
event := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test",
ResourceVersion: 1000,
Action: DataActionCreated,
}
err := store.Save(ctx, event)
assert.NoError(t, err)
}

@ -0,0 +1,120 @@
package resource
import (
"context"
"errors"
"fmt"
"github.com/grafana/grafana-app-sdk/logging"
gocache "github.com/patrickmn/go-cache"
"time"
)
const (
defaultLookbackPeriod = 30 * time.Second
defaultPollInterval = 100 * time.Millisecond
defaultEventCacheSize = 10000
defaultBufferSize = 10000
)
type notifier struct {
eventStore *eventStore
log logging.Logger
}
type notifierOptions struct {
log logging.Logger
}
type watchOptions struct {
LookbackPeriod time.Duration // How far back to look for events
PollInterval time.Duration // How often to poll for new events
BufferSize int // How many events to buffer
}
func defaultWatchOptions() watchOptions {
return watchOptions{
LookbackPeriod: defaultLookbackPeriod,
PollInterval: defaultPollInterval,
BufferSize: defaultBufferSize,
}
}
func newNotifier(eventStore *eventStore, opts notifierOptions) *notifier {
if opts.log == nil {
opts.log = &logging.NoOpLogger{}
}
return &notifier{eventStore: eventStore, log: opts.log}
}
// Return the last resource version from the event store
func (n *notifier) lastEventResourceVersion(ctx context.Context) (int64, error) {
e, err := n.eventStore.LastEventKey(ctx)
if err != nil {
return 0, err
}
return e.ResourceVersion, nil
}
func (n *notifier) cacheKey(evt Event) string {
return fmt.Sprintf("%s~%s~%s~%s~%d", evt.Namespace, evt.Group, evt.Resource, evt.Name, evt.ResourceVersion)
}
func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
if opts.PollInterval <= 0 {
opts.PollInterval = defaultPollInterval
}
cacheTTL := opts.LookbackPeriod
cacheCleanupInterval := 2 * opts.LookbackPeriod
cache := gocache.New(cacheTTL, cacheCleanupInterval)
events := make(chan Event, opts.BufferSize)
initialRV, err := n.lastEventResourceVersion(ctx)
if errors.Is(err, ErrNotFound) {
initialRV = 0 // No events yet, start from the beginning
} else if err != nil {
n.log.Error("Failed to get last event resource version", "error", err)
}
lastRV := initialRV + 1 // We want to start watching from the next event
go func() {
defer close(events)
for {
select {
case <-ctx.Done():
return
case <-time.After(opts.PollInterval):
for evt, err := range n.eventStore.ListSince(ctx, lastRV-opts.LookbackPeriod.Nanoseconds()) {
if err != nil {
n.log.Error("Failed to list events since", "error", err)
continue
}
// Skip old events lower than the requested resource version
if evt.ResourceVersion <= initialRV {
continue
}
// Skip if the event is already sent
if _, found := cache.Get(n.cacheKey(evt)); found {
continue
}
if evt.ResourceVersion > lastRV {
lastRV = evt.ResourceVersion + 1
}
// Send the event
select {
case events <- evt:
cache.Set(n.cacheKey(evt), true, opts.LookbackPeriod)
case <-ctx.Done():
return
}
}
}
}
}()
return events
}

@ -0,0 +1,442 @@
package resource
import (
"context"
"testing"
"time"
"github.com/grafana/grafana-app-sdk/logging"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
func setupTestNotifier(t *testing.T) (*notifier, *eventStore) {
db := setupTestBadgerDB(t)
t.Cleanup(func() {
err := db.Close()
require.NoError(t, err)
})
kv := NewBadgerKV(db)
eventStore := newEventStore(kv)
notifier := newNotifier(eventStore, notifierOptions{log: &logging.NoOpLogger{}})
return notifier, eventStore
}
func TestNewNotifier(t *testing.T) {
notifier, _ := setupTestNotifier(t)
assert.NotNil(t, notifier.eventStore)
}
func TestDefaultWatchOptions(t *testing.T) {
opts := defaultWatchOptions()
assert.Equal(t, defaultLookbackPeriod, opts.LookbackPeriod)
assert.Equal(t, defaultPollInterval, opts.PollInterval)
assert.Equal(t, defaultBufferSize, opts.BufferSize)
}
func TestNotifier_lastEventResourceVersion(t *testing.T) {
ctx := context.Background()
notifier, eventStore := setupTestNotifier(t)
// Test with no events
rv, err := notifier.lastEventResourceVersion(ctx)
assert.Error(t, err)
assert.ErrorIs(t, ErrNotFound, err)
assert.Equal(t, int64(0), rv)
// Save an event
event := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 999,
}
err = eventStore.Save(ctx, event)
require.NoError(t, err)
// Test with events
rv, err = notifier.lastEventResourceVersion(ctx)
require.NoError(t, err)
assert.Equal(t, int64(1000), rv)
// Save another event with higher RV
event2 := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-2",
ResourceVersion: 2000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 1000,
}
err = eventStore.Save(ctx, event2)
require.NoError(t, err)
// Should return the higher RV
rv, err = notifier.lastEventResourceVersion(ctx)
require.NoError(t, err)
assert.Equal(t, int64(2000), rv)
}
func TestNotifier_cachekey(t *testing.T) {
notifier, _ := setupTestNotifier(t)
tests := []struct {
name string
event Event
expected string
}{
{
name: "basic event",
event: Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 1000,
},
expected: "default~apps~resource~test-resource~1000",
},
{
name: "empty namespace",
event: Event{
Namespace: "",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: 2000,
},
expected: "~apps~resource~test-resource~2000",
},
{
name: "special characters in name",
event: Event{
Namespace: "test-ns",
Group: "apps",
Resource: "resource",
Name: "test-resource-with-dashes",
ResourceVersion: 3000,
},
expected: "test-ns~apps~resource~test-resource-with-dashes~3000",
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := notifier.cacheKey(tt.event)
assert.Equal(t, tt.expected, result)
})
}
}
func TestNotifier_Watch_NoEvents(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 500*time.Millisecond)
defer cancel()
notifier, eventStore := setupTestNotifier(t)
// Add at least one event so that lastEventResourceVersion doesn't return ErrNotFound
initialEvent := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "initial-resource",
ResourceVersion: 100,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
}
err := eventStore.Save(ctx, initialEvent)
require.NoError(t, err)
opts := watchOptions{
LookbackPeriod: 100 * time.Millisecond,
PollInterval: 50 * time.Millisecond,
BufferSize: 10,
}
events := notifier.Watch(ctx, opts)
// Should receive no new events (only events after initial RV should be sent)
select {
case event := <-events:
t.Fatalf("Expected no events, but got: %+v", event)
case <-ctx.Done():
// Expected - context timeout
}
}
func TestNotifier_Watch_WithExistingEvents(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
notifier, eventStore := setupTestNotifier(t)
// Save some initial events
initialEvents := []Event{
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-1",
ResourceVersion: 1000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-2",
ResourceVersion: 2000,
Action: DataActionUpdated,
Folder: "test-folder",
PreviousRV: 1000,
},
}
for _, event := range initialEvents {
err := eventStore.Save(ctx, event)
require.NoError(t, err)
}
opts := watchOptions{
LookbackPeriod: 100 * time.Millisecond,
PollInterval: 50 * time.Millisecond,
BufferSize: 10,
}
// Start watching
events := notifier.Watch(ctx, opts)
// Save a new event after starting to watch
newEvent := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-3",
ResourceVersion: 3000,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 2000,
}
err := eventStore.Save(ctx, newEvent)
require.NoError(t, err)
// Should receive the new event
select {
case receivedEvent := <-events:
assert.Equal(t, newEvent.Name, receivedEvent.Name)
assert.Equal(t, newEvent.ResourceVersion, receivedEvent.ResourceVersion)
assert.Equal(t, newEvent.Action, receivedEvent.Action)
case <-time.After(500 * time.Millisecond):
t.Fatal("Expected to receive an event, but timed out")
}
}
func TestNotifier_Watch_EventDeduplication(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
defer cancel()
notifier, eventStore := setupTestNotifier(t)
// Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound
initialEvent := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "initial-resource",
ResourceVersion: time.Now().UnixNano(),
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
}
err := eventStore.Save(ctx, initialEvent)
require.NoError(t, err)
opts := watchOptions{
LookbackPeriod: time.Second,
PollInterval: 20 * time.Millisecond,
BufferSize: 10,
}
// Start watching
events := notifier.Watch(ctx, opts)
// Save an event
event := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource",
ResourceVersion: time.Now().UnixNano(),
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
}
err = eventStore.Save(ctx, event)
require.NoError(t, err)
// Should receive the event once
select {
case receivedEvent := <-events:
assert.Equal(t, event.Name, receivedEvent.Name)
assert.Equal(t, event.ResourceVersion, receivedEvent.ResourceVersion)
case <-time.After(200 * time.Millisecond):
t.Fatal("Expected to receive an event, but timed out")
}
// Should not receive the same event again (due to caching)
select {
case duplicateEvent := <-events:
t.Fatalf("Expected no duplicate events, but got: %+v", duplicateEvent)
case <-time.After(100 * time.Millisecond):
// Expected - no duplicate events
}
}
func TestNotifier_Watch_ContextCancellation(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
notifier, eventStore := setupTestNotifier(t)
// Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound
initialEvent := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "initial-resource",
ResourceVersion: time.Now().UnixNano(),
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
}
err := eventStore.Save(ctx, initialEvent)
require.NoError(t, err)
opts := watchOptions{
LookbackPeriod: 100 * time.Millisecond,
PollInterval: 20 * time.Millisecond,
BufferSize: 10,
}
events := notifier.Watch(ctx, opts)
// Cancel the context
cancel()
// Channel should be closed
select {
case event, ok := <-events:
if ok {
t.Fatalf("Expected channel to be closed, but got event: %+v", event)
}
// Channel is closed as expected
case <-time.After(100 * time.Millisecond):
t.Fatal("Expected channel to be closed quickly after context cancellation")
}
}
func TestNotifier_Watch_MultipleEvents(t *testing.T) {
ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
defer cancel()
notifier, eventStore := setupTestNotifier(t)
rv := time.Now().UnixNano()
// Add an initial event so that lastEventResourceVersion doesn't return ErrNotFound
initialEvent := Event{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "initial-resource",
ResourceVersion: rv,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
}
err := eventStore.Save(ctx, initialEvent)
require.NoError(t, err)
opts := watchOptions{
LookbackPeriod: time.Second,
PollInterval: 20 * time.Millisecond,
BufferSize: 10,
}
// Start watching
events := notifier.Watch(ctx, opts)
// Save multiple events
testEvents := []Event{
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-1",
ResourceVersion: rv + 1,
Action: DataActionCreated,
Folder: "test-folder",
PreviousRV: 0,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-2",
ResourceVersion: rv + 3,
Action: DataActionUpdated,
Folder: "test-folder",
PreviousRV: 1000,
},
{
Namespace: "default",
Group: "apps",
Resource: "resource",
Name: "test-resource-3",
ResourceVersion: rv + 2, // Out of order on purpose
Action: DataActionDeleted,
Folder: "test-folder",
PreviousRV: 2000,
},
}
go func() {
for _, event := range testEvents {
err := eventStore.Save(ctx, event)
require.NoError(t, err)
}
}()
// Receive events
receivedEvents := make([]Event, 0, len(testEvents))
for i := 0; i < len(testEvents); i++ {
select {
case event := <-events:
receivedEvents = append(receivedEvents, event)
case <-time.After(1 * time.Second):
t.Fatalf("Timed out waiting for event %d", i+1)
}
}
// Verify all events were received
assert.Len(t, receivedEvents, len(testEvents))
// Verify the events match and ordered by resource version
receivedNames := make([]string, len(receivedEvents))
for i, event := range receivedEvents {
receivedNames[i] = event.Name
}
expectedNames := []string{"test-resource-1", "test-resource-2", "test-resource-3"}
assert.ElementsMatch(t, expectedNames, receivedNames)
}
Loading…
Cancel
Save