Storage memory improvement (#713)

* add benchmark for storage queries
* improve iterator to load only on next
* fix memory retained by lazy chunks
* reverse backward lazy iterator
pull/768/head
Cyril Tovena 7 years ago committed by GitHub
parent 8b06eb66bf
commit 3346ce1b40
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      Gopkg.lock
  2. 4
      Makefile
  3. 3
      cmd/loki/loki-local-config.yaml
  4. 3
      pkg/chunkenc/dumb_chunk.go
  5. 2
      pkg/chunkenc/facade.go
  6. 165
      pkg/chunkenc/gzip.go
  7. 108
      pkg/chunkenc/gzip_test.go
  8. 3
      pkg/chunkenc/interface.go
  9. 43
      pkg/chunkenc/lazy_chunk.go
  10. 114
      pkg/chunkenc/pool.go
  11. 4
      pkg/ingester/chunk_test.go
  12. 11
      pkg/ingester/flush_test.go
  13. 8
      pkg/ingester/instance.go
  14. 5
      pkg/ingester/stream.go
  15. 4
      pkg/ingester/stream_test.go
  16. 19
      pkg/ingester/tailer.go
  17. 126
      pkg/iter/iterator.go
  18. 55
      pkg/logql/ast.go
  19. 3
      pkg/loki/loki.go
  20. 3
      pkg/loki/modules.go
  21. 11
      pkg/querier/querier.go
  22. 142
      pkg/storage/hack/main.go
  23. 100
      pkg/storage/store.go
  24. 163
      pkg/storage/store_test.go
  25. 25
      pkg/util/conv.go

2
Gopkg.lock generated

@ -1575,6 +1575,7 @@
"github.com/coreos/go-systemd/sdjournal",
"github.com/cortexproject/cortex/pkg/chunk",
"github.com/cortexproject/cortex/pkg/chunk/encoding",
"github.com/cortexproject/cortex/pkg/chunk/local",
"github.com/cortexproject/cortex/pkg/chunk/storage",
"github.com/cortexproject/cortex/pkg/ingester/client",
"github.com/cortexproject/cortex/pkg/ingester/index",
@ -1637,6 +1638,7 @@
"golang.org/x/net/context",
"google.golang.org/grpc",
"google.golang.org/grpc/health/grpc_health_v1",
"google.golang.org/grpc/metadata",
"gopkg.in/alecthomas/kingpin.v2",
"gopkg.in/fsnotify.v1",
"gopkg.in/yaml.v2",

@ -312,3 +312,7 @@ push-plugin: build-plugin
enable-plugin:
docker plugin enable grafana/loki-docker-driver:$(PLUGIN_TAG)
benchmark-store:
go run ./pkg/storage/hack/main.go
go test ./pkg/storage/ -bench=. -benchmem -memprofile memprofile.out -cpuprofile cpuprofile.out

@ -10,7 +10,8 @@ ingester:
kvstore:
store: inmemory
replication_factor: 1
chunk_idle_period: 15m
chunk_idle_period: 5m
chunk_retain_period: 30s
schema_config:
configs:

@ -6,6 +6,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)
const (
@ -51,7 +52,7 @@ func (c *dumbChunk) Size() int {
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction, _ logql.Filter) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})

