Use prometheus pool for line buffer. (#790)

* Use prometheus pool for line buffer.
pull/792/head
Cyril Tovena 6 years ago committed by GitHub
parent f289384a1c
commit cb7c8f6233
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 9
      Gopkg.lock
  2. 24
      pkg/chunkenc/gzip.go
  3. 27
      pkg/chunkenc/pool.go
  4. 87
      vendor/github.com/prometheus/prometheus/pkg/pool/pool.go

9
Gopkg.lock generated

@ -904,7 +904,7 @@
[[projects]]
branch = "master"
digest = "1:5fc7e79142258f1ce47429c00fedb9ee04d8562e7fd477f2d9ae28a692e80969"
digest = "1:43edb14ccbe0d18f38bd55511250c39ccc3917c64a514a0e791408ac19a71558"
name = "github.com/prometheus/prometheus"
packages = [
"discovery",
@ -925,6 +925,7 @@
"pkg/gate",
"pkg/labels",
"pkg/modtimevfs",
"pkg/pool",
"pkg/relabel",
"pkg/textparse",
"pkg/timestamp",
@ -1608,7 +1609,9 @@
"github.com/pkg/errors",
"github.com/prometheus/client_golang/prometheus",
"github.com/prometheus/client_golang/prometheus/promauto",
"github.com/prometheus/client_golang/prometheus/promhttp",
"github.com/prometheus/client_golang/prometheus/testutil",
"github.com/prometheus/client_model/go",
"github.com/prometheus/common/config",
"github.com/prometheus/common/expfmt",
"github.com/prometheus/common/model",
@ -1618,6 +1621,7 @@
"github.com/prometheus/prometheus/discovery/targetgroup",
"github.com/prometheus/prometheus/pkg/labels",
"github.com/prometheus/prometheus/pkg/modtimevfs",
"github.com/prometheus/prometheus/pkg/pool",
"github.com/prometheus/prometheus/pkg/relabel",
"github.com/prometheus/prometheus/pkg/textparse",
"github.com/prometheus/prometheus/promql",
@ -1637,8 +1641,9 @@
"github.com/weaveworks/common/user",
"golang.org/x/net/context",
"google.golang.org/grpc",
"google.golang.org/grpc/codes",
"google.golang.org/grpc/health/grpc_health_v1",
"google.golang.org/grpc/metadata",
"google.golang.org/grpc/status",
"gopkg.in/alecthomas/kingpin.v2",
"gopkg.in/fsnotify.v1",
"gopkg.in/yaml.v2",

@ -476,8 +476,8 @@ type bufferedIterator struct {
err error
buf *bytes.Buffer // The buffer for a single entry.
decBuf []byte // The buffer for decoding the lengths.
buf []byte // The buffer for a single entry.
decBuf []byte // The buffer for decoding the lengths.
closed bool
@ -491,7 +491,6 @@ func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *b
reader: r,
pool: pool,
filter: filter,
buf: BytesBufferPool.Get(),
decBuf: make([]byte, binary.MaxVarintLen64),
}
}
@ -529,25 +528,32 @@ func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
return 0, nil, false
}
}
lineSize := int(l)
if si.buf.Cap() < int(l) {
si.buf.Grow(int(l) - si.buf.Cap())
// If the buffer is not yet initialize or too small, we get a new one.
if si.buf == nil || lineSize > cap(si.buf) {
// in case of a replacement we replace back the buffer in the pool
if si.buf != nil {
BytesBufferPool.Put(si.buf)
}
si.buf = BytesBufferPool.Get(lineSize).([]byte)
}
n, err := si.s.Read(si.buf.Bytes()[:l])
// Then process reading the line.
n, err := si.s.Read(si.buf[:lineSize])
if err != nil && err != io.EOF {
si.err = err
return 0, nil, false
}
for n < int(l) {
r, err := si.s.Read(si.buf.Bytes()[n:l])
for n < lineSize {
r, err := si.s.Read(si.buf[n:lineSize])
if err != nil {
si.err = err
return 0, nil, false
}
n += r
}
return ts, si.buf.Bytes()[:l], true
return ts, si.buf[:lineSize], true
}
func (si *bufferedIterator) Entry() logproto.Entry {

@ -2,11 +2,11 @@ package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"io"
"sync"
"github.com/prometheus/prometheus/pkg/pool"
)
// CompressionPool is a pool of CompressionWriter and CompressionReader
@ -28,7 +28,8 @@ var (
},
}
// BytesBufferPool is a bytes buffer used for lines decompressed.
BytesBufferPool = newBufferPoolWithSize(4096)
// Buckets [0.5KB,1KB,2KB,4KB,8KB]
BytesBufferPool = pool.New(1<<9, 1<<13, 2, func(size int) interface{} { return make([]byte, 0, size) })
)
// GzipPool is a gun zip compression pool
@ -92,23 +93,3 @@ func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
bufPool.pool.Put(b)
}
type bufferPool struct {
pool sync.Pool
}
func newBufferPoolWithSize(size int) *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} { return bytes.NewBuffer(make([]byte, size)) },
},
}
}
func (bp *bufferPool) Get() *bytes.Buffer {
return bp.pool.Get().(*bytes.Buffer)
}
func (bp *bufferPool) Put(b *bytes.Buffer) {
bp.pool.Put(b)
}

@ -0,0 +1,87 @@
// Copyright 2017 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package pool
import (
"fmt"
"reflect"
"sync"
)
// Pool is a bucketed pool for variably sized byte slices.
type Pool struct {
buckets []sync.Pool
sizes []int
// make is the function used to create an empty slice when none exist yet.
make func(int) interface{}
}
// New returns a new Pool with size buckets for minSize to maxSize
// increasing by the given factor.
func New(minSize, maxSize int, factor float64, makeFunc func(int) interface{}) *Pool {
if minSize < 1 {
panic("invalid minimum pool size")
}
if maxSize < 1 {
panic("invalid maximum pool size")
}
if factor < 1 {
panic("invalid factor")
}
var sizes []int
for s := minSize; s <= maxSize; s = int(float64(s) * factor) {
sizes = append(sizes, s)
}
p := &Pool{
buckets: make([]sync.Pool, len(sizes)),
sizes: sizes,
make: makeFunc,
}
return p
}
// Get returns a new byte slices that fits the given size.
func (p *Pool) Get(sz int) interface{} {
for i, bktSize := range p.sizes {
if sz > bktSize {
continue
}
b := p.buckets[i].Get()
if b == nil {
b = p.make(bktSize)
}
return b
}
return p.make(sz)
}
// Put adds a slice to the right bucket in the pool.
func (p *Pool) Put(s interface{}) {
slice := reflect.ValueOf(s)
if slice.Kind() != reflect.Slice {
panic(fmt.Sprintf("%+v is not a slice", slice))
}
for i, size := range p.sizes {
if slice.Cap() > size {
continue
}
p.buckets[i].Put(slice.Slice(0, 0).Interface())
return
}
}
Loading…
Cancel
Save