* Add checkenc without serialisation for now.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Simplify interface and add serialisatio`

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Move away from \xFF magic to something simple

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Add serialisation and Deserialisation

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Modify interface to be closer to logish interface.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* chunkenc: Fix race b/w append and iteration.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* chunkenc: Make iterators honour bounds

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* chunkenc: Remove locks as safety is assured externally

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* chunkenc: Add checksums

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Add code quotes around block design.

* Split headBlock into it's own type.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Simplify encoding and decoding.

Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in>

* Expose flags.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Use the already existing EntryIterator interface

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Use existing Chunk interface.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Review feedback

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Integrate the compressed chunk and add metrics

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
pull/26/head
Goutham Veeramachaneni 8 years ago committed by GitHub
parent c0b153e4a4
commit 8f4e12a5ec
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 28
      pkg/chunkenc/README.md
  2. 177
      pkg/chunkenc/encoding_helpers.go
  3. 547
      pkg/chunkenc/gzip.go
  4. 180
      pkg/chunkenc/gzip_test.go
  5. 39
      pkg/chunkenc/interface.go
  6. 14
      pkg/ingester/chunk.go
  7. 57
      pkg/ingester/chunk_test.go
  8. 2
      pkg/ingester/ingester.go
  9. 34
      pkg/ingester/instance.go
  10. 88
      pkg/ingester/stream.go
  11. 70
      pkg/ingester/stream_test.go
  12. 111
      pkg/iter/iterator.go

@ -0,0 +1,28 @@
# Chunk format
```
| | |
| MagicNumber(4b) | version(1b) |
| | |
--------------------------------------------------
| block-1 bytes | checksum (4b) |
--------------------------------------------------
| block-2 bytes | checksum (4b) |
--------------------------------------------------
| block-n bytes | checksum (4b) |
--------------------------------------------------
| #blocks (uvarint) |
--------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
-------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
-------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
-------------------------------------------------------------------
| #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) |
-------------------------------------------------------------------
| checksum(from #blocks) |
-------------------------------------------------------------------
| metasOffset - offset to the point with #blocks |
--------------------------------------------------
```

@ -0,0 +1,177 @@
package chunkenc
import (
"encoding/binary"
"hash"
"hash/crc32"
)
// enbuf is a helper type to populate a byte slice with various types.
type encbuf struct {
b []byte
c [binary.MaxVarintLen64]byte
}
func (e *encbuf) reset() { e.b = e.b[:0] }
func (e *encbuf) get() []byte { return e.b }
func (e *encbuf) len() int { return len(e.b) }
func (e *encbuf) putString(s string) { e.b = append(e.b, s...) }
func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) }
func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) }
func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) }
func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) }
func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) }
func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) }
func (e *encbuf) putBE32(x uint32) {
binary.BigEndian.PutUint32(e.c[:], x)
e.b = append(e.b, e.c[:4]...)
}
func (e *encbuf) putBE64(x uint64) {
binary.BigEndian.PutUint64(e.c[:], x)
e.b = append(e.b, e.c[:8]...)
}
func (e *encbuf) putUvarint64(x uint64) {
n := binary.PutUvarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
func (e *encbuf) putVarint64(x int64) {
n := binary.PutVarint(e.c[:], x)
e.b = append(e.b, e.c[:n]...)
}
// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!).
func (e *encbuf) putUvarintStr(s string) {
e.putUvarint(len(s))
e.putString(s)
}
// putHash appends a hash over the buffers current contents to the buffer.
func (e *encbuf) putHash(h hash.Hash) {
h.Reset()
_, err := h.Write(e.b)
if err != nil {
panic(err) // The CRC32 implementation does not error
}
e.b = h.Sum(e.b)
}
// decbuf provides safe methods to extract data from a byte slice. It does all
// necessary bounds checking and advancing of the byte slice.
// Several datums can be extracted without checking for errors. However, before using
// any datum, the err() method must be checked.
type decbuf struct {
b []byte
e error
}
func (d *decbuf) uvarint() int { return int(d.uvarint64()) }
func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) }
func (d *decbuf) be32int() int { return int(d.be32()) }
func (d *decbuf) be64int64() int64 { return int64(d.be64()) }
// crc32 returns a CRC32 checksum over the remaining bytes.
func (d *decbuf) crc32() uint32 {
return crc32.Checksum(d.b, castagnoliTable)
}
func (d *decbuf) uvarintStr() string {
l := d.uvarint64()
if d.e != nil {
return ""
}
if len(d.b) < int(l) {
d.e = ErrInvalidSize
return ""
}
s := string(d.b[:l])
d.b = d.b[l:]
return s
}
func (d *decbuf) varint64() int64 {
if d.e != nil {
return 0
}
x, n := binary.Varint(d.b)
if n < 1 {
d.e = ErrInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) uvarint64() uint64 {
if d.e != nil {
return 0
}
x, n := binary.Uvarint(d.b)
if n < 1 {
d.e = ErrInvalidSize
return 0
}
d.b = d.b[n:]
return x
}
func (d *decbuf) be64() uint64 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = ErrInvalidSize
return 0
}
x := binary.BigEndian.Uint64(d.b)
d.b = d.b[8:]
return x
}
func (d *decbuf) be32() uint32 {
if d.e != nil {
return 0
}
if len(d.b) < 4 {
d.e = ErrInvalidSize
return 0
}
x := binary.BigEndian.Uint32(d.b)
d.b = d.b[4:]
return x
}
func (d *decbuf) byte() byte {
if d.e != nil {
return 0
}
if len(d.b) < 1 {
d.e = ErrInvalidSize
return 0
}
x := d.b[0]
d.b = d.b[1:]
return x
}
func (d *decbuf) decbuf(l int) decbuf {
if d.e != nil {
return decbuf{e: d.e}
}
if l > len(d.b) {
return decbuf{e: ErrInvalidSize}
}
r := decbuf{b: d.b[:l]}
d.b = d.b[l:]
return r
}
func (d *decbuf) err() error { return d.e }
func (d *decbuf) len() int { return len(d.b) }
func (d *decbuf) get() []byte { return d.b }

@ -0,0 +1,547 @@
package chunkenc
import (
"bufio"
"bytes"
"compress/gzip"
"encoding/binary"
"hash"
"hash/crc32"
"io"
"math"
"time"
"github.com/grafana/logish/pkg/logproto"
"github.com/grafana/logish/pkg/iter"
"github.com/pkg/errors"
)
var (
magicNumber = uint32(0x12EE56A)
chunkFormatV1 = byte(1)
// The errors on read.
ErrInvalidSize = errors.New("invalid size")
ErrInvalidFlag = errors.New("invalid flag")
ErrInvalidChecksum = errors.New("invalid checksum")
ErrOutOfOrder = errors.New("out of order sample")
)
// The table gets initialized with sync.Once but may still cause a race
// with any other use of the crc32 package anywhere. Thus we initialize it
// before.
var castagnoliTable *crc32.Table
func init() {
castagnoliTable = crc32.MakeTable(crc32.Castagnoli)
}
// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the
// polynomial may be easily changed in one location at a later time, if necessary.
func newCRC32() hash.Hash32 {
return crc32.New(castagnoliTable)
}
// MemChunk implements compressed log chunks.
type MemChunk struct {
// The number of uncompressed bytes per block.
blockSize int
// The max number of blocks in a chunk.
maxBlocks int
// The finished blocks.
blocks []block
// Current in-mem block being appended to.
head *headBlock
encoding Encoding
cw func(w io.Writer) CompressionWriter
cr func(r io.Reader) (CompressionReader, error)
}
type block struct {
// This is compressed bytes.
b []byte
numEntries int
mint, maxt int64
offset int // The offset of the block in the chunk.
}
// This block holds the un-compressed entries. Once it has enough data, this is
// emptied into a block with only compressed entries.
type headBlock struct {
// This is the list of raw entries.
entries []entry
size int // size of uncompressed bytes.
mint, maxt int64
}
func (hb *headBlock) isEmpty() bool {
return len(hb.entries) == 0
}
func (hb *headBlock) append(ts int64, line string) error {
if !hb.isEmpty() && hb.maxt >= ts {
return ErrOutOfOrder
}
hb.entries = append(hb.entries, entry{ts, line})
if hb.mint == 0 || hb.mint > ts {
hb.mint = ts
}
hb.maxt = ts
hb.size += len(line)
return nil
}
func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) {
buf := bytes.NewBuffer(make([]byte, 0, 1<<15)) // 32K. Pool it later.
encBuf := make([]byte, binary.MaxVarintLen64)
compressedWriter := cw(buf)
for _, logEntry := range hb.entries {
n := binary.PutVarint(encBuf, logEntry.t)
_, err := compressedWriter.Write(encBuf[:n])
if err != nil {
return nil, errors.Wrap(err, "appending entry")
}
n = binary.PutUvarint(encBuf, uint64(len(logEntry.s)))
_, err = compressedWriter.Write(encBuf[:n])
if err != nil {
return nil, errors.Wrap(err, "appending entry")
}
_, err = compressedWriter.Write([]byte(logEntry.s))
if err != nil {
return nil, errors.Wrap(err, "appending entry")
}
}
if err := compressedWriter.Close(); err != nil {
return nil, errors.Wrap(err, "flushing pending compress buffer")
}
return buf.Bytes(), nil
}
type entry struct {
t int64
s string
}
// NewMemChunk returns a new in-mem chunk.
func NewMemChunk(enc Encoding) *MemChunk {
c := &MemChunk{
blockSize: 256 * 1024, // The blockSize in bytes.
blocks: []block{},
head: &headBlock{
mint: math.MaxInt64,
maxt: math.MinInt64,
},
encoding: enc,
}
switch enc {
case EncGZIP:
c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) }
c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }
default:
panic("unknown encoding")
}
return c
}
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) },
}
db := decbuf{b: b}
// Verify the header.
m, version := db.be32(), db.byte()
if db.err() != nil {
return nil, errors.Wrap(db.err(), "verifying header")
}
if m != magicNumber {
return nil, errors.Errorf("invalid magic number %x", m)
}
if version != 1 {
return nil, errors.Errorf("invalid version %d", version)
}
metasOffset := binary.BigEndian.Uint64(b[len(b)-8:])
mb := b[metasOffset : len(b)-(8+4)] // storing the metasOffset + checksum of meta
db = decbuf{b: mb}
expCRC := binary.BigEndian.Uint32(b[len(b)-(8+4):])
if expCRC != db.crc32() {
return nil, ErrInvalidChecksum
}
// Read the number of blocks.
num := db.uvarint()
for i := 0; i < num; i++ {
blk := block{}
// Read #entries.
blk.numEntries = db.uvarint()
// Read mint, maxt.
blk.mint = db.varint64()
blk.maxt = db.varint64()
// Read offset and length.
blk.offset = db.uvarint()
l := db.uvarint()
blk.b = b[blk.offset : blk.offset+l]
// Verify checksums.
expCRC := binary.BigEndian.Uint32(b[blk.offset+int(l):])
if expCRC != crc32.Checksum(blk.b, castagnoliTable) {
return bc, ErrInvalidChecksum
}
bc.blocks = append(bc.blocks, blk)
if db.err() != nil {
return nil, errors.Wrap(db.err(), "decoding block meta")
}
}
return bc, nil
}
// Bytes implements Chunk.
func (c *MemChunk) Bytes() ([]byte, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
c.cut()
}
crc32Hash := newCRC32()
buf := bytes.NewBuffer(nil)
offset := 0
eb := encbuf{b: make([]byte, 0, 1<<10)}
// Write the header (magicNum + version).
eb.putBE32(magicNumber)
eb.putByte(chunkFormatV1)
n, err := buf.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write blockMeta #entries")
}
offset += n
// Write Blocks.
for i, b := range c.blocks {
c.blocks[i].offset = offset
eb.reset()
eb.putBytes(b.b)
eb.putHash(crc32Hash)
n, err := buf.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write block")
}
offset += n
}
metasOffset := offset
// Write the number of blocks.
eb.reset()
eb.putUvarint(len(c.blocks))
// Write BlockMetas.
for _, b := range c.blocks {
eb.putUvarint(b.numEntries)
eb.putVarint64(b.mint)
eb.putVarint64(b.maxt)
eb.putUvarint(b.offset)
eb.putUvarint(len(b.b))
}
eb.putHash(crc32Hash)
n, err = buf.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write block metas")
}
// Write the metasOffset.
eb.reset()
eb.putBE64int(metasOffset)
n, err = buf.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write metasOffset")
}
return buf.Bytes(), nil
}
// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
return c.encoding
}
// NumSamples implements Chunk.
func (c *MemChunk) Size() int {
ne := 0
for _, blk := range c.blocks {
ne += blk.numEntries
}
if !c.head.isEmpty() {
ne += len(c.head.entries)
}
return ne
}
// SpaceFor implements Chunk.
func (c *MemChunk) SpaceFor(*logproto.Entry) bool {
return len(c.blocks) < 10
}
// Append implements Chunk.
func (c *MemChunk) Append(entry *logproto.Entry) error {
return c.head.append(entry.Timestamp.UnixNano(), entry.Line)
}
// Close implements Chunk.
// TODO: Fix this to check edge cases.
func (c *MemChunk) Close() error {
return c.cut()
}
// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.isEmpty() {
return nil
}
b, err := c.head.serialise(c.cw)
if err != nil {
return err
}
c.blocks = append(c.blocks, block{
b: b,
numEntries: len(c.head.entries),
mint: c.head.mint,
maxt: c.head.maxt,
})
c.head.entries = c.head.entries[:0]
c.head.mint = 0 // Will be set on first append.
return nil
}
// Bounds implements Chunk.
func (c *MemChunk) Bounds() (fromT, toT time.Time) {
var from, to int64
if len(c.blocks) > 0 {
from = c.blocks[0].mint
to = c.blocks[len(c.blocks)-1].maxt
}
if !c.head.isEmpty() {
if from > c.head.mint {
from = c.head.mint
}
if to < c.head.maxt {
to = c.head.maxt
}
}
return time.Unix(0, from), time.Unix(0, to)
}
// Iterator implements Chunk.
func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
mint, maxt := mintT.UnixNano(), maxtT.UnixNano()
its := make([]iter.EntryIterator, 0, len(c.blocks))
for _, b := range c.blocks {
if maxt > b.mint && b.maxt > mint {
it, err := b.iterator(c.cr)
if err != nil {
return nil, err
}
its = append(its, it)
}
}
its = append(its, c.head.iterator(mint, maxt))
iterForward := iter.NewTimeRangedIterator(
iter.NewNonOverlappingIterator(its, ""),
time.Unix(0, mint),
time.Unix(0, maxt),
)
if direction == logproto.FORWARD {
return iterForward, nil
}
return iter.NewEntryIteratorBackward(iterForward)
}
func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) {
if len(b.b) == 0 {
return emptyIterator, nil
}
r, err := cr(bytes.NewBuffer(b.b))
if err != nil {
return nil, err
}
s := bufio.NewReader(r)
return newBufferedIterator(s), nil
}
func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator {
if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) {
return emptyIterator
}
// We are doing a copy everytime, this is because b.entries could change completely,
// the alternate would be that we allocate a new b.entries everytime we cut a block,
// but the tradeoff is that queries to near-realtime data would be much lower than
// cutting of blocks.
entries := make([]entry, len(hb.entries))
copy(entries, hb.entries)
return &listIterator{
entries: entries,
}
}
var emptyIterator = &listIterator{}
type listIterator struct {
entries []entry
cur entry
}
func (li *listIterator) Seek(int64) bool {
return false
}
func (li *listIterator) Next() bool {
if len(li.entries) > 0 {
li.cur = li.entries[0]
li.entries = li.entries[1:]
return true
}
return false
}
func (li *listIterator) Entry() logproto.Entry {
return logproto.Entry{time.Unix(0, li.cur.t), li.cur.s}
}
func (li *listIterator) Error() error { return nil }
func (li *listIterator) Close() error { return nil }
func (li *listIterator) Labels() string { return "" }
type bufferedIterator struct {
s *bufio.Reader
curT int64
curLog string
err error
buf []byte // The buffer a single entry.
decBuf []byte // The buffer for decoding the lengths.
}
func newBufferedIterator(s *bufio.Reader) *bufferedIterator {
return &bufferedIterator{
s: s,
buf: make([]byte, 1024),
decBuf: make([]byte, binary.MaxVarintLen64),
}
}
func (si *bufferedIterator) Seek(int64) bool {
return false
}
func (si *bufferedIterator) Next() bool {
ts, err := binary.ReadVarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
}
return false
}
l, err := binary.ReadUvarint(si.s)
if err != nil {
if err != io.EOF {
si.err = err
return false
}
}
for len(si.buf) < int(l) {
si.buf = append(si.buf, make([]byte, 1024)...)
}
n, err := si.s.Read(si.buf[:l])
if err != nil && err != io.EOF {
si.err = err
return false
}
if n < int(l) {
n, err = si.s.Read(si.buf[n:l])
if err != nil {
si.err = err
return false
}
}
si.curT = ts
si.curLog = string(si.buf[:l])
return true
}
func (si *bufferedIterator) Entry() logproto.Entry {
return logproto.Entry{time.Unix(0, si.curT), si.curLog}
}
func (si *bufferedIterator) Error() error { return si.err }
func (si *bufferedIterator) Close() error { return si.err }
func (si *bufferedIterator) Labels() string { return "" }
type noopFlushingWriter struct {
io.WriteCloser
}
func (noopFlushingWriter) Flush() error {
return nil
}

@ -0,0 +1,180 @@
package chunkenc
import (
"bytes"
"fmt"
"io/ioutil"
"math"
"testing"
"time"
"github.com/grafana/logish/pkg/logproto"
"github.com/stretchr/testify/require"
)
func TestGZIPBlock(t *testing.T) {
chk := NewMemChunk(EncGZIP)
cases := []struct {
ts int64
str string
cut bool
}{
{
ts: 1,
str: "hello, world!",
},
{
ts: 2,
str: "hello, world2!",
},
{
ts: 3,
str: "hello, world3!",
},
{
ts: 4,
str: "hello, world4!",
},
{
ts: 5,
str: "hello, world5!",
},
{
ts: 6,
str: "hello, world6!",
cut: true,
},
{
ts: 7,
str: "hello, world7!",
},
{
ts: 8,
str: "hello, worl\nd8!",
},
{
ts: 9,
str: "",
},
}
for _, c := range cases {
require.NoError(t, chk.Append(logprotoEntry(c.ts, c.str)))
if c.cut {
require.NoError(t, chk.cut())
}
}
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
require.NoError(t, err)
idx := 0
for it.Next() {
e := it.Entry()
require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano())
require.Equal(t, cases[idx].str, e.Line)
idx++
}
require.NoError(t, it.Error())
require.Equal(t, len(cases), idx)
t.Run("bounded-iteration", func(t *testing.T) {
it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD)
require.NoError(t, err)
idx := 2
for it.Next() {
e := it.Entry()
require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano())
require.Equal(t, cases[idx].str, e.Line)
idx++
}
require.NoError(t, it.Error())
require.Equal(t, 6, idx)
})
}
func TestGZIPCompression(t *testing.T) {
if testing.Short() {
t.SkipNow()
}
b, err := ioutil.ReadFile("NASA_access_log_Aug95")
if err != nil {
t.SkipNow()
}
lines := bytes.Split(b, []byte("\n"))
fmt.Println(len(lines))
for _, blockSize := range []int{4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, 512 * 1024} {
testName := fmt.Sprintf("%d", blockSize/1024)
t.Run(testName, func(t *testing.T) {
chk := NewMemChunk(EncGZIP)
chk.blockSize = blockSize
for i, l := range lines {
require.NoError(t, chk.Append(logprotoEntry(int64(i), string(l))))
}
b2, err := chk.Bytes()
require.NoError(t, err)
fmt.Println(float64(len(b))/(1024*1024), float64(len(b2))/(1024*1024), float64(len(b2))/float64(len(chk.blocks)))
it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
require.NoError(t, err)
for i, l := range lines {
require.True(t, it.Next())
e := it.Entry()
require.Equal(t, int64(i), e.Timestamp.UnixNano())
require.Equal(t, string(l), e.Line)
}
require.NoError(t, it.Error())
})
}
}
func TestGZIPSerialisation(t *testing.T) {
chk := NewMemChunk(EncGZIP)
numSamples := 500000
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), string(i))))
}
byt, err := chk.Bytes()
require.NoError(t, err)
bc, err := NewByteChunk(byt)
require.NoError(t, err)
it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD)
require.NoError(t, err)
for i := 0; i < numSamples; i++ {
require.True(t, it.Next())
e := it.Entry()
require.Equal(t, int64(i), e.Timestamp.UnixNano())
require.Equal(t, string(i), e.Line)
}
require.NoError(t, it.Error())
byt2, err := chk.Bytes()
require.NoError(t, err)
require.True(t, bytes.Equal(byt, byt2))
}
func logprotoEntry(ts int64, line string) *logproto.Entry {
return &logproto.Entry{
Timestamp: time.Unix(0, ts),
Line: line,
}
}

@ -0,0 +1,39 @@
package chunkenc
import (
"io"
)
// Encoding is the identifier for a chunk encoding.
type Encoding uint8
// The different available encodings.
const (
EncNone Encoding = iota
EncGZIP
)
func (e Encoding) String() string {
switch e {
case EncGZIP:
return "gzip"
case EncNone:
return "none"
default:
return "unknown"
}
}
// CompressionWriter is the writer that compresses the data passed to it.
type CompressionWriter interface {
Write(p []byte) (int, error)
Close() error
Flush() error
Reset(w io.Writer)
}
// CompressionReader reads the compressed data.
type CompressionReader interface {
Read(p []byte) (int, error)
Reset(r io.Reader) error
}

@ -15,16 +15,18 @@ const (
tmpNumEntries = 1024
)
// Errors returned by the chunk interface.
var (
ErrChunkFull = errors.New("Chunk full")
ErrOutOfOrder = errors.New("Entry out of order")
)
// Chunk is the interface for the compressed logs chunk format.
type Chunk interface {
Bounds() (time.Time, time.Time)
SpaceFor(*logproto.Entry) bool
Push(*logproto.Entry) error
Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator
Append(*logproto.Entry) error
Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error)
Size() int
}
@ -47,7 +49,7 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool {
return len(c.entries) < tmpNumEntries
}
func (c *dumbChunk) Push(entry *logproto.Entry) error {
func (c *dumbChunk) Append(entry *logproto.Entry) error {
if len(c.entries) == tmpNumEntries {
return ErrChunkFull
}
@ -66,7 +68,7 @@ func (c *dumbChunk) Size() int {
// Returns an iterator that goes from _most_ recent to _least_ recent (ie,
// backwards).
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator {
func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
i := sort.Search(len(c.entries), func(i int) bool {
return !from.After(c.entries[i].Timestamp)
})
@ -76,7 +78,7 @@ func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Directi
log.Println("from", from, "through", through, "i", i, "j", j, "entries", len(c.entries))
if from == through {
return nil
return nil, nil
}
start := -1
@ -89,7 +91,7 @@ func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Directi
direction: direction,
i: start,
entries: c.entries[i:j],
}
}, nil
}
type dumbChunkIterator struct {

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/grafana/logish/pkg/chunkenc"
"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
"github.com/stretchr/testify/assert"
@ -37,32 +38,42 @@ func testIteratorBackward(t *testing.T, iter iter.EntryIterator, from, through i
}
func TestIterator(t *testing.T) {
chunk := newChunk()
const entries = 100
for i := int64(0); i < entries; i++ {
err := chunk.Push(&logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("line %d", i),
})
require.NoError(t, err)
}
for _, chk := range []struct {
name string
new func() Chunk
}{
{"dumbChunk", newChunk},
{"gzipChunk", func() Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
} {
t.Run(chk.name, func(t *testing.T) {
chunk := chk.new()
for i := int64(0); i < entries; i++ {
err := chunk.Append(&logproto.Entry{
Timestamp: time.Unix(i, 0),
Line: fmt.Sprintf("line %d", i),
})
require.NoError(t, err)
}
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
require.NotNil(t, iter)
testIteratorForward(t, iter, int64(from), int64(from+len))
iter.Close()
}
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
iter.Close()
}
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
require.NotNil(t, iter)
testIteratorBackward(t, iter, int64(from), int64(from+len))
iter.Close()
for i := 0; i < entries; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(entries-from) + 1
iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))
iter.Close()
}
})
}
}

@ -84,7 +84,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance()
inst = newInstance(instanceID)
i.instances[instanceID] = inst
}
return inst

@ -7,6 +7,7 @@ import (
"sync"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
@ -21,16 +22,38 @@ var (
ErrStreamMissing = errors.New("Stream missing")
)
var (
streamsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "logish",
Name: "ingester_streams_created_total",
Help: "The total number of streams created in the ingester.",
}, []string{"org"})
streamsRemovedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "logish",
Name: "ingester_streams_removed_total",
Help: "The total number of streams removed by the ingester.",
}, []string{"org"})
)
func init() {
prometheus.MustRegister(streamsCreatedTotal)
prometheus.MustRegister(streamsRemovedTotal)
}
type instance struct {
streamsMtx sync.Mutex
streams map[string]*stream
index *invertedIndex
instanceID string
}
func newInstance() *instance {
func newInstance(instanceID string) *instance {
streamsCreatedTotal.WithLabelValues(instanceID).Inc()
return &instance{
streams: map[string]*stream{},
index: newInvertedIndex(),
streams: map[string]*stream{},
index: newInvertedIndex(),
instanceID: instanceID,
}
}
@ -75,7 +98,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
i.streamsMtx.Unlock()
return ErrStreamMissing
}
iterators[j] = stream.Iterator(req.Start, req.End, req.Direction)
iterators[j], err = stream.Iterator(req.Start, req.End, req.Direction)
if err != nil {
return err
}
}
i.streamsMtx.Unlock()

@ -4,12 +4,42 @@ import (
"context"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/logish/pkg/chunkenc"
"github.com/grafana/logish/pkg/iter"
"github.com/grafana/logish/pkg/logproto"
)
var (
chunksCreatedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "logish",
Name: "ingester_chunks_created_total",
Help: "The total number of chunks created in the ingester.",
})
chunksFlushedTotal = prometheus.NewCounter(prometheus.CounterOpts{
Namespace: "logish",
Name: "ingester_chunks_flushed_total",
Help: "The total number of chunks flushed by the ingester.",
})
samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{
Namespace: "logish",
Subsystem: "ingester",
Name: "samples_per_chunk",
Help: "The number of samples in a chunk.",
Buckets: prometheus.LinearBuckets(4096, 2048, 6),
})
)
func init() {
prometheus.MustRegister(chunksCreatedTotal)
prometheus.MustRegister(chunksFlushedTotal)
prometheus.MustRegister(samplesPerChunk)
}
const tmpMaxChunks = 3
type stream struct {
@ -27,30 +57,37 @@ func newStream(labels labels.Labels) *stream {
func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error {
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, newChunk())
s.chunks = append(s.chunks, chunkenc.NewMemChunk(chunkenc.EncGZIP))
chunksCreatedTotal.Inc()
}
for i := range entries {
if !s.chunks[0].SpaceFor(&entries[i]) {
s.chunks = append([]Chunk{newChunk()}, s.chunks...)
samplesPerChunk.Observe(float64(s.chunks[0].Size()))
s.chunks = append([]Chunk{chunkenc.NewMemChunk(chunkenc.EncGZIP)}, s.chunks...)
chunksCreatedTotal.Inc()
}
if err := s.chunks[0].Push(&entries[i]); err != nil {
if err := s.chunks[0].Append(&entries[i]); err != nil {
return err
}
}
// Temp; until we implement flushing, only keep N chunks in memory.
if len(s.chunks) > tmpMaxChunks {
chunksFlushedTotal.Add(float64(len(s.chunks) - tmpMaxChunks))
s.chunks = s.chunks[:tmpMaxChunks]
}
return nil
}
// Returns an iterator.
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator {
func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
for _, c := range s.chunks {
iter := c.Iterator(from, through, direction)
iter, err := c.Iterator(from, through, direction)
if err != nil {
return nil, err
}
if iter != nil {
iterators = append(iterators, iter)
}
@ -62,44 +99,5 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction)
}
}
return &nonOverlappingIterator{
labels: s.labels.String(),
iterators: iterators,
}
}
type nonOverlappingIterator struct {
labels string
i int
iterators []iter.EntryIterator
curr iter.EntryIterator
}
func (i *nonOverlappingIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
if i.i >= len(i.iterators) {
return false
}
i.curr = i.iterators[i.i]
i.i++
}
return true
}
func (i *nonOverlappingIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *nonOverlappingIterator) Labels() string {
return i.labels
}
func (i *nonOverlappingIterator) Error() error {
return nil
}
func (i *nonOverlappingIterator) Close() error {
return nil
return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil
}

@ -6,43 +6,57 @@ import (
"testing"
"time"
"github.com/grafana/logish/pkg/chunkenc"
"github.com/grafana/logish/pkg/logproto"
"github.com/stretchr/testify/require"
)
func TestStreamIterator(t *testing.T) {
var s stream
const chunks = 3
const entries = 100
for i := int64(0); i < chunks; i++ {
chunk := newChunk()
for j := int64(0); j < entries; j++ {
k := i*entries + j
err := chunk.Push(&logproto.Entry{
Timestamp: time.Unix(k, 0),
Line: fmt.Sprintf("line %d", k),
})
require.NoError(t, err)
}
s.chunks = append([]Chunk{chunk}, s.chunks...)
}
for _, chk := range []struct {
name string
new func() Chunk
}{
{"dumbChunk", newChunk},
{"gzipChunk", func() Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }},
} {
t.Run(chk.name, func(t *testing.T) {
var s stream
for i := int64(0); i < chunks; i++ {
chunk := chk.new()
for j := int64(0); j < entries; j++ {
k := i*entries + j
err := chunk.Append(&logproto.Entry{
Timestamp: time.Unix(k, 0),
Line: fmt.Sprintf("line %d", k),
})
require.NoError(t, err)
}
s.chunks = append([]Chunk{chunk}, s.chunks...)
}
for i := 0; i < 100; i++ {
from := rand.Intn(chunks*entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
require.NotNil(t, iter)
testIteratorForward(t, iter, int64(from), int64(from+len))
iter.Close()
}
for i := 0; i < 100; i++ {
from := rand.Intn(chunks*entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD)
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorForward(t, iter, int64(from), int64(from+len))
iter.Close()
}
for i := 0; i < 100; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
require.NotNil(t, iter)
testIteratorBackward(t, iter, int64(from), int64(from+len))
iter.Close()
for i := 0; i < 100; i++ {
from := rand.Intn(entries - 1)
len := rand.Intn(chunks*entries-from) + 1
iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD)
require.NotNil(t, iter)
require.NoError(t, err)
testIteratorBackward(t, iter, int64(from), int64(from+len))
iter.Close()
}
})
}
}

@ -5,6 +5,7 @@ import (
"fmt"
"io"
"regexp"
"time"
"github.com/grafana/logish/pkg/logproto"
)
@ -261,3 +262,113 @@ func (i *regexpFilter) Next() bool {
}
return false
}
type nonOverlappingIterator struct {
labels string
i int
iterators []EntryIterator
curr EntryIterator
}
// NewNonOverlappingIterator gives a chained iterator over the iterators.
func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIterator {
return &nonOverlappingIterator{
labels: labels,
iterators: iterators,
}
}
func (i *nonOverlappingIterator) Next() bool {
for i.curr == nil || !i.curr.Next() {
if i.i >= len(i.iterators) {
return false
}
i.curr = i.iterators[i.i]
i.i++
}
return true
}
func (i *nonOverlappingIterator) Entry() logproto.Entry {
return i.curr.Entry()
}
func (i *nonOverlappingIterator) Labels() string {
return i.labels
}
func (i *nonOverlappingIterator) Error() error {
return nil
}
func (i *nonOverlappingIterator) Close() error {
return nil
}
type timeRangedIterator struct {
EntryIterator
mint, maxt time.Time
}
func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator {
return &timeRangedIterator{
EntryIterator: it,
mint: mint,
maxt: maxt,
}
}
func (i *timeRangedIterator) Next() bool {
ok := i.EntryIterator.Next()
ts := i.EntryIterator.Entry().Timestamp
for ok && i.mint.After(ts) {
ok = i.EntryIterator.Next()
ts = i.EntryIterator.Entry().Timestamp
}
if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive.
ok = false
}
return ok
}
type entryIteratorBackward struct {
cur logproto.Entry
entries []logproto.Entry
}
func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) {
entries := make([]logproto.Entry, 0, 128)
for it.Next() {
entries = append(entries, it.Entry())
}
return &entryIteratorBackward{entries: entries}, it.Error()
}
func (i *entryIteratorBackward) Next() bool {
if len(i.entries) == 0 {
return false
}
i.cur = i.entries[len(i.entries)-1]
i.entries = i.entries[:len(i.entries)-1]
return true
}
func (i *entryIteratorBackward) Entry() logproto.Entry {
return i.cur
}
func (i *entryIteratorBackward) Close() error { return nil }
func (i *entryIteratorBackward) Error() error { return nil }
func (i *entryIteratorBackward) Labels() string {
return ""
}

Loading…
Cancel
Save