@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"github.com/grafana/dskit/backoff"
"github.com/grafana/grafana-app-sdk/logging"
gocache "github.com/patrickmn/go-cache"
@ -13,8 +14,8 @@ import (
const (
defaultLookbackPeriod = 30 * time . Second
defaultPollInterval = 100 * time . Millisecond
defaultEventCacheSize = 10000
defaultMinBackoff = 100 * time . Millisecond
defaultMaxBackoff = 5 * time . Second
defaultBufferSize = 10000
)
@ -29,15 +30,17 @@ type notifierOptions struct {
type watchOptions struct {
LookbackPeriod time . Duration // How far back to look for events
PollInterval time . Duration // How often to poll for new events
BufferSize int // How many events to buffer
MinBackoff time . Duration // Minimum interval between polling requests
MaxBackoff time . Duration // Maximum interval between polling requests
}
func defaultWatchOptions ( ) watchOptions {
return watchOptions {
LookbackPeriod : defaultLookbackPeriod ,
PollInterval : defaultPollInterval ,
BufferSize : defaultBufferSize ,
MinBackoff : defaultMinBackoff ,
MaxBackoff : defaultMaxBackoff ,
}
}
@ -62,9 +65,13 @@ func (n *notifier) cacheKey(evt Event) string {
}
func ( n * notifier ) Watch ( ctx context . Context , opts watchOptions ) <- chan Event {
if opts . PollInterval <= 0 {
opts . PollInterval = defaultPollInterval
if opts . MinBackoff <= 0 {
opts . MinBackoff = defaultMinBackoff
}
if opts . MaxBackoff <= 0 || opts . MaxBackoff <= opts . MinBackoff {
opts . MaxBackoff = defaultMaxBackoff
}
cacheTTL := opts . LookbackPeriod
cacheCleanupInterval := 2 * opts . LookbackPeriod
@ -81,11 +88,21 @@ func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
go func ( ) {
defer close ( events )
// Initialize backoff with minimum backoff interval
currentInterval := opts . MinBackoff
backoffConfig := backoff . Config {
MinBackoff : opts . MinBackoff ,
MaxBackoff : opts . MaxBackoff ,
MaxRetries : 0 , // infinite retries
}
bo := backoff . New ( ctx , backoffConfig )
for {
select {
case <- ctx . Done ( ) :
return
case <- time . After ( opts . PollInterval ) :
case <- time . After ( currentInterval ) :
foundEvents := false
for evt , err := range n . eventStore . ListSince ( ctx , subtractDurationFromSnowflake ( lastRV , opts . LookbackPeriod ) ) {
if err != nil {
n . log . Error ( "Failed to list events since" , "error" , err )
@ -102,6 +119,7 @@ func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
continue
}
foundEvents = true
if evt . ResourceVersion > lastRV {
lastRV = evt . ResourceVersion + 1
}
@ -113,6 +131,14 @@ func (n *notifier) Watch(ctx context.Context, opts watchOptions) <-chan Event {
return
}
}
// Apply backoff logic: reset to min when events are found, increase when no events
if foundEvents {
bo . Reset ( )
currentInterval = opts . MinBackoff
} else {
currentInterval = bo . NextDelay ( )
}
}
}
} ( )