diff --git a/pkg/util/mempool/metrics.go b/pkg/util/mempool/metrics.go index af712ce737..90d48cef24 100644 --- a/pkg/util/mempool/metrics.go +++ b/pkg/util/mempool/metrics.go @@ -11,6 +11,7 @@ type metrics struct { availableBuffersPerSlab *prometheus.GaugeVec errorsCounter *prometheus.CounterVec accesses *prometheus.CounterVec + waitDuration *prometheus.HistogramVec } const ( @@ -41,5 +42,12 @@ func newMetrics(r prometheus.Registerer, name string) *metrics { Help: "The total amount of accesses to the pool.", ConstLabels: prometheus.Labels{"pool": name}, }, []string{"slab", "op"}), + waitDuration: promauto.With(r).NewHistogramVec(prometheus.HistogramOpts{ + Namespace: constants.Loki, + Subsystem: "mempool", + Name: "wait_duration_seconds", + Help: "Time spent waiting for obtaining buffer from slab.", + ConstLabels: prometheus.Labels{"pool": name}, + }, []string{"slab"}), } } diff --git a/pkg/util/mempool/pool.go b/pkg/util/mempool/pool.go index 4ac5b96062..e02bc62888 100644 --- a/pkg/util/mempool/pool.go +++ b/pkg/util/mempool/pool.go @@ -4,6 +4,7 @@ import ( "errors" "fmt" "sync" + "time" "unsafe" "github.com/dustin/go-humanize" @@ -51,15 +52,11 @@ func (s *slab) get(size int) ([]byte, error) { s.metrics.accesses.WithLabelValues(s.name, opTypeGet).Inc() s.once.Do(s.init) + waitStart := time.Now() // wait for available buffer on channel - var buf []byte - select { - case ptr := <-s.buffer: - buf = unsafe.Slice((*byte)(ptr), s.size) - default: - s.metrics.errorsCounter.WithLabelValues(s.name, reasonSlabExhausted).Inc() - return nil, errSlabExhausted - } + ptr := <-s.buffer + buf := unsafe.Slice((*byte)(ptr), s.size) + s.metrics.waitDuration.WithLabelValues(s.name).Observe(time.Since(waitStart).Seconds()) return buf[:size], nil } diff --git a/pkg/util/mempool/pool_test.go b/pkg/util/mempool/pool_test.go index 289479078d..955d5e5dfd 100644 --- a/pkg/util/mempool/pool_test.go +++ b/pkg/util/mempool/pool_test.go @@ -45,15 +45,26 @@ func TestMemPool(t *testing.T) { require.Equal(t, 512, cap(res)) }) - t.Run("pool returns error when no buffer is available", func(t *testing.T) { + t.Run("pool blocks when no buffer is available", func(t *testing.T) { pool := New("test", []Bucket{ {Size: 1, Capacity: 64}, }, nil) buf1, _ := pool.Get(32) require.Equal(t, 32, len(buf1)) + delay := 20 * time.Millisecond + start := time.Now() + + go func(p *MemPool) { + time.Sleep(delay) + p.Put(make([]byte, 16)) + }(pool) + _, err := pool.Get(16) - require.ErrorContains(t, err, errSlabExhausted.Error()) + duration := time.Since(start) + + require.NoError(t, err) + require.Greater(t, duration, delay) }) t.Run("test ring buffer returns same backing array", func(t *testing.T) { @@ -80,16 +91,17 @@ func TestMemPool(t *testing.T) { }) t.Run("concurrent access", func(t *testing.T) { + numWorkers := 256 + pool := New("test", []Bucket{ - {Size: 32, Capacity: 2 << 10}, - {Size: 16, Capacity: 4 << 10}, - {Size: 8, Capacity: 8 << 10}, - {Size: 4, Capacity: 16 << 10}, - {Size: 2, Capacity: 32 << 10}, + {Size: numWorkers, Capacity: 2 << 10}, + {Size: numWorkers, Capacity: 4 << 10}, + {Size: numWorkers, Capacity: 8 << 10}, + {Size: numWorkers, Capacity: 16 << 10}, + {Size: numWorkers, Capacity: 32 << 10}, }, nil) var wg sync.WaitGroup - numWorkers := 256 n := 10 for i := 0; i < numWorkers; i++ {