Adds WAL support (experimental) (#2981)

* marshalable chunks

* wal record types custom serialization

* proto types for wal checkpoints

* byteswith output unaffected by buffer

* wal & record pool ifcs

* wal record can hold entries from multiple series

* entry pool

* ingester uses noopWal

* removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding.

* segment writing

* [WIP] wal recovery from segments

* replay uses sync.Maps & preserves WAL fingerprints

* in memory wal recovery

* wal segment recovery

* ingester metrics struct

* wal replay locks streamsMtx in instances, adds checkpoint codec

* ingester metrics

* checkpointer

* WAL checkpoint writer

* checkpointwriter can write multiple checkpoints

* reorgs checkpointing

* wires up checkpointwriter to wal

* ingester SeriesIter impl

* wires up ingesterRecoverer to consume checkpoints

* generic recovery fn

* generic recovery fn

* recover from both wal types

* cleans up old tmp checkpoints & allows aborting in flight checkpoints

* wires up wal checkpointing

* more granular wal logging

* fixes off by 1 wal truncation & removes double logging

* adds userID to wal records correctly

* wire chunk encoding tests

* more granular wal metrics

* checkpoint encoding test

* ignores debug bins

* segment replay ignores out of orders

* fixes bug between WAL reading []byte validity and proto unmarshalling refs

* conf validations, removes comments

* flush on shutdown config

* POST /ingester/shutdown

* renames flush on shutdown

* wal & checkpoint use same segment size

* writes entries to wal regardless of tailers

* makes wal checkpoing duration default to 5m

* recovery metrics

* encodes headchunks separately for wal purposes

* merge upstream

* linting

* addresses pr feedback

uses entry pool in stream push/tailer

removes unnecessary pool interaction

checkpointbytes comment

fillchunk helper, record resetting in tests via pool

redundant comment

defers wg done in recovery

s/num/count/

checkpoint wal uses a logger

encodeWithTypeHeader now creates its own []byte

removes pool from decodeEntries

wal stop can error

* prevent shared access bug with tailers and entry pool

* removes stream push entry pool optimization
pull/3002/head
Owen Diehl 5 years ago committed by GitHub
parent ae9c4b82ec
commit 4d9865acd4
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      .gitignore
  2. 4
      cmd/loki/loki-local-config.yaml
  3. 10
      docs/sources/api/_index.md
  4. 13
      pkg/chunkenc/encoding_helpers.go
  5. 130
      pkg/chunkenc/memchunk.go
  6. 63
      pkg/chunkenc/memchunk_test.go
  7. 1
      pkg/chunkenc/util_test.go
  8. 464
      pkg/ingester/checkpoint.go
  9. 241
      pkg/ingester/checkpoint.pb.go
  10. 10
      pkg/ingester/checkpoint.proto
  11. 148
      pkg/ingester/encoding.go
  12. 198
      pkg/ingester/encoding_test.go
  13. 4
      pkg/ingester/flush.go
  14. 148
      pkg/ingester/ingester.go
  15. 47
      pkg/ingester/ingester_test.go
  16. 79
      pkg/ingester/instance.go
  17. 40
      pkg/ingester/instance_test.go
  18. 87
      pkg/ingester/metrics.go
  19. 395
      pkg/ingester/recovery.go
  20. 192
      pkg/ingester/recovery_test.go
  21. 102
      pkg/ingester/stream.go
  22. 97
      pkg/ingester/stream_test.go
  23. 6
      pkg/ingester/transfer.go
  24. 210
      pkg/ingester/wal.go
  25. 3
      pkg/loki/loki.go
  26. 2
      pkg/loki/modules.go

1
.gitignore vendored

@ -5,6 +5,7 @@
*.output
*.tgz
*.exe
__debug_bin
requirements.lock
mixin/vendor/
cmd/loki/loki

@ -4,6 +4,10 @@ server:
http_listen_port: 3100
ingester:
wal:
enabled: true
dir: /tmp/wal
recover: true
lifecycler:
address: 127.0.0.1
ring:

@ -37,6 +37,7 @@ The HTTP API includes the following endpoints:
- [Examples](#examples-8)
- [`GET /ready`](#get-ready)
- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)
- [`GET /metrics`](#get-metrics)
- [Series](#series)
- [Examples](#examples-9)
@ -107,6 +108,7 @@ While these endpoints are exposed by just the distributor:
And these endpoints are exposed by just the ingester:
- [`POST /flush`](#post-flush)
- [`POST /ingester/shutdown`](#post-shutdown)
The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.
@ -844,6 +846,14 @@ backing store. Mainly used for local testing.
In microservices mode, the `/flush` endpoint is exposed by the ingester.
## `POST /ingester/shutdown`
`/ingester/shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.
In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.
## `GET /metrics`
`/metrics` exposes Prometheus metrics. See

@ -119,4 +119,17 @@ func (d *decbuf) byte() byte {
return x
}
func (d *decbuf) bytes(n int) []byte {
if d.e != nil {
return nil
}
if len(d.b) < n {
d.e = ErrInvalidSize
return nil
}
x := d.b[:n]
d.b = d.b[n:]
return x
}
func (d *decbuf) err() error { return d.e }

@ -152,6 +152,94 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) {
return outBuf.Bytes(), nil
}
// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing,
// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but
// needs to serialize/deserialize the data to disk to ensure data durability.
func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) {
encB := BytesBufferPool.Get(1 << 10).([]byte)
defer func() {
BytesBufferPool.Put(encB[:0])
}()
buf := bytes.NewBuffer(make([]byte, 0, 1<<10))
eb := encbuf{b: encB}
eb.putByte(version)
_, err := buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock version")
}
eb.reset()
eb.putUvarint(len(hb.entries))
eb.putUvarint(hb.size)
eb.putVarint64(hb.mint)
eb.putVarint64(hb.maxt)
_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock metas")
}
eb.reset()
for _, entry := range hb.entries {
eb.putVarint64(entry.t)
eb.putUvarint(len(entry.s))
_, err = buf.Write(eb.get())
if err != nil {
return nil, errors.Wrap(err, "write headBlock entry ts")
}
eb.reset()
_, err := buf.WriteString(entry.s)
if err != nil {
return nil, errors.Wrap(err, "write headblock entry line")
}
}
return buf.Bytes(), nil
}
func (hb *headBlock) FromCheckpoint(b []byte) error {
if len(b) < 1 {
return nil
}
db := decbuf{b: b}
version := db.byte()
if db.err() != nil {
return errors.Wrap(db.err(), "verifying headblock header")
}
if version != chunkFormatV3 {
return errors.New("incompatible headBlock version, only V3 is currently supported")
}
ln := db.uvarint()
hb.size = db.uvarint()
hb.mint = db.varint64()
hb.maxt = db.varint64()
if err := db.err(); err != nil {
return errors.Wrap(err, "verifying headblock metadata")
}
hb.entries = make([]entry, ln)
for i := 0; i < ln && db.err() == nil; i++ {
var entry entry
entry.t = db.varint64()
lineLn := db.uvarint()
entry.s = string(db.bytes(lineLn))
hb.entries[i] = entry
}
if err := db.err(); err != nil {
return errors.Wrap(err, "decoding entries")
}
return nil
}
type entry struct {
t int64
s string
@ -256,6 +344,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) {
}
// BytesWith uses a provided []byte for buffer instantiation
// NOTE: This does not cut the head block nor include any head block data.
func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
buf := bytes.NewBuffer(b[:0])
if _, err := c.WriteTo(buf); err != nil {
@ -265,18 +354,18 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) {
}
// Bytes implements Chunk.
// NOTE: Does not cut head block or include any head block data.
func (c *MemChunk) Bytes() ([]byte, error) {
return c.BytesWith(nil)
}
// WriteTo Implements io.WriterTo
// NOTE: Does not cut head block or include any head block data.
// For this to be the case you must call Close() first.
// This decision notably enables WAL checkpointing, which would otherwise
// result in different content addressable chunks in storage based on the timing of when
// they were checkpointed (which would cause new blocks to be cut early).
func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
if c.head != nil {
// When generating the bytes, we need to flush the data held in-buffer.
if err := c.cut(); err != nil {
return 0, err
}
}
crc32Hash := newCRC32()
offset := int64(0)
@ -348,6 +437,35 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) {
return offset, nil
}
// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually
// flushed chunks don't have different substructures depending on when they were checkpointed.
// In turn this allows us to maintain a more effective dedupe ratio in storage.
func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) {
chk, err = c.BytesWith(b)
if err != nil {
return nil, nil, err
}
if c.head.isEmpty() {
return chk, nil, nil
}
head, err = c.head.CheckpointBytes(c.format)
if err != nil {
return nil, nil, err
}
return chk, head, nil
}
func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) {
mc, err := NewByteChunk(chk, blockSize, targetSize)
if err != nil {
return nil, err
}
return mc, mc.head.FromCheckpoint(head)
}
// Encoding implements Chunk.
func (c *MemChunk) Encoding() Encoding {
return c.encoding

@ -294,6 +294,7 @@ func TestSerialization(t *testing.T) {
for i := 0; i < numSamples; i++ {
require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i))))
}
require.NoError(t, chk.Close())
byt, err := chk.Bytes()
require.NoError(t, err)
@ -840,6 +841,68 @@ func TestBytesWith(t *testing.T) {
require.Equal(t, exp, out)
}
func TestHeadBlockCheckpointing(t *testing.T) {
c := NewMemChunk(EncSnappy, 256*1024, 1500*1024)
// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// ensure blocks are not cut
require.Equal(t, 0, len(c.blocks))
b, err := c.head.CheckpointBytes(c.format)
require.Nil(t, err)
hb := &headBlock{}
require.Nil(t, hb.FromCheckpoint(b))
require.Equal(t, c.head, hb)
}
func TestCheckpointEncoding(t *testing.T) {
blockSize, targetSize := 256*1024, 1500*1024
c := NewMemChunk(EncSnappy, blockSize, targetSize)
// add a few entries
for i := 0; i < 5; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// cut it
require.Nil(t, c.cut())
// add a few more to head
for i := 5; i < 10; i++ {
entry := &logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("hi there - %d", i),
}
require.Equal(t, true, c.SpaceFor(entry))
require.Nil(t, c.Append(entry))
}
// ensure new blocks are not cut
require.Equal(t, 1, len(c.blocks))
chk, head, err := c.SerializeForCheckpoint(nil)
require.Nil(t, err)
cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize)
require.Nil(t, err)
require.Equal(t, c, cpy)
}
var streams = []logproto.Stream{}
var series = []logproto.Series{}

@ -52,5 +52,6 @@ func fillChunk(c Chunk) int64 {
entry.Line = testdata.LogString(i)
}
_ = c.Close()
return inserted
}

@ -1,14 +1,29 @@
package ingester
import (
fmt "fmt"
"io/ioutil"
"os"
"path/filepath"
"regexp"
"strconv"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/pkg/errors"
tsdb_errors "github.com/prometheus/prometheus/tsdb/errors"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/grafana/loki/pkg/chunkenc"
)
// The passed wireChunks slice is for re-use.
// nolint(deadcode)
func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
func toWireChunks(descs []chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
if cap(wireChunks) < len(descs) {
wireChunks = make([]Chunk, len(descs))
} else {
@ -17,10 +32,12 @@ func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
for i, d := range descs {
from, to := d.chunk.Bounds()
wireChunk := Chunk{
From: from,
To: to,
Closed: d.closed,
FlushedAt: d.flushed,
From: from,
To: to,
Closed: d.closed,
FlushedAt: d.flushed,
LastUpdated: d.lastUpdated,
Synced: d.synced,
}
slice := wireChunks[i].Data[:0] // try to re-use the memory from last time
@ -28,28 +45,29 @@ func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) {
slice = make([]byte, 0, d.chunk.CompressedSize())
}
out, err := d.chunk.BytesWith(slice)
chk, head, err := d.chunk.SerializeForCheckpoint(slice)
if err != nil {
return nil, err
}
wireChunk.Data = out
wireChunk.Data = chk
wireChunk.Head = head
wireChunks[i] = wireChunk
}
return wireChunks, nil
}
// nolint(deadcode)
func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) {
descs := make([]*chunkDesc, 0, len(wireChunks))
func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) {
descs := make([]chunkDesc, 0, len(wireChunks))
for _, c := range wireChunks {
desc := &chunkDesc{
desc := chunkDesc{
closed: c.Closed,
synced: c.Synced,
flushed: c.FlushedAt,
lastUpdated: time.Now(),
lastUpdated: c.LastUpdated,
}
mc, err := chunkenc.NewByteChunk(c.Data, conf.BlockSize, conf.TargetChunkSize)
mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, conf.BlockSize, conf.TargetChunkSize)
if err != nil {
return nil, err
}
@ -59,3 +77,421 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) {
}
return descs, nil
}
// nolint:interfacer
func decodeCheckpointRecord(rec []byte, s *Series) error {
//TODO(owen-d): reduce allocs
// The proto unmarshaling code will retain references to the underlying []byte it's passed
// in order to reduce allocs. This is harmful to us because when reading from a WAL, the []byte
// is only guaranteed to be valid between calls to Next().
// Therefore, we copy it to avoid this problem.
cpy := make([]byte, len(rec))
copy(cpy, rec)
switch RecordType(cpy[0]) {
case CheckpointRecord:
return proto.Unmarshal(cpy[1:], s)
default:
return errors.Errorf("unexpected record type: %d", rec[0])
}
}
func encodeWithTypeHeader(m proto.Message, typ RecordType) ([]byte, error) {
buf, err := proto.Marshal(m)
if err != nil {
return nil, err
}
b := make([]byte, 0, len(buf)+1)
b = append(b, byte(typ))
b = append(b, buf...)
return b, nil
}
type SeriesWithErr struct {
Err error
Series *Series
}
type SeriesIter interface {
Count() int
Iter() <-chan *SeriesWithErr
Stop()
}
type ingesterSeriesIter struct {
ing *Ingester
done chan struct{}
}
func newIngesterSeriesIter(ing *Ingester) *ingesterSeriesIter {
return &ingesterSeriesIter{
ing: ing,
done: make(chan struct{}),
}
}
func (i *ingesterSeriesIter) Count() (ct int) {
for _, inst := range i.ing.getInstances() {
ct += inst.numStreams()
}
return ct
}
func (i *ingesterSeriesIter) Stop() {
close(i.done)
}
func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr {
ch := make(chan *SeriesWithErr)
go func() {
for _, inst := range i.ing.getInstances() {
inst.streamsMtx.RLock()
// Need to buffer streams internally so the read lock isn't held trying to write to a blocked channel.
streams := make([]*stream, 0, len(inst.streams))
inst.streamsMtx.RUnlock()
_ = inst.forAllStreams(func(stream *stream) error {
streams = append(streams, stream)
return nil
})
for _, stream := range streams {
// TODO(owen-d): use a pool
chunks, err := toWireChunks(stream.chunks, nil)
var s *Series
if err == nil {
s = &Series{
UserID: inst.instanceID,
Fingerprint: uint64(stream.fp),
Labels: client.FromLabelsToLabelAdapters(stream.labels),
Chunks: chunks,
}
}
select {
case ch <- &SeriesWithErr{
Err: err,
Series: s,
}:
case <-i.done:
return
}
}
}
close(ch)
}()
return ch
}
type CheckpointWriter interface {
// Advances current checkpoint, can also signal a no-op.
Advance() (noop bool, err error)
Write(*Series) error
// Closes current checkpoint.
Close(abort bool) error
}
type WALCheckpointWriter struct {
metrics *ingesterMetrics
segmentWAL *wal.WAL
checkpointWAL *wal.WAL
lastSegment int // name of the last segment guaranteed to be covered by the checkpoint
final string // filename to atomically rotate upon completion
bufSize int
recs [][]byte
}
func (w *WALCheckpointWriter) Advance() (bool, error) {
_, lastSegment, err := wal.Segments(w.segmentWAL.Dir())
if err != nil {
return false, err
}
if lastSegment < 0 {
// There are no WAL segments. No need of checkpoint yet.
return true, nil
}
// First we advance the wal segment internally to ensure we don't overlap a previous checkpoint in
// low throughput scenarios and to minimize segment replays on top of checkpoints.
if err := w.segmentWAL.NextSegment(); err != nil {
return false, err
}
// Checkpoint is named after the last WAL segment present so that when replaying the WAL
// we can start from that particular WAL segment.
checkpointDir := filepath.Join(w.segmentWAL.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", lastSegment))
level.Info(util.Logger).Log("msg", "attempting checkpoint for", "dir", checkpointDir)
checkpointDirTemp := checkpointDir + ".tmp"
// cleanup any old partial checkpoints
if _, err := os.Stat(checkpointDirTemp); err == nil {
if err := os.RemoveAll(checkpointDirTemp); err != nil {
level.Error(util.Logger).Log("msg", "unable to cleanup old tmp checkpoint", "dir", checkpointDirTemp)
return false, err
}
}
if err := os.MkdirAll(checkpointDirTemp, 0777); err != nil {
return false, errors.Wrap(err, "create checkpoint dir")
}
checkpoint, err := wal.NewSize(log.With(util.Logger, "component", "checkpoint_wal"), nil, checkpointDirTemp, walSegmentSize, false)
if err != nil {
return false, errors.Wrap(err, "open checkpoint")
}
w.checkpointWAL = checkpoint
w.lastSegment = lastSegment
w.final = checkpointDir
return false, nil
}
func (w *WALCheckpointWriter) Write(s *Series) error {
b, err := encodeWithTypeHeader(s, CheckpointRecord)
if err != nil {
return err
}
w.recs = append(w.recs, b)
w.bufSize += len(b)
// 1MB
if w.bufSize > 1>>20 {
if err := w.flush(); err != nil {
return err
}
}
return nil
}
func (w *WALCheckpointWriter) flush() error {
if err := w.checkpointWAL.Log(w.recs...); err != nil {
return err
}
w.metrics.checkpointLoggedBytesTotal.Add(float64(w.bufSize))
w.recs = w.recs[:0]
w.bufSize = 0
return nil
}
const checkpointPrefix = "checkpoint."
var checkpointRe = regexp.MustCompile("^" + regexp.QuoteMeta(checkpointPrefix) + "(\\d+)(\\.tmp)?$")
// checkpointIndex returns the index of a given checkpoint file. It handles
// both regular and temporary checkpoints according to the includeTmp flag. If
// the file is not a checkpoint it returns an error.
func checkpointIndex(filename string, includeTmp bool) (int, error) {
result := checkpointRe.FindStringSubmatch(filename)
if len(result) < 2 {
return 0, errors.New("file is not a checkpoint")
}
// Filter out temporary checkpoints if desired.
if !includeTmp && len(result) == 3 && result[2] != "" {
return 0, errors.New("temporary checkpoint")
}
return strconv.Atoi(result[1])
}
// lastCheckpoint returns the directory name and index of the most recent checkpoint.
// If dir does not contain any checkpoints, -1 is returned as index.
func lastCheckpoint(dir string) (string, int, error) {
dirs, err := ioutil.ReadDir(dir)
if err != nil {
return "", -1, err
}
var (
maxIdx = -1
checkpointDir string
)
// There may be multiple checkpoints left, so select the one with max index.
for i := 0; i < len(dirs); i++ {
di := dirs[i]
idx, err := checkpointIndex(di.Name(), false)
if err != nil {
continue
}
if !di.IsDir() {
return "", -1, fmt.Errorf("checkpoint %s is not a directory", di.Name())
}
if idx > maxIdx {
checkpointDir = di.Name()
maxIdx = idx
}
}
if maxIdx >= 0 {
return filepath.Join(dir, checkpointDir), maxIdx, nil
}
return "", -1, nil
}
// deleteCheckpoints deletes all checkpoints in a directory which is < maxIndex.
func (w *WALCheckpointWriter) deleteCheckpoints(maxIndex int) (err error) {
w.metrics.checkpointDeleteTotal.Inc()
defer func() {
if err != nil {
w.metrics.checkpointDeleteFail.Inc()
}
}()
var errs tsdb_errors.MultiError
files, err := ioutil.ReadDir(w.segmentWAL.Dir())
if err != nil {
return err
}
for _, fi := range files {
index, err := checkpointIndex(fi.Name(), true)
if err != nil || index >= maxIndex {
continue
}
if err := os.RemoveAll(filepath.Join(w.segmentWAL.Dir(), fi.Name())); err != nil {
errs.Add(err)
}
}
return errs.Err()
}
func (w *WALCheckpointWriter) Close(abort bool) error {
if len(w.recs) > 0 {
if err := w.flush(); err != nil {
return err
}
}
if err := w.checkpointWAL.Close(); err != nil {
return err
}
if abort {
return os.RemoveAll(w.checkpointWAL.Dir())
}
if err := fileutil.Replace(w.checkpointWAL.Dir(), w.final); err != nil {
return errors.Wrap(err, "rename checkpoint directory")
}
level.Info(util.Logger).Log("msg", "atomic checkpoint finished", "old", w.checkpointWAL.Dir(), "new", w.final)
// We delete the WAL segments which are before the previous checkpoint and not before the
// current checkpoint created. This is because if the latest checkpoint is corrupted for any reason, we
// should be able to recover from the older checkpoint which would need the older WAL segments.
if err := w.segmentWAL.Truncate(w.lastSegment + 1); err != nil {
// It is fine to have old WAL segments hanging around if deletion failed.
// We can try again next time.
level.Error(util.Logger).Log("msg", "error deleting old WAL segments", "err", err, "lastSegment", w.lastSegment)
}
if w.lastSegment >= 0 {
if err := w.deleteCheckpoints(w.lastSegment); err != nil {
// It is fine to have old checkpoints hanging around if deletion failed.
// We can try again next time.
level.Error(util.Logger).Log("msg", "error deleting old checkpoint", "err", err)
}
}
return nil
}
type Checkpointer struct {
dur time.Duration
iter SeriesIter
writer CheckpointWriter
metrics *ingesterMetrics
quit <-chan struct{}
}
func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer {
return &Checkpointer{
dur: dur,
iter: iter,
writer: writer,
metrics: metrics,
quit: quit,
}
}
func (c *Checkpointer) PerformCheckpoint() (err error) {
noop, err := c.writer.Advance()
if err != nil {
return err
}
if noop {
return nil
}
c.metrics.checkpointCreationTotal.Inc()
defer func() {
if err != nil {
c.metrics.checkpointCreationFail.Inc()
}
}()
// signal whether checkpoint writes should be amortized or burst
var immediate bool
n := c.iter.Count()
if n < 1 {
return c.writer.Close(false)
}
// Give a 10% buffer to the checkpoint duration in order to account for
// new series, slow writes, etc.
perSeriesDuration := (90 * c.dur) / (100 * time.Duration(n))
ticker := time.NewTicker(perSeriesDuration)
defer ticker.Stop()
start := time.Now()
defer func() {
elapsed := time.Since(start)
level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String())
c.metrics.checkpointDuration.Observe(elapsed.Seconds())
}()
for s := range c.iter.Iter() {
if s.Err != nil {
return s.Err
}
if err := c.writer.Write(s.Series); err != nil {
return err
}
if !immediate {
if time.Since(start) > c.dur {
// This indicates the checkpoint is taking too long; stop waiting
// and flush the remaining series as fast as possible.
immediate = true
continue
}
}
select {
case <-c.quit:
return c.writer.Close(true)
case <-ticker.C:
}
}
return c.writer.Close(false)
}
func (c *Checkpointer) Run() {
ticker := time.NewTicker(c.dur)
defer ticker.Stop()
defer c.iter.Stop()
for {
select {
case <-ticker.C:
level.Info(util.Logger).Log("msg", "starting checkpoint")
if err := c.PerformCheckpoint(); err != nil {
level.Error(util.Logger).Log("msg", "error checkpointing series", "err", err)
continue
}
case <-c.quit:
return
}
}
}

@ -34,11 +34,16 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package
// Chunk is a {de,}serializable intermediate type for chunkDesc which allows
// efficient loading/unloading to disk during WAL checkpoint recovery.
type Chunk struct {
From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"`
To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"`
FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"`
Closed bool `protobuf:"varint,4,opt,name=closed,proto3" json:"closed,omitempty"`
Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"`
From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"`
To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"`
FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"`
LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"`
Closed bool `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"`
Synced bool `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"`
// data to be unmarshaled into a MemChunk
Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"`
// data to be unmarshaled into a MemChunk's headBlock
Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"`
}
func (m *Chunk) Reset() { *m = Chunk{} }
@ -94,6 +99,13 @@ func (m *Chunk) GetFlushedAt() time.Time {
return time.Time{}
}
func (m *Chunk) GetLastUpdated() time.Time {
if m != nil {
return m.LastUpdated
}
return time.Time{}
}
func (m *Chunk) GetClosed() bool {
if m != nil {
return m.Closed
@ -101,6 +113,13 @@ func (m *Chunk) GetClosed() bool {
return false
}
func (m *Chunk) GetSynced() bool {
if m != nil {
return m.Synced
}
return false
}
func (m *Chunk) GetData() []byte {
if m != nil {
return m.Data
@ -108,9 +127,17 @@ func (m *Chunk) GetData() []byte {
return nil
}
func (m *Chunk) GetHead() []byte {
if m != nil {
return m.Head
}
return nil
}
// Series is a {de,}serializable intermediate type for Series.
type Series struct {
UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"`
UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"`
// post mapped fingerprint is necessary because subsequent wal writes will reference it.
Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"`
Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"`
Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"`
@ -177,34 +204,37 @@ func init() {
func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) }
var fileDescriptor_00f4b7152db9bdb5 = []byte{
// 427 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xbd, 0x8e, 0xd4, 0x30,
0x10, 0x8e, 0x77, 0x73, 0xd1, 0x9e, 0x17, 0x09, 0xe1, 0x02, 0x45, 0x2b, 0xe1, 0x44, 0x57, 0xa5,
0x39, 0x47, 0x3a, 0x28, 0xa8, 0x10, 0x17, 0x28, 0x40, 0xa2, 0x40, 0x81, 0x8a, 0x2e, 0x3f, 0x4e,
0x62, 0x36, 0x89, 0x23, 0xdb, 0x91, 0x28, 0x79, 0x84, 0x7b, 0x0c, 0x1e, 0xe5, 0xca, 0x2d, 0x4f,
0x14, 0x07, 0x9b, 0x95, 0x80, 0xf2, 0x1e, 0x01, 0xc5, 0x49, 0xd8, 0xa5, 0xdc, 0xeb, 0xfc, 0xcd,
0x7c, 0xdf, 0xcc, 0x37, 0x33, 0x86, 0x4f, 0x9a, 0x75, 0xee, 0xb3, 0x3a, 0xa7, 0x52, 0x51, 0xe1,
0x27, 0x05, 0x4d, 0xd6, 0x0d, 0x67, 0xb5, 0x22, 0x8d, 0xe0, 0x8a, 0xa3, 0xc5, 0x94, 0x5a, 0x39,
0x39, 0xe7, 0x79, 0x49, 0x7d, 0x1d, 0x8f, 0xdb, 0xcc, 0x57, 0xac, 0xa2, 0x52, 0x45, 0x55, 0x33,
0x50, 0x57, 0xe7, 0x39, 0x53, 0x45, 0x1b, 0x93, 0x84, 0x57, 0x7e, 0xce, 0x73, 0xbe, 0x67, 0xf6,
0x48, 0x03, 0xfd, 0x1a, 0xe9, 0x2f, 0x0f, 0xe8, 0x09, 0x17, 0x8a, 0x7e, 0x69, 0x04, 0xff, 0x4c,
0x13, 0x35, 0x22, 0xff, 0x7f, 0x63, 0x25, 0xa3, 0xf5, 0x94, 0x1a, 0x2a, 0x9c, 0xfd, 0x06, 0xf0,
0xe4, 0x55, 0xd1, 0xd6, 0x6b, 0xf4, 0x1c, 0x9a, 0x99, 0xe0, 0x95, 0x0d, 0x5c, 0xe0, 0x2d, 0x2f,
0x56, 0x64, 0xb0, 0x4a, 0x26, 0x03, 0xe4, 0xe3, 0x64, 0x35, 0x58, 0x5c, 0xdf, 0x3a, 0xc6, 0xd5,
0x0f, 0x07, 0x84, 0x5a, 0x81, 0x9e, 0xc1, 0x99, 0xe2, 0xf6, 0xec, 0x08, 0xdd, 0x4c, 0x71, 0x14,
0xc0, 0xd3, 0xac, 0x6c, 0x65, 0x41, 0xd3, 0x4b, 0x65, 0xcf, 0x8f, 0x10, 0xef, 0x65, 0xe8, 0x31,
0xb4, 0x92, 0x92, 0x4b, 0x9a, 0xda, 0xa6, 0x0b, 0xbc, 0x45, 0x38, 0x22, 0x84, 0xa0, 0x99, 0x46,
0x2a, 0xb2, 0x4f, 0x5c, 0xe0, 0x3d, 0x08, 0xf5, 0xfb, 0xec, 0x17, 0x80, 0xd6, 0x07, 0x2a, 0x18,
0x95, 0xbd, 0xac, 0x95, 0x54, 0xbc, 0x7d, 0xad, 0x87, 0x3d, 0x0d, 0x47, 0x84, 0x5c, 0xb8, 0xcc,
0xfa, 0x6d, 0x89, 0x46, 0xb0, 0x5a, 0xe9, 0x89, 0xcc, 0xf0, 0x30, 0x84, 0x24, 0xb4, 0xca, 0x28,
0xa6, 0xa5, 0xb4, 0xe7, 0xee, 0xdc, 0x5b, 0x5e, 0x3c, 0x22, 0xe3, 0x36, 0xdf, 0xf5, 0xd1, 0xf7,
0x11, 0x13, 0xc1, 0x9b, 0xde, 0xe8, 0xf7, 0x5b, 0xe7, 0x3e, 0xb7, 0x19, 0xca, 0x5c, 0xa6, 0x51,
0xa3, 0xa8, 0x08, 0xc7, 0x56, 0xe8, 0x1c, 0x5a, 0x49, 0x7f, 0x22, 0x69, 0x9b, 0xba, 0xe9, 0x43,
0x32, 0xc9, 0x88, 0x3e, 0x5d, 0x60, 0xf6, 0x2d, 0xc3, 0x91, 0x14, 0xbc, 0xd8, 0x6c, 0xb1, 0x71,
0xb3, 0xc5, 0xc6, 0xdd, 0x16, 0x83, 0xaf, 0x1d, 0x06, 0xdf, 0x3a, 0x0c, 0xae, 0x3b, 0x0c, 0x36,
0x1d, 0x06, 0x3f, 0x3b, 0x0c, 0xfe, 0x74, 0xd8, 0xb8, 0xeb, 0x30, 0xb8, 0xda, 0x61, 0x63, 0xb3,
0xc3, 0xc6, 0xcd, 0x0e, 0x1b, 0x9f, 0xfe, 0x7d, 0xd2, 0xd8, 0xd2, 0xdb, 0x7f, 0xfa, 0x37, 0x00,
0x00, 0xff, 0xff, 0xaf, 0xc1, 0x8c, 0xad, 0xd6, 0x02, 0x00, 0x00,
// 467 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xb1, 0x8e, 0xd3, 0x40,
0x10, 0xf5, 0x26, 0x3e, 0x93, 0xdb, 0x20, 0x21, 0xb6, 0x40, 0xab, 0x48, 0x6c, 0xac, 0xab, 0xd2,
0x9c, 0x2d, 0x1d, 0x14, 0x54, 0x88, 0x33, 0x08, 0x81, 0x44, 0x81, 0x0c, 0x34, 0x74, 0x8e, 0x3d,
0xb1, 0x4d, 0x1c, 0xaf, 0xb5, 0xbb, 0x96, 0xa0, 0xe3, 0x13, 0xee, 0x33, 0xf8, 0x94, 0x2b, 0x53,
0x9e, 0x40, 0x3a, 0x88, 0x23, 0x21, 0xca, 0xfb, 0x04, 0xb4, 0x6b, 0x9b, 0x84, 0x32, 0xd7, 0xcd,
0x7b, 0xfb, 0xde, 0xcc, 0x78, 0x66, 0x8c, 0x1f, 0x56, 0xcb, 0xd4, 0xcf, 0xcb, 0x14, 0xa4, 0x02,
0xe1, 0xc7, 0x19, 0xc4, 0xcb, 0x8a, 0xe7, 0xa5, 0xf2, 0x2a, 0xc1, 0x15, 0x27, 0xa3, 0xfe, 0x69,
0x32, 0x4d, 0x39, 0x4f, 0x0b, 0xf0, 0x0d, 0x3f, 0xaf, 0x17, 0xbe, 0xca, 0x57, 0x20, 0x55, 0xb4,
0xaa, 0x5a, 0xe9, 0xe4, 0x34, 0xcd, 0x55, 0x56, 0xcf, 0xbd, 0x98, 0xaf, 0xfc, 0x94, 0xa7, 0x7c,
0xa7, 0xd4, 0xc8, 0x00, 0x13, 0x75, 0xf2, 0x67, 0x7b, 0xf2, 0x98, 0x0b, 0x05, 0x9f, 0x2b, 0xc1,
0x3f, 0x41, 0xac, 0x3a, 0xe4, 0xff, 0xdf, 0x58, 0x91, 0x43, 0xd9, 0x3f, 0xb5, 0x19, 0x4e, 0x7e,
0x0c, 0xf0, 0xd1, 0xf3, 0xac, 0x2e, 0x97, 0xe4, 0x09, 0xb6, 0x17, 0x82, 0xaf, 0x28, 0x72, 0xd1,
0x6c, 0x7c, 0x36, 0xf1, 0xda, 0x56, 0xbd, 0xbe, 0x01, 0xef, 0x7d, 0xdf, 0x6a, 0x30, 0xba, 0xbc,
0x9e, 0x5a, 0x17, 0x3f, 0xa7, 0x28, 0x34, 0x0e, 0xf2, 0x18, 0x0f, 0x14, 0xa7, 0x83, 0x03, 0x7c,
0x03, 0xc5, 0x49, 0x80, 0x8f, 0x17, 0x45, 0x2d, 0x33, 0x48, 0xce, 0x15, 0x1d, 0x1e, 0x60, 0xde,
0xd9, 0xc8, 0x4b, 0x3c, 0x2e, 0x22, 0xa9, 0x3e, 0x54, 0x49, 0xa4, 0x20, 0xa1, 0xf6, 0x01, 0x59,
0xf6, 0x8d, 0xe4, 0x01, 0x76, 0xe2, 0x82, 0x4b, 0x48, 0xe8, 0x91, 0x8b, 0x66, 0xa3, 0xb0, 0x43,
0x9a, 0x97, 0x5f, 0xca, 0x18, 0x12, 0xea, 0xb4, 0x7c, 0x8b, 0x08, 0xc1, 0x76, 0x12, 0xa9, 0x88,
0xde, 0x71, 0xd1, 0xec, 0x6e, 0x68, 0x62, 0xcd, 0x65, 0x10, 0x25, 0x74, 0xd4, 0x72, 0x3a, 0x3e,
0xf9, 0x8d, 0xb0, 0xf3, 0x0e, 0x44, 0x0e, 0x52, 0xa7, 0xaa, 0x25, 0x88, 0xd7, 0x2f, 0xcc, 0x80,
0x8f, 0xc3, 0x0e, 0x11, 0x17, 0x8f, 0x17, 0x7a, 0x43, 0xa2, 0x12, 0x79, 0xa9, 0xcc, 0x14, 0xed,
0x70, 0x9f, 0x22, 0x12, 0x3b, 0x45, 0x34, 0x87, 0x42, 0xd2, 0xa1, 0x3b, 0x9c, 0x8d, 0xcf, 0xee,
0x7b, 0xdd, 0x06, 0xdf, 0x68, 0xf6, 0x6d, 0x94, 0x8b, 0xe0, 0x95, 0xfe, 0xac, 0xef, 0xd7, 0xd3,
0xdb, 0xdc, 0x43, 0x9b, 0xe6, 0x3c, 0x89, 0x2a, 0x05, 0x22, 0xec, 0x4a, 0x91, 0x53, 0xec, 0xc4,
0xfa, 0x2c, 0x24, 0xb5, 0x4d, 0xd1, 0x7b, 0x5e, 0x6f, 0xf3, 0xcc, 0xb9, 0x04, 0xb6, 0x2e, 0x19,
0x76, 0xa2, 0xe0, 0xe9, 0x7a, 0xc3, 0xac, 0xab, 0x0d, 0xb3, 0x6e, 0x36, 0x0c, 0x7d, 0x6d, 0x18,
0xfa, 0xd6, 0x30, 0x74, 0xd9, 0x30, 0xb4, 0x6e, 0x18, 0xfa, 0xd5, 0x30, 0xf4, 0xa7, 0x61, 0xd6,
0x4d, 0xc3, 0xd0, 0xc5, 0x96, 0x59, 0xeb, 0x2d, 0xb3, 0xae, 0xb6, 0xcc, 0xfa, 0xf8, 0xef, 0xc7,
0x98, 0x3b, 0x66, 0x57, 0x8f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0x69, 0x98, 0x4e, 0xed, 0x4a,
0x03, 0x00, 0x00,
}
func (this *Chunk) Equal(that interface{}) bool {
@ -235,12 +265,21 @@ func (this *Chunk) Equal(that interface{}) bool {
if !this.FlushedAt.Equal(that1.FlushedAt) {
return false
}
if !this.LastUpdated.Equal(that1.LastUpdated) {
return false
}
if this.Closed != that1.Closed {
return false
}
if this.Synced != that1.Synced {
return false
}
if !bytes.Equal(this.Data, that1.Data) {
return false
}
if !bytes.Equal(this.Head, that1.Head) {
return false
}
return true
}
func (this *Series) Equal(that interface{}) bool {
@ -290,13 +329,16 @@ func (this *Chunk) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 9)
s := make([]string, 0, 12)
s = append(s, "&ingester.Chunk{")
s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n")
s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n")
s = append(s, "FlushedAt: "+fmt.Sprintf("%#v", this.FlushedAt)+",\n")
s = append(s, "LastUpdated: "+fmt.Sprintf("%#v", this.LastUpdated)+",\n")
s = append(s, "Closed: "+fmt.Sprintf("%#v", this.Closed)+",\n")
s = append(s, "Synced: "+fmt.Sprintf("%#v", this.Synced)+",\n")
s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n")
s = append(s, "Head: "+fmt.Sprintf("%#v", this.Head)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
@ -366,8 +408,16 @@ func (m *Chunk) MarshalTo(dAtA []byte) (int, error) {
return 0, err
}
i += n3
dAtA[i] = 0x22
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.LastUpdated)))
n4, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.LastUpdated, dAtA[i:])
if err != nil {
return 0, err
}
i += n4
if m.Closed {
dAtA[i] = 0x20
dAtA[i] = 0x28
i++
if m.Closed {
dAtA[i] = 1
@ -376,12 +426,28 @@ func (m *Chunk) MarshalTo(dAtA []byte) (int, error) {
}
i++
}
if m.Synced {
dAtA[i] = 0x30
i++
if m.Synced {
dAtA[i] = 1
} else {
dAtA[i] = 0
}
i++
}
if len(m.Data) > 0 {
dAtA[i] = 0x2a
dAtA[i] = 0x3a
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.Data)))
i += copy(dAtA[i:], m.Data)
}
if len(m.Head) > 0 {
dAtA[i] = 0x42
i++
i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.Head)))
i += copy(dAtA[i:], m.Head)
}
return i, nil
}
@ -459,13 +525,22 @@ func (m *Chunk) Size() (n int) {
n += 1 + l + sovCheckpoint(uint64(l))
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.FlushedAt)
n += 1 + l + sovCheckpoint(uint64(l))
l = github_com_gogo_protobuf_types.SizeOfStdTime(m.LastUpdated)
n += 1 + l + sovCheckpoint(uint64(l))
if m.Closed {
n += 2
}
if m.Synced {
n += 2
}
l = len(m.Data)
if l > 0 {
n += 1 + l + sovCheckpoint(uint64(l))
}
l = len(m.Head)
if l > 0 {
n += 1 + l + sovCheckpoint(uint64(l))
}
return n
}
@ -518,8 +593,11 @@ func (this *Chunk) String() string {
`From:` + strings.Replace(strings.Replace(this.From.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`FlushedAt:` + strings.Replace(strings.Replace(this.FlushedAt.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`LastUpdated:` + strings.Replace(strings.Replace(this.LastUpdated.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`,
`Closed:` + fmt.Sprintf("%v", this.Closed) + `,`,
`Synced:` + fmt.Sprintf("%v", this.Synced) + `,`,
`Data:` + fmt.Sprintf("%v", this.Data) + `,`,
`Head:` + fmt.Sprintf("%v", this.Head) + `,`,
`}`,
}, "")
return s
@ -674,6 +752,39 @@ func (m *Chunk) Unmarshal(dAtA []byte) error {
}
iNdEx = postIndex
case 4:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field LastUpdated", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthCheckpoint
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthCheckpoint
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.LastUpdated, dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
case 5:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Closed", wireType)
}
@ -693,7 +804,27 @@ func (m *Chunk) Unmarshal(dAtA []byte) error {
}
}
m.Closed = bool(v != 0)
case 5:
case 6:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field Synced", wireType)
}
var v int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
v |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
m.Synced = bool(v != 0)
case 7:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType)
}
@ -727,6 +858,40 @@ func (m *Chunk) Unmarshal(dAtA []byte) error {
m.Data = []byte{}
}
iNdEx = postIndex
case 8:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Head", wireType)
}
var byteLen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowCheckpoint
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
byteLen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if byteLen < 0 {
return ErrInvalidLengthCheckpoint
}
postIndex := iNdEx + byteLen
if postIndex < 0 {
return ErrInvalidLengthCheckpoint
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Head = append(m.Head[:0], dAtA[iNdEx:postIndex]...)
if m.Head == nil {
m.Head = []byte{}
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipCheckpoint(dAtA[iNdEx:])

@ -14,13 +14,19 @@ message Chunk {
google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
google.protobuf.Timestamp flushedAt = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bool closed = 4;
bytes data = 5;
google.protobuf.Timestamp lastUpdated = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
bool closed = 5;
bool synced = 6;
// data to be unmarshaled into a MemChunk
bytes data = 7;
// data to be unmarshaled into a MemChunk's headBlock
bytes head = 8;
}
// Series is a {de,}serializable intermediate type for Series.
message Series {
string userID = 1;
// post mapped fingerprint is necessary because subsequent wal writes will reference it.
uint64 fingerprint = 2;
repeated cortex.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"];
repeated Chunk chunks = 4 [(gogoproto.nullable) = false];

@ -5,7 +5,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/encoding"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/grafana/loki/pkg/logproto"
)
@ -25,9 +25,44 @@ const (
// WALRecord is a struct combining the series and samples record.
type WALRecord struct {
UserID string
Series []tsdb_record.RefSeries
RefEntries RefEntries
UserID string
Series []record.RefSeries
// entryIndexMap coordinates the RefEntries index associated with a particular fingerprint.
// This is helpful for constant time lookups during ingestion and is ignored when restoring
// from the WAL.
entryIndexMap map[uint64]int
RefEntries []RefEntries
}
func (r *WALRecord) IsEmpty() bool {
return len(r.Series) == 0 && len(r.RefEntries) == 0
}
func (r *WALRecord) Reset() {
r.UserID = ""
if len(r.Series) > 0 {
r.Series = r.Series[:0]
}
for _, ref := range r.RefEntries {
recordPool.PutEntries(ref.Entries)
}
r.RefEntries = r.RefEntries[:0]
r.entryIndexMap = make(map[uint64]int)
}
func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) {
if idx, ok := r.entryIndexMap[fp]; ok {
r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...)
return
}
r.entryIndexMap[fp] = len(r.RefEntries)
r.RefEntries = append(r.RefEntries, RefEntries{
Ref: fp,
Entries: entries,
})
}
type RefEntries struct {
@ -35,74 +70,96 @@ type RefEntries struct {
Entries []logproto.Entry
}
func (record *WALRecord) encodeSeries(b []byte) []byte {
func (r *WALRecord) encodeSeries(b []byte) []byte {
buf := EncWith(b)
buf.PutByte(byte(WALRecordSeries))
buf.PutUvarintStr(record.UserID)
buf.PutUvarintStr(r.UserID)
var enc tsdb_record.Encoder
var enc record.Encoder
// The 'encoded' already has the type header and userID here, hence re-using
// the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series.
encoded := buf.Get()
encoded = append(encoded, enc.Series(record.Series, encoded[len(encoded):])...)
encoded = append(encoded, enc.Series(r.Series, encoded[len(encoded):])...)
return encoded
}
func (record *WALRecord) encodeEntries(b []byte) []byte {
func (r *WALRecord) encodeEntries(b []byte) []byte {
buf := EncWith(b)
buf.PutByte(byte(WALRecordEntries))
buf.PutUvarintStr(record.UserID)
entries := record.RefEntries.Entries
if len(entries) == 0 {
return buf.Get()
buf.PutUvarintStr(r.UserID)
// Placeholder for the first timestamp of any sample encountered.
// All others in this record will store their timestamps as diffs relative to this
// as a space optimization.
var first int64
outer:
for _, ref := range r.RefEntries {
for _, entry := range ref.Entries {
first = entry.Timestamp.UnixNano()
buf.PutBE64int64(first)
break outer
}
}
// Only encode the series fingerprint if there are >0 entries.
buf.PutBE64(record.RefEntries.Ref)
// Store base timestamp and base reference number of first sample.
// All samples encode their timestamp and ref as delta to those.
first := entries[0].Timestamp.UnixNano()
buf.PutBE64int64(first)
for _, s := range entries {
buf.PutVarint64(s.Timestamp.UnixNano() - first)
// denote line length
byteLine := []byte(s.Line)
buf.PutUvarint(len(byteLine))
buf.PutBytes(byteLine)
for _, ref := range r.RefEntries {
// ignore refs with 0 entries
if len(ref.Entries) < 1 {
continue
}
buf.PutBE64(ref.Ref) // write fingerprint
buf.PutUvarint(len(ref.Entries)) // write number of entries
for _, s := range ref.Entries {
buf.PutVarint64(s.Timestamp.UnixNano() - first)
// denote line length
byteLine := []byte(s.Line)
buf.PutUvarint(len(byteLine))
buf.PutBytes(byteLine)
}
}
return buf.Get()
}
func decodeEntries(b []byte, entries *RefEntries) error {
func decodeEntries(b []byte, rec *WALRecord) error {
if len(b) == 0 {
return nil
}
dec := DecWith(b)
entries.Ref = dec.Be64()
baseTime := dec.Be64int64()
for len(dec.B) > 0 && dec.Err() == nil {
dRef := dec.Varint64()
ln := dec.Uvarint()
line := dec.Bytes(ln)
entries.Entries = append(entries.Entries, logproto.Entry{
Timestamp: time.Unix(0, baseTime+dRef),
Line: string(line),
})
refEntries := RefEntries{
Ref: dec.Be64(),
}
nEntries := dec.Uvarint()
rem := nEntries
for ; dec.Err() == nil && rem > 0; rem-- {
timeOffset := dec.Varint64()
lineLength := dec.Uvarint()
line := dec.Bytes(lineLength)
refEntries.Entries = append(refEntries.Entries, logproto.Entry{
Timestamp: time.Unix(0, baseTime+timeOffset),
Line: string(line),
})
}
if dec.Err() != nil {
return errors.Wrapf(dec.Err(), "entry decode error after %d RefEntries", nEntries-rem)
}
rec.RefEntries = append(rec.RefEntries, refEntries)
}
if dec.Err() != nil {
return errors.Wrapf(dec.Err(), "decode error after %d entries", len(entries.Entries))
return errors.Wrap(dec.Err(), "refEntry decode error")
}
if len(dec.B) > 0 {
return errors.Errorf("unexpected %d bytes left in entry", len(dec.B))
}
@ -113,23 +170,20 @@ func decodeEntries(b []byte, entries *RefEntries) error {
func decodeWALRecord(b []byte, walRec *WALRecord) (err error) {
var (
userID string
dec tsdb_record.Decoder
rSeries []tsdb_record.RefSeries
dec record.Decoder
rSeries []record.RefSeries
decbuf = DecWith(b)
t = RecordType(decbuf.Byte())
)
walRec.Series = walRec.Series[:0]
walRec.RefEntries.Entries = walRec.RefEntries.Entries[:0]
switch t {
case WALRecordSeries:
userID = decbuf.UvarintStr()
rSeries, err = dec.Series(decbuf.B, walRec.Series)
case WALRecordEntries:
userID = decbuf.UvarintStr()
err = decodeEntries(decbuf.B, &walRec.RefEntries)
err = decodeEntries(decbuf.B, walRec)
default:
return errors.New("unknown record type")
}

@ -1,19 +1,23 @@
package ingester
import (
"fmt"
"testing"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
)
func Test_Encoding_Series(t *testing.T) {
record := &WALRecord{
UserID: "123",
entryIndexMap: make(map[uint64]int),
UserID: "123",
Series: []record.RefSeries{
{
Ref: 456,
@ -34,26 +38,42 @@ func Test_Encoding_Series(t *testing.T) {
buf := record.encodeSeries(nil)
var decoded WALRecord
decoded := recordPool.GetRecord()
err := decodeWALRecord(buf, &decoded)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, &decoded)
require.Equal(t, record, decoded)
}
func Test_Encoding_Entries(t *testing.T) {
record := &WALRecord{
UserID: "123",
RefEntries: RefEntries{
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
entryIndexMap: make(map[uint64]int),
UserID: "123",
RefEntries: []RefEntries{
{
Ref: 456,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(1000, 0),
Line: "first",
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
},
{
Timestamp: time.Unix(2000, 0),
Line: "second",
},
{
Ref: 789,
Entries: []logproto.Entry{
{
Timestamp: time.Unix(3000, 0),
Line: "third",
},
{
Timestamp: time.Unix(4000, 0),
Line: "fourth",
},
},
},
},
@ -61,9 +81,153 @@ func Test_Encoding_Entries(t *testing.T) {
buf := record.encodeEntries(nil)
var decoded WALRecord
decoded := recordPool.GetRecord()
err := decodeWALRecord(buf, &decoded)
err := decodeWALRecord(buf, decoded)
require.Nil(t, err)
require.Equal(t, record, &decoded)
require.Equal(t, record, decoded)
}
func fillChunk(t *testing.T, c chunkenc.Chunk) int64 {
t.Helper()
var i, inserted int64
entry := &logproto.Entry{
Timestamp: time.Unix(0, 0),
Line: "entry for line 0",
}
for c.SpaceFor(entry) {
require.NoError(t, c.Append(entry))
i++
inserted += int64(len(entry.Line))
entry.Timestamp = time.Unix(0, i)
entry.Line = fmt.Sprintf("entry for line %d", i)
}
return inserted
}
func dummyConf() *Config {
var conf Config
conf.BlockSize = 256 * 1024
conf.TargetChunkSize = 1500 * 1024
return &conf
}
func Test_EncodingChunks(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize)
fillChunk(t, c)
from := []chunkDesc{
{
chunk: c,
},
// test non zero values
{
chunk: c,
closed: true,
synced: true,
flushed: time.Unix(1, 0),
lastUpdated: time.Unix(0, 1),
},
}
there, err := toWireChunks(from, nil)
require.Nil(t, err)
backAgain, err := fromWireChunks(conf, there)
require.Nil(t, err)
for i, to := range backAgain {
// test the encoding directly as the substructure may change.
// for instance the uncompressed size for each block is not included in the encoded version.
enc, err := to.chunk.Bytes()
require.Nil(t, err)
to.chunk = nil
matched := from[i]
exp, err := matched.chunk.Bytes()
require.Nil(t, err)
matched.chunk = nil
require.Equal(t, exp, enc)
require.Equal(t, matched, to)
}
}
func Test_EncodingCheckpoint(t *testing.T) {
conf := dummyConf()
c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize)
require.Nil(t, c.Append(&logproto.Entry{
Timestamp: time.Unix(1, 0),
Line: "hi there",
}))
data, err := c.Bytes()
require.Nil(t, err)
from, to := c.Bounds()
ls := labels.FromMap(map[string]string{"foo": "bar"})
s := &Series{
UserID: "fake",
Fingerprint: 123,
Labels: client.FromLabelsToLabelAdapters(ls),
Chunks: []Chunk{
{
From: from,
To: to,
Closed: true,
Synced: true,
FlushedAt: time.Unix(1, 0),
LastUpdated: time.Unix(0, 1),
Data: data,
},
},
}
b, err := encodeWithTypeHeader(s, CheckpointRecord)
require.Nil(t, err)
out := &Series{}
err = decodeCheckpointRecord(b, out)
require.Nil(t, err)
// override the passed []byte to ensure that the resulting *Series doesn't
// contain any trailing refs to it.
for i := range b {
b[i] = 0
}
// test chunk bytes separately
sChunks := s.Chunks
s.Chunks = nil
outChunks := out.Chunks
out.Chunks = nil
require.Equal(t, s, out)
require.Equal(t, len(sChunks), len(outChunks))
for i, exp := range sChunks {
got := outChunks[i]
zero := time.Unix(0, 0)
// Issues diffing zero-value time.Locations against nil ones.
// Check/override them individually so that other fields get tested in an extensible manner.
require.Equal(t, true, exp.From.Equal(got.From))
exp.From = zero
got.From = zero
require.Equal(t, true, exp.To.Equal(got.To))
exp.To = zero
got.To = zero
require.Equal(t, true, exp.FlushedAt.Equal(got.FlushedAt))
exp.FlushedAt = zero
got.FlushedAt = zero
require.Equal(t, true, exp.LastUpdated.Equal(got.LastUpdated))
exp.LastUpdated = zero
got.LastUpdated = zero
require.Equal(t, exp, got)
}
}

@ -320,6 +320,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
wireChunks := make([]chunk.Chunk, 0, len(cs))
for _, c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err = c.chunk.Close(); err != nil {
return err
}
firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds())
c := chunk.NewChunk(
userID, fp, metric,

@ -2,14 +2,19 @@ package ingester
import (
"context"
"errors"
"flag"
"fmt"
"net/http"
"os"
"sync"
"time"
"github.com/grafana/loki/pkg/storage"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
@ -17,11 +22,6 @@ import (
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/ingester/client"
@ -29,7 +29,9 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
errUtil "github.com/grafana/loki/pkg/util"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/validation"
)
@ -50,15 +52,16 @@ type Config struct {
// Config for transferring chunks.
MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
TargetChunkSize int `yaml:"chunk_target_size"`
ChunkEncoding string `yaml:"chunk_encoding"`
parsedEncoding chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding
MaxChunkAge time.Duration `yaml:"max_chunk_age"`
// Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments.
SyncPeriod time.Duration `yaml:"sync_period"`
@ -71,11 +74,14 @@ type Config struct {
QueryStore bool `yaml:"-"`
QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"`
WAL WALConfig `yaml:"wal,omitempty"`
}
// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)
cfg.WAL.RegisterFlags(f)
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 16, "")
@ -93,6 +99,24 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
}
func (cfg *Config) Validate() error {
enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
if err != nil {
return err
}
cfg.parsedEncoding = enc
if err = cfg.WAL.Validate(); err != nil {
return err
}
if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled {
return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers")
}
return nil
}
// Ingester builds chunks for incoming log streams.
type Ingester struct {
services.Service
@ -121,7 +145,10 @@ type Ingester struct {
flushQueuesDone sync.WaitGroup
limiter *Limiter
factory func() chunkenc.Chunk
metrics *ingesterMetrics
wal WAL
}
// ChunkStore is the interface we need to store chunks.
@ -138,10 +165,8 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding)
if err != nil {
return nil, err
}
metrics := newIngesterMetrics(registerer)
i := &Ingester{
cfg: cfg,
@ -152,12 +177,22 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
loopQuit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
tailersQuit: make(chan struct{}),
factory: func() chunkenc.Chunk {
return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize)
},
metrics: metrics,
}
if cfg.WAL.Enabled {
if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil {
return nil, err
}
}
wal, err := newWAL(cfg.WAL, registerer, metrics, newIngesterSeriesIter(i))
if err != nil {
return nil, err
}
i.wal = wal
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer)
i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WAL.Enabled || cfg.WAL.FlushOnShutdown, registerer)
if err != nil {
return nil, err
}
@ -174,6 +209,46 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
}
func (i *Ingester) starting(ctx context.Context) error {
if i.cfg.WAL.Recover {
recoverer := newIngesterRecoverer(i)
defer recoverer.Close()
start := time.Now()
level.Info(util.Logger).Log("msg", "recovering from checkpoint")
checkpointReader, checkpointCloser, err := newCheckpointReader(i.cfg.WAL.Dir)
if err != nil {
return err
}
defer checkpointCloser.Close()
if err = RecoverCheckpoint(checkpointReader, recoverer); err != nil {
i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc()
level.Error(util.Logger).Log("msg", "failed to recover from checkpoint", "elapsed", time.Since(start).String())
return err
}
level.Info(util.Logger).Log("msg", "recovered from checkpoint", "elapsed", time.Since(start).String())
level.Info(util.Logger).Log("msg", "recovering from WAL")
segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1)
if err != nil {
return err
}
defer segmentCloser.Close()
if err = RecoverWAL(segmentReader, recoverer); err != nil {
i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc()
level.Error(util.Logger).Log("msg", "failed to recover from WAL segments", "elapsed", time.Since(start).String())
return err
}
level.Info(util.Logger).Log("msg", "recovered from WAL segments", "elapsed", time.Since(start).String())
elapsed := time.Since(start)
i.metrics.walReplayDuration.Set(elapsed.Seconds())
level.Info(util.Logger).Log("msg", "recovery completed", "time", elapsed.String())
}
i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes)
for j := 0; j < i.cfg.ConcurrentFlushes; j++ {
i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength)
@ -222,8 +297,9 @@ func (i *Ingester) running(ctx context.Context) error {
// At this point, loop no longer runs, but flushers are still running.
func (i *Ingester) stopping(_ error) error {
i.stopIncomingRequests()
err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler)
var errs errUtil.MultiError
errs.Add(i.wal.Stop())
errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler))
// Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails,
// we better stop them.
@ -232,7 +308,7 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()
return err
return errs.Err()
}
func (i *Ingester) loop() {
@ -252,6 +328,18 @@ func (i *Ingester) loop() {
}
}
// ShutdownHandler triggers the following set of operations in order:
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
_ = services.StopAndAwaitTerminated(context.Background(), i)
i.lifecycler.SetFlushOnShutdown(originalState)
w.WriteHeader(http.StatusNoContent)
}
// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
@ -276,7 +364,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics)
i.instances[instanceID] = inst
}
return inst