@ -17,7 +17,7 @@ func init() {
})
}
// Facade for compatibility with cortex chunk type, so we can use it's chunk store.
// Facade for compatibility with cortex chunk type, so we can use its chunk store.
type Facade struct {
c Chunk
encoding.Chunk

@ -3,18 +3,15 @@ package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"hash"
"hash/crc32"
"io"
"math"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/pkg/errors"
)
@ -53,8 +50,7 @@ type MemChunk struct {
head *headBlock
encoding Encoding
cw func(w io.Writer) CompressionWriter
cr func(r io.Reader) (CompressionReader, error)
cPool CompressionPool
}
type block struct {
@ -96,10 +92,10 @@ func (hb *headBlock) append(ts int64, line string) error {
return nil
}
func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) {
func (hb *headBlock) serialise(pool CompressionPool) ([]byte, error) {
buf := &bytes.Buffer{}
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := cw(buf)
compressedWriter := pool.GetWriter(buf)
for _, logEntry := range hb.entries {
n := binary.PutVarint(encBuf, logEntry.t)
_, err := compressedWriter.Write(encBuf[:n])
@ -120,7 +116,7 @@ func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte,
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}
pool.PutWriter(compressedWriter)
return buf.Bytes(), nil
}
@ -136,18 +132,14 @@ func NewMemChunkSize(enc Encoding, blockSize int) *MemChunk {
blockSize: blockSize, // The blockSize in bytes.
blocks: []block{},
head: &headBlock{
mint: math.MaxInt64,
maxt: math.MinInt64,
},
head: &headBlock{},
encoding: enc,
}
switch enc {
case EncGZIP:
c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) }
c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }
c.cPool = &Gzip
default:
panic("unknown encoding")
}
@ -163,8 +155,8 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) },
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
}
db := decbuf{b: b}
@ -192,6 +184,7 @@ func NewByteChunk(b []byte) (*MemChunk, error) {
// Read the number of blocks.
num := db.uvarint()
bc.blocks = make([]block, 0, num)
for i := 0; i < num; i++ {
blk := block{}
@ -343,7 +336,7 @@ func (c *MemChunk) cut() error {
return nil
}
b, err := c.head.serialise(c.cw)
b, err := c.head.serialise(c.cPool)
if err != nil {
return err
}
@ -384,22 +377,19 @@ func (c *MemChunk) Bounds() (fromT, toT time.Time) {
}
// Iterator implements Chunk.
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks))
its := make([]iter.EntryIterator, 0, len(c.blocks)+1)
for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
it, err := b.iterator(c.cr)
if err != nil {
return nil, err
}
its = append(its, it)
its = append(its, b.iterator(c.cPool, filter))
}
}
its = append(its, c.head.iterator(mint, maxt))
if !c.head.isEmpty() {
its = append(its, c.head.iterator(mint, maxt, filter))
}
iterForward := iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(its, ""),
@ -414,21 +404,14 @@ func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction
return iter.NewEntryIteratorBackward(iterForward)
}
func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) {
func (b block) iterator(pool CompressionPool, filter logql.Filter) iter.EntryIterator {
if len(b.b) == 0 {
return emptyIterator, nil
}
r, err := cr(bytes.NewBuffer(b.b))
if err != nil {
return nil, err
return emptyIterator
}
s := bufio.NewReader(r)
return newBufferedIterator(s), nil
return newBufferedIterator(pool, b.b, filter)
}
func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
func (hb *headBlock) iterator(mint, maxt int64, filter logql.Filter) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
@ -438,8 +421,16 @@ func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
entries := make([]entry, len(hb.entries))
copy(entries, hb.entries)
entries := make([]entry, 0, len(hb.entries))
for _, e := range hb.entries {
if filter == nil || filter([]byte(e.s)) {
entries = append(entries, e)
}
}
if len(entries) == 0 {
return emptyIterator
}
return &listIterator{
entries: entries,
@ -477,73 +468,107 @@ func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }
type bufferedIterator struct {
s *bufio.Reader
s *bufio.Reader
reader CompressionReader
pool CompressionPool
curT int64
curLog string
cur logproto.Entry
err error
buf []byte // The buffer a single entry.
decBuf []byte // The buffer for decoding the lengths.
buf *bytes.Buffer // The buffer for a single entry.
decBuf []byte // The buffer for decoding the lengths.
closed bool
filter logql.Filter
}
func newBufferedIterator(s *bufio.Reader) *bufferedIterator {
func newBufferedIterator(pool CompressionPool, b []byte, filter logql.Filter) *bufferedIterator {
r := pool.GetReader(bytes.NewBuffer(b))
return &bufferedIterator{
s: s,
buf: make([]byte, 1024),
s: BufReaderPool.Get(r),
reader: r,
pool: pool,
filter: filter,
buf: BytesBufferPool.Get(),
decBuf: make([]byte, binary.MaxVarintLen64),
}
}
func (si *bufferedIterator) Next() bool {
for {
ts, line, ok := si.moveNext()
if !ok {
si.Close()
return false
}
if si.filter != nil && !si.filter(line) {
continue
}
si.cur.Line = string(line)
si.cur.Timestamp = time.Unix(0, ts)
return true
}
}
// moveNext moves the buffer to the next entry
func (si *bufferedIterator) moveNext() (int64, []byte, bool) {
ts, err := binary.ReadVarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
}
return false
return 0, nil, false
}
l, err := binary.ReadUvarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
return false
return 0, nil, false
}
}
for len(si.buf) < int(l) {
si.buf = append(si.buf, make([]byte, 1024)...)
if si.buf.Cap() < int(l) {
si.buf.Grow(int(l) - si.buf.Cap())
}
n, err := si.s.Read(si.buf[:l])
n, err := si.s.Read(si.buf.Bytes()[:l])
if err != nil && err != io.EOF {
si.err = err
return false
return 0, nil, false
}
if n < int(l) {
_, err = si.s.Read(si.buf[n:l])
for n < int(l) {
r, err := si.s.Read(si.buf.Bytes()[n:l])
if err != nil {
si.err = err
return false
return 0, nil, false
}
n += r
}
si.curT = ts
si.curLog = string(si.buf[:l])
return true
return ts, si.buf.Bytes()[:l], true
}
func (si *bufferedIterator) Entry() logproto.Entry {
return logproto.Entry{
Timestamp: time.Unix(0, si.curT),
Line: si.curLog,
}
return si.cur
}
func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error {
if !si.closed {
si.closed = true
si.pool.PutReader(si.reader)
BufReaderPool.Put(si.s)
BytesBufferPool.Put(si.buf)
si.s = nil
si.buf = nil
si.decBuf = nil
si.reader = nil
return si.err
}
return si.err
}
func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error { return si.err }
func (si *bufferedIterator) Labels() string { return "" }

@ -5,11 +5,12 @@ import (
"fmt"
"io/ioutil"
"math"
"math/rand"
"sync"
"testing"
"time"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
)
@ -75,7 +76,7 @@ func TestGZIPBlock(t *testing.T) {
}
}
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
require.NoError(t, err)
idx := 0
@ -90,7 +91,7 @@ func TestGZIPBlock(t *testing.T) {
require.Equal(t, len(cases), idx)
t.Run("bounded-iteration", func(t *testing.T) {
it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD)
it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD, nil)
require.NoError(t, err)
idx := 2
@ -132,7 +133,7 @@ func TestGZIPCompression(t *testing.T) {
require.NoError(t, err)
fmt.Println(float64(len(b))/(1024*1024), float64(len(b2))/(1024*1024), float64(len(b2))/float64(len(chk.blocks)))
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
require.NoError(t, err)
for i, l := range lines {
@ -162,7 +163,7 @@ func TestGZIPSerialisation(t *testing.T) {
bc, err := NewByteChunk(byt)
require.NoError(t, err)
it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD, nil)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
@ -203,7 +204,7 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, 100), logproto.FORWARD, nil)
require.NoError(t, err)
i = 0
for it.Next() {
@ -215,6 +216,101 @@ func TestGZIPChunkFilling(t *testing.T) {
require.Equal(t, int64(lines), i)
}
var result []Chunk
func BenchmarkWriteGZIP(b *testing.B) {
chunks := []Chunk{}
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: RandString(512),
}
i := int64(0)
for n := 0; n < b.N; n++ {
c := NewMemChunk(EncGZIP)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
entry.Timestamp = time.Unix(0, i)
i++
}
chunks = append(chunks, c)
}
result = chunks
}
func BenchmarkReadGZIP(b *testing.B) {
chunks := []Chunk{}
i := int64(0)
for n := 0; n < 50; n++ {
entry := randSizeEntry(0)
c := NewMemChunk(EncGZIP)
// adds until full so we trigger cut which serialize using gzip
for c.SpaceFor(entry) {
_ = c.Append(entry)
i++
entry = randSizeEntry(i)
}
c.Close()
chunks = append(chunks, c)
}
entries := []logproto.Entry{}
b.ResetTimer()
for n := 0; n < b.N; n++ {
var wg sync.WaitGroup
for _, c := range chunks {
wg.Add(1)
go func(c Chunk) {
iterator, err := c.Iterator(time.Unix(0, 0), time.Now(), logproto.BACKWARD, nil)
if err != nil {
panic(err)
}
for iterator.Next() {
entries = append(entries, iterator.Entry())
}
iterator.Close()
wg.Done()
}(c)
}
wg.Wait()
}
}
func randSizeEntry(ts int64) *logproto.Entry {
var line string
switch ts % 10 {
case 0:
line = RandString(27000)
case 1:
line = RandString(10000)
case 2, 3, 4, 5:
line = RandString(2048)
default:
line = RandString(4096)
}
return &logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: line,
}
}
const charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func RandStringWithCharset(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset)-1)]
}
return string(b)
}
func RandString(length int) string {
return RandStringWithCharset(length, charset)
}
func logprotoEntry(ts int64, line string) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, ts),

