diff --git a/pkg/chunkenc/README.md b/pkg/chunkenc/README.md new file mode 100644 index 0000000000..c7b92afec7 --- /dev/null +++ b/pkg/chunkenc/README.md @@ -0,0 +1,28 @@ +# Chunk format + +``` + | | | + | MagicNumber(4b) | version(1b) | + | | | + -------------------------------------------------- + | block-1 bytes | checksum (4b) | + -------------------------------------------------- + | block-2 bytes | checksum (4b) | + -------------------------------------------------- + | block-n bytes | checksum (4b) | + -------------------------------------------------- + | #blocks (uvarint) | + -------------------------------------------------- + | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | + ------------------------------------------------------------------- + | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | + ------------------------------------------------------------------- + | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | + ------------------------------------------------------------------- + | #entries(uvarint) | mint, maxt (varint) | offset, len (uvarint) | + ------------------------------------------------------------------- + | checksum(from #blocks) | + ------------------------------------------------------------------- + | metasOffset - offset to the point with #blocks | + -------------------------------------------------- +``` diff --git a/pkg/chunkenc/encoding_helpers.go b/pkg/chunkenc/encoding_helpers.go new file mode 100644 index 0000000000..1f3c594029 --- /dev/null +++ b/pkg/chunkenc/encoding_helpers.go @@ -0,0 +1,177 @@ +package chunkenc + +import ( + "encoding/binary" + "hash" + "hash/crc32" +) + +// enbuf is a helper type to populate a byte slice with various types. +type encbuf struct { + b []byte + c [binary.MaxVarintLen64]byte +} + +func (e *encbuf) reset() { e.b = e.b[:0] } +func (e *encbuf) get() []byte { return e.b } +func (e *encbuf) len() int { return len(e.b) } + +func (e *encbuf) putString(s string) { e.b = append(e.b, s...) } +func (e *encbuf) putBytes(b []byte) { e.b = append(e.b, b...) } +func (e *encbuf) putByte(c byte) { e.b = append(e.b, c) } + +func (e *encbuf) putBE32int(x int) { e.putBE32(uint32(x)) } +func (e *encbuf) putBE64int(x int) { e.putBE64(uint64(x)) } +func (e *encbuf) putBE64int64(x int64) { e.putBE64(uint64(x)) } +func (e *encbuf) putUvarint32(x uint32) { e.putUvarint64(uint64(x)) } +func (e *encbuf) putUvarint(x int) { e.putUvarint64(uint64(x)) } + +func (e *encbuf) putBE32(x uint32) { + binary.BigEndian.PutUint32(e.c[:], x) + e.b = append(e.b, e.c[:4]...) +} + +func (e *encbuf) putBE64(x uint64) { + binary.BigEndian.PutUint64(e.c[:], x) + e.b = append(e.b, e.c[:8]...) +} + +func (e *encbuf) putUvarint64(x uint64) { + n := binary.PutUvarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +func (e *encbuf) putVarint64(x int64) { + n := binary.PutVarint(e.c[:], x) + e.b = append(e.b, e.c[:n]...) +} + +// putVarintStr writes a string to the buffer prefixed by its varint length (in bytes!). +func (e *encbuf) putUvarintStr(s string) { + e.putUvarint(len(s)) + e.putString(s) +} + +// putHash appends a hash over the buffers current contents to the buffer. +func (e *encbuf) putHash(h hash.Hash) { + h.Reset() + _, err := h.Write(e.b) + if err != nil { + panic(err) // The CRC32 implementation does not error + } + e.b = h.Sum(e.b) +} + +// decbuf provides safe methods to extract data from a byte slice. It does all +// necessary bounds checking and advancing of the byte slice. +// Several datums can be extracted without checking for errors. However, before using +// any datum, the err() method must be checked. +type decbuf struct { + b []byte + e error +} + +func (d *decbuf) uvarint() int { return int(d.uvarint64()) } +func (d *decbuf) uvarint32() uint32 { return uint32(d.uvarint64()) } +func (d *decbuf) be32int() int { return int(d.be32()) } +func (d *decbuf) be64int64() int64 { return int64(d.be64()) } + +// crc32 returns a CRC32 checksum over the remaining bytes. +func (d *decbuf) crc32() uint32 { + return crc32.Checksum(d.b, castagnoliTable) +} + +func (d *decbuf) uvarintStr() string { + l := d.uvarint64() + if d.e != nil { + return "" + } + if len(d.b) < int(l) { + d.e = ErrInvalidSize + return "" + } + s := string(d.b[:l]) + d.b = d.b[l:] + return s +} + +func (d *decbuf) varint64() int64 { + if d.e != nil { + return 0 + } + x, n := binary.Varint(d.b) + if n < 1 { + d.e = ErrInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) uvarint64() uint64 { + if d.e != nil { + return 0 + } + x, n := binary.Uvarint(d.b) + if n < 1 { + d.e = ErrInvalidSize + return 0 + } + d.b = d.b[n:] + return x +} + +func (d *decbuf) be64() uint64 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = ErrInvalidSize + return 0 + } + x := binary.BigEndian.Uint64(d.b) + d.b = d.b[8:] + return x +} + +func (d *decbuf) be32() uint32 { + if d.e != nil { + return 0 + } + if len(d.b) < 4 { + d.e = ErrInvalidSize + return 0 + } + x := binary.BigEndian.Uint32(d.b) + d.b = d.b[4:] + return x +} + +func (d *decbuf) byte() byte { + if d.e != nil { + return 0 + } + if len(d.b) < 1 { + d.e = ErrInvalidSize + return 0 + } + x := d.b[0] + d.b = d.b[1:] + return x +} + +func (d *decbuf) decbuf(l int) decbuf { + if d.e != nil { + return decbuf{e: d.e} + } + if l > len(d.b) { + return decbuf{e: ErrInvalidSize} + } + r := decbuf{b: d.b[:l]} + d.b = d.b[l:] + return r +} + +func (d *decbuf) err() error { return d.e } +func (d *decbuf) len() int { return len(d.b) } +func (d *decbuf) get() []byte { return d.b } diff --git a/pkg/chunkenc/gzip.go b/pkg/chunkenc/gzip.go new file mode 100644 index 0000000000..b76425ac51 --- /dev/null +++ b/pkg/chunkenc/gzip.go @@ -0,0 +1,547 @@ +package chunkenc + +import ( + "bufio" + "bytes" + "compress/gzip" + "encoding/binary" + "hash" + "hash/crc32" + "io" + "math" + "time" + + "github.com/grafana/logish/pkg/logproto" + + "github.com/grafana/logish/pkg/iter" + + "github.com/pkg/errors" +) + +var ( + magicNumber = uint32(0x12EE56A) + + chunkFormatV1 = byte(1) + + // The errors on read. + ErrInvalidSize = errors.New("invalid size") + ErrInvalidFlag = errors.New("invalid flag") + ErrInvalidChecksum = errors.New("invalid checksum") + ErrOutOfOrder = errors.New("out of order sample") +) + +// The table gets initialized with sync.Once but may still cause a race +// with any other use of the crc32 package anywhere. Thus we initialize it +// before. +var castagnoliTable *crc32.Table + +func init() { + castagnoliTable = crc32.MakeTable(crc32.Castagnoli) +} + +// newCRC32 initializes a CRC32 hash with a preconfigured polynomial, so the +// polynomial may be easily changed in one location at a later time, if necessary. +func newCRC32() hash.Hash32 { + return crc32.New(castagnoliTable) +} + +// MemChunk implements compressed log chunks. +type MemChunk struct { + // The number of uncompressed bytes per block. + blockSize int + // The max number of blocks in a chunk. + maxBlocks int + + // The finished blocks. + blocks []block + + // Current in-mem block being appended to. + head *headBlock + + encoding Encoding + cw func(w io.Writer) CompressionWriter + cr func(r io.Reader) (CompressionReader, error) +} + +type block struct { + // This is compressed bytes. + b []byte + numEntries int + + mint, maxt int64 + + offset int // The offset of the block in the chunk. +} + +// This block holds the un-compressed entries. Once it has enough data, this is +// emptied into a block with only compressed entries. +type headBlock struct { + // This is the list of raw entries. + entries []entry + size int // size of uncompressed bytes. + + mint, maxt int64 +} + +func (hb *headBlock) isEmpty() bool { + return len(hb.entries) == 0 +} + +func (hb *headBlock) append(ts int64, line string) error { + if !hb.isEmpty() && hb.maxt >= ts { + return ErrOutOfOrder + } + + hb.entries = append(hb.entries, entry{ts, line}) + if hb.mint == 0 || hb.mint > ts { + hb.mint = ts + } + hb.maxt = ts + hb.size += len(line) + + return nil +} + +func (hb *headBlock) serialise(cw func(w io.Writer) CompressionWriter) ([]byte, error) { + buf := bytes.NewBuffer(make([]byte, 0, 1<<15)) // 32K. Pool it later. + encBuf := make([]byte, binary.MaxVarintLen64) + compressedWriter := cw(buf) + for _, logEntry := range hb.entries { + n := binary.PutVarint(encBuf, logEntry.t) + _, err := compressedWriter.Write(encBuf[:n]) + if err != nil { + return nil, errors.Wrap(err, "appending entry") + } + + n = binary.PutUvarint(encBuf, uint64(len(logEntry.s))) + _, err = compressedWriter.Write(encBuf[:n]) + if err != nil { + return nil, errors.Wrap(err, "appending entry") + } + _, err = compressedWriter.Write([]byte(logEntry.s)) + if err != nil { + return nil, errors.Wrap(err, "appending entry") + } + } + if err := compressedWriter.Close(); err != nil { + return nil, errors.Wrap(err, "flushing pending compress buffer") + } + + return buf.Bytes(), nil +} + +type entry struct { + t int64 + s string +} + +// NewMemChunk returns a new in-mem chunk. +func NewMemChunk(enc Encoding) *MemChunk { + c := &MemChunk{ + blockSize: 256 * 1024, // The blockSize in bytes. + blocks: []block{}, + + head: &headBlock{ + mint: math.MaxInt64, + maxt: math.MinInt64, + }, + + encoding: enc, + } + + switch enc { + case EncGZIP: + c.cw = func(w io.Writer) CompressionWriter { return gzip.NewWriter(w) } + c.cr = func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) } + default: + panic("unknown encoding") + } + + return c +} + +// NewByteChunk returns a MemChunk on the passed bytes. +func NewByteChunk(b []byte) (*MemChunk, error) { + bc := &MemChunk{ + cr: func(r io.Reader) (CompressionReader, error) { return gzip.NewReader(r) }, + } + + db := decbuf{b: b} + + // Verify the header. + m, version := db.be32(), db.byte() + if db.err() != nil { + return nil, errors.Wrap(db.err(), "verifying header") + } + if m != magicNumber { + return nil, errors.Errorf("invalid magic number %x", m) + } + if version != 1 { + return nil, errors.Errorf("invalid version %d", version) + } + + metasOffset := binary.BigEndian.Uint64(b[len(b)-8:]) + mb := b[metasOffset : len(b)-(8+4)] // storing the metasOffset + checksum of meta + db = decbuf{b: mb} + + expCRC := binary.BigEndian.Uint32(b[len(b)-(8+4):]) + if expCRC != db.crc32() { + return nil, ErrInvalidChecksum + } + + // Read the number of blocks. + num := db.uvarint() + + for i := 0; i < num; i++ { + blk := block{} + // Read #entries. + blk.numEntries = db.uvarint() + + // Read mint, maxt. + blk.mint = db.varint64() + blk.maxt = db.varint64() + + // Read offset and length. + blk.offset = db.uvarint() + l := db.uvarint() + blk.b = b[blk.offset : blk.offset+l] + + // Verify checksums. + expCRC := binary.BigEndian.Uint32(b[blk.offset+int(l):]) + if expCRC != crc32.Checksum(blk.b, castagnoliTable) { + return bc, ErrInvalidChecksum + } + + bc.blocks = append(bc.blocks, blk) + + if db.err() != nil { + return nil, errors.Wrap(db.err(), "decoding block meta") + } + } + + return bc, nil +} + +// Bytes implements Chunk. +func (c *MemChunk) Bytes() ([]byte, error) { + if c.head != nil { + // When generating the bytes, we need to flush the data held in-buffer. + c.cut() + } + crc32Hash := newCRC32() + + buf := bytes.NewBuffer(nil) + offset := 0 + + eb := encbuf{b: make([]byte, 0, 1<<10)} + + // Write the header (magicNum + version). + eb.putBE32(magicNumber) + eb.putByte(chunkFormatV1) + + n, err := buf.Write(eb.get()) + if err != nil { + return buf.Bytes(), errors.Wrap(err, "write blockMeta #entries") + } + offset += n + + // Write Blocks. + for i, b := range c.blocks { + c.blocks[i].offset = offset + + eb.reset() + eb.putBytes(b.b) + eb.putHash(crc32Hash) + + n, err := buf.Write(eb.get()) + if err != nil { + return buf.Bytes(), errors.Wrap(err, "write block") + } + offset += n + } + + metasOffset := offset + // Write the number of blocks. + eb.reset() + eb.putUvarint(len(c.blocks)) + + // Write BlockMetas. + for _, b := range c.blocks { + eb.putUvarint(b.numEntries) + eb.putVarint64(b.mint) + eb.putVarint64(b.maxt) + eb.putUvarint(b.offset) + eb.putUvarint(len(b.b)) + } + eb.putHash(crc32Hash) + + n, err = buf.Write(eb.get()) + if err != nil { + return buf.Bytes(), errors.Wrap(err, "write block metas") + } + + // Write the metasOffset. + eb.reset() + eb.putBE64int(metasOffset) + n, err = buf.Write(eb.get()) + if err != nil { + return buf.Bytes(), errors.Wrap(err, "write metasOffset") + } + + return buf.Bytes(), nil +} + +// Encoding implements Chunk. +func (c *MemChunk) Encoding() Encoding { + return c.encoding +} + +// NumSamples implements Chunk. +func (c *MemChunk) Size() int { + ne := 0 + for _, blk := range c.blocks { + ne += blk.numEntries + } + + if !c.head.isEmpty() { + ne += len(c.head.entries) + } + + return ne +} + +// SpaceFor implements Chunk. +func (c *MemChunk) SpaceFor(*logproto.Entry) bool { + return len(c.blocks) < 10 +} + +// Append implements Chunk. +func (c *MemChunk) Append(entry *logproto.Entry) error { + return c.head.append(entry.Timestamp.UnixNano(), entry.Line) +} + +// Close implements Chunk. +// TODO: Fix this to check edge cases. +func (c *MemChunk) Close() error { + return c.cut() +} + +// cut a new block and add it to finished blocks. +func (c *MemChunk) cut() error { + if c.head.isEmpty() { + return nil + } + + b, err := c.head.serialise(c.cw) + if err != nil { + return err + } + + c.blocks = append(c.blocks, block{ + b: b, + numEntries: len(c.head.entries), + mint: c.head.mint, + maxt: c.head.maxt, + }) + + c.head.entries = c.head.entries[:0] + c.head.mint = 0 // Will be set on first append. + + return nil +} + +// Bounds implements Chunk. +func (c *MemChunk) Bounds() (fromT, toT time.Time) { + var from, to int64 + if len(c.blocks) > 0 { + from = c.blocks[0].mint + to = c.blocks[len(c.blocks)-1].maxt + } + + if !c.head.isEmpty() { + if from > c.head.mint { + from = c.head.mint + } + + if to < c.head.maxt { + to = c.head.maxt + } + } + + return time.Unix(0, from), time.Unix(0, to) +} + +// Iterator implements Chunk. +func (c *MemChunk) Iterator(mintT, maxtT time.Time, direction logproto.Direction) (iter.EntryIterator, error) { + mint, maxt := mintT.UnixNano(), maxtT.UnixNano() + its := make([]iter.EntryIterator, 0, len(c.blocks)) + + for _, b := range c.blocks { + if maxt > b.mint && b.maxt > mint { + it, err := b.iterator(c.cr) + if err != nil { + return nil, err + } + + its = append(its, it) + } + } + + its = append(its, c.head.iterator(mint, maxt)) + + iterForward := iter.NewTimeRangedIterator( + iter.NewNonOverlappingIterator(its, ""), + time.Unix(0, mint), + time.Unix(0, maxt), + ) + + if direction == logproto.FORWARD { + return iterForward, nil + } + + return iter.NewEntryIteratorBackward(iterForward) +} + +func (b block) iterator(cr func(io.Reader) (CompressionReader, error)) (iter.EntryIterator, error) { + if len(b.b) == 0 { + return emptyIterator, nil + } + + r, err := cr(bytes.NewBuffer(b.b)) + if err != nil { + return nil, err + } + + s := bufio.NewReader(r) + return newBufferedIterator(s), nil +} + +func (hb *headBlock) iterator(mint, maxt int64) iter.EntryIterator { + if hb.isEmpty() || (maxt < hb.mint || hb.maxt < mint) { + return emptyIterator + } + + // We are doing a copy everytime, this is because b.entries could change completely, + // the alternate would be that we allocate a new b.entries everytime we cut a block, + // but the tradeoff is that queries to near-realtime data would be much lower than + // cutting of blocks. + + entries := make([]entry, len(hb.entries)) + copy(entries, hb.entries) + + return &listIterator{ + entries: entries, + } +} + +var emptyIterator = &listIterator{} + +type listIterator struct { + entries []entry + + cur entry +} + +func (li *listIterator) Seek(int64) bool { + return false +} + +func (li *listIterator) Next() bool { + if len(li.entries) > 0 { + li.cur = li.entries[0] + li.entries = li.entries[1:] + + return true + } + + return false +} + +func (li *listIterator) Entry() logproto.Entry { + return logproto.Entry{time.Unix(0, li.cur.t), li.cur.s} +} + +func (li *listIterator) Error() error { return nil } +func (li *listIterator) Close() error { return nil } +func (li *listIterator) Labels() string { return "" } + +type bufferedIterator struct { + s *bufio.Reader + + curT int64 + curLog string + + err error + + buf []byte // The buffer a single entry. + decBuf []byte // The buffer for decoding the lengths. +} + +func newBufferedIterator(s *bufio.Reader) *bufferedIterator { + return &bufferedIterator{ + s: s, + buf: make([]byte, 1024), + decBuf: make([]byte, binary.MaxVarintLen64), + } +} + +func (si *bufferedIterator) Seek(int64) bool { + return false +} + +func (si *bufferedIterator) Next() bool { + ts, err := binary.ReadVarint(si.s) + if err != nil { + if err != io.EOF { + si.err = err + } + return false + } + + l, err := binary.ReadUvarint(si.s) + if err != nil { + if err != io.EOF { + si.err = err + + return false + } + } + + for len(si.buf) < int(l) { + si.buf = append(si.buf, make([]byte, 1024)...) + } + + n, err := si.s.Read(si.buf[:l]) + if err != nil && err != io.EOF { + si.err = err + return false + } + if n < int(l) { + n, err = si.s.Read(si.buf[n:l]) + if err != nil { + si.err = err + return false + } + } + + si.curT = ts + si.curLog = string(si.buf[:l]) + + return true +} + +func (si *bufferedIterator) Entry() logproto.Entry { + return logproto.Entry{time.Unix(0, si.curT), si.curLog} +} + +func (si *bufferedIterator) Error() error { return si.err } +func (si *bufferedIterator) Close() error { return si.err } +func (si *bufferedIterator) Labels() string { return "" } + +type noopFlushingWriter struct { + io.WriteCloser +} + +func (noopFlushingWriter) Flush() error { + return nil +} diff --git a/pkg/chunkenc/gzip_test.go b/pkg/chunkenc/gzip_test.go new file mode 100644 index 0000000000..44b67957f3 --- /dev/null +++ b/pkg/chunkenc/gzip_test.go @@ -0,0 +1,180 @@ +package chunkenc + +import ( + "bytes" + "fmt" + "io/ioutil" + "math" + "testing" + "time" + + "github.com/grafana/logish/pkg/logproto" + + "github.com/stretchr/testify/require" +) + +func TestGZIPBlock(t *testing.T) { + chk := NewMemChunk(EncGZIP) + + cases := []struct { + ts int64 + str string + cut bool + }{ + { + ts: 1, + str: "hello, world!", + }, + { + ts: 2, + str: "hello, world2!", + }, + { + ts: 3, + str: "hello, world3!", + }, + { + ts: 4, + str: "hello, world4!", + }, + { + ts: 5, + str: "hello, world5!", + }, + { + ts: 6, + str: "hello, world6!", + cut: true, + }, + { + ts: 7, + str: "hello, world7!", + }, + { + ts: 8, + str: "hello, worl\nd8!", + }, + { + ts: 9, + str: "", + }, + } + + for _, c := range cases { + require.NoError(t, chk.Append(logprotoEntry(c.ts, c.str))) + if c.cut { + require.NoError(t, chk.cut()) + } + } + + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + require.NoError(t, err) + + idx := 0 + for it.Next() { + e := it.Entry() + require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano()) + require.Equal(t, cases[idx].str, e.Line) + idx++ + } + + require.NoError(t, it.Error()) + require.Equal(t, len(cases), idx) + + t.Run("bounded-iteration", func(t *testing.T) { + it, err := chk.Iterator(time.Unix(0, 3), time.Unix(0, 7), logproto.FORWARD) + require.NoError(t, err) + + idx := 2 + for it.Next() { + e := it.Entry() + require.Equal(t, cases[idx].ts, e.Timestamp.UnixNano()) + require.Equal(t, cases[idx].str, e.Line) + idx++ + } + require.NoError(t, it.Error()) + require.Equal(t, 6, idx) + }) +} + +func TestGZIPCompression(t *testing.T) { + if testing.Short() { + t.SkipNow() + } + + b, err := ioutil.ReadFile("NASA_access_log_Aug95") + if err != nil { + t.SkipNow() + } + + lines := bytes.Split(b, []byte("\n")) + fmt.Println(len(lines)) + + for _, blockSize := range []int{4 * 1024, 8 * 1024, 16 * 1024, 32 * 1024, 64 * 1024, 128 * 1024, 256 * 1024, 512 * 1024} { + testName := fmt.Sprintf("%d", blockSize/1024) + t.Run(testName, func(t *testing.T) { + chk := NewMemChunk(EncGZIP) + chk.blockSize = blockSize + + for i, l := range lines { + require.NoError(t, chk.Append(logprotoEntry(int64(i), string(l)))) + } + + b2, err := chk.Bytes() + require.NoError(t, err) + fmt.Println(float64(len(b))/(1024*1024), float64(len(b2))/(1024*1024), float64(len(b2))/float64(len(chk.blocks))) + + it, err := chk.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + require.NoError(t, err) + + for i, l := range lines { + require.True(t, it.Next()) + + e := it.Entry() + require.Equal(t, int64(i), e.Timestamp.UnixNano()) + require.Equal(t, string(l), e.Line) + } + require.NoError(t, it.Error()) + }) + } +} + +func TestGZIPSerialisation(t *testing.T) { + chk := NewMemChunk(EncGZIP) + + numSamples := 500000 + + for i := 0; i < numSamples; i++ { + require.NoError(t, chk.Append(logprotoEntry(int64(i), string(i)))) + } + + byt, err := chk.Bytes() + require.NoError(t, err) + + bc, err := NewByteChunk(byt) + require.NoError(t, err) + + it, err := bc.Iterator(time.Unix(0, 0), time.Unix(0, math.MaxInt64), logproto.FORWARD) + require.NoError(t, err) + for i := 0; i < numSamples; i++ { + require.True(t, it.Next()) + + e := it.Entry() + require.Equal(t, int64(i), e.Timestamp.UnixNano()) + require.Equal(t, string(i), e.Line) + } + + require.NoError(t, it.Error()) + + byt2, err := chk.Bytes() + require.NoError(t, err) + + require.True(t, bytes.Equal(byt, byt2)) +} + +func logprotoEntry(ts int64, line string) *logproto.Entry { + return &logproto.Entry{ + Timestamp: time.Unix(0, ts), + Line: line, + } +} diff --git a/pkg/chunkenc/interface.go b/pkg/chunkenc/interface.go new file mode 100644 index 0000000000..d3565824d6 --- /dev/null +++ b/pkg/chunkenc/interface.go @@ -0,0 +1,39 @@ +package chunkenc + +import ( + "io" +) + +// Encoding is the identifier for a chunk encoding. +type Encoding uint8 + +// The different available encodings. +const ( + EncNone Encoding = iota + EncGZIP +) + +func (e Encoding) String() string { + switch e { + case EncGZIP: + return "gzip" + case EncNone: + return "none" + default: + return "unknown" + } +} + +// CompressionWriter is the writer that compresses the data passed to it. +type CompressionWriter interface { + Write(p []byte) (int, error) + Close() error + Flush() error + Reset(w io.Writer) +} + +// CompressionReader reads the compressed data. +type CompressionReader interface { + Read(p []byte) (int, error) + Reset(r io.Reader) error +} diff --git a/pkg/ingester/chunk.go b/pkg/ingester/chunk.go index ca4aa00c81..7b19d03531 100644 --- a/pkg/ingester/chunk.go +++ b/pkg/ingester/chunk.go @@ -15,16 +15,18 @@ const ( tmpNumEntries = 1024 ) +// Errors returned by the chunk interface. var ( ErrChunkFull = errors.New("Chunk full") ErrOutOfOrder = errors.New("Entry out of order") ) +// Chunk is the interface for the compressed logs chunk format. type Chunk interface { Bounds() (time.Time, time.Time) SpaceFor(*logproto.Entry) bool - Push(*logproto.Entry) error - Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator + Append(*logproto.Entry) error + Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) Size() int } @@ -47,7 +49,7 @@ func (c *dumbChunk) SpaceFor(_ *logproto.Entry) bool { return len(c.entries) < tmpNumEntries } -func (c *dumbChunk) Push(entry *logproto.Entry) error { +func (c *dumbChunk) Append(entry *logproto.Entry) error { if len(c.entries) == tmpNumEntries { return ErrChunkFull } @@ -66,7 +68,7 @@ func (c *dumbChunk) Size() int { // Returns an iterator that goes from _most_ recent to _least_ recent (ie, // backwards). -func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator { +func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) { i := sort.Search(len(c.entries), func(i int) bool { return !from.After(c.entries[i].Timestamp) }) @@ -76,7 +78,7 @@ func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Directi log.Println("from", from, "through", through, "i", i, "j", j, "entries", len(c.entries)) if from == through { - return nil + return nil, nil } start := -1 @@ -89,7 +91,7 @@ func (c *dumbChunk) Iterator(from, through time.Time, direction logproto.Directi direction: direction, i: start, entries: c.entries[i:j], - } + }, nil } type dumbChunkIterator struct { diff --git a/pkg/ingester/chunk_test.go b/pkg/ingester/chunk_test.go index e3b7edf46c..194dc8fc8a 100644 --- a/pkg/ingester/chunk_test.go +++ b/pkg/ingester/chunk_test.go @@ -6,6 +6,7 @@ import ( "testing" "time" + "github.com/grafana/logish/pkg/chunkenc" "github.com/grafana/logish/pkg/iter" "github.com/grafana/logish/pkg/logproto" "github.com/stretchr/testify/assert" @@ -37,32 +38,42 @@ func testIteratorBackward(t *testing.T, iter iter.EntryIterator, from, through i } func TestIterator(t *testing.T) { - chunk := newChunk() const entries = 100 - for i := int64(0); i < entries; i++ { - err := chunk.Push(&logproto.Entry{ - Timestamp: time.Unix(i, 0), - Line: fmt.Sprintf("line %d", i), - }) - require.NoError(t, err) - } + for _, chk := range []struct { + name string + new func() Chunk + }{ + {"dumbChunk", newChunk}, + {"gzipChunk", func() Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }}, + } { + t.Run(chk.name, func(t *testing.T) { + chunk := chk.new() + for i := int64(0); i < entries; i++ { + err := chunk.Append(&logproto.Entry{ + Timestamp: time.Unix(i, 0), + Line: fmt.Sprintf("line %d", i), + }) + require.NoError(t, err) + } - for i := 0; i < entries; i++ { - from := rand.Intn(entries - 1) - len := rand.Intn(entries-from) + 1 - iter := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) - require.NotNil(t, iter) - testIteratorForward(t, iter, int64(from), int64(from+len)) - iter.Close() - } + for i := 0; i < entries; i++ { + from := rand.Intn(entries - 1) + len := rand.Intn(entries-from) + 1 + iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) + require.NoError(t, err) + testIteratorForward(t, iter, int64(from), int64(from+len)) + iter.Close() + } - for i := 0; i < entries; i++ { - from := rand.Intn(entries - 1) - len := rand.Intn(entries-from) + 1 - iter := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) - require.NotNil(t, iter) - testIteratorBackward(t, iter, int64(from), int64(from+len)) - iter.Close() + for i := 0; i < entries; i++ { + from := rand.Intn(entries - 1) + len := rand.Intn(entries-from) + 1 + iter, err := chunk.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) + require.NoError(t, err) + testIteratorBackward(t, iter, int64(from), int64(from+len)) + iter.Close() + } + }) } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 17f3f7f7c5..80a2186606 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -84,7 +84,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance() + inst = newInstance(instanceID) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 5ae6bfd515..fda5e5b26a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -7,6 +7,7 @@ import ( "sync" "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "github.com/grafana/logish/pkg/iter" "github.com/grafana/logish/pkg/logproto" @@ -21,16 +22,38 @@ var ( ErrStreamMissing = errors.New("Stream missing") ) +var ( + streamsCreatedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "logish", + Name: "ingester_streams_created_total", + Help: "The total number of streams created in the ingester.", + }, []string{"org"}) + streamsRemovedTotal = prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "logish", + Name: "ingester_streams_removed_total", + Help: "The total number of streams removed by the ingester.", + }, []string{"org"}) +) + +func init() { + prometheus.MustRegister(streamsCreatedTotal) + prometheus.MustRegister(streamsRemovedTotal) +} + type instance struct { streamsMtx sync.Mutex streams map[string]*stream index *invertedIndex + + instanceID string } -func newInstance() *instance { +func newInstance(instanceID string) *instance { + streamsCreatedTotal.WithLabelValues(instanceID).Inc() return &instance{ - streams: map[string]*stream{}, - index: newInvertedIndex(), + streams: map[string]*stream{}, + index: newInvertedIndex(), + instanceID: instanceID, } } @@ -75,7 +98,10 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie i.streamsMtx.Unlock() return ErrStreamMissing } - iterators[j] = stream.Iterator(req.Start, req.End, req.Direction) + iterators[j], err = stream.Iterator(req.Start, req.End, req.Direction) + if err != nil { + return err + } } i.streamsMtx.Unlock() diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index cca0c54317..c305a4dab4 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -4,12 +4,42 @@ import ( "context" "time" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" + "github.com/grafana/logish/pkg/chunkenc" "github.com/grafana/logish/pkg/iter" "github.com/grafana/logish/pkg/logproto" ) +var ( + chunksCreatedTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "logish", + Name: "ingester_chunks_created_total", + Help: "The total number of chunks created in the ingester.", + }) + chunksFlushedTotal = prometheus.NewCounter(prometheus.CounterOpts{ + Namespace: "logish", + Name: "ingester_chunks_flushed_total", + Help: "The total number of chunks flushed by the ingester.", + }) + + samplesPerChunk = prometheus.NewHistogram(prometheus.HistogramOpts{ + Namespace: "logish", + Subsystem: "ingester", + Name: "samples_per_chunk", + Help: "The number of samples in a chunk.", + + Buckets: prometheus.LinearBuckets(4096, 2048, 6), + }) +) + +func init() { + prometheus.MustRegister(chunksCreatedTotal) + prometheus.MustRegister(chunksFlushedTotal) + prometheus.MustRegister(samplesPerChunk) +} + const tmpMaxChunks = 3 type stream struct { @@ -27,30 +57,37 @@ func newStream(labels labels.Labels) *stream { func (s *stream) Push(ctx context.Context, entries []logproto.Entry) error { if len(s.chunks) == 0 { - s.chunks = append(s.chunks, newChunk()) + s.chunks = append(s.chunks, chunkenc.NewMemChunk(chunkenc.EncGZIP)) + chunksCreatedTotal.Inc() } for i := range entries { if !s.chunks[0].SpaceFor(&entries[i]) { - s.chunks = append([]Chunk{newChunk()}, s.chunks...) + samplesPerChunk.Observe(float64(s.chunks[0].Size())) + s.chunks = append([]Chunk{chunkenc.NewMemChunk(chunkenc.EncGZIP)}, s.chunks...) + chunksCreatedTotal.Inc() } - if err := s.chunks[0].Push(&entries[i]); err != nil { + if err := s.chunks[0].Append(&entries[i]); err != nil { return err } } // Temp; until we implement flushing, only keep N chunks in memory. if len(s.chunks) > tmpMaxChunks { + chunksFlushedTotal.Add(float64(len(s.chunks) - tmpMaxChunks)) s.chunks = s.chunks[:tmpMaxChunks] } return nil } // Returns an iterator. -func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) iter.EntryIterator { +func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) { iterators := make([]iter.EntryIterator, 0, len(s.chunks)) for _, c := range s.chunks { - iter := c.Iterator(from, through, direction) + iter, err := c.Iterator(from, through, direction) + if err != nil { + return nil, err + } if iter != nil { iterators = append(iterators, iter) } @@ -62,44 +99,5 @@ func (s *stream) Iterator(from, through time.Time, direction logproto.Direction) } } - return &nonOverlappingIterator{ - labels: s.labels.String(), - iterators: iterators, - } -} - -type nonOverlappingIterator struct { - labels string - i int - iterators []iter.EntryIterator - curr iter.EntryIterator -} - -func (i *nonOverlappingIterator) Next() bool { - for i.curr == nil || !i.curr.Next() { - if i.i >= len(i.iterators) { - return false - } - - i.curr = i.iterators[i.i] - i.i++ - } - - return true -} - -func (i *nonOverlappingIterator) Entry() logproto.Entry { - return i.curr.Entry() -} - -func (i *nonOverlappingIterator) Labels() string { - return i.labels -} - -func (i *nonOverlappingIterator) Error() error { - return nil -} - -func (i *nonOverlappingIterator) Close() error { - return nil + return iter.NewNonOverlappingIterator(iterators, s.labels.String()), nil } diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index b5352c26c1..aca8548050 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -6,43 +6,57 @@ import ( "testing" "time" + "github.com/grafana/logish/pkg/chunkenc" "github.com/grafana/logish/pkg/logproto" "github.com/stretchr/testify/require" ) func TestStreamIterator(t *testing.T) { - var s stream const chunks = 3 const entries = 100 - for i := int64(0); i < chunks; i++ { - chunk := newChunk() - for j := int64(0); j < entries; j++ { - k := i*entries + j - err := chunk.Push(&logproto.Entry{ - Timestamp: time.Unix(k, 0), - Line: fmt.Sprintf("line %d", k), - }) - require.NoError(t, err) - } - s.chunks = append([]Chunk{chunk}, s.chunks...) - } + for _, chk := range []struct { + name string + new func() Chunk + }{ + {"dumbChunk", newChunk}, + {"gzipChunk", func() Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP) }}, + } { + t.Run(chk.name, func(t *testing.T) { + var s stream + for i := int64(0); i < chunks; i++ { + chunk := chk.new() + for j := int64(0); j < entries; j++ { + k := i*entries + j + err := chunk.Append(&logproto.Entry{ + Timestamp: time.Unix(k, 0), + Line: fmt.Sprintf("line %d", k), + }) + require.NoError(t, err) + } + s.chunks = append([]Chunk{chunk}, s.chunks...) + } - for i := 0; i < 100; i++ { - from := rand.Intn(chunks*entries - 1) - len := rand.Intn(chunks*entries-from) + 1 - iter := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) - require.NotNil(t, iter) - testIteratorForward(t, iter, int64(from), int64(from+len)) - iter.Close() - } + for i := 0; i < 100; i++ { + from := rand.Intn(chunks*entries - 1) + len := rand.Intn(chunks*entries-from) + 1 + iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.FORWARD) + require.NotNil(t, iter) + require.NoError(t, err) + testIteratorForward(t, iter, int64(from), int64(from+len)) + iter.Close() + } - for i := 0; i < 100; i++ { - from := rand.Intn(entries - 1) - len := rand.Intn(chunks*entries-from) + 1 - iter := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) - require.NotNil(t, iter) - testIteratorBackward(t, iter, int64(from), int64(from+len)) - iter.Close() + for i := 0; i < 100; i++ { + from := rand.Intn(entries - 1) + len := rand.Intn(chunks*entries-from) + 1 + iter, err := s.Iterator(time.Unix(int64(from), 0), time.Unix(int64(from+len), 0), logproto.BACKWARD) + require.NotNil(t, iter) + require.NoError(t, err) + testIteratorBackward(t, iter, int64(from), int64(from+len)) + iter.Close() + } + }) } + } diff --git a/pkg/iter/iterator.go b/pkg/iter/iterator.go index 4a03cc8290..04e4326a3f 100644 --- a/pkg/iter/iterator.go +++ b/pkg/iter/iterator.go @@ -5,6 +5,7 @@ import ( "fmt" "io" "regexp" + "time" "github.com/grafana/logish/pkg/logproto" ) @@ -261,3 +262,113 @@ func (i *regexpFilter) Next() bool { } return false } + +type nonOverlappingIterator struct { + labels string + i int + iterators []EntryIterator + curr EntryIterator +} + +// NewNonOverlappingIterator gives a chained iterator over the iterators. +func NewNonOverlappingIterator(iterators []EntryIterator, labels string) EntryIterator { + return &nonOverlappingIterator{ + labels: labels, + iterators: iterators, + } +} + +func (i *nonOverlappingIterator) Next() bool { + for i.curr == nil || !i.curr.Next() { + if i.i >= len(i.iterators) { + return false + } + + i.curr = i.iterators[i.i] + i.i++ + } + + return true +} + +func (i *nonOverlappingIterator) Entry() logproto.Entry { + return i.curr.Entry() +} + +func (i *nonOverlappingIterator) Labels() string { + return i.labels +} + +func (i *nonOverlappingIterator) Error() error { + return nil +} + +func (i *nonOverlappingIterator) Close() error { + return nil +} + +type timeRangedIterator struct { + EntryIterator + mint, maxt time.Time +} + +func NewTimeRangedIterator(it EntryIterator, mint, maxt time.Time) EntryIterator { + return &timeRangedIterator{ + EntryIterator: it, + mint: mint, + maxt: maxt, + } +} + +func (i *timeRangedIterator) Next() bool { + ok := i.EntryIterator.Next() + + ts := i.EntryIterator.Entry().Timestamp + for ok && i.mint.After(ts) { + ok = i.EntryIterator.Next() + ts = i.EntryIterator.Entry().Timestamp + } + + if ok && (i.maxt.Before(ts) || i.maxt.Equal(ts)) { // The maxt is exclusive. + ok = false + } + + return ok +} + +type entryIteratorBackward struct { + cur logproto.Entry + entries []logproto.Entry +} + +func NewEntryIteratorBackward(it EntryIterator) (EntryIterator, error) { + entries := make([]logproto.Entry, 0, 128) + for it.Next() { + entries = append(entries, it.Entry()) + } + + return &entryIteratorBackward{entries: entries}, it.Error() +} + +func (i *entryIteratorBackward) Next() bool { + if len(i.entries) == 0 { + return false + } + + i.cur = i.entries[len(i.entries)-1] + i.entries = i.entries[:len(i.entries)-1] + + return true +} + +func (i *entryIteratorBackward) Entry() logproto.Entry { + return i.cur +} + +func (i *entryIteratorBackward) Close() error { return nil } + +func (i *entryIteratorBackward) Error() error { return nil } + +func (i *entryIteratorBackward) Labels() string { + return "" +}