@ -20,6 +20,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/metadata"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -444,3 +445,49 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) {
})
}
}
func TestValidate(t *testing.T) {
for i, tc := range []struct {
in Config
err bool
expected Config
}{
{
in: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
},
expected: Config{
MaxChunkAge: time.Minute,
ChunkEncoding: chunkenc.EncGZIP.String(),
parsedEncoding: chunkenc.EncGZIP,
},
},
{
in: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
},
expected: Config{
ChunkEncoding: chunkenc.EncSnappy.String(),
parsedEncoding: chunkenc.EncSnappy,
},
},
{
in: Config{
ChunkEncoding: "bad-enc",
},
err: true,
},
} {
t.Run(fmt.Sprint(i), func(t *testing.T) {
err := tc.in.Validate()
if tc.err {
require.NotNil(t, err)
return
}
require.Nil(t, err)
require.Equal(t, tc.expected, tc.in)
})
}
}

@ -4,20 +4,19 @@ import (
"context"
"net/http"
"sync"
"time"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
tsdb_record "github.com/prometheus/prometheus/tsdb/record"
"github.com/weaveworks/common/httpgrpc"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ingester/index"
cutil "github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/helpers"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
@ -75,14 +74,19 @@ type instance struct {
tailerMtx sync.RWMutex
limiter *Limiter
factory func() chunkenc.Chunk
// sync
syncPeriod time.Duration
syncMinUtil float64
wal WAL
metrics *ingesterMetrics
}
func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance {
func newInstance(
cfg *Config,
instanceID string,
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
) *instance {
i := &instance{
cfg: cfg,
streams: map[string]*stream{},
@ -94,12 +98,11 @@ func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk,
streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID),
streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID),
factory: factory,
tailers: map[uint32]*tailer{},
limiter: limiter,
syncPeriod: syncPeriod,
syncMinUtil: syncMinUtil,
wal: wal,
metrics: metrics,
}
i.mapper = newFPMapper(i.getLabelsFromFingerprint)
return i
@ -115,8 +118,9 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
stream, ok := i.streamsByFP[fp]
if !ok {
sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, fp, sortedLabels, i.factory)
stream = newStream(i.cfg, fp, sortedLabels, i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
@ -133,20 +137,24 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
}
func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
record := recordPool.GetRecord()
record.UserID = i.instanceID
defer recordPool.PutRecord(record)
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
var appendErr error
for _, s := range req.Streams {
stream, err := i.getOrCreateStream(s)
stream, err := i.getOrCreateStream(s, false, record)
if err != nil {
appendErr = err
continue
}
prevNumChunks := len(stream.chunks)
if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil {
if err := stream.Push(ctx, s.Entries, record); err != nil {
appendErr = err
continue
}
@ -154,16 +162,34 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks))
}
if !record.IsEmpty() {
if err := i.wal.Log(record); err != nil {
return err
}
}
return appendErr
}
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) {
// getOrCreateStream returns the stream or creates it. Must hold streams mutex if not asked to lock.
func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) {
if lock {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
}
stream, ok := i.streams[pushReqStream.Labels]
if ok {
return stream, nil
}
err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
// record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after
// reducing the stream limits, for instance.
var err error
if record != nil {
err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams))
}
if err != nil {
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
@ -179,11 +205,23 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
_ = i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, fp, labels, i.factory)
sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, fp, sortedLabels, i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
record.Series = append(record.Series, tsdb_record.RefSeries{
Ref: uint64(fp),
Labels: sortedLabels,
})
} else {
// If the record is nil, this is a WAL recovery.
i.metrics.recoveredStreamsTotal.Inc()
}
memoryStreams.WithLabelValues(i.instanceID).Inc()
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
@ -343,6 +381,13 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp
return &logproto.SeriesResponse{Series: series}, nil
}
func (i *instance) numStreams() int {
i.streamsMtx.RLock()
defer i.streamsMtx.RUnlock()
return len(i.streams)
}
// forAllStreams will execute a function for all streams in the instance.
// It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex.
func (i *instance) forAllStreams(fn func(*stream) error) error {

@ -9,26 +9,33 @@ import (
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/util/validation"
)
var defaultFactory = func() chunkenc.Chunk {
return chunkenc.NewMemChunk(chunkenc.EncGZIP, 512, 0)
func defaultConfig() *Config {
cfg := Config{
BlockSize: 512,
ChunkEncoding: "gzip",
}
if err := cfg.Validate(); err != nil {
panic(errors.Wrap(err, "error building default test config"))
}
return &cfg
}
var NilMetrics = newIngesterMetrics(nil)
func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil)
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
@ -55,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
inst := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
const (
concurrent = 10
@ -113,7 +120,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics)
lbls := makeRandomLabels()
tt := time.Now()
@ -128,7 +135,7 @@ func TestSyncPeriod(t *testing.T) {
require.NoError(t, err)
// let's verify results
s, err := inst.getOrCreateStream(pr.Streams[0])
s, err := inst.getOrCreateStream(pr.Streams[0], false, recordPool.GetRecord())
require.NoError(t, err)
// make sure each chunk spans max 'sync period' time
@ -149,10 +156,11 @@ func Test_SeriesQuery(t *testing.T) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
// just some random values
syncPeriod := 1 * time.Minute
minUtil := 0.20
cfg := defaultConfig()
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20
instance := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil)
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics)
currentTime := time.Now()
@ -162,9 +170,9 @@ func Test_SeriesQuery(t *testing.T) {
}
for _, testStream := range testStreams {
stream, err := instance.getOrCreateStream(testStream)
stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord())
require.NoError(t, err)
chunk := defaultFactory()
chunk := newStream(cfg, 0, nil, NilMetrics).NewChunk()
for _, entry := range testStream.Entries {
err = chunk.Append(&entry)
require.NoError(t, err)
@ -263,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics)
ctx := context.Background()
for n := 0; n < b.N; n++ {

@ -0,0 +1,87 @@
package ingester
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
type ingesterMetrics struct {
checkpointDeleteFail prometheus.Counter
checkpointDeleteTotal prometheus.Counter
checkpointCreationFail prometheus.Counter
checkpointCreationTotal prometheus.Counter
checkpointDuration prometheus.Summary
checkpointLoggedBytesTotal prometheus.Counter
walReplayDuration prometheus.Gauge
walCorruptionsTotal *prometheus.CounterVec
walLoggedBytesTotal prometheus.Counter
walRecordsLogged prometheus.Counter
recoveredStreamsTotal prometheus.Counter
recoveredChunksTotal prometheus.Counter
recoveredEntriesTotal prometheus.Counter
}
const (
walTypeCheckpoint = "checkpoint"
walTypeSegment = "segment"
)
func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
return &ingesterMetrics{
walReplayDuration: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_wal_replay_duration_seconds",
Help: "Time taken to replay the checkpoint and the WAL.",
}),
walCorruptionsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Name: "loki_ingester_wal_corruptions_total",
Help: "Total number of WAL corruptions encountered.",
}, []string{"type"}),
checkpointDeleteFail: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_checkpoint_deletions_failed_total",
Help: "Total number of checkpoint deletions that failed.",
}),
checkpointDeleteTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_checkpoint_deletions_total",
Help: "Total number of checkpoint deletions attempted.",
}),
checkpointCreationFail: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_checkpoint_creations_failed_total",
Help: "Total number of checkpoint creations that failed.",
}),
checkpointCreationTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_checkpoint_creations_total",
Help: "Total number of checkpoint creations attempted.",
}),
checkpointDuration: promauto.With(r).NewSummary(prometheus.SummaryOpts{
Name: "loki_ingester_checkpoint_duration_seconds",
Help: "Time taken to create a checkpoint.",
Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001},
}),
walRecordsLogged: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_records_logged_total",
Help: "Total number of WAL records logged.",
}),
checkpointLoggedBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_checkpoint_logged_bytes_total",
Help: "Total number of bytes written to disk for checkpointing.",
}),
walLoggedBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_logged_bytes_total",
Help: "Total number of bytes written to disk for WAL records.",
}),
recoveredStreamsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_recovered_streams_total",
Help: "Total number of streams recovered from the WAL.",
}),
recoveredChunksTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_recovered_chunks_total",
Help: "Total number of chunks recovered from the WAL checkpoints.",
}),
recoveredEntriesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_wal_recovered_entries_total",
Help: "Total number of entries recovered from the WAL.",
}),
}
}