@ -7,6 +7,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)
// Errors returned by the chunk interface.
@ -46,7 +47,7 @@ type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Append(*logproto.Entry) error
Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error)
Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error)
Size() int
Bytes() ([]byte, error)
}

@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)
// LazyChunk loads the chunk when it is accessed.
@ -20,21 +21,20 @@ func (c *LazyChunk) getChunk(ctx context.Context) (Chunk, error) {
if err != nil {
return nil, err
}
c.Chunk = chunks[0]
return chunks[0].Data.(*Facade).LokiChunk(), nil
}
// Iterator returns an entry iterator.
func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (c *LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
// If the chunk is already loaded, then use that.
if c.Chunk.Data != nil {
lokiChunk := c.Chunk.Data.(*Facade).LokiChunk()
return lokiChunk.Iterator(from, through, direction)
return lokiChunk.Iterator(from, through, direction, filter)
}
return &lazyIterator{
chunk: c,
chunk: c,
filter: filter,
from: from,
through: through,
@ -46,12 +46,15 @@ func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direct
type lazyIterator struct {
iter.EntryIterator
chunk LazyChunk
chunk *LazyChunk
err error
from, through time.Time
direction logproto.Direction
context context.Context
filter logql.Filter
closed bool
}
func (it *lazyIterator) Next() bool {
@ -59,8 +62,16 @@ func (it *lazyIterator) Next() bool {
return false
}
if it.closed {
return false
}
if it.EntryIterator != nil {
return it.EntryIterator.Next()
next := it.EntryIterator.Next()
if !next {
it.Close()
}
return next
}
chk, err := it.chunk.getChunk(it.context)
@ -68,9 +79,7 @@ func (it *lazyIterator) Next() bool {
it.err = err
return false
}
it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction)
it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction, it.filter)
return it.Next()
}
@ -82,6 +91,18 @@ func (it *lazyIterator) Error() error {
if it.err != nil {
return it.err
}
if it.EntryIterator != nil {
return it.EntryIterator.Error()
}
return nil
}
return it.EntryIterator.Error()
func (it *lazyIterator) Close() error {
if it.EntryIterator != nil {
it.closed = true
err := it.EntryIterator.Close()
it.EntryIterator = nil
return err
}
return nil
}

@ -0,0 +1,114 @@
package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"io"
"sync"
)
// CompressionPool is a pool of CompressionWriter and CompressionReader
// This is used by every chunk to avoid unnecessary allocations.
type CompressionPool interface {
GetWriter(io.Writer) CompressionWriter
PutWriter(CompressionWriter)
GetReader(io.Reader) CompressionReader
PutReader(CompressionReader)
}
var (
// Gzip is the gun zip compression pool
Gzip GzipPool
// BufReaderPool is bufio.Reader pool
BufReaderPool = &BufioReaderPool{
pool: sync.Pool{
New: func() interface{} { return bufio.NewReader(nil) },
},
}
// BytesBufferPool is a bytes buffer used for lines decompressed.
BytesBufferPool = newBufferPoolWithSize(4096)
)
// GzipPool is a gun zip compression pool
type GzipPool struct {
readers sync.Pool
writers sync.Pool
}
// GetReader gets or creates a new CompressionReader and reset it to read from src
func (pool *GzipPool) GetReader(src io.Reader) (reader CompressionReader) {
if r := pool.readers.Get(); r != nil {
reader = r.(CompressionReader)
err := reader.Reset(src)
if err != nil {
panic(err)
}
} else {
var err error
reader, err = gzip.NewReader(src)
if err != nil {
panic(err)
}
}
return reader
}
// PutReader places back in the pool a CompressionReader
func (pool *GzipPool) PutReader(reader CompressionReader) {
pool.readers.Put(reader)
}
// GetWriter gets or creates a new CompressionWriter and reset it to write to dst
func (pool *GzipPool) GetWriter(dst io.Writer) (writer CompressionWriter) {
if w := pool.writers.Get(); w != nil {
writer = w.(CompressionWriter)
writer.Reset(dst)
} else {
writer = gzip.NewWriter(dst)
}
return writer
}
// PutWriter places back in the pool a CompressionWriter
func (pool *GzipPool) PutWriter(writer CompressionWriter) {
pool.writers.Put(writer)
}
// BufioReaderPool is a bufio reader that uses sync.Pool.
type BufioReaderPool struct {
pool sync.Pool
}
// Get returns a bufio.Reader which reads from r. The buffer size is that of the pool.
func (bufPool *BufioReaderPool) Get(r io.Reader) *bufio.Reader {
buf := bufPool.pool.Get().(*bufio.Reader)
buf.Reset(r)
return buf
}
// Put puts the bufio.Reader back into the pool.
func (bufPool *BufioReaderPool) Put(b *bufio.Reader) {
bufPool.pool.Put(b)
}
type bufferPool struct {
pool sync.Pool
}
func newBufferPoolWithSize(size int) *bufferPool {
return &bufferPool{
pool: sync.Pool{
New: func() interface{} { return bytes.NewBuffer(make([]byte, size)) },
},
}
}
func (bp *bufferPool) Get() *bytes.Buffer {
return bp.pool.Get().(*bytes.Buffer)
}
func (bp *bufferPool) Put(b *bytes.Buffer) {
bp.pool.Put(b)
}

