Implement io.WriteTo by chunks. (#2962)

This allows to totally defer the buffer creation to the user.
`Marshal(w io.Writer) error` Now don't even need to create a buffer.

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/2974/head
Cyril Tovena 5 years ago committed by GitHub
parent a7d4774859
commit 8ea5fd173f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 3
      pkg/chunkenc/dumb_chunk.go
  2. 6
      pkg/chunkenc/facade.go
  3. 2
      pkg/chunkenc/interface.go
  4. 52
      pkg/chunkenc/memchunk.go
  5. 3
      pkg/ingester/stream_test.go

@ -2,6 +2,7 @@ package chunkenc
import (
"context"
"io"
"sort"
"time"
@ -107,6 +108,8 @@ func (c *dumbChunk) BytesWith(_ []byte) ([]byte, error) {
return nil, nil
}
func (c *dumbChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil }
func (c *dumbChunk) Blocks(_ time.Time, _ time.Time) []Block {
return nil
}

@ -44,12 +44,10 @@ func (f Facade) Marshal(w io.Writer) error {
if f.c == nil {
return nil
}
buf, err := f.c.Bytes()
if err != nil {
if _, err := f.c.WriteTo(w); err != nil {
return err
}
_, err = w.Write(buf)
return err
return nil
}
// UnmarshalFromBuf implements encoding.Chunk.

@ -4,6 +4,7 @@ import (
"context"
"errors"
"fmt"
"io"
"strings"
"time"
@ -105,6 +106,7 @@ type Chunk interface {
Size() int
Bytes() ([]byte, error)
BytesWith([]byte) ([]byte, error) // uses provided []byte for buffer instantiation
io.WriterTo
BlockCount() int
Utilization() float64
UncompressedSize() int

@ -257,16 +257,29 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
// BytesWith uses a provided []byte for buffer instantiation
func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
if _, err := c.WriteTo(buf); err != nil {
return nil, err
}
return buf.Bytes(), nil
}
// Bytes implements Chunk.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}
// WriteTo Implements io.WriterTo
func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
if err := c.cut(); err != nil {
return nil, err
return 0, err
}
}
crc32Hash := newCRC32()
buf := bytes.NewBuffer(b[:0])
offset := 0
offset := int64(0)
eb := encbuf{b: make([]byte, 0, 1<<10)}
@ -278,25 +291,25 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
eb.putByte(byte(c.encoding))
}
n, err := buf.Write(eb.get())
n, err := w.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write blockMeta #entries")
return offset, errors.Wrap(err, "write blockMeta #entries")
}
offset += n
offset += int64(n)
// Write Blocks.
for i, b := range c.blocks {
c.blocks[i].offset = offset
c.blocks[i].offset = int(offset)
eb.reset()
eb.putBytes(b.b)
eb.putHash(crc32Hash)
n, err := buf.Write(eb.get())
n, err := w.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write block")
return offset, errors.Wrap(err, "write block")
}
offset += n
offset += int64(n)
}
metasOffset := offset
@ -317,25 +330,22 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
}
eb.putHash(crc32Hash)
_, err = buf.Write(eb.get())
n, err = w.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write block metas")
return offset, errors.Wrap(err, "write block metas")
}
offset += int64(n)
// Write the metasOffset.
eb.reset()
eb.putBE64int(metasOffset)
_, err = buf.Write(eb.get())
eb.putBE64int(int(metasOffset))
n, err = w.Write(eb.get())
if err != nil {
return buf.Bytes(), errors.Wrap(err, "write metasOffset")
return offset, errors.Wrap(err, "write metasOffset")
}
offset += int64(n)
return buf.Bytes(), nil
}
// Bytes implements Chunk.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
return offset, nil
}
// Encoding implements Chunk.

@ -4,6 +4,7 @@ import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"testing"
@ -221,6 +222,8 @@ func (c *noopChunk) BytesWith(_ []byte) ([]byte, error) {
return nil, nil
}
func (c *noopChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil }
func (c *noopChunk) Blocks(_ time.Time, _ time.Time) []chunkenc.Block {
return nil
}

Loading…
Cancel
Save