@ -0,0 +1,395 @@
package ingester
import (
"context"
io "io"
"runtime"
"sync"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/grafana/loki/pkg/logproto"
)
type WALReader interface {
Next() bool
Err() error
// Record should not be used across multiple calls to Next()
Record() []byte
}
type NoopWALReader struct{}
func (NoopWALReader) Next() bool { return false }
func (NoopWALReader) Err() error { return nil }
func (NoopWALReader) Record() []byte { return nil }
func (NoopWALReader) Close() error { return nil }
// If startSegment is <0, it means all the segments.
func newWalReader(dir string, startSegment int) (*wal.Reader, io.Closer, error) {
var (
segmentReader io.ReadCloser
err error
)
if startSegment < 0 {
segmentReader, err = wal.NewSegmentsReader(dir)
if err != nil {
return nil, nil, err
}
} else {
first, last, err := wal.Segments(dir)
if err != nil {
return nil, nil, err
}
if startSegment > last {
return nil, nil, errors.New("start segment is beyond the last WAL segment")
}
if first > startSegment {
startSegment = first
}
segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{
Dir: dir,
First: startSegment,
Last: -1, // Till the end.
})
if err != nil {
return nil, nil, err
}
}
return wal.NewReader(segmentReader), segmentReader, nil
}
func newCheckpointReader(dir string) (WALReader, io.Closer, error) {
lastCheckpointDir, idx, err := lastCheckpoint(dir)
if err != nil {
return nil, nil, err
}
if idx < 0 {
level.Info(util.Logger).Log("msg", "no checkpoint found, treating as no-op")
var reader NoopWALReader
return reader, reader, nil
}
r, err := wal.NewSegmentsReader(lastCheckpointDir)
if err != nil {
return nil, nil, err
}
return wal.NewReader(r), r, nil
}
type Recoverer interface {
NumWorkers() int
Series(series *Series) error
SetStream(userID string, series record.RefSeries) error
Push(userID string, entries RefEntries) error
Close()
Done() <-chan struct{}
}
type ingesterRecoverer struct {
// basically map[userID]map[fingerprint]*stream
users sync.Map
ing *Ingester
done chan struct{}
}
func newIngesterRecoverer(i *Ingester) *ingesterRecoverer {
return &ingesterRecoverer{
ing: i,
done: make(chan struct{}),
}
}
// Use all available cores
func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) }
func (r *ingesterRecoverer) Series(series *Series) error {
inst := r.ing.getOrCreateInstance(series.UserID)
// TODO(owen-d): create another fn to avoid unnecessary label type conversions.
stream, err := inst.getOrCreateStream(logproto.Stream{
Labels: client.FromLabelAdaptersToLabels(series.Labels).String(),
}, true, nil)
if err != nil {
return err
}
added, err := stream.setChunks(series.Chunks)
if err != nil {
return err
}
r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks)))
r.ing.metrics.recoveredEntriesTotal.Add(float64(added))
// now store the stream in the recovery map under the fingerprint originally recorded
// as it's possible the newly mapped fingerprint is different. This is because the WAL records
// will use this original reference.
got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{})
streamsMap := got.(*sync.Map)
streamsMap.Store(series.Fingerprint, stream)
return nil
}
// SetStream is responsible for setting the key path for userIDs -> fingerprints -> streams.
// Internally, this uses nested sync.Maps due to their performance benefits for sets that only grow.
// Using these also allows us to bypass the ingester -> instance -> stream hierarchy internally, which
// may yield some performance gains, but is essential for the following:
// Due to the use of the instance's fingerprint mapper, stream fingerprints are NOT necessarily
// deterministic. The WAL uses the post-mapped fingerprint on the ingester that originally
// created the stream and we ensure that said fingerprint maps correctly to the newly
// created stream during WAL replay, even if the new in memory stream was assigned a different
// fingerprint from the mapper. This is paramount because subsequent WAL records will use
// the fingerprint reported in the WAL record, not the potentially differing one assigned during
// stream creation.
func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) error {
inst := r.ing.getOrCreateInstance(userID)
stream, err := inst.getOrCreateStream(
logproto.Stream{
Labels: series.Labels.String(),
},
true,
nil,
)
if err != nil {
return err
}
// Now that we have the stream, ensure that the userID -> fingerprint -> stream
// path is set properly.
got, _ := r.users.LoadOrStore(userID, &sync.Map{})
streamsMap := got.(*sync.Map)
streamsMap.Store(series.Ref, stream)
return nil
}
func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
out, ok := r.users.Load(userID)
if !ok {
return errors.Errorf("user (%s) not set during WAL replay", userID)
}
s, ok := out.(*sync.Map).Load(entries.Ref)
if !ok {
return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID)
}
// ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments)
_ = s.(*stream).Push(context.Background(), entries.Entries, nil)
return nil
}
func (r *ingesterRecoverer) Close() {
close(r.done)
}
func (r *ingesterRecoverer) Done() <-chan struct{} {
return r.done
}
func RecoverWAL(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error {
rec := recordPool.GetRecord()
if err := decodeWALRecord(b, rec); err != nil {
return err
}
// First process all series to ensure we don't write entries to nonexistant series.
for _, s := range rec.Series {
if err := recoverer.SetStream(rec.UserID, s); err != nil {
return err
}
}
for _, entries := range rec.RefEntries {
worker := int(entries.Ref % uint64(len(inputs)))
select {
case err := <-errCh:
return err
case inputs[worker] <- recoveryInput{
userID: rec.UserID,
data: entries,
}:
}
}
return nil
}
process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) {
for {
select {
case <-recoverer.Done():
case next, ok := <-input:
if !ok {
return
}
entries, ok := next.data.(RefEntries)
var err error
if !ok {
err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, entries)
}
if err == nil {
err = recoverer.Push(next.userID, entries)
}
// Pass the error back, but respect the quit signal.
if err != nil {
select {
case errCh <- err:
case <-recoverer.Done():
}
return
}
}
}
}
return recoverGeneric(
reader,
recoverer,
dispatch,
process,
)
}
func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error {
dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error {
s := &Series{}
if err := decodeCheckpointRecord(b, s); err != nil {
return err
}
worker := int(s.Fingerprint % uint64(len(inputs)))
select {
case err := <-errCh:
return err
case inputs[worker] <- recoveryInput{
userID: s.UserID,
data: s,
}:
}
return nil
}
process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) {
for {
select {
case <-recoverer.Done():
case next, ok := <-input:
if !ok {
return
}
series, ok := next.data.(*Series)
var err error
if !ok {
err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, series)
}
if err == nil {
err = recoverer.Series(series)
}
// Pass the error back, but respect the quit signal.
if err != nil {
select {
case errCh <- err:
case <-recoverer.Done():
}
return
}
}
}
}
return recoverGeneric(
reader,
recoverer,
dispatch,
process,
)
}
type recoveryInput struct {
userID string
data interface{}
}
// recoverGeneric enables reusing the ability to recover from WALs of different types
// by exposing the dispatch and process functions.
// Note: it explicitly does not call the Recoverer.Close function as it's possible to layer
// multiple recoveries on top of each other, as in the case of recovering from Checkpoints
// then the WAL.
func recoverGeneric(
reader WALReader,
recoverer Recoverer,
dispatch func(Recoverer, []byte, []chan recoveryInput, <-chan error) error,
process func(Recoverer, <-chan recoveryInput, chan<- error),
) error {
var wg sync.WaitGroup
var lastErr error
nWorkers := recoverer.NumWorkers()
if nWorkers < 1 {
return errors.New("cannot recover with no workers")
}
errCh := make(chan error)
inputs := make([]chan recoveryInput, 0, nWorkers)
wg.Add(nWorkers)
for i := 0; i < nWorkers; i++ {
inputs = append(inputs, make(chan recoveryInput))
go func(input <-chan recoveryInput) {
defer wg.Done()
process(recoverer, input, errCh)
}(inputs[i])
}
outer:
for reader.Next() {
b := reader.Record()
if lastErr = reader.Err(); lastErr != nil {
break outer
}
if lastErr = dispatch(recoverer, b, inputs, errCh); lastErr != nil {
break outer
}
}
for _, w := range inputs {
close(w)
}
// may have broken loop early
if lastErr != nil {
return lastErr
}
finished := make(chan struct{})
go func(finished chan<- struct{}) {
wg.Wait()
finished <- struct{}{}
}(finished)
select {
case <-finished:
case lastErr = <-errCh:
}
return lastErr
}