@ -60,7 +60,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
_ = iter.Close()
@ -69,7 +69,7 @@ func TestIterator(t *testing.T) {
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))
_ = iter.Close()

@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -102,6 +103,14 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
return nil
}
func (s *testStore) IsLocal() bool {
return false
}
func (s *testStore) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
return nil, nil
}
func (s *testStore) Stop() {}
func pushTestSamples(t *testing.T, ing logproto.PusherServer) ([]string, map[string][]*logproto.Stream) {
@ -174,7 +183,7 @@ func (s *testStore) checkData(t *testing.T, userIDs []string, testData map[strin
}
func buildStreamsFromChunk(t *testing.T, labels string, chk chunkenc.Chunk) *logproto.Stream {
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD)
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(1000, 0), logproto.FORWARD, nil)
require.NoError(t, err)
stream := &logproto.Stream{

@ -112,8 +112,8 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, req.Regex)
}
querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
iters, err := i.lookupStreams(req, matchers)
querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) {
iters, err := i.lookupStreams(req, matchers, filter)
if err != nil {
return nil, err
}
@ -150,7 +150,7 @@ func (i *instance) Label(_ context.Context, req *logproto.LabelRequest) (*logpro
}, nil
}
func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher) ([]iter.EntryIterator, error) {
func (i *instance) lookupStreams(req *logproto.QueryRequest, matchers []*labels.Matcher, filter logql.Filter) ([]iter.EntryIterator, error) {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()
@ -170,7 +170,7 @@ outer:
continue outer
}
}
iter, err := stream.Iterator(req.Start, req.End, req.Direction)
iter, err := stream.Iterator(req.Start, req.End, req.Direction, filter)
if err != nil {
return nil, err
}

@ -14,6 +14,7 @@ import (
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
)
var (
@ -142,10 +143,10 @@ func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
}
// Returns an iterator.
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction, filter logql.Filter) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
itr, err := c.chunk.Iterator(from, through, direction)
itr, err := c.chunk.Iterator(from, through, direction, filter)
if err != nil {
return nil, err
}

@ -40,7 +40,7 @@ func TestStreamIterator(t *testing.T) {
for i := 0; i < 100; i++ {
from := rand.Intn(chunks*entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD, nil)
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
@ -50,7 +50,7 @@ func TestStreamIterator(t *testing.T) {
for i := 0; i < 100; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD, nil)
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))

@ -129,21 +129,22 @@ func (t *tailer) send(stream logproto.Stream) {
}
func (t *tailer) filterEntriesInStream(stream *logproto.Stream) error {
querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
return iter.NewStreamIterator(stream), nil
querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) {
var filteredEntries []logproto.Entry
for _, e := range stream.Entries {
if filter == nil || filter([]byte(e.Line)) {
filteredEntries = append(filteredEntries, e)
}
}
stream.Entries = filteredEntries
return nil, nil
})
itr, err := t.expr.Eval(querier)
_, err := t.expr.Eval(querier)
if err != nil {
return err
}
filteredEntries := new([]logproto.Entry)
for itr.Next() {
*filteredEntries = append(*filteredEntries, itr.Entry())
}
stream.Entries = *filteredEntries
return nil
}

