|
|
|
@ -69,8 +69,9 @@ type cdkBackend struct { |
|
|
|
|
nextRV NextResourceVersion |
|
|
|
|
mutex sync.Mutex |
|
|
|
|
|
|
|
|
|
// Typically one... the server wrapper
|
|
|
|
|
subscribers []chan *WrittenEvent |
|
|
|
|
// Simple watch stream -- NOTE, this only works for single tenant!
|
|
|
|
|
broadcaster Broadcaster[*WrittenEvent] |
|
|
|
|
stream chan<- *WrittenEvent |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string { |
|
|
|
@ -123,24 +124,19 @@ func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64 |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Async notify all subscribers
|
|
|
|
|
if s.subscribers != nil { |
|
|
|
|
if s.stream != nil { |
|
|
|
|
go func() { |
|
|
|
|
write := &WrittenEvent{ |
|
|
|
|
WriteEvent: event, |
|
|
|
|
|
|
|
|
|
WriteEvent: event, |
|
|
|
|
Timestamp: time.Now().UnixMilli(), |
|
|
|
|
ResourceVersion: rv, |
|
|
|
|
} |
|
|
|
|
for _, sub := range s.subscribers { |
|
|
|
|
sub <- write |
|
|
|
|
} |
|
|
|
|
s.stream <- write |
|
|
|
|
}() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return rv, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Read implements ResourceStoreServer.
|
|
|
|
|
func (s *cdkBackend) Read(ctx context.Context, req *ReadRequest) (*ReadResponse, error) { |
|
|
|
|
rv := req.ResourceVersion |
|
|
|
|
|
|
|
|
@ -191,7 +187,6 @@ func isDeletedMarker(raw []byte) bool { |
|
|
|
|
return false |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// List implements AppendingStore.
|
|
|
|
|
func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) (*ListResponse, error) { |
|
|
|
|
resources, err := buildTree(ctx, s, req.Options.Key) |
|
|
|
|
if err != nil { |
|
|
|
@ -215,36 +210,21 @@ func (s *cdkBackend) PrepareList(ctx context.Context, req *ListRequest) (*ListRe |
|
|
|
|
return rsp, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Watch implements AppendingStore.
|
|
|
|
|
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) { |
|
|
|
|
stream := make(chan *WrittenEvent, 10) |
|
|
|
|
{ |
|
|
|
|
s.mutex.Lock() |
|
|
|
|
defer s.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
// Add the event stream
|
|
|
|
|
s.subscribers = append(s.subscribers, stream) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Wait for context done
|
|
|
|
|
go func() { |
|
|
|
|
// Wait till the context is done
|
|
|
|
|
<-ctx.Done() |
|
|
|
|
|
|
|
|
|
// Then remove the subscription
|
|
|
|
|
s.mutex.Lock() |
|
|
|
|
defer s.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
// Copy all streams without our listener
|
|
|
|
|
subs := []chan *WrittenEvent{} |
|
|
|
|
for _, sub := range s.subscribers { |
|
|
|
|
if sub != stream { |
|
|
|
|
subs = append(subs, sub) |
|
|
|
|
} |
|
|
|
|
s.mutex.Lock() |
|
|
|
|
defer s.mutex.Unlock() |
|
|
|
|
|
|
|
|
|
if s.broadcaster == nil { |
|
|
|
|
var err error |
|
|
|
|
s.broadcaster, err = NewBroadcaster(context.Background(), func(c chan<- *WrittenEvent) error { |
|
|
|
|
s.stream = c |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
s.subscribers = subs |
|
|
|
|
}() |
|
|
|
|
return stream, nil |
|
|
|
|
} |
|
|
|
|
return s.broadcaster.Subscribe(ctx) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// group > resource > namespace > name > versions
|
|
|
|
|