@ -0,0 +1,192 @@
package ingester
import (
fmt "fmt"
"runtime"
"sync"
"testing"
"time"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/tsdb/record"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/logproto"
)
type MemoryWALReader struct {
xs [][]byte
initialized bool
}
func (m *MemoryWALReader) Next() bool {
if len(m.xs) < 1 {
return false
}
// don't advance on the first call
if !m.initialized {
m.initialized = true
return true
}
m.xs = m.xs[1:]
return len(m.xs) > 0
}
func (m *MemoryWALReader) Err() error { return nil }
func (m *MemoryWALReader) Record() []byte { return m.xs[0] }
func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALReader, []*WALRecord) {
var recs []*WALRecord
reader := &MemoryWALReader{}
for i := 0; i < totalStreams; i++ {
user := fmt.Sprintf("%d", i%users)
recs = append(recs, &WALRecord{
UserID: user,
Series: []record.RefSeries{
{
Ref: uint64(i),
Labels: labels.FromMap(
map[string]string{
"stream": fmt.Sprint(i),
"user": user,
},
),
},
},
})
var entries []logproto.Entry
for j := 0; j < entriesPerStream; j++ {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(j), 0),
Line: fmt.Sprintf("%d", j),
})
}
recs = append(recs, &WALRecord{
UserID: user,
RefEntries: []RefEntries{
{
Ref: uint64(i),
Entries: entries,
},
},
})
}
for _, rec := range recs {
if len(rec.Series) > 0 {
reader.xs = append(reader.xs, rec.encodeSeries(nil))
}
if len(rec.RefEntries) > 0 {
reader.xs = append(reader.xs, rec.encodeEntries(nil))
}
}
return reader, recs
}
type MemRecoverer struct {
users map[string]map[uint64][]logproto.Entry
done chan struct{}
sync.Mutex
usersCt, streamsCt, seriesCt int
}
func NewMemRecoverer() *MemRecoverer {
return &MemRecoverer{
users: make(map[string]map[uint64][]logproto.Entry),
done: make(chan struct{}),
}
}
func (r *MemRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) }
func (r *MemRecoverer) Series(_ *Series) error { return nil }
func (r *MemRecoverer) SetStream(userID string, series record.RefSeries) error {
r.Lock()
defer r.Unlock()
user, ok := r.users[userID]
if !ok {
user = make(map[uint64][]logproto.Entry)
r.users[userID] = user
r.usersCt++
}
if _, exists := user[series.Ref]; exists {
return errors.Errorf("stream (%d) already exists for user (%s)", series.Ref, userID)
}
user[series.Ref] = make([]logproto.Entry, 0)
r.streamsCt++
return nil
}
func (r *MemRecoverer) Push(userID string, entries RefEntries) error {
r.Lock()
defer r.Unlock()
user, ok := r.users[userID]
if !ok {
return errors.Errorf("unexpected user access (%s)", userID)
}
stream, ok := user[entries.Ref]
if !ok {
return errors.Errorf("unexpected stream access")
}
r.seriesCt += len(entries.Entries)
user[entries.Ref] = append(stream, entries.Entries...)
return nil
}
func (r *MemRecoverer) Close() { close(r.done) }
func (r *MemRecoverer) Done() <-chan struct{} { return r.done }
func Test_InMemorySegmentRecover(t *testing.T) {
var (
users = 10
streamsCt = 1000
entriesPerStream = 50
)
reader, recs := buildMemoryReader(users, streamsCt, entriesPerStream)
recoverer := NewMemRecoverer()
require.Nil(t, RecoverWAL(reader, recoverer))
recoverer.Close()
require.Equal(t, users, recoverer.usersCt)
require.Equal(t, streamsCt, recoverer.streamsCt)
require.Equal(t, streamsCt*entriesPerStream, recoverer.seriesCt)
for _, rec := range recs {
user, ok := recoverer.users[rec.UserID]
require.Equal(t, true, ok)
for _, s := range rec.Series {
_, ok := user[s.Ref]
require.Equal(t, true, ok)
}
for _, entries := range rec.RefEntries {
stream, ok := user[entries.Ref]
require.Equal(t, true, ok)
for i, entry := range entries.Entries {
require.Equal(t, entry, stream[i])
}
}
}
}