@ -113,6 +113,10 @@ type heapIterator struct {
heap.Interface
Peek() EntryIterator
}
is []EntryIterator
prenext bool
tuples tuples
currEntry logproto.Entry
currLabels string
errs []error
@ -121,7 +125,7 @@ type heapIterator struct {
// NewHeapIterator returns a new iterator which uses a heap to merge together
// entries for multiple interators.
func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapIterator {
result := &heapIterator{}
result := &heapIterator{is: is}
switch direction {
case logproto.BACKWARD:
result.heap = &iteratorMaxHeap{}
@ -131,11 +135,7 @@ func NewHeapIterator(is []EntryIterator, direction logproto.Direction) HeapItera
panic("bad direction")
}
// pre-next each iterator, drop empty.
for _, i := range is {
result.requeue(i, false)
}
result.tuples = make([]tuple, 0, len(is))
return result
}
@ -160,7 +160,21 @@ type tuple struct {
EntryIterator
}
type tuples []tuple
func (t tuples) Len() int { return len(t) }
func (t tuples) Swap(i, j int) { t[i], t[j] = t[j], t[i] }
func (t tuples) Less(i, j int) bool { return t[i].Line < t[j].Line }
func (i *heapIterator) Next() bool {
if !i.prenext {
i.prenext = true
// pre-next each iterator, drop empty.
for _, it := range i.is {
i.requeue(it, false)
}
i.is = nil
}
if i.heap.Len() == 0 {
return false
}
@ -170,16 +184,15 @@ func (i *heapIterator) Next() bool {
// heap with the same timestamp, and pop the ones whose common value
// occurs most often.
tuples := make([]tuple, 0, i.heap.Len())
for i.heap.Len() > 0 {
next := i.heap.Peek()
entry := next.Entry()
if len(tuples) > 0 && (tuples[0].Labels() != next.Labels() || !tuples[0].Timestamp.Equal(entry.Timestamp)) {
if len(i.tuples) > 0 && (i.tuples[0].Labels() != next.Labels() || !i.tuples[0].Timestamp.Equal(entry.Timestamp)) {
break
}
heap.Pop(i.heap)
tuples = append(tuples, tuple{
i.tuples = append(i.tuples, tuple{
Entry: entry,
EntryIterator: next,
})
@ -187,22 +200,20 @@ func (i *heapIterator) Next() bool {
// Find in entry which occurs most often which, due to quorum based
// replication, is guaranteed to be the correct next entry.
t := mostCommon(tuples)
t := mostCommon(i.tuples)
i.currEntry = t.Entry
i.currLabels = t.Labels()
// Requeue the iterators, advancing them if they were consumed.
for j := range tuples {
i.requeue(tuples[j].EntryIterator, tuples[j].Line != i.currEntry.Line)
for j := range i.tuples {
i.requeue(i.tuples[j].EntryIterator, i.tuples[j].Line != i.currEntry.Line)
}
i.tuples = i.tuples[:0]
return true
}
func mostCommon(tuples []tuple) tuple {
sort.Slice(tuples, func(i, j int) bool {
return tuples[i].Line < tuples[j].Line
})
func mostCommon(tuples tuples) tuple {
sort.Sort(tuples)
result := tuples[0]
count, max := 0, 0
for i := 0; i < len(tuples)-1; i++ {
@ -247,6 +258,7 @@ func (i *heapIterator) Close() error {
return err
}
}
i.tuples = nil
return nil
}
@ -314,28 +326,6 @@ func (i *queryClientIterator) Close() error {
return i.client.CloseSend()
}
type filter struct {
EntryIterator
f func(string) bool
}
// NewFilter builds a filtering iterator.
func NewFilter(f func(string) bool, i EntryIterator) EntryIterator {
return &filter{
f: f,
EntryIterator: i,
}
}
func (i *filter) Next() bool {
for i.EntryIterator.Next() {
if i.f(i.Entry().Line) {
return true
}
}
return false
}
type nonOverlappingIterator struct {
labels string
i int
@ -353,12 +343,17 @@ func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIt
func (i *nonOverlappingIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
if i.i >= len(i.iterators) {
if len(i.iterators) == 0 {
if i.curr != nil {
i.curr.Close()
}
return false
}
i.curr = i.iterators[i.i]
if i.curr != nil {
i.curr.Close()
}
i.i++
i.curr, i.iterators = i.iterators[0], i.iterators[1:]
}
return true
@ -377,10 +372,17 @@ func (i *nonOverlappingIterator) Labels() string {
}
func (i *nonOverlappingIterator) Error() error {
return i.curr.Error()
if i.curr != nil {
return i.curr.Error()
}
return nil
}
func (i *nonOverlappingIterator) Close() error {
for _, iter := range i.iterators {
iter.Close()
}
i.iterators = nil
return nil
}
@ -400,7 +402,10 @@ func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator
func (i *timeRangedIterator) Next() bool {
ok := i.EntryIterator.Next()
if !ok {
i.EntryIterator.Close()
return ok
}
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
@ -410,34 +415,43 @@ func (i *timeRangedIterator) Next() bool {
if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
ok = false
}
if !ok {
i.EntryIterator.Close()
}
return ok
}
type entryIteratorBackward struct {
cur logproto.Entry
entries []logproto.Entry
forwardIter EntryIterator
cur logproto.Entry
entries []logproto.Entry
loaded bool
}
// NewEntryIteratorBackward returns an iterator which loads all the entries
// of an existing iterator, and then iterates over them backward.
func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) {
entries := make([]logproto.Entry, 0, 128)
for it.Next() {
entries = append(entries, it.Entry())
}
return &entryIteratorBackward{entries: make([]logproto.Entry, 0, 1024), forwardIter: it}, it.Error()
}
return &entryIteratorBackward{entries: entries}, it.Error()
func (i *entryIteratorBackward) load() {
if !i.loaded {
i.loaded = true
for i.forwardIter.Next() {
entry := i.forwardIter.Entry()
i.entries = append(i.entries, entry)
}
i.forwardIter.Close()
}
}
func (i *entryIteratorBackward) Next() bool {
i.load()
if len(i.entries) == 0 {
i.entries = nil
return false
}
i.cur = i.entries[len(i.entries)-1]
i.entries = i.entries[:len(i.entries)-1]
i.cur, i.entries = i.entries[len(i.entries)-1], i.entries[:len(i.entries)-1]
return true
}

@ -1,26 +1,29 @@
package logql
import (
"bytes"
"fmt"
"regexp"
"strings"
"github.com/grafana/loki/pkg/iter"
"github.com/prometheus/prometheus/pkg/labels"
)
// Filter is a line filter sent to a querier to filter out log line.
type Filter func([]byte) bool
// QuerierFunc implements Querier.
type QuerierFunc func([]*labels.Matcher) (iter.EntryIterator, error)
type QuerierFunc func([]*labels.Matcher, Filter) (iter.EntryIterator, error)
// Query implements Querier.
func (q QuerierFunc) Query(ms []*labels.Matcher) (iter.EntryIterator, error) {
return q(ms)
func (q QuerierFunc) Query(ms []*labels.Matcher, entryFilter Filter) (iter.EntryIterator, error) {
return q(ms, entryFilter)
}
// Querier allows a LogQL expression to fetch an EntryIterator for a
// set of matchers.
type Querier interface {
Query([]*labels.Matcher) (iter.EntryIterator, error)
Query([]*labels.Matcher, Filter) (iter.EntryIterator, error)
}
// Expr is a LogQL expression.
@ -34,7 +37,7 @@ type matchersExpr struct {
}
func (e *matchersExpr) Eval(q Querier) (iter.EntryIterator, error) {
return q.Query(e.matchers)
return q.Query(e.matchers, nil)
}
func (e *matchersExpr) Matchers() []*labels.Matcher {
@ -60,45 +63,61 @@ func NewFilterExpr(left Expr, ty labels.MatchType, match string) Expr {
}
}
func (e *filterExpr) Eval(q Querier) (iter.EntryIterator, error) {
var f func(string) bool
func (e *filterExpr) filter() (func([]byte) bool, error) {
var f func([]byte) bool
switch e.ty {
case labels.MatchRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = re.MatchString
f = re.Match
case labels.MatchNotRegexp:
re, err := regexp.Compile(e.match)
if err != nil {
return nil, err
}
f = func(line string) bool {
return !re.MatchString(line)
f = func(line []byte) bool {
return !re.Match(line)
}
case labels.MatchEqual:
f = func(line string) bool {
return strings.Contains(line, e.match)
f = func(line []byte) bool {
return bytes.Contains(line, []byte(e.match))
}
case labels.MatchNotEqual:
f = func(line string) bool {
return !strings.Contains(line, e.match)
f = func(line []byte) bool {
return !bytes.Contains(line, []byte(e.match))
}
default:
return nil, fmt.Errorf("unknow matcher: %v", e.match)
}
next, ok := e.left.(*filterExpr)
if ok {
nextFilter, err := next.filter()
if err != nil {
return nil, err
}
return func(line []byte) bool {
return nextFilter(line) && f(line)
}, nil
}
return f, nil
}
left, err := e.left.Eval(q)
func (e *filterExpr) Eval(q Querier) (iter.EntryIterator, error) {
f, err := e.filter()
if err != nil {
return nil, err
}
return iter.NewFilter(f, left), nil
next, err := q.Query(e.left.Matchers(), f)
if err != nil {
return nil, err
}
return next, nil
}
func mustNewMatcher(t labels.MatchType, n, v string) *labels.Matcher {

@ -20,6 +20,7 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/querier"
loki_storage "github.com/grafana/loki/pkg/storage"
)
// Config is the root config for Loki.
@ -69,7 +70,7 @@ type Loki struct {
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
store chunk.Store
store loki_storage.Store
tableManager *chunk.TableManager
httpAuthMiddleware middleware.Interface

@ -22,6 +22,7 @@ import (
"github.com/grafana/loki/pkg/ingester"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/querier"
loki_storage "github.com/grafana/loki/pkg/storage"
)
const maxChunkAgeForTableManager = 12 * time.Hour
@ -218,7 +219,7 @@ func (t *Loki) stopTableManager() error {
}
func (t *Loki) initStore() (err error) {
t.store, err = storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
t.store, err = loki_storage.NewStore(t.cfg.StorageConfig, t.cfg.ChunkStoreConfig, t.cfg.SchemaConfig, t.overrides)
return
}

@ -5,7 +5,6 @@ import (
"flag"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
cortex_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
@ -18,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage"
)
// Config for a querier.
@ -33,11 +33,11 @@ type Querier struct {
cfg Config
ring ring.ReadRing
pool *cortex_client.Pool
store chunk.Store
store storage.Store
}
// New makes a new Querier.
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store chunk.Store) (*Querier, error) {
func New(cfg Config, clientCfg client.Config, ring ring.ReadRing, store storage.Store) (*Querier, error) {
factory := func(addr string) (grpc_health_v1.HealthClient, error) {
return client.New(clientCfg, addr)
}
@ -107,12 +107,13 @@ func (q *Querier) forGivenIngesters(replicationSet ring.ReplicationSet, f func(l
// Query does the heavy lifting for an actual query.
func (q *Querier) Query(ctx context.Context, req *logproto.QueryRequest) (*logproto.QueryResponse, error) {
ingesterIterators, err := q.queryIngesters(ctx, req)
if err != nil {
return nil, err
}
chunkStoreIterators, err := q.queryStore(ctx, req)
chunkStoreIterators, err := q.store.LazyQuery(ctx, req)
if err != nil {
return nil, err
}
@ -298,7 +299,7 @@ func (q *Querier) queryDroppedStreams(ctx context.Context, req *logproto.TailReq
ingesterIterators[i] = iter.NewQueryClientIterator(clients[i].response.(logproto.Querier_QueryClient), query.Direction)
}
chunkStoreIterators, err := q.queryStore(ctx, &query)
chunkStoreIterators, err := q.store.LazyQuery(ctx, &query)
if err != nil {
return nil, err
}

@ -0,0 +1,142 @@
package main
import (
"context"
"fmt"
"log"
"math/rand"
"os"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
lstore "github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
)
var (
start = model.Time(1523750400000)
ctx = user.InjectOrgID(context.Background(), "fake")
maxChunks = 600 // 600 chunks is 1.2bib of data enough to run benchmark
)
// fill up the local filesystem store with 1gib of data to run benchmark
func main() {
if _, err := os.Stat("/tmp/benchmark/chunks"); os.IsNotExist(err) {
if err := fillStore(); err != nil {
log.Fatal("error filling up storage:", err)
}
}
}
func getStore() (lstore.Store, error) {
store, err := lstore.NewStore(
storage.Config{
BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"},
},
chunk.StoreConfig{},
chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: start},
IndexType: "boltdb",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
},
},
&validation.Overrides{},
)
if err != nil {
return nil, err
}
return store, nil
}
func fillStore() error {
store, err := getStore()
if err != nil {
return err
}
defer store.Stop()
var wgPush sync.WaitGroup
var flushCount int
// insert 5 streams with a random logs every nanoseconds
// the string is randomize so chunks are big ~2mb
// take ~1min to build 1gib of data
for i := 0; i < 5; i++ {
wgPush.Add(1)
go func(j int) {
defer wgPush.Done()
lbs, err := util.ToClientLabels(fmt.Sprintf("{foo=\"bar\",level=\"%d\"}", j))
if err != nil {
panic(err)
}
labelsBuilder := labels.NewBuilder(client.FromLabelAdaptersToLabels(lbs))
labelsBuilder.Set(labels.MetricName, "logs")
metric := labelsBuilder.Labels()
fp := client.FastFingerprint(lbs)
chunkEnc := chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144)
for ts := start.UnixNano(); ts < start.UnixNano()+time.Hour.Nanoseconds(); ts = ts + time.Millisecond.Nanoseconds() {
entry := &logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: randString(250),
}
if chunkEnc.SpaceFor(entry) {
_ = chunkEnc.Append(entry)
} else {
from, to := chunkEnc.Bounds()
c := chunk.NewChunk("fake", fp, metric, chunkenc.NewFacade(chunkEnc), model.TimeFromUnixNano(from.UnixNano()), model.TimeFromUnixNano(to.UnixNano()))
if err := c.Encode(); err != nil {
panic(err)
}
err := store.Put(ctx, []chunk.Chunk{c})
if err != nil {
panic(err)
}
flushCount++
log.Println("flushed ", flushCount, from.UnixNano(), to.UnixNano(), metric)
if flushCount >= maxChunks {
return
}
chunkEnc = chunkenc.NewMemChunkSize(chunkenc.EncGZIP, 262144)
}
}
}(i)
}
wgPush.Wait()
return nil
}
const charset = "abcdefghijklmnopqrstuvwxyz" +
"ABCDEFGHIJKLMNOPQRSTUVWXYZ0123456789"
func randStringWithCharset(length int, charset string) string {
b := make([]byte, length)
for i := range b {
b[i] = charset[rand.Intn(len(charset)-1)]
}
return string(b)
}
func randString(length int) string {
return randStringWithCharset(length, charset)
}

@ -1,21 +1,45 @@
package querier
package storage
import (
"context"
"sort"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
// Store is the Loki chunk store to retrieve and save chunks.
type Store interface {
chunk.Store
LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error)
}
type store struct {
chunk.Store
}
// NewStore creates a new Loki Store using configuration supplied.
func NewStore(cfg storage.Config, storeCfg chunk.StoreConfig, schemaCfg chunk.SchemaConfig, limits *validation.Overrides) (Store, error) {
s, err := storage.NewStore(cfg, storeCfg, schemaCfg, limits)
if err != nil {
return nil, err
}
return &store{
Store: s,
}, nil
}
// LazyQuery returns an iterator that will query the store for more chunks while iterating instead of fetching all chunks upfront
// for that request.
func (s *store) LazyQuery(ctx context.Context, req *logproto.QueryRequest) (iter.EntryIterator, error) {
expr, err := logql.ParseExpr(req.Query)
if err != nil {
return nil, err
@ -25,7 +49,7 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it
expr = logql.NewFilterExpr(expr, labels.MatchRegexp, req.Regex)
}
querier := logql.QuerierFunc(func(matchers []*labels.Matcher) (iter.EntryIterator, error) {
querier := logql.QuerierFunc(func(matchers []*labels.Matcher, filter logql.Filter) (iter.EntryIterator, error) {
nameLabelMatcher, err := labels.NewMatcher(labels.MatchEqual, labels.MetricName, "logs")
if err != nil {
return nil, err
@ -33,7 +57,7 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it
matchers = append(matchers, nameLabelMatcher)
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...)
chks, fetchers, err := s.GetChunkRefs(ctx, from, through, matchers...)
if err != nil {
return nil, err
}
@ -46,15 +70,15 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) (it
// Make sure the initial chunks are loaded. This is not one chunk
// per series, but rather a chunk per non-overlapping iterator.
if err := loadFirstChunks(ctx, chksBySeries); err != nil {
if err := loadFirstChunks(ctx, chksBySeries, req); err != nil {
return nil, err
}
// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers, req)
iters, err := buildIterators(ctx, req, chksBySeries)
iters, err := buildIterators(ctx, req, chksBySeries, filter)
if err != nil {
return nil, err
}
@ -76,60 +100,77 @@ func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.
return filtered
}
func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]chunkenc.LazyChunk {
func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher, req *logproto.QueryRequest) map[model.Fingerprint][][]chunkenc.LazyChunk {
outer:
for fp, chunks := range chks {
for _, matcher := range matchers {
if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {
delete(chks, fp)
continue outer
// checks matchers against the last chunk if we're doing BACKWARD
if req.Direction == logproto.BACKWARD {
if !matcher.Matches(chunks[0][len(chunks[0])-1].Chunk.Metric.Get(matcher.Name)) {
delete(chks, fp)
continue outer
}
} else {
if !matcher.Matches(chunks[0][0].Chunk.Metric.Get(matcher.Name)) {
delete(chks, fp)
continue outer
}
}
}
}
return chks
}
func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk) ([]iter.EntryIterator, error) {
func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk, filter logql.Filter) ([]iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for _, chunks := range chks {
iterator, err := buildHeapIterator(ctx, req, chunks)
iterator, err := buildHeapIterator(ctx, req, chunks, filter)
if err != nil {
return nil, err
}
result = append(result, iterator)
}
return result, nil
}
func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk) (iter.EntryIterator, error) {
func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk, filter logql.Filter) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
if chks[0][0].Chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chks[0][0].Chunk.Metric)
var fetchedChunkIndex int
if req.Direction == logproto.BACKWARD {
fetchedChunkIndex = len(chks[0]) - 1
}
if chks[0][fetchedChunkIndex].Chunk.Metric.Has("__name__") {
labelsBuilder := labels.NewBuilder(chks[0][fetchedChunkIndex].Chunk.Metric)
labelsBuilder.Del("__name__")
chks[0][0].Chunk.Metric = labelsBuilder.Labels()
chks[0][fetchedChunkIndex].Chunk.Metric = labelsBuilder.Labels()
}
labels := chks[0][0].Chunk.Metric.String()
labels := chks[0][fetchedChunkIndex].Chunk.Metric.String()
for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
for j := range chks[i] {
iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction)
iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction, filter)
if err != nil {
return nil, err
}
iterators = append(iterators, iterator)
}
// reverse chunks to start with the last one.
if req.Direction == logproto.BACKWARD {
for i, j := 0, len(iterators)-1; i < j; i, j = i+1, j-1 {
iterators[i], iterators[j] = iterators[j], iterators[i]
}
}
result = append(result, iter.NewNonOverlappingIterator(iterators, labels))
}
return iter.NewHeapIterator(result, req.Direction), nil
}
func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk) error {
func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk, req *logproto.QueryRequest) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "loadFirstChunks")
defer sp.Finish()
@ -140,7 +181,12 @@ func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunken
if len(lchk) == 0 {
continue
}
chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0])
// load the last chunk if we're doing BACKWARD
if req.Direction == logproto.BACKWARD {
chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[len(lchk)-1])
} else {
chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0])
}
}
}

@ -0,0 +1,163 @@
package storage
import (
"context"
"log"
"runtime"
"testing"
"time"
"net/http"
_ "net/http/pprof"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/util/validation"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/user"
)
var (
start = model.Time(1523750400000)
m runtime.MemStats
ctx = user.InjectOrgID(context.Background(), "fake")
chunkStore = getStore()
)
//go test -bench=. -benchmem -memprofile memprofile.out -cpuprofile profile.out
func Benchmark_store_LazyQueryRegexBackward(b *testing.B) {
benchmarkStoreQuery(b, &logproto.QueryRequest{
Query: "{foo=\"bar\"}",
Regex: "fuzz",
Limit: 1000,
Start: time.Unix(0, start.UnixNano()),
End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
Direction: logproto.BACKWARD,
})
}
func Benchmark_store_LazyQueryLogQLBackward(b *testing.B) {
benchmarkStoreQuery(b, &logproto.QueryRequest{
Query: "{foo=\"bar\"} |= \"test\" != \"toto\"",
Regex: "fuzz",
Limit: 1000,
Start: time.Unix(0, start.UnixNano()),
End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
Direction: logproto.BACKWARD,
})
}
func Benchmark_store_LazyQueryRegexForward(b *testing.B) {
benchmarkStoreQuery(b, &logproto.QueryRequest{
Query: "{foo=\"bar\"}",
Regex: "fuzz",
Limit: 1000,
Start: time.Unix(0, start.UnixNano()),
End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
Direction: logproto.FORWARD,
})
}
func Benchmark_store_LazyQueryForward(b *testing.B) {
benchmarkStoreQuery(b, &logproto.QueryRequest{
Query: "{foo=\"bar\"}",
Limit: 1000,
Start: time.Unix(0, start.UnixNano()),
End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
Direction: logproto.FORWARD,
})
}
func Benchmark_store_LazyQueryBackward(b *testing.B) {
benchmarkStoreQuery(b, &logproto.QueryRequest{
Query: "{foo=\"bar\"}",
Limit: 1000,
Start: time.Unix(0, start.UnixNano()),
End: time.Unix(0, (24*time.Hour.Nanoseconds())+start.UnixNano()),
Direction: logproto.BACKWARD,
})
}
func benchmarkStoreQuery(b *testing.B, query *logproto.QueryRequest) {
b.ReportAllocs()
// force to run gc 10x more often this can be useful to detect fast allocation vs leak.
//debug.SetGCPercent(10)
stop := make(chan struct{})
go func() {
_ = http.ListenAndServe(":6060", http.DefaultServeMux)
}()
go func() {
ticker := time.NewTicker(time.Millisecond)
for {
select {
case <-ticker.C:
// print and capture the max in use heap size
printHeap(b, false)
case <-stop:
ticker.Stop()
return
}
}
}()
for i := 0; i < b.N; i++ {
iter, err := chunkStore.LazyQuery(ctx, query)
if err != nil {
b.Fatal(err)
}
res := []logproto.Entry{}
printHeap(b, false)
j := uint32(0)
for iter.Next() {
j++
printHeap(b, false)
res = append(res, iter.Entry())
// limit result like the querier would do.
if j == query.Limit {
break
}
}
iter.Close()
printHeap(b, true)
log.Println("line fetched", len(res))
}
close(stop)
}
var maxHeapInuse uint64
func printHeap(b *testing.B, show bool) {
runtime.ReadMemStats(&m)
if m.HeapInuse > maxHeapInuse {
maxHeapInuse = m.HeapInuse
}
if show {
log.Printf("Benchmark %d maxHeapInuse: %d Mbytes\n", b.N, maxHeapInuse/1024/1024)
log.Printf("Benchmark %d currentHeapInuse: %d Mbytes\n", b.N, m.HeapInuse/1024/1024)
}
}
func getStore() Store {
store, err := NewStore(storage.Config{
BoltDBConfig: local.BoltDBConfig{Directory: "/tmp/benchmark/index"},
FSConfig: local.FSConfig{Directory: "/tmp/benchmark/chunks"},
}, chunk.StoreConfig{}, chunk.SchemaConfig{
Configs: []chunk.PeriodConfig{
{
From: chunk.DayTime{Time: start},
IndexType: "boltdb",
ObjectType: "filesystem",
Schema: "v9",
IndexTables: chunk.PeriodicTableConfig{
Prefix: "index_",
Period: time.Hour * 168,
},
},
},
}, &validation.Overrides{})
if err != nil {
panic(err)
}
return store
}

@ -1,19 +1,36 @@
package util
import (
"sort"
"strings"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/grafana/loki/pkg/logql"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql"
)
type byLabel []client.LabelAdapter
func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }
func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// ToClientLabels parses the labels and converts them to the Cortex type.
func ToClientLabels(labels string) ([]client.LabelAdapter, error) {
ls, err := promql.ParseMetric(labels)
ls, err := logql.ParseExpr(labels)
if err != nil {
return nil, err
}
return client.FromLabelsToLabelAdapaters(ls), nil
matchers := ls.Matchers()
result := make([]client.LabelAdapter, 0, len(matchers))
for _, m := range matchers {
result = append(result, client.LabelAdapter{
Name: m.Name,
Value: m.Value,
})
}
sort.Sort(byLabel(result))
return result, nil
}
// ModelLabelSetToMap convert a model.LabelSet to a map[string]string

Loading…
Cancel
Save