Reverts flush buffer pooling. (#3210)

Currently this requires more work, as the cache is asynchronously using the buffer when flushing chunks.

We still benefits from other improvements like computing the right size for a chunk.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/3217/head
Cyril Tovena 5 years ago committed by GitHub
parent ba0df61531
commit e74eb549ba
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      pkg/ingester/flush.go
  2. 24
      pkg/ingester/flush_test.go
  3. 4
      pkg/util/pool/bytesbuffer.go
  4. 17
      pkg/util/pool/bytesbuffer_test.go

@ -323,12 +323,6 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
metric := labelsBuilder.Labels()
wireChunks := make([]chunk.Chunk, len(cs))
buffers := make([]*bytes.Buffer, len(cs))
defer func() {
for j := range buffers {
chunksBufferPool.Put(buffers[j])
}
}()
// use anonymous function to make lock releasing simpler.
err = func() error {
@ -349,13 +343,11 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
)
chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header
buffer := chunksBufferPool.Get(chunkSize)
start := time.Now()
if err := ch.EncodeTo(buffer); err != nil {
if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil {
return err
}
chunkEncodeTime.Observe(time.Since(start).Seconds())
buffers[j] = buffer
wireChunks[j] = ch
}
return nil

@ -97,6 +97,25 @@ func Benchmark_FlushLoop(b *testing.B) {
}
}
func Test_Flush(t *testing.T) {
var (
store, ing = newTestStore(t, defaultIngesterTestConfig(t), nil)
lbs = makeRandomLabels()
ctx = user.InjectOrgID(context.Background(), "foo")
)
store.onPut = func(ctx context.Context, chunks []chunk.Chunk) error {
for _, c := range chunks {
buf, err := c.Encoded()
require.Nil(t, err)
if err := c.Decode(chunk.NewDecodeContext(), buf); err != nil {
return err
}
}
return nil
}
require.NoError(t, ing.flushChunks(ctx, 0, lbs, buildChunkDecs(t), &sync.RWMutex{}))
}
func buildChunkDecs(t testing.TB) []*chunkDesc {
res := make([]*chunkDesc, 10)
for i := range res {
@ -227,6 +246,7 @@ type testStore struct {
mtx sync.Mutex
// Chunks keyed by userID.
chunks map[string][]chunk.Chunk
onPut func(ctx context.Context, chunks []chunk.Chunk) error
}
// Note: the ingester New() function creates it's own WAL first which we then override if specified.
@ -275,7 +295,9 @@ func defaultIngesterTestConfig(t testing.TB) Config {
func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()
if s.onPut != nil {
return s.onPut(ctx, chunks)
}
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return err

@ -44,13 +44,13 @@ func (p *BufferPool) Get(sz int) *bytes.Buffer {
}
b := p.buckets[i].Get()
if b == nil {
b = bytes.NewBuffer(make([]byte, bktSize))
b = bytes.NewBuffer(make([]byte, 0, bktSize))
}
buf := b.(*bytes.Buffer)
buf.Reset()
return b.(*bytes.Buffer)
}
return bytes.NewBuffer(make([]byte, sz))
return bytes.NewBuffer(make([]byte, 0, sz))
}
// Put adds a byte buffer to the right bucket in the pool.

@ -0,0 +1,17 @@
package pool
import (
"testing"
"github.com/stretchr/testify/require"
)
func Test_ZeroBuffer(t *testing.T) {
p := NewBuffer(2, 10, 2)
require.Equal(t, 0, p.Get(1).Len())
require.Equal(t, 0, p.Get(1).Len())
require.Equal(t, 0, p.Get(2).Len())
require.Equal(t, 0, p.Get(2).Len())
require.Equal(t, 0, p.Get(20).Len())
require.Equal(t, 0, p.Get(20).Len())
}
Loading…
Cancel
Save