@ -64,15 +64,15 @@ type stream struct {
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
labels labels.Labels
labelsString string
factory func() chunkenc.Chunk
lastLine line
metrics *ingesterMetrics
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
}
type chunkDesc struct {
chunk chunkenc.Chunk
chunk *chunkenc.MemChunk
closed bool
synced bool
flushed time.Time
@ -85,14 +85,14 @@ type entryWithError struct {
e error
}
func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream {
func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics *ingesterMetrics) *stream {
return &stream{
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
factory: factory,
tailers: map[uint32]*tailer{},
metrics: metrics,
}
}
@ -111,26 +111,39 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
return nil
}
func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error {
// setChunks is used during checkpoint recovery
func (s *stream) setChunks(chunks []Chunk) (entriesAdded int, err error) {
chks, err := fromWireChunks(s.cfg, chunks)
if err != nil {
return 0, err
}
s.chunks = chks
for _, c := range s.chunks {
entriesAdded += c.chunk.Size()
}
return entriesAdded, nil
}
func (s *stream) NewChunk() *chunkenc.MemChunk {
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, s.cfg.BlockSize, s.cfg.TargetChunkSize)
}
func (s *stream) Push(
ctx context.Context,
entries []logproto.Entry,
record *WALRecord,
) error {
var lastChunkTimestamp time.Time
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{
chunk: s.factory(),
chunk: s.NewChunk(),
})
chunksCreatedTotal.Inc()
} else {
_, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds()
}
s.tailerMtx.RLock()
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
var storedEntries []logproto.Entry
if hasTailers {
storedEntries = make([]logproto.Entry, 0, len(entries))
}
failedEntriesWithError := []entryWithError{}
// Don't fail on the first append error - if samples are sent out of order,
@ -149,7 +162,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
}
chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, synchronizePeriod, minUtilization) {
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
err := chunk.chunk.Close()
if err != nil {
@ -164,7 +177,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
chunksCreatedTotal.Inc()
s.chunks = append(s.chunks, chunkDesc{
chunk: s.factory(),
chunk: s.NewChunk(),
})
chunk = &s.chunks[len(s.chunks)-1]
lastChunkTimestamp = time.Time{}
@ -172,10 +185,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
if err := chunk.chunk.Append(&entries[i]); err != nil {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err})
} else {
// send only stored entries to tailers
if hasTailers {
storedEntries = append(storedEntries, entries[i])
}
storedEntries = append(storedEntries, entries[i])
lastChunkTimestamp = entries[i].Timestamp
s.lastLine.ts = lastChunkTimestamp
s.lastLine.content = entries[i].Line
@ -184,30 +194,44 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize
}
if len(storedEntries) != 0 {
go func() {
stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries}
closedTailers := []uint32{}
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
record.AddEntries(uint64(s.fp), storedEntries...)
} else {
// If record is nil, this is a WAL recovery.
s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries)))
}
s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
s.tailerMtx.RLock()
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
if hasTailers {
go func() {
stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries}
closedTailers := []uint32{}
s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream, s.labels)
}
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()
s.tailerMtx.RUnlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}
}
}()
}()
}
}
if len(failedEntriesWithError) > 0 {

@ -4,7 +4,6 @@ import (
"bytes"
"context"
"fmt"
"io"
"math/rand"
"net/http"
"testing"
@ -16,7 +15,6 @@ import (
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
)
@ -35,18 +33,20 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
for _, tc := range tt {
t.Run(tc.name, func(t *testing.T) {
cfg := defaultConfig()
cfg.MaxReturnedErrors = tc.limit
s := newStream(
&Config{MaxReturnedErrors: tc.limit},
cfg,
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
defaultFactory,
NilMetrics,
)
err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(int64(numLogs), 0), Line: "log"},
}, 0, 0)
}, recordPool.GetRecord())
require.NoError(t, err)
newLines := make([]logproto.Entry, numLogs)
@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs)
expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String())
err = s.Push(context.Background(), newLines, 0, 0)
err = s.Push(context.Background(), newLines, recordPool.GetRecord())
require.Error(t, err)
require.Equal(t, expectErr.Error(), err.Error())
})
@ -74,19 +74,19 @@ func TestMaxReturnedStreamsErrors(t *testing.T) {
func TestPushDeduplication(t *testing.T) {
s := newStream(
&Config{},
defaultConfig(),
model.Fingerprint(0),
labels.Labels{
{Name: "foo", Value: "bar"},
},
defaultFactory,
NilMetrics,
)
err := s.Push(context.Background(), []logproto.Entry{
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "test"},
{Timestamp: time.Unix(1, 0), Line: "newer, better test"},
}, 0, 0)
}, recordPool.GetRecord())
require.NoError(t, err)
require.Len(t, s.chunks, 1)
require.Equal(t, s.chunks[0].chunk.Size(), 2,
@ -99,10 +99,9 @@ func TestStreamIterator(t *testing.T) {
for _, chk := range []struct {
name string
new func() chunkenc.Chunk
new func() *chunkenc.MemChunk
}{
{"dumbChunk", chunkenc.NewDumbChunk},
{"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
{"gzipChunk", func() *chunkenc.MemChunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }},
} {
t.Run(chk.name, func(t *testing.T) {
var s stream
@ -150,9 +149,7 @@ func Benchmark_PushStream(b *testing.B) {
labels.Label{Name: "job", Value: "loki-dev/ingester"},
labels.Label{Name: "container", Value: "ingester"},
}
s := newStream(&Config{}, model.Fingerprint(0), ls, func() chunkenc.Chunk {
return &noopChunk{}
})
s := newStream(&Config{}, model.Fingerprint(0), ls, NilMetrics)
t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{})
require.NoError(b, err)
@ -166,72 +163,8 @@ func Benchmark_PushStream(b *testing.B) {
b.ReportAllocs()
for n := 0; n < b.N; n++ {
require.NoError(b, s.Push(ctx, e, 0, 0))
rec := recordPool.GetRecord()
require.NoError(b, s.Push(ctx, e, rec))
recordPool.PutRecord(rec)
}
}
type noopChunk struct {
}
func (c *noopChunk) Bounds() (time.Time, time.Time) {
return time.Time{}, time.Time{}
}
func (c *noopChunk) SpaceFor(_ *logproto.Entry) bool {
return true
}
func (c *noopChunk) Append(entry *logproto.Entry) error {
return nil
}
func (c *noopChunk) Size() int {
return 0
}
// UncompressedSize implements Chunk.
func (c *noopChunk) UncompressedSize() int {
return c.Size()
}
// CompressedSize implements Chunk.
func (c *noopChunk) CompressedSize() int {
return 0
}
// Utilization implements Chunk
func (c *noopChunk) Utilization() float64 {
return 0
}
func (c *noopChunk) Encoding() chunkenc.Encoding { return chunkenc.EncNone }
func (c *noopChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) {
return nil, nil
}
func (c *noopChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator {
return nil
}
func (c *noopChunk) Bytes() ([]byte, error) {
return nil, nil
}
func (c *noopChunk) BytesWith(_ []byte) ([]byte, error) {
return nil, nil
}
func (c *noopChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil }
func (c *noopChunk) Blocks(_ time.Time, _ time.Time) []chunkenc.Block {
return nil
}
func (c *noopChunk) BlockCount() int {
return 0
}
func (c *noopChunk) Close() error {
return nil
}

@ -239,6 +239,12 @@ func (i *Ingester) transferOut(ctx context.Context) error {
// typically streams won't have many chunks in memory so sending one at a time
// shouldn't add too much overhead.
for _, c := range istream.chunks {
// Close the chunk first, writing any data in the headblock to a new block.
err := c.chunk.Close()
if err != nil {
return err
}
bb, err := c.chunk.Bytes()
if err != nil {
return err

@ -0,0 +1,210 @@
package ingester
import (
"flag"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb/wal"
"github.com/grafana/loki/pkg/logproto"
)
var (
// shared pool for WALRecords and []logproto.Entries
recordPool = newRecordPool()
)
const walSegmentSize = wal.DefaultSegmentSize * 4
type WALConfig struct {
Enabled bool `yaml:"enabled"`
Dir string `yaml:"dir"`
Recover bool `yaml:"recover"`
CheckpointDuration time.Duration `yaml:"checkpoint_duration"`
FlushOnShutdown bool `yaml:"flush_on_shutdown"`
}
func (cfg *WALConfig) Validate() error {
if cfg.Enabled && cfg.CheckpointDuration < 1 {
return errors.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration)
}
return nil
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.")
f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.")
f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.")
f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.")
f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.")
}
// WAL interface allows us to have a no-op WAL when the WAL is disabled.
type WAL interface {
// Log marshalls the records and writes it into the WAL.
Log(*WALRecord) error
// Stop stops all the WAL operations.
Stop() error
}
type noopWAL struct{}
func (noopWAL) Log(*WALRecord) error { return nil }
func (noopWAL) Stop() error { return nil }
type walWrapper struct {
cfg WALConfig
wal *wal.WAL
metrics *ingesterMetrics
seriesIter SeriesIter
wait sync.WaitGroup
quit chan struct{}
}
// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL.
func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMetrics, seriesIter SeriesIter) (WAL, error) {
if !cfg.Enabled {
return noopWAL{}, nil
}
tsdbWAL, err := wal.NewSize(util.Logger, registerer, cfg.Dir, walSegmentSize, false)
if err != nil {
return nil, err
}
w := &walWrapper{
cfg: cfg,
quit: make(chan struct{}),
wal: tsdbWAL,
metrics: metrics,
seriesIter: seriesIter,
}
w.wait.Add(1)
go w.run()
return w, nil
}
func (w *walWrapper) Log(record *WALRecord) error {
if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) {
return nil
}
select {
case <-w.quit:
return nil
default:
buf := recordPool.GetBytes()[:0]
defer func() {
recordPool.PutBytes(buf)
}()
// Always write series then entries.
if len(record.Series) > 0 {
buf = record.encodeSeries(buf)
if err := w.wal.Log(buf); err != nil {
return err
}
w.metrics.walRecordsLogged.Inc()
w.metrics.walLoggedBytesTotal.Add(float64(len(buf)))
buf = buf[:0]
}
if len(record.RefEntries) > 0 {
buf = record.encodeEntries(buf)
if err := w.wal.Log(buf); err != nil {
return err
}
w.metrics.walRecordsLogged.Inc()
w.metrics.walLoggedBytesTotal.Add(float64(len(buf)))
}
return nil
}
}
func (w *walWrapper) Stop() error {
close(w.quit)
w.wait.Wait()
err := w.wal.Close()
level.Info(util.Logger).Log("msg", "stopped", "component", "wal")
return err
}
func (w *walWrapper) checkpointWriter() *WALCheckpointWriter {
return &WALCheckpointWriter{
metrics: w.metrics,
segmentWAL: w.wal,
}
}
func (w *walWrapper) run() {
level.Info(util.Logger).Log("msg", "started", "component", "wal")
defer w.wait.Done()
checkpointer := NewCheckpointer(
w.cfg.CheckpointDuration,
w.seriesIter,
w.checkpointWriter(),
w.metrics,
w.quit,
)
checkpointer.Run()
}
type resettingPool struct {
rPool *sync.Pool // records
ePool *sync.Pool // entries
bPool *sync.Pool // bytes
}
func (p *resettingPool) GetRecord() *WALRecord {
rec := p.rPool.Get().(*WALRecord)
rec.Reset()
return rec
}
func (p *resettingPool) PutRecord(r *WALRecord) {
p.rPool.Put(r)
}
func (p *resettingPool) GetEntries() []logproto.Entry {
return p.ePool.Get().([]logproto.Entry)
}
func (p *resettingPool) PutEntries(es []logproto.Entry) {
p.ePool.Put(es[:0]) // nolint:staticcheck
}
func (p *resettingPool) GetBytes() []byte {
return p.bPool.Get().([]byte)
}
func (p *resettingPool) PutBytes(b []byte) {
p.bPool.Put(b[:0]) // nolint:staticcheck
}
func newRecordPool() *resettingPool {
return &resettingPool{
rPool: &sync.Pool{
New: func() interface{} {
return &WALRecord{}
},
},
ePool: &sync.Pool{
New: func() interface{} {
return make([]logproto.Entry, 0, 512)
},
},
bPool: &sync.Pool{
New: func() interface{} {
return make([]byte, 0, 1<<10) // 1kb
},
},
}
}

@ -124,6 +124,9 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.Ruler.Validate(); err != nil {
return errors.Wrap(err, "invalid ruler config")
}
if err := c.Ingester.Validate(); err != nil {
return errors.Wrap(err, "invalid ingester config")
}
return nil
}

@ -211,6 +211,8 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
logproto.RegisterIngesterServer(t.server.GRPC, t.ingester)
grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester)
t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler))
// TODO(owen-d): should this use cortex style path (/ingester/shutdown), legacy style (/shutdown), or apir prefixed (/loki/api/v1/ingester/shutdown)?
t.server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler))
return t.ingester, nil
}

Loading…
Cancel
Save