chore(memory): prevent concurrent use of Allocator (#20598)

pull/20570/head
Robert Fratto 3 months ago committed by GitHub
parent 9ccf55d0ff
commit 8a5e69648a
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 40
      pkg/memory/allocator.go
  2. 25
      pkg/memory/allocator_concurrent_test.go

@ -1,14 +1,21 @@
package memory
import (
"errors"
"unsafe"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/pkg/memory/internal/memalign"
)
var errConcurrentUse = errors.New("detected concurrent use of allocator")
// Allocator is an arena-style memory allocator that manages a set of memory
// regions.
//
// An Allocator must not be copied after first use.
//
// The Allocator can be reset to reclaim memory regions, marking them as free
// for future calls to [Allocator.Allocate].
//
@ -16,8 +23,12 @@ import (
// Callers must take care to ensure that the lifetime of a returned Memory does
// not exceed the lifetime of an allocator reuse cycle.
//
// The zero value of Allocator is ready for use.
// Allocators are not goroutine safe. If an Allocator methods are called
// concurrently, the method will panic.
type Allocator struct {
// locked is set to true when the allocator is in use.
locked atomic.Bool
parent *Allocator
regions []*Region
@ -40,6 +51,9 @@ func MakeAllocator(parent *Allocator) *Allocator {
// bytes. If there is no such free Memory region, a new memory region will be
// created.
func (alloc *Allocator) Allocate(size int) *Region {
alloc.lock()
defer alloc.unlock()
// Iterate over the set bits in the freelist. Each set bit indicates an
// available memory region.
for i := range alloc.avail.IterValues(true) {
@ -64,6 +78,18 @@ func (alloc *Allocator) Allocate(size int) *Region {
return region
}
func (alloc *Allocator) lock() {
if !alloc.locked.CompareAndSwap(false, true) {
panic(errConcurrentUse)
}
}
func (alloc *Allocator) unlock() {
if !alloc.locked.CompareAndSwap(true, false) {
panic(errConcurrentUse)
}
}
// addRegion inserts a new region into alloc's list of regions, taking
// ownership over it.
func (alloc *Allocator) addRegion(region *Region, free bool) {
@ -107,6 +133,9 @@ func (alloc *Allocator) Reset() {
//
// If Trim is called after Reclaim, all memory regions will be released.
func (alloc *Allocator) Trim() {
alloc.lock()
defer alloc.unlock()
for i := range alloc.used.IterValues(false) {
region := alloc.regions[i]
if region == nil {
@ -146,12 +175,18 @@ func (alloc *Allocator) Free() {
// Reclaim all memory regions back to the Allocator for reuse. After calling
// Reclaim, any [Region] returned by the Allocator or any child allocators must no longer be used.
func (alloc *Allocator) Reclaim() {
alloc.lock()
defer alloc.unlock()
alloc.avail.SetRange(0, len(alloc.regions), true)
alloc.used.SetRange(0, len(alloc.regions), false)
}
// AllocatedBytes returns the total amount of bytes owned by the Allocator.
func (alloc *Allocator) AllocatedBytes() int {
alloc.lock()
defer alloc.unlock()
var sum int
for _, region := range alloc.regions {
if region == nil {
@ -165,6 +200,9 @@ func (alloc *Allocator) AllocatedBytes() int {
// FreeBytes returns the total amount of bytes available for Memory to use
// without requiring additional allocations.
func (alloc *Allocator) FreeBytes() int {
alloc.lock()
defer alloc.unlock()
var sum int
for i := range alloc.avail.IterValues(true) {
region := alloc.regions[i]

@ -0,0 +1,25 @@
package memory
import (
"testing"
"github.com/stretchr/testify/require"
)
func TestAllocator_lock(t *testing.T) {
var alloc Allocator
t.Run("can lock and unlock without panic", func(t *testing.T) {
require.NotPanics(t, func() {
alloc.lock()
alloc.unlock()
})
})
t.Run("cannot double-lock", func(t *testing.T) {
require.PanicsWithValue(t, errConcurrentUse, func() {
alloc.lock()
alloc.lock()
})
})
}
Loading…
Cancel
Save