The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/util/debouncer/debouncer_test.go

217 lines
5.3 KiB

package debouncer
import (
"context"
"errors"
"sync"
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/stretchr/testify/require"
)
func TestDebouncer(t *testing.T) {
t.Run("should process values after min wait", func(t *testing.T) {
var processedMu sync.Mutex
processedValues := make(map[string]int)
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
processedMu.Lock()
processedValues[value]++
processedMu.Unlock()
return nil
},
MinWait: 10 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
require.NoError(t, group.Add("key1"))
require.NoError(t, group.Add("key2"))
// Should be deduplicated.
require.NoError(t, group.Add("key1"))
require.Eventually(t, func() bool {
// We should have processed key1 and key2 exactly once.
processedMu.Lock()
if processedValues["key1"] == 1 && processedValues["key2"] == 1 {
return true
}
processedMu.Unlock()
return false
}, time.Millisecond*200, time.Millisecond*20)
})
t.Run("should process values after max wait", func(t *testing.T) {
processed := make(map[string]int, 1)
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
processed[value]++
return nil
},
MinWait: 50 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
ticker := time.NewTicker(time.Millisecond * 40)
defer ticker.Stop()
start := time.Now()
for counter := 0; counter < 25; counter++ {
<-ticker.C
_ = group.Add("key1")
if processed["key1"] == 1 {
break
}
}
require.WithinDuration(t, start.Add(time.Millisecond*500), time.Now(), time.Millisecond*100)
})
t.Run("should handle buffer full", func(t *testing.T) {
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 1,
ProcessHandler: func(ctx context.Context, value string) error { return nil },
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
})
require.NoError(t, err)
require.NoError(t, group.Add("key1"))
// Buffer should be full by now as we are not reading from it yet.
require.ErrorIs(t, group.Add("key2"), ErrBufferFull)
})
t.Run("should track metrics", func(t *testing.T) {
var wg sync.WaitGroup
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
wg.Done()
return nil
},
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
Name: "test",
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
wg.Add(1)
require.NoError(t, group.Add("key1"))
require.NoError(t, group.Add("key1"))
wg.Wait()
require.Equal(t, float64(2), testutil.ToFloat64(group.metrics.itemsAddedCounter))
require.Equal(t, float64(1), testutil.ToFloat64(group.metrics.itemsProcessedCounter))
})
t.Run("should handle errors", func(t *testing.T) {
var (
wg sync.WaitGroup
errs = make(chan error, 10)
expectedErr = errors.New("test error")
)
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, value string) error {
wg.Done()
return expectedErr
},
MinWait: 10 * time.Millisecond,
MaxWait: 100 * time.Millisecond,
Reg: prometheus.NewPedanticRegistry(),
Name: "test_errors",
ErrorHandler: func(_ string, err error) { errs <- err },
})
require.NoError(t, err)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
wg.Add(1)
require.NoError(t, group.Add("key1"))
wg.Wait()
select {
case err := <-errs:
require.Equal(t, expectedErr, err)
default:
t.Fatal("expected error")
}
require.Equal(t, float64(1), testutil.ToFloat64(group.metrics.processingErrorsCounter))
})
t.Run("should gracefully handle stops", func(t *testing.T) {
// Create a channel to signal when processing is done.
done := make(chan struct{})
group, err := NewGroup(DebouncerOpts[string]{
BufferSize: 10,
ProcessHandler: func(ctx context.Context, item string) error {
// Start a goroutine to wait for context cancellation.
go func() {
<-ctx.Done()
close(done)
}()
return nil
},
MinWait: 50 * time.Millisecond,
MaxWait: 500 * time.Millisecond,
})
require.NoError(t, err)
// Start the group with a context
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
group.Start(ctx)
// Send an item to trigger processing.
require.NoError(t, group.Add("key-1"))
// Give the group a moment to process the item.
time.Sleep(100 * time.Millisecond)
// Stop the group, which should cancel the context.
group.Stop()
// Wait for the done signal or timeout.
select {
case <-done:
// Success - the group was stopped and the context was canceled
case <-time.After(time.Second):
t.Fatal("Timed out waiting for group to stop")
}
})
}