From 4d9865acd41ca77931c13febe2f8f73cd4062653 Mon Sep 17 00:00:00 2001 From: Owen Diehl Date: Fri, 27 Nov 2020 09:42:58 -0500 Subject: [PATCH] Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization --- .gitignore | 1 + cmd/loki/loki-local-config.yaml | 4 + docs/sources/api/_index.md | 10 + pkg/chunkenc/encoding_helpers.go | 13 + pkg/chunkenc/memchunk.go | 130 ++++++++- pkg/chunkenc/memchunk_test.go | 63 +++++ pkg/chunkenc/util_test.go | 1 + pkg/ingester/checkpoint.go | 464 ++++++++++++++++++++++++++++++- pkg/ingester/checkpoint.pb.go | 241 +++++++++++++--- pkg/ingester/checkpoint.proto | 10 +- pkg/ingester/encoding.go | 148 ++++++---- pkg/ingester/encoding_test.go | 198 +++++++++++-- pkg/ingester/flush.go | 4 + pkg/ingester/ingester.go | 148 ++++++++-- pkg/ingester/ingester_test.go | 47 ++++ pkg/ingester/instance.go | 79 ++++-- pkg/ingester/instance_test.go | 40 +-- pkg/ingester/metrics.go | 87 ++++++ pkg/ingester/recovery.go | 395 ++++++++++++++++++++++++++ pkg/ingester/recovery_test.go | 192 +++++++++++++ pkg/ingester/stream.go | 102 ++++--- pkg/ingester/stream_test.go | 97 +------ pkg/ingester/transfer.go | 6 + pkg/ingester/wal.go | 210 ++++++++++++++ pkg/loki/loki.go | 3 + pkg/loki/modules.go | 2 + 26 files changed, 2387 insertions(+), 308 deletions(-) create mode 100644 pkg/ingester/metrics.go create mode 100644 pkg/ingester/recovery.go create mode 100644 pkg/ingester/recovery_test.go create mode 100644 pkg/ingester/wal.go diff --git a/.gitignore b/.gitignore index 419dce4e40..66550b4959 100644 --- a/.gitignore +++ b/.gitignore @@ -5,6 +5,7 @@ *.output *.tgz *.exe +__debug_bin requirements.lock mixin/vendor/ cmd/loki/loki diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 9fca924810..434e4a4a43 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -4,6 +4,10 @@ server: http_listen_port: 3100 ingester: + wal: + enabled: true + dir: /tmp/wal + recover: true lifecycler: address: 127.0.0.1 ring: diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index d9062ee142..1dc18483a2 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -37,6 +37,7 @@ The HTTP API includes the following endpoints: - [Examples](#examples-8) - [`GET /ready`](#get-ready) - [`POST /flush`](#post-flush) + - [`POST /ingester/shutdown`](#post-shutdown) - [`GET /metrics`](#get-metrics) - [Series](#series) - [Examples](#examples-9) @@ -107,6 +108,7 @@ While these endpoints are exposed by just the distributor: And these endpoints are exposed by just the ingester: - [`POST /flush`](#post-flush) +- [`POST /ingester/shutdown`](#post-shutdown) The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably. @@ -844,6 +846,14 @@ backing store. Mainly used for local testing. In microservices mode, the `/flush` endpoint is exposed by the ingester. +## `POST /ingester/shutdown` + +`/ingester/shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds. +This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned, +but instead flushed to our chunk backend. + +In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester. + ## `GET /metrics` `/metrics` exposes Prometheus metrics. See diff --git a/pkg/chunkenc/encoding_helpers.go b/pkg/chunkenc/encoding_helpers.go index d27e4839cb..2850c69f1d 100644 --- a/pkg/chunkenc/encoding_helpers.go +++ b/pkg/chunkenc/encoding_helpers.go @@ -119,4 +119,17 @@ func (d *decbuf) byte() byte { return x } +func (d *decbuf) bytes(n int) []byte { + if d.e != nil { + return nil + } + if len(d.b) < n { + d.e = ErrInvalidSize + return nil + } + x := d.b[:n] + d.b = d.b[n:] + return x +} + func (d *decbuf) err() error { return d.e } diff --git a/pkg/chunkenc/memchunk.go b/pkg/chunkenc/memchunk.go index 504893d20d..08579a5035 100644 --- a/pkg/chunkenc/memchunk.go +++ b/pkg/chunkenc/memchunk.go @@ -152,6 +152,94 @@ func (hb *headBlock) serialise(pool WriterPool) ([]byte, error) { return outBuf.Bytes(), nil } +// CheckpointBytes serializes a headblock to []byte. This is used by the WAL checkpointing, +// which does not want to mutate a chunk by cutting it (otherwise risking content address changes), but +// needs to serialize/deserialize the data to disk to ensure data durability. +func (hb *headBlock) CheckpointBytes(version byte) ([]byte, error) { + encB := BytesBufferPool.Get(1 << 10).([]byte) + + defer func() { + BytesBufferPool.Put(encB[:0]) + }() + + buf := bytes.NewBuffer(make([]byte, 0, 1<<10)) + eb := encbuf{b: encB} + + eb.putByte(version) + _, err := buf.Write(eb.get()) + if err != nil { + return nil, errors.Wrap(err, "write headBlock version") + } + eb.reset() + + eb.putUvarint(len(hb.entries)) + eb.putUvarint(hb.size) + eb.putVarint64(hb.mint) + eb.putVarint64(hb.maxt) + + _, err = buf.Write(eb.get()) + if err != nil { + return nil, errors.Wrap(err, "write headBlock metas") + } + eb.reset() + + for _, entry := range hb.entries { + eb.putVarint64(entry.t) + eb.putUvarint(len(entry.s)) + _, err = buf.Write(eb.get()) + if err != nil { + return nil, errors.Wrap(err, "write headBlock entry ts") + } + eb.reset() + + _, err := buf.WriteString(entry.s) + if err != nil { + return nil, errors.Wrap(err, "write headblock entry line") + } + } + return buf.Bytes(), nil +} + +func (hb *headBlock) FromCheckpoint(b []byte) error { + if len(b) < 1 { + return nil + } + + db := decbuf{b: b} + + version := db.byte() + if db.err() != nil { + return errors.Wrap(db.err(), "verifying headblock header") + } + if version != chunkFormatV3 { + return errors.New("incompatible headBlock version, only V3 is currently supported") + } + + ln := db.uvarint() + hb.size = db.uvarint() + hb.mint = db.varint64() + hb.maxt = db.varint64() + + if err := db.err(); err != nil { + return errors.Wrap(err, "verifying headblock metadata") + } + + hb.entries = make([]entry, ln) + for i := 0; i < ln && db.err() == nil; i++ { + var entry entry + entry.t = db.varint64() + lineLn := db.uvarint() + entry.s = string(db.bytes(lineLn)) + hb.entries[i] = entry + } + + if err := db.err(); err != nil { + return errors.Wrap(err, "decoding entries") + } + + return nil +} + type entry struct { t int64 s string @@ -256,6 +344,7 @@ func NewByteChunk(b []byte, blockSize, targetSize int) (*MemChunk, error) { } // BytesWith uses a provided []byte for buffer instantiation +// NOTE: This does not cut the head block nor include any head block data. func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { buf := bytes.NewBuffer(b[:0]) if _, err := c.WriteTo(buf); err != nil { @@ -265,18 +354,18 @@ func (c *MemChunk) BytesWith(b []byte) ([]byte, error) { } // Bytes implements Chunk. +// NOTE: Does not cut head block or include any head block data. func (c *MemChunk) Bytes() ([]byte, error) { return c.BytesWith(nil) } // WriteTo Implements io.WriterTo +// NOTE: Does not cut head block or include any head block data. +// For this to be the case you must call Close() first. +// This decision notably enables WAL checkpointing, which would otherwise +// result in different content addressable chunks in storage based on the timing of when +// they were checkpointed (which would cause new blocks to be cut early). func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { - if c.head != nil { - // When generating the bytes, we need to flush the data held in-buffer. - if err := c.cut(); err != nil { - return 0, err - } - } crc32Hash := newCRC32() offset := int64(0) @@ -348,6 +437,35 @@ func (c *MemChunk) WriteTo(w io.Writer) (int64, error) { return offset, nil } +// SerializeForCheckpoint returns []bytes representing the chunk & head. This is to ensure eventually +// flushed chunks don't have different substructures depending on when they were checkpointed. +// In turn this allows us to maintain a more effective dedupe ratio in storage. +func (c *MemChunk) SerializeForCheckpoint(b []byte) (chk, head []byte, err error) { + chk, err = c.BytesWith(b) + if err != nil { + return nil, nil, err + } + + if c.head.isEmpty() { + return chk, nil, nil + } + + head, err = c.head.CheckpointBytes(c.format) + if err != nil { + return nil, nil, err + } + + return chk, head, nil +} + +func MemchunkFromCheckpoint(chk, head []byte, blockSize int, targetSize int) (*MemChunk, error) { + mc, err := NewByteChunk(chk, blockSize, targetSize) + if err != nil { + return nil, err + } + return mc, mc.head.FromCheckpoint(head) +} + // Encoding implements Chunk. func (c *MemChunk) Encoding() Encoding { return c.encoding diff --git a/pkg/chunkenc/memchunk_test.go b/pkg/chunkenc/memchunk_test.go index b87113974e..c935a79e27 100644 --- a/pkg/chunkenc/memchunk_test.go +++ b/pkg/chunkenc/memchunk_test.go @@ -294,6 +294,7 @@ func TestSerialization(t *testing.T) { for i := 0; i < numSamples; i++ { require.NoError(t, chk.Append(logprotoEntry(int64(i), strconv.Itoa(i)))) } + require.NoError(t, chk.Close()) byt, err := chk.Bytes() require.NoError(t, err) @@ -840,6 +841,68 @@ func TestBytesWith(t *testing.T) { require.Equal(t, exp, out) } +func TestHeadBlockCheckpointing(t *testing.T) { + c := NewMemChunk(EncSnappy, 256*1024, 1500*1024) + + // add a few entries + for i := 0; i < 5; i++ { + entry := &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("hi there - %d", i), + } + require.Equal(t, true, c.SpaceFor(entry)) + require.Nil(t, c.Append(entry)) + } + + // ensure blocks are not cut + require.Equal(t, 0, len(c.blocks)) + + b, err := c.head.CheckpointBytes(c.format) + require.Nil(t, err) + + hb := &headBlock{} + require.Nil(t, hb.FromCheckpoint(b)) + require.Equal(t, c.head, hb) +} + +func TestCheckpointEncoding(t *testing.T) { + blockSize, targetSize := 256*1024, 1500*1024 + c := NewMemChunk(EncSnappy, blockSize, targetSize) + + // add a few entries + for i := 0; i < 5; i++ { + entry := &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("hi there - %d", i), + } + require.Equal(t, true, c.SpaceFor(entry)) + require.Nil(t, c.Append(entry)) + } + + // cut it + require.Nil(t, c.cut()) + + // add a few more to head + for i := 5; i < 10; i++ { + entry := &logproto.Entry{ + Timestamp: time.Unix(int64(i), 0), + Line: fmt.Sprintf("hi there - %d", i), + } + require.Equal(t, true, c.SpaceFor(entry)) + require.Nil(t, c.Append(entry)) + } + + // ensure new blocks are not cut + require.Equal(t, 1, len(c.blocks)) + + chk, head, err := c.SerializeForCheckpoint(nil) + require.Nil(t, err) + + cpy, err := MemchunkFromCheckpoint(chk, head, blockSize, targetSize) + require.Nil(t, err) + require.Equal(t, c, cpy) +} + var streams = []logproto.Stream{} var series = []logproto.Series{} diff --git a/pkg/chunkenc/util_test.go b/pkg/chunkenc/util_test.go index 15782180bb..25e5c45592 100644 --- a/pkg/chunkenc/util_test.go +++ b/pkg/chunkenc/util_test.go @@ -52,5 +52,6 @@ func fillChunk(c Chunk) int64 { entry.Line = testdata.LogString(i) } + _ = c.Close() return inserted } diff --git a/pkg/ingester/checkpoint.go b/pkg/ingester/checkpoint.go index d1c7039d2c..9108f08585 100644 --- a/pkg/ingester/checkpoint.go +++ b/pkg/ingester/checkpoint.go @@ -1,14 +1,29 @@ package ingester import ( + fmt "fmt" + "io/ioutil" + "os" + "path/filepath" + "regexp" + "strconv" "time" + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log" + "github.com/go-kit/kit/log/level" + "github.com/gogo/protobuf/proto" + "github.com/pkg/errors" + tsdb_errors "github.com/prometheus/prometheus/tsdb/errors" + "github.com/prometheus/prometheus/tsdb/fileutil" + "github.com/prometheus/prometheus/tsdb/wal" + "github.com/grafana/loki/pkg/chunkenc" ) // The passed wireChunks slice is for re-use. -// nolint(deadcode) -func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) { +func toWireChunks(descs []chunkDesc, wireChunks []Chunk) ([]Chunk, error) { if cap(wireChunks) < len(descs) { wireChunks = make([]Chunk, len(descs)) } else { @@ -17,10 +32,12 @@ func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) { for i, d := range descs { from, to := d.chunk.Bounds() wireChunk := Chunk{ - From: from, - To: to, - Closed: d.closed, - FlushedAt: d.flushed, + From: from, + To: to, + Closed: d.closed, + FlushedAt: d.flushed, + LastUpdated: d.lastUpdated, + Synced: d.synced, } slice := wireChunks[i].Data[:0] // try to re-use the memory from last time @@ -28,28 +45,29 @@ func toWireChunks(descs []*chunkDesc, wireChunks []Chunk) ([]Chunk, error) { slice = make([]byte, 0, d.chunk.CompressedSize()) } - out, err := d.chunk.BytesWith(slice) + chk, head, err := d.chunk.SerializeForCheckpoint(slice) if err != nil { return nil, err } - wireChunk.Data = out + wireChunk.Data = chk + wireChunk.Head = head wireChunks[i] = wireChunk } return wireChunks, nil } -// nolint(deadcode) -func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) { - descs := make([]*chunkDesc, 0, len(wireChunks)) +func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) { + descs := make([]chunkDesc, 0, len(wireChunks)) for _, c := range wireChunks { - desc := &chunkDesc{ + desc := chunkDesc{ closed: c.Closed, + synced: c.Synced, flushed: c.FlushedAt, - lastUpdated: time.Now(), + lastUpdated: c.LastUpdated, } - mc, err := chunkenc.NewByteChunk(c.Data, conf.BlockSize, conf.TargetChunkSize) + mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, conf.BlockSize, conf.TargetChunkSize) if err != nil { return nil, err } @@ -59,3 +77,421 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]*chunkDesc, error) { } return descs, nil } + +// nolint:interfacer +func decodeCheckpointRecord(rec []byte, s *Series) error { + //TODO(owen-d): reduce allocs + // The proto unmarshaling code will retain references to the underlying []byte it's passed + // in order to reduce allocs. This is harmful to us because when reading from a WAL, the []byte + // is only guaranteed to be valid between calls to Next(). + // Therefore, we copy it to avoid this problem. + cpy := make([]byte, len(rec)) + copy(cpy, rec) + + switch RecordType(cpy[0]) { + case CheckpointRecord: + return proto.Unmarshal(cpy[1:], s) + default: + return errors.Errorf("unexpected record type: %d", rec[0]) + } +} + +func encodeWithTypeHeader(m proto.Message, typ RecordType) ([]byte, error) { + buf, err := proto.Marshal(m) + if err != nil { + return nil, err + } + + b := make([]byte, 0, len(buf)+1) + b = append(b, byte(typ)) + b = append(b, buf...) + return b, nil +} + +type SeriesWithErr struct { + Err error + Series *Series +} + +type SeriesIter interface { + Count() int + Iter() <-chan *SeriesWithErr + Stop() +} + +type ingesterSeriesIter struct { + ing *Ingester + + done chan struct{} +} + +func newIngesterSeriesIter(ing *Ingester) *ingesterSeriesIter { + return &ingesterSeriesIter{ + ing: ing, + done: make(chan struct{}), + } +} + +func (i *ingesterSeriesIter) Count() (ct int) { + for _, inst := range i.ing.getInstances() { + ct += inst.numStreams() + } + return ct +} + +func (i *ingesterSeriesIter) Stop() { + close(i.done) +} + +func (i *ingesterSeriesIter) Iter() <-chan *SeriesWithErr { + ch := make(chan *SeriesWithErr) + go func() { + for _, inst := range i.ing.getInstances() { + inst.streamsMtx.RLock() + // Need to buffer streams internally so the read lock isn't held trying to write to a blocked channel. + streams := make([]*stream, 0, len(inst.streams)) + inst.streamsMtx.RUnlock() + _ = inst.forAllStreams(func(stream *stream) error { + streams = append(streams, stream) + return nil + }) + + for _, stream := range streams { + // TODO(owen-d): use a pool + chunks, err := toWireChunks(stream.chunks, nil) + var s *Series + if err == nil { + s = &Series{ + UserID: inst.instanceID, + Fingerprint: uint64(stream.fp), + Labels: client.FromLabelsToLabelAdapters(stream.labels), + Chunks: chunks, + } + } + select { + case ch <- &SeriesWithErr{ + Err: err, + Series: s, + }: + case <-i.done: + return + } + } + } + close(ch) + }() + return ch +} + +type CheckpointWriter interface { + // Advances current checkpoint, can also signal a no-op. + Advance() (noop bool, err error) + Write(*Series) error + // Closes current checkpoint. + Close(abort bool) error +} + +type WALCheckpointWriter struct { + metrics *ingesterMetrics + segmentWAL *wal.WAL + + checkpointWAL *wal.WAL + lastSegment int // name of the last segment guaranteed to be covered by the checkpoint + final string // filename to atomically rotate upon completion + bufSize int + recs [][]byte +} + +func (w *WALCheckpointWriter) Advance() (bool, error) { + _, lastSegment, err := wal.Segments(w.segmentWAL.Dir()) + if err != nil { + return false, err + } + + if lastSegment < 0 { + // There are no WAL segments. No need of checkpoint yet. + return true, nil + } + + // First we advance the wal segment internally to ensure we don't overlap a previous checkpoint in + // low throughput scenarios and to minimize segment replays on top of checkpoints. + if err := w.segmentWAL.NextSegment(); err != nil { + return false, err + } + + // Checkpoint is named after the last WAL segment present so that when replaying the WAL + // we can start from that particular WAL segment. + checkpointDir := filepath.Join(w.segmentWAL.Dir(), fmt.Sprintf(checkpointPrefix+"%06d", lastSegment)) + level.Info(util.Logger).Log("msg", "attempting checkpoint for", "dir", checkpointDir) + checkpointDirTemp := checkpointDir + ".tmp" + + // cleanup any old partial checkpoints + if _, err := os.Stat(checkpointDirTemp); err == nil { + if err := os.RemoveAll(checkpointDirTemp); err != nil { + level.Error(util.Logger).Log("msg", "unable to cleanup old tmp checkpoint", "dir", checkpointDirTemp) + return false, err + } + } + + if err := os.MkdirAll(checkpointDirTemp, 0777); err != nil { + return false, errors.Wrap(err, "create checkpoint dir") + } + + checkpoint, err := wal.NewSize(log.With(util.Logger, "component", "checkpoint_wal"), nil, checkpointDirTemp, walSegmentSize, false) + if err != nil { + return false, errors.Wrap(err, "open checkpoint") + } + + w.checkpointWAL = checkpoint + w.lastSegment = lastSegment + w.final = checkpointDir + + return false, nil +} + +func (w *WALCheckpointWriter) Write(s *Series) error { + b, err := encodeWithTypeHeader(s, CheckpointRecord) + if err != nil { + return err + } + + w.recs = append(w.recs, b) + w.bufSize += len(b) + + // 1MB + if w.bufSize > 1>>20 { + if err := w.flush(); err != nil { + return err + } + + } + return nil +} + +func (w *WALCheckpointWriter) flush() error { + if err := w.checkpointWAL.Log(w.recs...); err != nil { + return err + } + w.metrics.checkpointLoggedBytesTotal.Add(float64(w.bufSize)) + w.recs = w.recs[:0] + w.bufSize = 0 + return nil +} + +const checkpointPrefix = "checkpoint." + +var checkpointRe = regexp.MustCompile("^" + regexp.QuoteMeta(checkpointPrefix) + "(\\d+)(\\.tmp)?$") + +// checkpointIndex returns the index of a given checkpoint file. It handles +// both regular and temporary checkpoints according to the includeTmp flag. If +// the file is not a checkpoint it returns an error. +func checkpointIndex(filename string, includeTmp bool) (int, error) { + result := checkpointRe.FindStringSubmatch(filename) + if len(result) < 2 { + return 0, errors.New("file is not a checkpoint") + } + // Filter out temporary checkpoints if desired. + if !includeTmp && len(result) == 3 && result[2] != "" { + return 0, errors.New("temporary checkpoint") + } + return strconv.Atoi(result[1]) +} + +// lastCheckpoint returns the directory name and index of the most recent checkpoint. +// If dir does not contain any checkpoints, -1 is returned as index. +func lastCheckpoint(dir string) (string, int, error) { + dirs, err := ioutil.ReadDir(dir) + if err != nil { + return "", -1, err + } + var ( + maxIdx = -1 + checkpointDir string + ) + // There may be multiple checkpoints left, so select the one with max index. + for i := 0; i < len(dirs); i++ { + di := dirs[i] + + idx, err := checkpointIndex(di.Name(), false) + if err != nil { + continue + } + if !di.IsDir() { + return "", -1, fmt.Errorf("checkpoint %s is not a directory", di.Name()) + } + if idx > maxIdx { + checkpointDir = di.Name() + maxIdx = idx + } + } + if maxIdx >= 0 { + return filepath.Join(dir, checkpointDir), maxIdx, nil + } + return "", -1, nil +} + +// deleteCheckpoints deletes all checkpoints in a directory which is < maxIndex. +func (w *WALCheckpointWriter) deleteCheckpoints(maxIndex int) (err error) { + w.metrics.checkpointDeleteTotal.Inc() + defer func() { + if err != nil { + w.metrics.checkpointDeleteFail.Inc() + } + }() + + var errs tsdb_errors.MultiError + + files, err := ioutil.ReadDir(w.segmentWAL.Dir()) + if err != nil { + return err + } + for _, fi := range files { + index, err := checkpointIndex(fi.Name(), true) + if err != nil || index >= maxIndex { + continue + } + if err := os.RemoveAll(filepath.Join(w.segmentWAL.Dir(), fi.Name())); err != nil { + errs.Add(err) + } + } + return errs.Err() +} + +func (w *WALCheckpointWriter) Close(abort bool) error { + + if len(w.recs) > 0 { + if err := w.flush(); err != nil { + return err + } + } + if err := w.checkpointWAL.Close(); err != nil { + return err + } + + if abort { + return os.RemoveAll(w.checkpointWAL.Dir()) + } + + if err := fileutil.Replace(w.checkpointWAL.Dir(), w.final); err != nil { + return errors.Wrap(err, "rename checkpoint directory") + } + level.Info(util.Logger).Log("msg", "atomic checkpoint finished", "old", w.checkpointWAL.Dir(), "new", w.final) + // We delete the WAL segments which are before the previous checkpoint and not before the + // current checkpoint created. This is because if the latest checkpoint is corrupted for any reason, we + // should be able to recover from the older checkpoint which would need the older WAL segments. + if err := w.segmentWAL.Truncate(w.lastSegment + 1); err != nil { + // It is fine to have old WAL segments hanging around if deletion failed. + // We can try again next time. + level.Error(util.Logger).Log("msg", "error deleting old WAL segments", "err", err, "lastSegment", w.lastSegment) + } + + if w.lastSegment >= 0 { + if err := w.deleteCheckpoints(w.lastSegment); err != nil { + // It is fine to have old checkpoints hanging around if deletion failed. + // We can try again next time. + level.Error(util.Logger).Log("msg", "error deleting old checkpoint", "err", err) + } + } + + return nil +} + +type Checkpointer struct { + dur time.Duration + iter SeriesIter + writer CheckpointWriter + metrics *ingesterMetrics + + quit <-chan struct{} +} + +func NewCheckpointer(dur time.Duration, iter SeriesIter, writer CheckpointWriter, metrics *ingesterMetrics, quit <-chan struct{}) *Checkpointer { + return &Checkpointer{ + dur: dur, + iter: iter, + writer: writer, + metrics: metrics, + quit: quit, + } +} + +func (c *Checkpointer) PerformCheckpoint() (err error) { + noop, err := c.writer.Advance() + if err != nil { + return err + } + if noop { + return nil + } + + c.metrics.checkpointCreationTotal.Inc() + defer func() { + if err != nil { + c.metrics.checkpointCreationFail.Inc() + } + }() + // signal whether checkpoint writes should be amortized or burst + var immediate bool + n := c.iter.Count() + if n < 1 { + return c.writer.Close(false) + } + + // Give a 10% buffer to the checkpoint duration in order to account for + // new series, slow writes, etc. + perSeriesDuration := (90 * c.dur) / (100 * time.Duration(n)) + + ticker := time.NewTicker(perSeriesDuration) + defer ticker.Stop() + start := time.Now() + defer func() { + elapsed := time.Since(start) + level.Info(util.Logger).Log("msg", "checkpoint done", "time", elapsed.String()) + c.metrics.checkpointDuration.Observe(elapsed.Seconds()) + }() + for s := range c.iter.Iter() { + if s.Err != nil { + return s.Err + } + if err := c.writer.Write(s.Series); err != nil { + return err + } + + if !immediate { + if time.Since(start) > c.dur { + // This indicates the checkpoint is taking too long; stop waiting + // and flush the remaining series as fast as possible. + immediate = true + continue + } + } + + select { + case <-c.quit: + return c.writer.Close(true) + case <-ticker.C: + } + + } + + return c.writer.Close(false) +} + +func (c *Checkpointer) Run() { + ticker := time.NewTicker(c.dur) + defer ticker.Stop() + defer c.iter.Stop() + + for { + select { + case <-ticker.C: + level.Info(util.Logger).Log("msg", "starting checkpoint") + if err := c.PerformCheckpoint(); err != nil { + level.Error(util.Logger).Log("msg", "error checkpointing series", "err", err) + continue + } + case <-c.quit: + return + } + } +} diff --git a/pkg/ingester/checkpoint.pb.go b/pkg/ingester/checkpoint.pb.go index aa0b5356b0..08bb8e77da 100644 --- a/pkg/ingester/checkpoint.pb.go +++ b/pkg/ingester/checkpoint.pb.go @@ -34,11 +34,16 @@ const _ = proto.GoGoProtoPackageIsVersion2 // please upgrade the proto package // Chunk is a {de,}serializable intermediate type for chunkDesc which allows // efficient loading/unloading to disk during WAL checkpoint recovery. type Chunk struct { - From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` - To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` - FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"` - Closed bool `protobuf:"varint,4,opt,name=closed,proto3" json:"closed,omitempty"` - Data []byte `protobuf:"bytes,5,opt,name=data,proto3" json:"data,omitempty"` + From time.Time `protobuf:"bytes,1,opt,name=from,proto3,stdtime" json:"from"` + To time.Time `protobuf:"bytes,2,opt,name=to,proto3,stdtime" json:"to"` + FlushedAt time.Time `protobuf:"bytes,3,opt,name=flushedAt,proto3,stdtime" json:"flushedAt"` + LastUpdated time.Time `protobuf:"bytes,4,opt,name=lastUpdated,proto3,stdtime" json:"lastUpdated"` + Closed bool `protobuf:"varint,5,opt,name=closed,proto3" json:"closed,omitempty"` + Synced bool `protobuf:"varint,6,opt,name=synced,proto3" json:"synced,omitempty"` + // data to be unmarshaled into a MemChunk + Data []byte `protobuf:"bytes,7,opt,name=data,proto3" json:"data,omitempty"` + // data to be unmarshaled into a MemChunk's headBlock + Head []byte `protobuf:"bytes,8,opt,name=head,proto3" json:"head,omitempty"` } func (m *Chunk) Reset() { *m = Chunk{} } @@ -94,6 +99,13 @@ func (m *Chunk) GetFlushedAt() time.Time { return time.Time{} } +func (m *Chunk) GetLastUpdated() time.Time { + if m != nil { + return m.LastUpdated + } + return time.Time{} +} + func (m *Chunk) GetClosed() bool { if m != nil { return m.Closed @@ -101,6 +113,13 @@ func (m *Chunk) GetClosed() bool { return false } +func (m *Chunk) GetSynced() bool { + if m != nil { + return m.Synced + } + return false +} + func (m *Chunk) GetData() []byte { if m != nil { return m.Data @@ -108,9 +127,17 @@ func (m *Chunk) GetData() []byte { return nil } +func (m *Chunk) GetHead() []byte { + if m != nil { + return m.Head + } + return nil +} + // Series is a {de,}serializable intermediate type for Series. type Series struct { - UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` + UserID string `protobuf:"bytes,1,opt,name=userID,proto3" json:"userID,omitempty"` + // post mapped fingerprint is necessary because subsequent wal writes will reference it. Fingerprint uint64 `protobuf:"varint,2,opt,name=fingerprint,proto3" json:"fingerprint,omitempty"` Labels []github_com_cortexproject_cortex_pkg_ingester_client.LabelAdapter `protobuf:"bytes,3,rep,name=labels,proto3,customtype=github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter" json:"labels"` Chunks []Chunk `protobuf:"bytes,4,rep,name=chunks,proto3" json:"chunks"` @@ -177,34 +204,37 @@ func init() { func init() { proto.RegisterFile("pkg/ingester/checkpoint.proto", fileDescriptor_00f4b7152db9bdb5) } var fileDescriptor_00f4b7152db9bdb5 = []byte{ - // 427 bytes of a gzipped FileDescriptorProto - 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x52, 0xbd, 0x8e, 0xd4, 0x30, - 0x10, 0x8e, 0x77, 0x73, 0xd1, 0x9e, 0x17, 0x09, 0xe1, 0x02, 0x45, 0x2b, 0xe1, 0x44, 0x57, 0xa5, - 0x39, 0x47, 0x3a, 0x28, 0xa8, 0x10, 0x17, 0x28, 0x40, 0xa2, 0x40, 0x81, 0x8a, 0x2e, 0x3f, 0x4e, - 0x62, 0x36, 0x89, 0x23, 0xdb, 0x91, 0x28, 0x79, 0x84, 0x7b, 0x0c, 0x1e, 0xe5, 0xca, 0x2d, 0x4f, - 0x14, 0x07, 0x9b, 0x95, 0x80, 0xf2, 0x1e, 0x01, 0xc5, 0x49, 0xd8, 0xa5, 0xdc, 0xeb, 0xfc, 0xcd, - 0x7c, 0xdf, 0xcc, 0x37, 0x33, 0x86, 0x4f, 0x9a, 0x75, 0xee, 0xb3, 0x3a, 0xa7, 0x52, 0x51, 0xe1, - 0x27, 0x05, 0x4d, 0xd6, 0x0d, 0x67, 0xb5, 0x22, 0x8d, 0xe0, 0x8a, 0xa3, 0xc5, 0x94, 0x5a, 0x39, - 0x39, 0xe7, 0x79, 0x49, 0x7d, 0x1d, 0x8f, 0xdb, 0xcc, 0x57, 0xac, 0xa2, 0x52, 0x45, 0x55, 0x33, - 0x50, 0x57, 0xe7, 0x39, 0x53, 0x45, 0x1b, 0x93, 0x84, 0x57, 0x7e, 0xce, 0x73, 0xbe, 0x67, 0xf6, - 0x48, 0x03, 0xfd, 0x1a, 0xe9, 0x2f, 0x0f, 0xe8, 0x09, 0x17, 0x8a, 0x7e, 0x69, 0x04, 0xff, 0x4c, - 0x13, 0x35, 0x22, 0xff, 0x7f, 0x63, 0x25, 0xa3, 0xf5, 0x94, 0x1a, 0x2a, 0x9c, 0xfd, 0x06, 0xf0, - 0xe4, 0x55, 0xd1, 0xd6, 0x6b, 0xf4, 0x1c, 0x9a, 0x99, 0xe0, 0x95, 0x0d, 0x5c, 0xe0, 0x2d, 0x2f, - 0x56, 0x64, 0xb0, 0x4a, 0x26, 0x03, 0xe4, 0xe3, 0x64, 0x35, 0x58, 0x5c, 0xdf, 0x3a, 0xc6, 0xd5, - 0x0f, 0x07, 0x84, 0x5a, 0x81, 0x9e, 0xc1, 0x99, 0xe2, 0xf6, 0xec, 0x08, 0xdd, 0x4c, 0x71, 0x14, - 0xc0, 0xd3, 0xac, 0x6c, 0x65, 0x41, 0xd3, 0x4b, 0x65, 0xcf, 0x8f, 0x10, 0xef, 0x65, 0xe8, 0x31, - 0xb4, 0x92, 0x92, 0x4b, 0x9a, 0xda, 0xa6, 0x0b, 0xbc, 0x45, 0x38, 0x22, 0x84, 0xa0, 0x99, 0x46, - 0x2a, 0xb2, 0x4f, 0x5c, 0xe0, 0x3d, 0x08, 0xf5, 0xfb, 0xec, 0x17, 0x80, 0xd6, 0x07, 0x2a, 0x18, - 0x95, 0xbd, 0xac, 0x95, 0x54, 0xbc, 0x7d, 0xad, 0x87, 0x3d, 0x0d, 0x47, 0x84, 0x5c, 0xb8, 0xcc, - 0xfa, 0x6d, 0x89, 0x46, 0xb0, 0x5a, 0xe9, 0x89, 0xcc, 0xf0, 0x30, 0x84, 0x24, 0xb4, 0xca, 0x28, - 0xa6, 0xa5, 0xb4, 0xe7, 0xee, 0xdc, 0x5b, 0x5e, 0x3c, 0x22, 0xe3, 0x36, 0xdf, 0xf5, 0xd1, 0xf7, - 0x11, 0x13, 0xc1, 0x9b, 0xde, 0xe8, 0xf7, 0x5b, 0xe7, 0x3e, 0xb7, 0x19, 0xca, 0x5c, 0xa6, 0x51, - 0xa3, 0xa8, 0x08, 0xc7, 0x56, 0xe8, 0x1c, 0x5a, 0x49, 0x7f, 0x22, 0x69, 0x9b, 0xba, 0xe9, 0x43, - 0x32, 0xc9, 0x88, 0x3e, 0x5d, 0x60, 0xf6, 0x2d, 0xc3, 0x91, 0x14, 0xbc, 0xd8, 0x6c, 0xb1, 0x71, - 0xb3, 0xc5, 0xc6, 0xdd, 0x16, 0x83, 0xaf, 0x1d, 0x06, 0xdf, 0x3a, 0x0c, 0xae, 0x3b, 0x0c, 0x36, - 0x1d, 0x06, 0x3f, 0x3b, 0x0c, 0xfe, 0x74, 0xd8, 0xb8, 0xeb, 0x30, 0xb8, 0xda, 0x61, 0x63, 0xb3, - 0xc3, 0xc6, 0xcd, 0x0e, 0x1b, 0x9f, 0xfe, 0x7d, 0xd2, 0xd8, 0xd2, 0xdb, 0x7f, 0xfa, 0x37, 0x00, - 0x00, 0xff, 0xff, 0xaf, 0xc1, 0x8c, 0xad, 0xd6, 0x02, 0x00, 0x00, + // 467 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0x9c, 0x53, 0xb1, 0x8e, 0xd3, 0x40, + 0x10, 0xf5, 0x26, 0x3e, 0x93, 0xdb, 0x20, 0x21, 0xb6, 0x40, 0xab, 0x48, 0x6c, 0xac, 0xab, 0xd2, + 0x9c, 0x2d, 0x1d, 0x14, 0x54, 0x88, 0x33, 0x08, 0x81, 0x44, 0x81, 0x0c, 0x34, 0x74, 0x8e, 0x3d, + 0xb1, 0x4d, 0x1c, 0xaf, 0xb5, 0xbb, 0x96, 0xa0, 0xe3, 0x13, 0xee, 0x33, 0xf8, 0x94, 0x2b, 0x53, + 0x9e, 0x40, 0x3a, 0x88, 0x23, 0x21, 0xca, 0xfb, 0x04, 0xb4, 0x6b, 0x9b, 0x84, 0x32, 0xd7, 0xcd, + 0x7b, 0xfb, 0xde, 0xcc, 0x78, 0x66, 0x8c, 0x1f, 0x56, 0xcb, 0xd4, 0xcf, 0xcb, 0x14, 0xa4, 0x02, + 0xe1, 0xc7, 0x19, 0xc4, 0xcb, 0x8a, 0xe7, 0xa5, 0xf2, 0x2a, 0xc1, 0x15, 0x27, 0xa3, 0xfe, 0x69, + 0x32, 0x4d, 0x39, 0x4f, 0x0b, 0xf0, 0x0d, 0x3f, 0xaf, 0x17, 0xbe, 0xca, 0x57, 0x20, 0x55, 0xb4, + 0xaa, 0x5a, 0xe9, 0xe4, 0x34, 0xcd, 0x55, 0x56, 0xcf, 0xbd, 0x98, 0xaf, 0xfc, 0x94, 0xa7, 0x7c, + 0xa7, 0xd4, 0xc8, 0x00, 0x13, 0x75, 0xf2, 0x67, 0x7b, 0xf2, 0x98, 0x0b, 0x05, 0x9f, 0x2b, 0xc1, + 0x3f, 0x41, 0xac, 0x3a, 0xe4, 0xff, 0xdf, 0x58, 0x91, 0x43, 0xd9, 0x3f, 0xb5, 0x19, 0x4e, 0x7e, + 0x0c, 0xf0, 0xd1, 0xf3, 0xac, 0x2e, 0x97, 0xe4, 0x09, 0xb6, 0x17, 0x82, 0xaf, 0x28, 0x72, 0xd1, + 0x6c, 0x7c, 0x36, 0xf1, 0xda, 0x56, 0xbd, 0xbe, 0x01, 0xef, 0x7d, 0xdf, 0x6a, 0x30, 0xba, 0xbc, + 0x9e, 0x5a, 0x17, 0x3f, 0xa7, 0x28, 0x34, 0x0e, 0xf2, 0x18, 0x0f, 0x14, 0xa7, 0x83, 0x03, 0x7c, + 0x03, 0xc5, 0x49, 0x80, 0x8f, 0x17, 0x45, 0x2d, 0x33, 0x48, 0xce, 0x15, 0x1d, 0x1e, 0x60, 0xde, + 0xd9, 0xc8, 0x4b, 0x3c, 0x2e, 0x22, 0xa9, 0x3e, 0x54, 0x49, 0xa4, 0x20, 0xa1, 0xf6, 0x01, 0x59, + 0xf6, 0x8d, 0xe4, 0x01, 0x76, 0xe2, 0x82, 0x4b, 0x48, 0xe8, 0x91, 0x8b, 0x66, 0xa3, 0xb0, 0x43, + 0x9a, 0x97, 0x5f, 0xca, 0x18, 0x12, 0xea, 0xb4, 0x7c, 0x8b, 0x08, 0xc1, 0x76, 0x12, 0xa9, 0x88, + 0xde, 0x71, 0xd1, 0xec, 0x6e, 0x68, 0x62, 0xcd, 0x65, 0x10, 0x25, 0x74, 0xd4, 0x72, 0x3a, 0x3e, + 0xf9, 0x8d, 0xb0, 0xf3, 0x0e, 0x44, 0x0e, 0x52, 0xa7, 0xaa, 0x25, 0x88, 0xd7, 0x2f, 0xcc, 0x80, + 0x8f, 0xc3, 0x0e, 0x11, 0x17, 0x8f, 0x17, 0x7a, 0x43, 0xa2, 0x12, 0x79, 0xa9, 0xcc, 0x14, 0xed, + 0x70, 0x9f, 0x22, 0x12, 0x3b, 0x45, 0x34, 0x87, 0x42, 0xd2, 0xa1, 0x3b, 0x9c, 0x8d, 0xcf, 0xee, + 0x7b, 0xdd, 0x06, 0xdf, 0x68, 0xf6, 0x6d, 0x94, 0x8b, 0xe0, 0x95, 0xfe, 0xac, 0xef, 0xd7, 0xd3, + 0xdb, 0xdc, 0x43, 0x9b, 0xe6, 0x3c, 0x89, 0x2a, 0x05, 0x22, 0xec, 0x4a, 0x91, 0x53, 0xec, 0xc4, + 0xfa, 0x2c, 0x24, 0xb5, 0x4d, 0xd1, 0x7b, 0x5e, 0x6f, 0xf3, 0xcc, 0xb9, 0x04, 0xb6, 0x2e, 0x19, + 0x76, 0xa2, 0xe0, 0xe9, 0x7a, 0xc3, 0xac, 0xab, 0x0d, 0xb3, 0x6e, 0x36, 0x0c, 0x7d, 0x6d, 0x18, + 0xfa, 0xd6, 0x30, 0x74, 0xd9, 0x30, 0xb4, 0x6e, 0x18, 0xfa, 0xd5, 0x30, 0xf4, 0xa7, 0x61, 0xd6, + 0x4d, 0xc3, 0xd0, 0xc5, 0x96, 0x59, 0xeb, 0x2d, 0xb3, 0xae, 0xb6, 0xcc, 0xfa, 0xf8, 0xef, 0xc7, + 0x98, 0x3b, 0x66, 0x57, 0x8f, 0xfe, 0x06, 0x00, 0x00, 0xff, 0xff, 0x69, 0x98, 0x4e, 0xed, 0x4a, + 0x03, 0x00, 0x00, } func (this *Chunk) Equal(that interface{}) bool { @@ -235,12 +265,21 @@ func (this *Chunk) Equal(that interface{}) bool { if !this.FlushedAt.Equal(that1.FlushedAt) { return false } + if !this.LastUpdated.Equal(that1.LastUpdated) { + return false + } if this.Closed != that1.Closed { return false } + if this.Synced != that1.Synced { + return false + } if !bytes.Equal(this.Data, that1.Data) { return false } + if !bytes.Equal(this.Head, that1.Head) { + return false + } return true } func (this *Series) Equal(that interface{}) bool { @@ -290,13 +329,16 @@ func (this *Chunk) GoString() string { if this == nil { return "nil" } - s := make([]string, 0, 9) + s := make([]string, 0, 12) s = append(s, "&ingester.Chunk{") s = append(s, "From: "+fmt.Sprintf("%#v", this.From)+",\n") s = append(s, "To: "+fmt.Sprintf("%#v", this.To)+",\n") s = append(s, "FlushedAt: "+fmt.Sprintf("%#v", this.FlushedAt)+",\n") + s = append(s, "LastUpdated: "+fmt.Sprintf("%#v", this.LastUpdated)+",\n") s = append(s, "Closed: "+fmt.Sprintf("%#v", this.Closed)+",\n") + s = append(s, "Synced: "+fmt.Sprintf("%#v", this.Synced)+",\n") s = append(s, "Data: "+fmt.Sprintf("%#v", this.Data)+",\n") + s = append(s, "Head: "+fmt.Sprintf("%#v", this.Head)+",\n") s = append(s, "}") return strings.Join(s, "") } @@ -366,8 +408,16 @@ func (m *Chunk) MarshalTo(dAtA []byte) (int, error) { return 0, err } i += n3 + dAtA[i] = 0x22 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(github_com_gogo_protobuf_types.SizeOfStdTime(m.LastUpdated))) + n4, err := github_com_gogo_protobuf_types.StdTimeMarshalTo(m.LastUpdated, dAtA[i:]) + if err != nil { + return 0, err + } + i += n4 if m.Closed { - dAtA[i] = 0x20 + dAtA[i] = 0x28 i++ if m.Closed { dAtA[i] = 1 @@ -376,12 +426,28 @@ func (m *Chunk) MarshalTo(dAtA []byte) (int, error) { } i++ } + if m.Synced { + dAtA[i] = 0x30 + i++ + if m.Synced { + dAtA[i] = 1 + } else { + dAtA[i] = 0 + } + i++ + } if len(m.Data) > 0 { - dAtA[i] = 0x2a + dAtA[i] = 0x3a i++ i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.Data))) i += copy(dAtA[i:], m.Data) } + if len(m.Head) > 0 { + dAtA[i] = 0x42 + i++ + i = encodeVarintCheckpoint(dAtA, i, uint64(len(m.Head))) + i += copy(dAtA[i:], m.Head) + } return i, nil } @@ -459,13 +525,22 @@ func (m *Chunk) Size() (n int) { n += 1 + l + sovCheckpoint(uint64(l)) l = github_com_gogo_protobuf_types.SizeOfStdTime(m.FlushedAt) n += 1 + l + sovCheckpoint(uint64(l)) + l = github_com_gogo_protobuf_types.SizeOfStdTime(m.LastUpdated) + n += 1 + l + sovCheckpoint(uint64(l)) if m.Closed { n += 2 } + if m.Synced { + n += 2 + } l = len(m.Data) if l > 0 { n += 1 + l + sovCheckpoint(uint64(l)) } + l = len(m.Head) + if l > 0 { + n += 1 + l + sovCheckpoint(uint64(l)) + } return n } @@ -518,8 +593,11 @@ func (this *Chunk) String() string { `From:` + strings.Replace(strings.Replace(this.From.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `To:` + strings.Replace(strings.Replace(this.To.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `FlushedAt:` + strings.Replace(strings.Replace(this.FlushedAt.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, + `LastUpdated:` + strings.Replace(strings.Replace(this.LastUpdated.String(), "Timestamp", "types.Timestamp", 1), `&`, ``, 1) + `,`, `Closed:` + fmt.Sprintf("%v", this.Closed) + `,`, + `Synced:` + fmt.Sprintf("%v", this.Synced) + `,`, `Data:` + fmt.Sprintf("%v", this.Data) + `,`, + `Head:` + fmt.Sprintf("%v", this.Head) + `,`, `}`, }, "") return s @@ -674,6 +752,39 @@ func (m *Chunk) Unmarshal(dAtA []byte) error { } iNdEx = postIndex case 4: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field LastUpdated", wireType) + } + var msglen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + msglen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if msglen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + msglen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + if err := github_com_gogo_protobuf_types.StdTimeUnmarshal(&m.LastUpdated, dAtA[iNdEx:postIndex]); err != nil { + return err + } + iNdEx = postIndex + case 5: if wireType != 0 { return fmt.Errorf("proto: wrong wireType = %d for field Closed", wireType) } @@ -693,7 +804,27 @@ func (m *Chunk) Unmarshal(dAtA []byte) error { } } m.Closed = bool(v != 0) - case 5: + case 6: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Synced", wireType) + } + var v int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + v |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + m.Synced = bool(v != 0) + case 7: if wireType != 2 { return fmt.Errorf("proto: wrong wireType = %d for field Data", wireType) } @@ -727,6 +858,40 @@ func (m *Chunk) Unmarshal(dAtA []byte) error { m.Data = []byte{} } iNdEx = postIndex + case 8: + if wireType != 2 { + return fmt.Errorf("proto: wrong wireType = %d for field Head", wireType) + } + var byteLen int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowCheckpoint + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + byteLen |= int(b&0x7F) << shift + if b < 0x80 { + break + } + } + if byteLen < 0 { + return ErrInvalidLengthCheckpoint + } + postIndex := iNdEx + byteLen + if postIndex < 0 { + return ErrInvalidLengthCheckpoint + } + if postIndex > l { + return io.ErrUnexpectedEOF + } + m.Head = append(m.Head[:0], dAtA[iNdEx:postIndex]...) + if m.Head == nil { + m.Head = []byte{} + } + iNdEx = postIndex default: iNdEx = preIndex skippy, err := skipCheckpoint(dAtA[iNdEx:]) diff --git a/pkg/ingester/checkpoint.proto b/pkg/ingester/checkpoint.proto index 0437746c8e..692ec7c50a 100644 --- a/pkg/ingester/checkpoint.proto +++ b/pkg/ingester/checkpoint.proto @@ -14,13 +14,19 @@ message Chunk { google.protobuf.Timestamp from = 1 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; google.protobuf.Timestamp flushedAt = 3 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; - bool closed = 4; - bytes data = 5; + google.protobuf.Timestamp lastUpdated = 4 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false]; + bool closed = 5; + bool synced = 6; + // data to be unmarshaled into a MemChunk + bytes data = 7; + // data to be unmarshaled into a MemChunk's headBlock + bytes head = 8; } // Series is a {de,}serializable intermediate type for Series. message Series { string userID = 1; + // post mapped fingerprint is necessary because subsequent wal writes will reference it. uint64 fingerprint = 2; repeated cortex.LabelPair labels = 3 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/cortexproject/cortex/pkg/ingester/client.LabelAdapter"]; repeated Chunk chunks = 4 [(gogoproto.nullable) = false]; diff --git a/pkg/ingester/encoding.go b/pkg/ingester/encoding.go index a933caec5a..2fee94f116 100644 --- a/pkg/ingester/encoding.go +++ b/pkg/ingester/encoding.go @@ -5,7 +5,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/prometheus/tsdb/encoding" - tsdb_record "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/record" "github.com/grafana/loki/pkg/logproto" ) @@ -25,9 +25,44 @@ const ( // WALRecord is a struct combining the series and samples record. type WALRecord struct { - UserID string - Series []tsdb_record.RefSeries - RefEntries RefEntries + UserID string + Series []record.RefSeries + + // entryIndexMap coordinates the RefEntries index associated with a particular fingerprint. + // This is helpful for constant time lookups during ingestion and is ignored when restoring + // from the WAL. + entryIndexMap map[uint64]int + RefEntries []RefEntries +} + +func (r *WALRecord) IsEmpty() bool { + return len(r.Series) == 0 && len(r.RefEntries) == 0 +} + +func (r *WALRecord) Reset() { + r.UserID = "" + if len(r.Series) > 0 { + r.Series = r.Series[:0] + } + + for _, ref := range r.RefEntries { + recordPool.PutEntries(ref.Entries) + } + r.RefEntries = r.RefEntries[:0] + r.entryIndexMap = make(map[uint64]int) +} + +func (r *WALRecord) AddEntries(fp uint64, entries ...logproto.Entry) { + if idx, ok := r.entryIndexMap[fp]; ok { + r.RefEntries[idx].Entries = append(r.RefEntries[idx].Entries, entries...) + return + } + + r.entryIndexMap[fp] = len(r.RefEntries) + r.RefEntries = append(r.RefEntries, RefEntries{ + Ref: fp, + Entries: entries, + }) } type RefEntries struct { @@ -35,74 +70,96 @@ type RefEntries struct { Entries []logproto.Entry } -func (record *WALRecord) encodeSeries(b []byte) []byte { +func (r *WALRecord) encodeSeries(b []byte) []byte { buf := EncWith(b) buf.PutByte(byte(WALRecordSeries)) - buf.PutUvarintStr(record.UserID) + buf.PutUvarintStr(r.UserID) - var enc tsdb_record.Encoder + var enc record.Encoder // The 'encoded' already has the type header and userID here, hence re-using // the remaining part of the slice (i.e. encoded[len(encoded):])) to encode the series. encoded := buf.Get() - encoded = append(encoded, enc.Series(record.Series, encoded[len(encoded):])...) + encoded = append(encoded, enc.Series(r.Series, encoded[len(encoded):])...) return encoded } -func (record *WALRecord) encodeEntries(b []byte) []byte { +func (r *WALRecord) encodeEntries(b []byte) []byte { buf := EncWith(b) buf.PutByte(byte(WALRecordEntries)) - buf.PutUvarintStr(record.UserID) - - entries := record.RefEntries.Entries - if len(entries) == 0 { - return buf.Get() + buf.PutUvarintStr(r.UserID) + + // Placeholder for the first timestamp of any sample encountered. + // All others in this record will store their timestamps as diffs relative to this + // as a space optimization. + var first int64 + +outer: + for _, ref := range r.RefEntries { + for _, entry := range ref.Entries { + first = entry.Timestamp.UnixNano() + buf.PutBE64int64(first) + break outer + } } - // Only encode the series fingerprint if there are >0 entries. - buf.PutBE64(record.RefEntries.Ref) - - // Store base timestamp and base reference number of first sample. - // All samples encode their timestamp and ref as delta to those. - first := entries[0].Timestamp.UnixNano() - - buf.PutBE64int64(first) - - for _, s := range entries { - buf.PutVarint64(s.Timestamp.UnixNano() - first) - // denote line length - byteLine := []byte(s.Line) - buf.PutUvarint(len(byteLine)) - buf.PutBytes(byteLine) + for _, ref := range r.RefEntries { + // ignore refs with 0 entries + if len(ref.Entries) < 1 { + continue + } + buf.PutBE64(ref.Ref) // write fingerprint + buf.PutUvarint(len(ref.Entries)) // write number of entries + + for _, s := range ref.Entries { + buf.PutVarint64(s.Timestamp.UnixNano() - first) + // denote line length + byteLine := []byte(s.Line) + buf.PutUvarint(len(byteLine)) + buf.PutBytes(byteLine) + } } return buf.Get() } -func decodeEntries(b []byte, entries *RefEntries) error { +func decodeEntries(b []byte, rec *WALRecord) error { if len(b) == 0 { return nil } dec := DecWith(b) - - entries.Ref = dec.Be64() baseTime := dec.Be64int64() for len(dec.B) > 0 && dec.Err() == nil { - dRef := dec.Varint64() - ln := dec.Uvarint() - line := dec.Bytes(ln) - - entries.Entries = append(entries.Entries, logproto.Entry{ - Timestamp: time.Unix(0, baseTime+dRef), - Line: string(line), - }) + refEntries := RefEntries{ + Ref: dec.Be64(), + } + + nEntries := dec.Uvarint() + rem := nEntries + for ; dec.Err() == nil && rem > 0; rem-- { + timeOffset := dec.Varint64() + lineLength := dec.Uvarint() + line := dec.Bytes(lineLength) + + refEntries.Entries = append(refEntries.Entries, logproto.Entry{ + Timestamp: time.Unix(0, baseTime+timeOffset), + Line: string(line), + }) + } + + if dec.Err() != nil { + return errors.Wrapf(dec.Err(), "entry decode error after %d RefEntries", nEntries-rem) + } + + rec.RefEntries = append(rec.RefEntries, refEntries) } if dec.Err() != nil { - return errors.Wrapf(dec.Err(), "decode error after %d entries", len(entries.Entries)) + return errors.Wrap(dec.Err(), "refEntry decode error") } + if len(dec.B) > 0 { return errors.Errorf("unexpected %d bytes left in entry", len(dec.B)) } @@ -113,23 +170,20 @@ func decodeEntries(b []byte, entries *RefEntries) error { func decodeWALRecord(b []byte, walRec *WALRecord) (err error) { var ( userID string - dec tsdb_record.Decoder - rSeries []tsdb_record.RefSeries + dec record.Decoder + rSeries []record.RefSeries decbuf = DecWith(b) t = RecordType(decbuf.Byte()) ) - walRec.Series = walRec.Series[:0] - walRec.RefEntries.Entries = walRec.RefEntries.Entries[:0] - switch t { case WALRecordSeries: userID = decbuf.UvarintStr() rSeries, err = dec.Series(decbuf.B, walRec.Series) case WALRecordEntries: userID = decbuf.UvarintStr() - err = decodeEntries(decbuf.B, &walRec.RefEntries) + err = decodeEntries(decbuf.B, walRec) default: return errors.New("unknown record type") } diff --git a/pkg/ingester/encoding_test.go b/pkg/ingester/encoding_test.go index 793c92a0c4..2e023af780 100644 --- a/pkg/ingester/encoding_test.go +++ b/pkg/ingester/encoding_test.go @@ -1,19 +1,23 @@ package ingester import ( + "fmt" "testing" "time" + "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/tsdb/record" "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/logproto" ) func Test_Encoding_Series(t *testing.T) { record := &WALRecord{ - UserID: "123", + entryIndexMap: make(map[uint64]int), + UserID: "123", Series: []record.RefSeries{ { Ref: 456, @@ -34,26 +38,42 @@ func Test_Encoding_Series(t *testing.T) { buf := record.encodeSeries(nil) - var decoded WALRecord + decoded := recordPool.GetRecord() - err := decodeWALRecord(buf, &decoded) + err := decodeWALRecord(buf, decoded) require.Nil(t, err) - require.Equal(t, record, &decoded) + require.Equal(t, record, decoded) } func Test_Encoding_Entries(t *testing.T) { record := &WALRecord{ - UserID: "123", - RefEntries: RefEntries{ - Ref: 456, - Entries: []logproto.Entry{ - { - Timestamp: time.Unix(1000, 0), - Line: "first", + entryIndexMap: make(map[uint64]int), + UserID: "123", + RefEntries: []RefEntries{ + { + Ref: 456, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(1000, 0), + Line: "first", + }, + { + Timestamp: time.Unix(2000, 0), + Line: "second", + }, }, - { - Timestamp: time.Unix(2000, 0), - Line: "second", + }, + { + Ref: 789, + Entries: []logproto.Entry{ + { + Timestamp: time.Unix(3000, 0), + Line: "third", + }, + { + Timestamp: time.Unix(4000, 0), + Line: "fourth", + }, }, }, }, @@ -61,9 +81,153 @@ func Test_Encoding_Entries(t *testing.T) { buf := record.encodeEntries(nil) - var decoded WALRecord + decoded := recordPool.GetRecord() - err := decodeWALRecord(buf, &decoded) + err := decodeWALRecord(buf, decoded) require.Nil(t, err) - require.Equal(t, record, &decoded) + require.Equal(t, record, decoded) +} + +func fillChunk(t *testing.T, c chunkenc.Chunk) int64 { + t.Helper() + var i, inserted int64 + entry := &logproto.Entry{ + Timestamp: time.Unix(0, 0), + Line: "entry for line 0", + } + + for c.SpaceFor(entry) { + require.NoError(t, c.Append(entry)) + i++ + inserted += int64(len(entry.Line)) + entry.Timestamp = time.Unix(0, i) + entry.Line = fmt.Sprintf("entry for line %d", i) + } + return inserted +} + +func dummyConf() *Config { + var conf Config + conf.BlockSize = 256 * 1024 + conf.TargetChunkSize = 1500 * 1024 + + return &conf +} + +func Test_EncodingChunks(t *testing.T) { + + conf := dummyConf() + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize) + fillChunk(t, c) + + from := []chunkDesc{ + { + chunk: c, + }, + // test non zero values + { + chunk: c, + closed: true, + synced: true, + flushed: time.Unix(1, 0), + lastUpdated: time.Unix(0, 1), + }, + } + there, err := toWireChunks(from, nil) + require.Nil(t, err) + backAgain, err := fromWireChunks(conf, there) + require.Nil(t, err) + + for i, to := range backAgain { + // test the encoding directly as the substructure may change. + // for instance the uncompressed size for each block is not included in the encoded version. + enc, err := to.chunk.Bytes() + require.Nil(t, err) + to.chunk = nil + + matched := from[i] + exp, err := matched.chunk.Bytes() + require.Nil(t, err) + matched.chunk = nil + + require.Equal(t, exp, enc) + require.Equal(t, matched, to) + + } +} + +func Test_EncodingCheckpoint(t *testing.T) { + conf := dummyConf() + c := chunkenc.NewMemChunk(chunkenc.EncGZIP, conf.BlockSize, conf.TargetChunkSize) + require.Nil(t, c.Append(&logproto.Entry{ + Timestamp: time.Unix(1, 0), + Line: "hi there", + })) + data, err := c.Bytes() + require.Nil(t, err) + from, to := c.Bounds() + + ls := labels.FromMap(map[string]string{"foo": "bar"}) + s := &Series{ + UserID: "fake", + Fingerprint: 123, + Labels: client.FromLabelsToLabelAdapters(ls), + Chunks: []Chunk{ + { + From: from, + To: to, + Closed: true, + Synced: true, + FlushedAt: time.Unix(1, 0), + LastUpdated: time.Unix(0, 1), + Data: data, + }, + }, + } + + b, err := encodeWithTypeHeader(s, CheckpointRecord) + require.Nil(t, err) + + out := &Series{} + err = decodeCheckpointRecord(b, out) + require.Nil(t, err) + + // override the passed []byte to ensure that the resulting *Series doesn't + // contain any trailing refs to it. + for i := range b { + b[i] = 0 + } + + // test chunk bytes separately + sChunks := s.Chunks + s.Chunks = nil + outChunks := out.Chunks + out.Chunks = nil + + require.Equal(t, s, out) + require.Equal(t, len(sChunks), len(outChunks)) + for i, exp := range sChunks { + + got := outChunks[i] + zero := time.Unix(0, 0) + // Issues diffing zero-value time.Locations against nil ones. + // Check/override them individually so that other fields get tested in an extensible manner. + require.Equal(t, true, exp.From.Equal(got.From)) + exp.From = zero + got.From = zero + + require.Equal(t, true, exp.To.Equal(got.To)) + exp.To = zero + got.To = zero + + require.Equal(t, true, exp.FlushedAt.Equal(got.FlushedAt)) + exp.FlushedAt = zero + got.FlushedAt = zero + + require.Equal(t, true, exp.LastUpdated.Equal(got.LastUpdated)) + exp.LastUpdated = zero + got.LastUpdated = zero + + require.Equal(t, exp, got) + } } diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 1775f7381d..32428b78f0 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -320,6 +320,10 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP wireChunks := make([]chunk.Chunk, 0, len(cs)) for _, c := range cs { + // Ensure that new blocks are cut before flushing as data in the head block is not included otherwise. + if err = c.chunk.Close(); err != nil { + return err + } firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) c := chunk.NewChunk( userID, fp, metric, diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f03b4fae93..bd8b7cea1b 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -2,14 +2,19 @@ package ingester import ( "context" - "errors" "flag" "fmt" + "net/http" + "os" "sync" "time" - "github.com/grafana/loki/pkg/storage" - + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/util" + "github.com/cortexproject/cortex/pkg/util/services" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" @@ -17,11 +22,6 @@ import ( "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/ring" - "github.com/cortexproject/cortex/pkg/util" - "github.com/cortexproject/cortex/pkg/util/services" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/ingester/client" @@ -29,7 +29,9 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/storage" "github.com/grafana/loki/pkg/storage/stores/shipper" + errUtil "github.com/grafana/loki/pkg/util" listutil "github.com/grafana/loki/pkg/util" "github.com/grafana/loki/pkg/util/validation" ) @@ -50,15 +52,16 @@ type Config struct { // Config for transferring chunks. MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"` - ConcurrentFlushes int `yaml:"concurrent_flushes"` - FlushCheckPeriod time.Duration `yaml:"flush_check_period"` - FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` - RetainPeriod time.Duration `yaml:"chunk_retain_period"` - MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` - BlockSize int `yaml:"chunk_block_size"` - TargetChunkSize int `yaml:"chunk_target_size"` - ChunkEncoding string `yaml:"chunk_encoding"` - MaxChunkAge time.Duration `yaml:"max_chunk_age"` + ConcurrentFlushes int `yaml:"concurrent_flushes"` + FlushCheckPeriod time.Duration `yaml:"flush_check_period"` + FlushOpTimeout time.Duration `yaml:"flush_op_timeout"` + RetainPeriod time.Duration `yaml:"chunk_retain_period"` + MaxChunkIdle time.Duration `yaml:"chunk_idle_period"` + BlockSize int `yaml:"chunk_block_size"` + TargetChunkSize int `yaml:"chunk_target_size"` + ChunkEncoding string `yaml:"chunk_encoding"` + parsedEncoding chunkenc.Encoding `yaml:"-"` // placeholder for validated encoding + MaxChunkAge time.Duration `yaml:"max_chunk_age"` // Synchronization settings. Used to make sure that ingesters cut their chunks at the same moments. SyncPeriod time.Duration `yaml:"sync_period"` @@ -71,11 +74,14 @@ type Config struct { QueryStore bool `yaml:"-"` QueryStoreMaxLookBackPeriod time.Duration `yaml:"query_store_max_look_back_period"` + + WAL WALConfig `yaml:"wal,omitempty"` } // RegisterFlags registers the flags. func (cfg *Config) RegisterFlags(f *flag.FlagSet) { cfg.LifecyclerConfig.RegisterFlags(f) + cfg.WAL.RegisterFlags(f) f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing. If set to 0 or negative value, transfers are disabled.") f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushes", 16, "") @@ -93,6 +99,24 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.") } +func (cfg *Config) Validate() error { + enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding) + if err != nil { + return err + } + cfg.parsedEncoding = enc + + if err = cfg.WAL.Validate(); err != nil { + return err + } + + if cfg.MaxTransferRetries > 0 && cfg.WAL.Enabled { + return errors.New("the use of the write ahead log (WAL) is incompatible with chunk transfers. It's suggested to use the WAL. Please try setting ingester.max-transfer-retries to 0 to disable transfers") + } + return nil + +} + // Ingester builds chunks for incoming log streams. type Ingester struct { services.Service @@ -121,7 +145,10 @@ type Ingester struct { flushQueuesDone sync.WaitGroup limiter *Limiter - factory func() chunkenc.Chunk + + metrics *ingesterMetrics + + wal WAL } // ChunkStore is the interface we need to store chunks. @@ -138,10 +165,8 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } - enc, err := chunkenc.ParseEncoding(cfg.ChunkEncoding) - if err != nil { - return nil, err - } + + metrics := newIngesterMetrics(registerer) i := &Ingester{ cfg: cfg, @@ -152,12 +177,22 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid loopQuit: make(chan struct{}), flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes), tailersQuit: make(chan struct{}), - factory: func() chunkenc.Chunk { - return chunkenc.NewMemChunk(enc, cfg.BlockSize, cfg.TargetChunkSize) - }, + metrics: metrics, + } + + if cfg.WAL.Enabled { + if err := os.MkdirAll(cfg.WAL.Dir, os.ModePerm); err != nil { + return nil, err + } + } + + wal, err := newWAL(cfg.WAL, registerer, metrics, newIngesterSeriesIter(i)) + if err != nil { + return nil, err } + i.wal = wal - i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, true, registerer) + i.lifecycler, err = ring.NewLifecycler(cfg.LifecyclerConfig, i, "ingester", ring.IngesterRingKey, !cfg.WAL.Enabled || cfg.WAL.FlushOnShutdown, registerer) if err != nil { return nil, err } @@ -174,6 +209,46 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid } func (i *Ingester) starting(ctx context.Context) error { + if i.cfg.WAL.Recover { + recoverer := newIngesterRecoverer(i) + defer recoverer.Close() + + start := time.Now() + + level.Info(util.Logger).Log("msg", "recovering from checkpoint") + checkpointReader, checkpointCloser, err := newCheckpointReader(i.cfg.WAL.Dir) + if err != nil { + return err + } + defer checkpointCloser.Close() + + if err = RecoverCheckpoint(checkpointReader, recoverer); err != nil { + i.metrics.walCorruptionsTotal.WithLabelValues(walTypeCheckpoint).Inc() + level.Error(util.Logger).Log("msg", "failed to recover from checkpoint", "elapsed", time.Since(start).String()) + return err + } + level.Info(util.Logger).Log("msg", "recovered from checkpoint", "elapsed", time.Since(start).String()) + + level.Info(util.Logger).Log("msg", "recovering from WAL") + segmentReader, segmentCloser, err := newWalReader(i.cfg.WAL.Dir, -1) + if err != nil { + return err + } + defer segmentCloser.Close() + + if err = RecoverWAL(segmentReader, recoverer); err != nil { + i.metrics.walCorruptionsTotal.WithLabelValues(walTypeSegment).Inc() + level.Error(util.Logger).Log("msg", "failed to recover from WAL segments", "elapsed", time.Since(start).String()) + return err + } + level.Info(util.Logger).Log("msg", "recovered from WAL segments", "elapsed", time.Since(start).String()) + + elapsed := time.Since(start) + i.metrics.walReplayDuration.Set(elapsed.Seconds()) + level.Info(util.Logger).Log("msg", "recovery completed", "time", elapsed.String()) + + } + i.flushQueuesDone.Add(i.cfg.ConcurrentFlushes) for j := 0; j < i.cfg.ConcurrentFlushes; j++ { i.flushQueues[j] = util.NewPriorityQueue(flushQueueLength) @@ -222,8 +297,9 @@ func (i *Ingester) running(ctx context.Context) error { // At this point, loop no longer runs, but flushers are still running. func (i *Ingester) stopping(_ error) error { i.stopIncomingRequests() - - err := services.StopAndAwaitTerminated(context.Background(), i.lifecycler) + var errs errUtil.MultiError + errs.Add(i.wal.Stop()) + errs.Add(services.StopAndAwaitTerminated(context.Background(), i.lifecycler)) // Normally, flushers are stopped via lifecycler (in transferOut), but if lifecycler fails, // we better stop them. @@ -232,7 +308,7 @@ func (i *Ingester) stopping(_ error) error { } i.flushQueuesDone.Wait() - return err + return errs.Err() } func (i *Ingester) loop() { @@ -252,6 +328,18 @@ func (i *Ingester) loop() { } } +// ShutdownHandler triggers the following set of operations in order: +// * Change the state of ring to stop accepting writes. +// * Flush all the chunks. +func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { + originalState := i.lifecycler.FlushOnShutdown() + // We want to flush the chunks if transfer fails irrespective of original flag. + i.lifecycler.SetFlushOnShutdown(true) + _ = services.StopAndAwaitTerminated(context.Background(), i) + i.lifecycler.SetFlushOnShutdown(originalState) + w.WriteHeader(http.StatusNoContent) +} + // Push implements logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { instanceID, err := user.ExtractOrgID(ctx) @@ -276,7 +364,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(&i.cfg, instanceID, i.factory, i.limiter, i.cfg.SyncPeriod, i.cfg.SyncMinUtilization) + inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 02820d22c1..98b8bba0b6 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -20,6 +20,7 @@ import ( "google.golang.org/grpc" "google.golang.org/grpc/metadata" + "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -444,3 +445,49 @@ func TestIngester_boltdbShipperMaxLookBack(t *testing.T) { }) } } + +func TestValidate(t *testing.T) { + + for i, tc := range []struct { + in Config + err bool + expected Config + }{ + { + in: Config{ + MaxChunkAge: time.Minute, + ChunkEncoding: chunkenc.EncGZIP.String(), + }, + expected: Config{ + MaxChunkAge: time.Minute, + ChunkEncoding: chunkenc.EncGZIP.String(), + parsedEncoding: chunkenc.EncGZIP, + }, + }, + { + in: Config{ + ChunkEncoding: chunkenc.EncSnappy.String(), + }, + expected: Config{ + ChunkEncoding: chunkenc.EncSnappy.String(), + parsedEncoding: chunkenc.EncSnappy, + }, + }, + { + in: Config{ + ChunkEncoding: "bad-enc", + }, + err: true, + }, + } { + t.Run(fmt.Sprint(i), func(t *testing.T) { + err := tc.in.Validate() + if tc.err { + require.NotNil(t, err) + return + } + require.Nil(t, err) + require.Equal(t, tc.expected, tc.in) + }) + } +} diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 4714d066d9..c3b2718ec7 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -4,20 +4,19 @@ import ( "context" "net/http" "sync" - "time" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" + tsdb_record "github.com/prometheus/prometheus/tsdb/record" "github.com/weaveworks/common/httpgrpc" "github.com/cortexproject/cortex/pkg/ingester/client" "github.com/cortexproject/cortex/pkg/ingester/index" cutil "github.com/cortexproject/cortex/pkg/util" - "github.com/grafana/loki/pkg/chunkenc" "github.com/grafana/loki/pkg/helpers" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" @@ -75,14 +74,19 @@ type instance struct { tailerMtx sync.RWMutex limiter *Limiter - factory func() chunkenc.Chunk - // sync - syncPeriod time.Duration - syncMinUtil float64 + wal WAL + + metrics *ingesterMetrics } -func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, limiter *Limiter, syncPeriod time.Duration, syncMinUtil float64) *instance { +func newInstance( + cfg *Config, + instanceID string, + limiter *Limiter, + wal WAL, + metrics *ingesterMetrics, +) *instance { i := &instance{ cfg: cfg, streams: map[string]*stream{}, @@ -94,12 +98,11 @@ func newInstance(cfg *Config, instanceID string, factory func() chunkenc.Chunk, streamsCreatedTotal: streamsCreatedTotal.WithLabelValues(instanceID), streamsRemovedTotal: streamsRemovedTotal.WithLabelValues(instanceID), - factory: factory, tailers: map[uint32]*tailer{}, limiter: limiter, - syncPeriod: syncPeriod, - syncMinUtil: syncMinUtil, + wal: wal, + metrics: metrics, } i.mapper = newFPMapper(i.getLabelsFromFingerprint) return i @@ -115,8 +118,9 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo stream, ok := i.streamsByFP[fp] if !ok { + sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(ls), fp) - stream = newStream(i.cfg, fp, sortedLabels, i.factory) + stream = newStream(i.cfg, fp, sortedLabels, i.metrics) i.streamsByFP[fp] = stream i.streams[stream.labelsString] = stream i.streamsCreatedTotal.Inc() @@ -133,20 +137,24 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo } func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { + record := recordPool.GetRecord() + record.UserID = i.instanceID + defer recordPool.PutRecord(record) + i.streamsMtx.Lock() defer i.streamsMtx.Unlock() var appendErr error for _, s := range req.Streams { - stream, err := i.getOrCreateStream(s) + stream, err := i.getOrCreateStream(s, false, record) if err != nil { appendErr = err continue } prevNumChunks := len(stream.chunks) - if err := stream.Push(ctx, s.Entries, i.syncPeriod, i.syncMinUtil); err != nil { + if err := stream.Push(ctx, s.Entries, record); err != nil { appendErr = err continue } @@ -154,16 +162,34 @@ func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error { memoryChunks.Add(float64(len(stream.chunks) - prevNumChunks)) } + if !record.IsEmpty() { + if err := i.wal.Log(record); err != nil { + return err + } + } + return appendErr } -func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, error) { +// getOrCreateStream returns the stream or creates it. Must hold streams mutex if not asked to lock. +func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, record *WALRecord) (*stream, error) { + if lock { + i.streamsMtx.Lock() + defer i.streamsMtx.Unlock() + } stream, ok := i.streams[pushReqStream.Labels] + if ok { return stream, nil } - err := i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) + // record is only nil when replaying WAL. We don't want to drop data when replaying a WAL after + // reducing the stream limits, for instance. + var err error + if record != nil { + err = i.limiter.AssertMaxStreamsPerUser(i.instanceID, len(i.streams)) + } + if err != nil { validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) bytes := 0 @@ -179,11 +205,23 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream) (*stream, er return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } fp := i.getHashForLabels(labels) - _ = i.index.Add(client.FromLabelsToLabelAdapters(labels), fp) - stream = newStream(i.cfg, fp, labels, i.factory) + + sortedLabels := i.index.Add(client.FromLabelsToLabelAdapters(labels), fp) + stream = newStream(i.cfg, fp, sortedLabels, i.metrics) i.streams[pushReqStream.Labels] = stream i.streamsByFP[fp] = stream + // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). + if record != nil { + record.Series = append(record.Series, tsdb_record.RefSeries{ + Ref: uint64(fp), + Labels: sortedLabels, + }) + } else { + // If the record is nil, this is a WAL recovery. + i.metrics.recoveredStreamsTotal.Inc() + } + memoryStreams.WithLabelValues(i.instanceID).Inc() i.streamsCreatedTotal.Inc() i.addTailersToNewStream(stream) @@ -343,6 +381,13 @@ func (i *instance) Series(_ context.Context, req *logproto.SeriesRequest) (*logp return &logproto.SeriesResponse{Series: series}, nil } +func (i *instance) numStreams() int { + i.streamsMtx.RLock() + defer i.streamsMtx.RUnlock() + + return len(i.streams) +} + // forAllStreams will execute a function for all streams in the instance. // It uses a function in order to enable generic stream access without accidentally leaking streams under the mutex. func (i *instance) forAllStreams(fn func(*stream) error) error { diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index ee57361fad..82489cd9cd 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -9,26 +9,33 @@ import ( "testing" "time" + "github.com/pkg/errors" "github.com/prometheus/prometheus/pkg/labels" - - "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/logproto" - "github.com/stretchr/testify/require" + "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/util/validation" ) -var defaultFactory = func() chunkenc.Chunk { - return chunkenc.NewMemChunk(chunkenc.EncGZIP, 512, 0) +func defaultConfig() *Config { + cfg := Config{ + BlockSize: 512, + ChunkEncoding: "gzip", + } + if err := cfg.Validate(); err != nil { + panic(errors.Wrap(err, "error building default test config")) + } + return &cfg } +var NilMetrics = newIngesterMetrics(nil) + func TestLabelsCollisions(t *testing.T) { limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil) require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0) + i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -55,7 +62,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0) + inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics) const ( concurrent = 10 @@ -113,7 +120,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil) + inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics) lbls := makeRandomLabels() tt := time.Now() @@ -128,7 +135,7 @@ func TestSyncPeriod(t *testing.T) { require.NoError(t, err) // let's verify results - s, err := inst.getOrCreateStream(pr.Streams[0]) + s, err := inst.getOrCreateStream(pr.Streams[0], false, recordPool.GetRecord()) require.NoError(t, err) // make sure each chunk spans max 'sync period' time @@ -149,10 +156,11 @@ func Test_SeriesQuery(t *testing.T) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) // just some random values - syncPeriod := 1 * time.Minute - minUtil := 0.20 + cfg := defaultConfig() + cfg.SyncPeriod = 1 * time.Minute + cfg.SyncMinUtilization = 0.20 - instance := newInstance(&Config{}, "test", defaultFactory, limiter, syncPeriod, minUtil) + instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics) currentTime := time.Now() @@ -162,9 +170,9 @@ func Test_SeriesQuery(t *testing.T) { } for _, testStream := range testStreams { - stream, err := instance.getOrCreateStream(testStream) + stream, err := instance.getOrCreateStream(testStream, false, recordPool.GetRecord()) require.NoError(t, err) - chunk := defaultFactory() + chunk := newStream(cfg, 0, nil, NilMetrics).NewChunk() for _, entry := range testStream.Entries { err = chunk.Append(&entry) require.NoError(t, err) @@ -263,7 +271,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", defaultFactory, limiter, 0, 0) + i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics) ctx := context.Background() for n := 0; n < b.N; n++ { diff --git a/pkg/ingester/metrics.go b/pkg/ingester/metrics.go new file mode 100644 index 0000000000..e9bdefdb9c --- /dev/null +++ b/pkg/ingester/metrics.go @@ -0,0 +1,87 @@ +package ingester + +import ( + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" +) + +type ingesterMetrics struct { + checkpointDeleteFail prometheus.Counter + checkpointDeleteTotal prometheus.Counter + checkpointCreationFail prometheus.Counter + checkpointCreationTotal prometheus.Counter + checkpointDuration prometheus.Summary + checkpointLoggedBytesTotal prometheus.Counter + + walReplayDuration prometheus.Gauge + walCorruptionsTotal *prometheus.CounterVec + walLoggedBytesTotal prometheus.Counter + walRecordsLogged prometheus.Counter + + recoveredStreamsTotal prometheus.Counter + recoveredChunksTotal prometheus.Counter + recoveredEntriesTotal prometheus.Counter +} + +const ( + walTypeCheckpoint = "checkpoint" + walTypeSegment = "segment" +) + +func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics { + return &ingesterMetrics{ + walReplayDuration: promauto.With(r).NewGauge(prometheus.GaugeOpts{ + Name: "loki_ingester_wal_replay_duration_seconds", + Help: "Time taken to replay the checkpoint and the WAL.", + }), + walCorruptionsTotal: promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Name: "loki_ingester_wal_corruptions_total", + Help: "Total number of WAL corruptions encountered.", + }, []string{"type"}), + checkpointDeleteFail: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_checkpoint_deletions_failed_total", + Help: "Total number of checkpoint deletions that failed.", + }), + checkpointDeleteTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_checkpoint_deletions_total", + Help: "Total number of checkpoint deletions attempted.", + }), + checkpointCreationFail: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_checkpoint_creations_failed_total", + Help: "Total number of checkpoint creations that failed.", + }), + checkpointCreationTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_checkpoint_creations_total", + Help: "Total number of checkpoint creations attempted.", + }), + checkpointDuration: promauto.With(r).NewSummary(prometheus.SummaryOpts{ + Name: "loki_ingester_checkpoint_duration_seconds", + Help: "Time taken to create a checkpoint.", + Objectives: map[float64]float64{0.5: 0.05, 0.9: 0.01, 0.99: 0.001}, + }), + walRecordsLogged: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_records_logged_total", + Help: "Total number of WAL records logged.", + }), + checkpointLoggedBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_checkpoint_logged_bytes_total", + Help: "Total number of bytes written to disk for checkpointing.", + }), + walLoggedBytesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_logged_bytes_total", + Help: "Total number of bytes written to disk for WAL records.", + }), + recoveredStreamsTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_recovered_streams_total", + Help: "Total number of streams recovered from the WAL.", + }), + recoveredChunksTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_recovered_chunks_total", + Help: "Total number of chunks recovered from the WAL checkpoints.", + }), + recoveredEntriesTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{ + Name: "loki_ingester_wal_recovered_entries_total", + Help: "Total number of entries recovered from the WAL.", + }), + } +} diff --git a/pkg/ingester/recovery.go b/pkg/ingester/recovery.go new file mode 100644 index 0000000000..8f0c540951 --- /dev/null +++ b/pkg/ingester/recovery.go @@ -0,0 +1,395 @@ +package ingester + +import ( + "context" + io "io" + "runtime" + "sync" + + "github.com/cortexproject/cortex/pkg/ingester/client" + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/prometheus/prometheus/tsdb/wal" + + "github.com/grafana/loki/pkg/logproto" +) + +type WALReader interface { + Next() bool + Err() error + // Record should not be used across multiple calls to Next() + Record() []byte +} + +type NoopWALReader struct{} + +func (NoopWALReader) Next() bool { return false } +func (NoopWALReader) Err() error { return nil } +func (NoopWALReader) Record() []byte { return nil } +func (NoopWALReader) Close() error { return nil } + +// If startSegment is <0, it means all the segments. +func newWalReader(dir string, startSegment int) (*wal.Reader, io.Closer, error) { + var ( + segmentReader io.ReadCloser + err error + ) + if startSegment < 0 { + segmentReader, err = wal.NewSegmentsReader(dir) + if err != nil { + return nil, nil, err + } + } else { + first, last, err := wal.Segments(dir) + if err != nil { + return nil, nil, err + } + if startSegment > last { + return nil, nil, errors.New("start segment is beyond the last WAL segment") + } + if first > startSegment { + startSegment = first + } + segmentReader, err = wal.NewSegmentsRangeReader(wal.SegmentRange{ + Dir: dir, + First: startSegment, + Last: -1, // Till the end. + }) + if err != nil { + return nil, nil, err + } + } + return wal.NewReader(segmentReader), segmentReader, nil +} + +func newCheckpointReader(dir string) (WALReader, io.Closer, error) { + lastCheckpointDir, idx, err := lastCheckpoint(dir) + if err != nil { + return nil, nil, err + } + if idx < 0 { + level.Info(util.Logger).Log("msg", "no checkpoint found, treating as no-op") + var reader NoopWALReader + return reader, reader, nil + } + + r, err := wal.NewSegmentsReader(lastCheckpointDir) + if err != nil { + return nil, nil, err + } + return wal.NewReader(r), r, nil +} + +type Recoverer interface { + NumWorkers() int + Series(series *Series) error + SetStream(userID string, series record.RefSeries) error + Push(userID string, entries RefEntries) error + Close() + Done() <-chan struct{} +} + +type ingesterRecoverer struct { + // basically map[userID]map[fingerprint]*stream + users sync.Map + ing *Ingester + done chan struct{} +} + +func newIngesterRecoverer(i *Ingester) *ingesterRecoverer { + return &ingesterRecoverer{ + ing: i, + done: make(chan struct{}), + } +} + +// Use all available cores +func (r *ingesterRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } + +func (r *ingesterRecoverer) Series(series *Series) error { + inst := r.ing.getOrCreateInstance(series.UserID) + + // TODO(owen-d): create another fn to avoid unnecessary label type conversions. + stream, err := inst.getOrCreateStream(logproto.Stream{ + Labels: client.FromLabelAdaptersToLabels(series.Labels).String(), + }, true, nil) + + if err != nil { + return err + } + + added, err := stream.setChunks(series.Chunks) + if err != nil { + return err + } + r.ing.metrics.recoveredChunksTotal.Add(float64(len(series.Chunks))) + r.ing.metrics.recoveredEntriesTotal.Add(float64(added)) + + // now store the stream in the recovery map under the fingerprint originally recorded + // as it's possible the newly mapped fingerprint is different. This is because the WAL records + // will use this original reference. + got, _ := r.users.LoadOrStore(series.UserID, &sync.Map{}) + streamsMap := got.(*sync.Map) + streamsMap.Store(series.Fingerprint, stream) + return nil +} + +// SetStream is responsible for setting the key path for userIDs -> fingerprints -> streams. +// Internally, this uses nested sync.Maps due to their performance benefits for sets that only grow. +// Using these also allows us to bypass the ingester -> instance -> stream hierarchy internally, which +// may yield some performance gains, but is essential for the following: +// Due to the use of the instance's fingerprint mapper, stream fingerprints are NOT necessarily +// deterministic. The WAL uses the post-mapped fingerprint on the ingester that originally +// created the stream and we ensure that said fingerprint maps correctly to the newly +// created stream during WAL replay, even if the new in memory stream was assigned a different +// fingerprint from the mapper. This is paramount because subsequent WAL records will use +// the fingerprint reported in the WAL record, not the potentially differing one assigned during +// stream creation. +func (r *ingesterRecoverer) SetStream(userID string, series record.RefSeries) error { + inst := r.ing.getOrCreateInstance(userID) + + stream, err := inst.getOrCreateStream( + logproto.Stream{ + Labels: series.Labels.String(), + }, + true, + nil, + ) + if err != nil { + return err + } + + // Now that we have the stream, ensure that the userID -> fingerprint -> stream + // path is set properly. + got, _ := r.users.LoadOrStore(userID, &sync.Map{}) + streamsMap := got.(*sync.Map) + streamsMap.Store(series.Ref, stream) + return nil +} + +func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error { + out, ok := r.users.Load(userID) + if !ok { + return errors.Errorf("user (%s) not set during WAL replay", userID) + } + + s, ok := out.(*sync.Map).Load(entries.Ref) + if !ok { + return errors.Errorf("stream (%d) not set during WAL replay for user (%s)", entries.Ref, userID) + } + + // ignore out of order errors here (it's possible for a checkpoint to already have data from the wal segments) + _ = s.(*stream).Push(context.Background(), entries.Entries, nil) + return nil +} + +func (r *ingesterRecoverer) Close() { + close(r.done) +} + +func (r *ingesterRecoverer) Done() <-chan struct{} { + return r.done +} + +func RecoverWAL(reader WALReader, recoverer Recoverer) error { + dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error { + rec := recordPool.GetRecord() + if err := decodeWALRecord(b, rec); err != nil { + return err + } + + // First process all series to ensure we don't write entries to nonexistant series. + for _, s := range rec.Series { + if err := recoverer.SetStream(rec.UserID, s); err != nil { + return err + } + + } + + for _, entries := range rec.RefEntries { + worker := int(entries.Ref % uint64(len(inputs))) + select { + case err := <-errCh: + return err + + case inputs[worker] <- recoveryInput{ + userID: rec.UserID, + data: entries, + }: + } + } + + return nil + } + + process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) { + for { + select { + case <-recoverer.Done(): + + case next, ok := <-input: + if !ok { + return + } + entries, ok := next.data.(RefEntries) + var err error + if !ok { + err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, entries) + } + if err == nil { + err = recoverer.Push(next.userID, entries) + } + + // Pass the error back, but respect the quit signal. + if err != nil { + select { + case errCh <- err: + case <-recoverer.Done(): + } + return + } + } + } + } + + return recoverGeneric( + reader, + recoverer, + dispatch, + process, + ) + +} + +func RecoverCheckpoint(reader WALReader, recoverer Recoverer) error { + dispatch := func(recoverer Recoverer, b []byte, inputs []chan recoveryInput, errCh <-chan error) error { + s := &Series{} + if err := decodeCheckpointRecord(b, s); err != nil { + return err + } + + worker := int(s.Fingerprint % uint64(len(inputs))) + select { + case err := <-errCh: + return err + + case inputs[worker] <- recoveryInput{ + userID: s.UserID, + data: s, + }: + } + + return nil + } + + process := func(recoverer Recoverer, input <-chan recoveryInput, errCh chan<- error) { + for { + select { + case <-recoverer.Done(): + + case next, ok := <-input: + if !ok { + return + } + series, ok := next.data.(*Series) + var err error + if !ok { + err = errors.Errorf("unexpected type (%T) when recovering WAL, expecting (%T)", next.data, series) + } + if err == nil { + err = recoverer.Series(series) + } + + // Pass the error back, but respect the quit signal. + if err != nil { + select { + case errCh <- err: + case <-recoverer.Done(): + } + return + } + } + } + } + + return recoverGeneric( + reader, + recoverer, + dispatch, + process, + ) +} + +type recoveryInput struct { + userID string + data interface{} +} + +// recoverGeneric enables reusing the ability to recover from WALs of different types +// by exposing the dispatch and process functions. +// Note: it explicitly does not call the Recoverer.Close function as it's possible to layer +// multiple recoveries on top of each other, as in the case of recovering from Checkpoints +// then the WAL. +func recoverGeneric( + reader WALReader, + recoverer Recoverer, + dispatch func(Recoverer, []byte, []chan recoveryInput, <-chan error) error, + process func(Recoverer, <-chan recoveryInput, chan<- error), +) error { + var wg sync.WaitGroup + var lastErr error + nWorkers := recoverer.NumWorkers() + + if nWorkers < 1 { + return errors.New("cannot recover with no workers") + } + + errCh := make(chan error) + inputs := make([]chan recoveryInput, 0, nWorkers) + wg.Add(nWorkers) + for i := 0; i < nWorkers; i++ { + inputs = append(inputs, make(chan recoveryInput)) + + go func(input <-chan recoveryInput) { + defer wg.Done() + process(recoverer, input, errCh) + }(inputs[i]) + + } + +outer: + for reader.Next() { + b := reader.Record() + if lastErr = reader.Err(); lastErr != nil { + break outer + } + + if lastErr = dispatch(recoverer, b, inputs, errCh); lastErr != nil { + break outer + } + } + + for _, w := range inputs { + close(w) + } + + // may have broken loop early + if lastErr != nil { + return lastErr + } + + finished := make(chan struct{}) + go func(finished chan<- struct{}) { + wg.Wait() + finished <- struct{}{} + }(finished) + + select { + case <-finished: + case lastErr = <-errCh: + } + + return lastErr +} diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go new file mode 100644 index 0000000000..4a0de2eef5 --- /dev/null +++ b/pkg/ingester/recovery_test.go @@ -0,0 +1,192 @@ +package ingester + +import ( + fmt "fmt" + "runtime" + "sync" + "testing" + "time" + + "github.com/pkg/errors" + "github.com/prometheus/prometheus/pkg/labels" + "github.com/prometheus/prometheus/tsdb/record" + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/logproto" +) + +type MemoryWALReader struct { + xs [][]byte + + initialized bool +} + +func (m *MemoryWALReader) Next() bool { + if len(m.xs) < 1 { + return false + } + + // don't advance on the first call + if !m.initialized { + m.initialized = true + return true + } + + m.xs = m.xs[1:] + return len(m.xs) > 0 +} + +func (m *MemoryWALReader) Err() error { return nil } + +func (m *MemoryWALReader) Record() []byte { return m.xs[0] } + +func buildMemoryReader(users, totalStreams, entriesPerStream int) (*MemoryWALReader, []*WALRecord) { + var recs []*WALRecord + reader := &MemoryWALReader{} + for i := 0; i < totalStreams; i++ { + user := fmt.Sprintf("%d", i%users) + recs = append(recs, &WALRecord{ + UserID: user, + Series: []record.RefSeries{ + { + Ref: uint64(i), + Labels: labels.FromMap( + map[string]string{ + "stream": fmt.Sprint(i), + "user": user, + }, + ), + }, + }, + }) + + var entries []logproto.Entry + for j := 0; j < entriesPerStream; j++ { + entries = append(entries, logproto.Entry{ + Timestamp: time.Unix(int64(j), 0), + Line: fmt.Sprintf("%d", j), + }) + } + recs = append(recs, &WALRecord{ + UserID: user, + RefEntries: []RefEntries{ + { + Ref: uint64(i), + Entries: entries, + }, + }, + }) + } + + for _, rec := range recs { + if len(rec.Series) > 0 { + reader.xs = append(reader.xs, rec.encodeSeries(nil)) + } + + if len(rec.RefEntries) > 0 { + reader.xs = append(reader.xs, rec.encodeEntries(nil)) + } + } + + return reader, recs + +} + +type MemRecoverer struct { + users map[string]map[uint64][]logproto.Entry + done chan struct{} + + sync.Mutex + usersCt, streamsCt, seriesCt int +} + +func NewMemRecoverer() *MemRecoverer { + return &MemRecoverer{ + users: make(map[string]map[uint64][]logproto.Entry), + done: make(chan struct{}), + } +} + +func (r *MemRecoverer) NumWorkers() int { return runtime.GOMAXPROCS(0) } + +func (r *MemRecoverer) Series(_ *Series) error { return nil } + +func (r *MemRecoverer) SetStream(userID string, series record.RefSeries) error { + r.Lock() + defer r.Unlock() + user, ok := r.users[userID] + if !ok { + user = make(map[uint64][]logproto.Entry) + r.users[userID] = user + r.usersCt++ + } + + if _, exists := user[series.Ref]; exists { + return errors.Errorf("stream (%d) already exists for user (%s)", series.Ref, userID) + } + + user[series.Ref] = make([]logproto.Entry, 0) + r.streamsCt++ + return nil +} + +func (r *MemRecoverer) Push(userID string, entries RefEntries) error { + r.Lock() + defer r.Unlock() + + user, ok := r.users[userID] + if !ok { + return errors.Errorf("unexpected user access (%s)", userID) + } + + stream, ok := user[entries.Ref] + if !ok { + return errors.Errorf("unexpected stream access") + } + + r.seriesCt += len(entries.Entries) + user[entries.Ref] = append(stream, entries.Entries...) + return nil +} + +func (r *MemRecoverer) Close() { close(r.done) } + +func (r *MemRecoverer) Done() <-chan struct{} { return r.done } + +func Test_InMemorySegmentRecover(t *testing.T) { + var ( + users = 10 + streamsCt = 1000 + entriesPerStream = 50 + ) + reader, recs := buildMemoryReader(users, streamsCt, entriesPerStream) + + recoverer := NewMemRecoverer() + + require.Nil(t, RecoverWAL(reader, recoverer)) + recoverer.Close() + + require.Equal(t, users, recoverer.usersCt) + require.Equal(t, streamsCt, recoverer.streamsCt) + require.Equal(t, streamsCt*entriesPerStream, recoverer.seriesCt) + + for _, rec := range recs { + user, ok := recoverer.users[rec.UserID] + require.Equal(t, true, ok) + + for _, s := range rec.Series { + _, ok := user[s.Ref] + require.Equal(t, true, ok) + } + + for _, entries := range rec.RefEntries { + stream, ok := user[entries.Ref] + require.Equal(t, true, ok) + + for i, entry := range entries.Entries { + require.Equal(t, entry, stream[i]) + } + } + } + +} diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index e3d256732c..71568ca563 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -64,15 +64,15 @@ type stream struct { fp model.Fingerprint // possibly remapped fingerprint, used in the streams map labels labels.Labels labelsString string - factory func() chunkenc.Chunk lastLine line + metrics *ingesterMetrics tailers map[uint32]*tailer tailerMtx sync.RWMutex } type chunkDesc struct { - chunk chunkenc.Chunk + chunk *chunkenc.MemChunk closed bool synced bool flushed time.Time @@ -85,14 +85,14 @@ type entryWithError struct { e error } -func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, factory func() chunkenc.Chunk) *stream { +func newStream(cfg *Config, fp model.Fingerprint, labels labels.Labels, metrics *ingesterMetrics) *stream { return &stream{ cfg: cfg, fp: fp, labels: labels, labelsString: labels.String(), - factory: factory, tailers: map[uint32]*tailer{}, + metrics: metrics, } } @@ -111,26 +111,39 @@ func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error { return nil } -func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronizePeriod time.Duration, minUtilization float64) error { +// setChunks is used during checkpoint recovery +func (s *stream) setChunks(chunks []Chunk) (entriesAdded int, err error) { + chks, err := fromWireChunks(s.cfg, chunks) + if err != nil { + return 0, err + } + s.chunks = chks + for _, c := range s.chunks { + entriesAdded += c.chunk.Size() + } + return entriesAdded, nil +} + +func (s *stream) NewChunk() *chunkenc.MemChunk { + return chunkenc.NewMemChunk(s.cfg.parsedEncoding, s.cfg.BlockSize, s.cfg.TargetChunkSize) +} + +func (s *stream) Push( + ctx context.Context, + entries []logproto.Entry, + record *WALRecord, +) error { var lastChunkTimestamp time.Time if len(s.chunks) == 0 { s.chunks = append(s.chunks, chunkDesc{ - chunk: s.factory(), + chunk: s.NewChunk(), }) chunksCreatedTotal.Inc() } else { _, lastChunkTimestamp = s.chunks[len(s.chunks)-1].chunk.Bounds() } - s.tailerMtx.RLock() - hasTailers := len(s.tailers) != 0 - s.tailerMtx.RUnlock() - var storedEntries []logproto.Entry - if hasTailers { - storedEntries = make([]logproto.Entry, 0, len(entries)) - } - failedEntriesWithError := []entryWithError{} // Don't fail on the first append error - if samples are sent out of order, @@ -149,7 +162,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize } chunk := &s.chunks[len(s.chunks)-1] - if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, synchronizePeriod, minUtilization) { + if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, lastChunkTimestamp, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) { // If the chunk has no more space call Close to make sure anything in the head block is cut and compressed err := chunk.chunk.Close() if err != nil { @@ -164,7 +177,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize chunksCreatedTotal.Inc() s.chunks = append(s.chunks, chunkDesc{ - chunk: s.factory(), + chunk: s.NewChunk(), }) chunk = &s.chunks[len(s.chunks)-1] lastChunkTimestamp = time.Time{} @@ -172,10 +185,7 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize if err := chunk.chunk.Append(&entries[i]); err != nil { failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], err}) } else { - // send only stored entries to tailers - if hasTailers { - storedEntries = append(storedEntries, entries[i]) - } + storedEntries = append(storedEntries, entries[i]) lastChunkTimestamp = entries[i].Timestamp s.lastLine.ts = lastChunkTimestamp s.lastLine.content = entries[i].Line @@ -184,30 +194,44 @@ func (s *stream) Push(ctx context.Context, entries []logproto.Entry, synchronize } if len(storedEntries) != 0 { - go func() { - stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries} - - closedTailers := []uint32{} + // record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them). + if record != nil { + record.AddEntries(uint64(s.fp), storedEntries...) + } else { + // If record is nil, this is a WAL recovery. + s.metrics.recoveredEntriesTotal.Add(float64(len(storedEntries))) + } - s.tailerMtx.RLock() - for _, tailer := range s.tailers { - if tailer.isClosed() { - closedTailers = append(closedTailers, tailer.getID()) - continue + s.tailerMtx.RLock() + hasTailers := len(s.tailers) != 0 + s.tailerMtx.RUnlock() + if hasTailers { + go func() { + stream := logproto.Stream{Labels: s.labelsString, Entries: storedEntries} + + closedTailers := []uint32{} + + s.tailerMtx.RLock() + for _, tailer := range s.tailers { + if tailer.isClosed() { + closedTailers = append(closedTailers, tailer.getID()) + continue + } + tailer.send(stream, s.labels) } - tailer.send(stream, s.labels) - } - s.tailerMtx.RUnlock() + s.tailerMtx.RUnlock() - if len(closedTailers) != 0 { - s.tailerMtx.Lock() - defer s.tailerMtx.Unlock() + if len(closedTailers) != 0 { + s.tailerMtx.Lock() + defer s.tailerMtx.Unlock() - for _, closedTailerID := range closedTailers { - delete(s.tailers, closedTailerID) + for _, closedTailerID := range closedTailers { + delete(s.tailers, closedTailerID) + } } - } - }() + }() + } + } if len(failedEntriesWithError) > 0 { diff --git a/pkg/ingester/stream_test.go b/pkg/ingester/stream_test.go index a0f2a72b8a..a6f500e2ac 100644 --- a/pkg/ingester/stream_test.go +++ b/pkg/ingester/stream_test.go @@ -4,7 +4,6 @@ import ( "bytes" "context" "fmt" - "io" "math/rand" "net/http" "testing" @@ -16,7 +15,6 @@ import ( "github.com/weaveworks/common/httpgrpc" "github.com/grafana/loki/pkg/chunkenc" - "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" ) @@ -35,18 +33,20 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { for _, tc := range tt { t.Run(tc.name, func(t *testing.T) { + cfg := defaultConfig() + cfg.MaxReturnedErrors = tc.limit s := newStream( - &Config{MaxReturnedErrors: tc.limit}, + cfg, model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, }, - defaultFactory, + NilMetrics, ) err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(int64(numLogs), 0), Line: "log"}, - }, 0, 0) + }, recordPool.GetRecord()) require.NoError(t, err) newLines := make([]logproto.Entry, numLogs) @@ -65,7 +65,7 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { fmt.Fprintf(&expected, "total ignored: %d out of %d", numLogs, numLogs) expectErr := httpgrpc.Errorf(http.StatusBadRequest, expected.String()) - err = s.Push(context.Background(), newLines, 0, 0) + err = s.Push(context.Background(), newLines, recordPool.GetRecord()) require.Error(t, err) require.Equal(t, expectErr.Error(), err.Error()) }) @@ -74,19 +74,19 @@ func TestMaxReturnedStreamsErrors(t *testing.T) { func TestPushDeduplication(t *testing.T) { s := newStream( - &Config{}, + defaultConfig(), model.Fingerprint(0), labels.Labels{ {Name: "foo", Value: "bar"}, }, - defaultFactory, + NilMetrics, ) err := s.Push(context.Background(), []logproto.Entry{ {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "test"}, {Timestamp: time.Unix(1, 0), Line: "newer, better test"}, - }, 0, 0) + }, recordPool.GetRecord()) require.NoError(t, err) require.Len(t, s.chunks, 1) require.Equal(t, s.chunks[0].chunk.Size(), 2, @@ -99,10 +99,9 @@ func TestStreamIterator(t *testing.T) { for _, chk := range []struct { name string - new func() chunkenc.Chunk + new func() *chunkenc.MemChunk }{ - {"dumbChunk", chunkenc.NewDumbChunk}, - {"gzipChunk", func() chunkenc.Chunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, + {"gzipChunk", func() *chunkenc.MemChunk { return chunkenc.NewMemChunk(chunkenc.EncGZIP, 256*1024, 0) }}, } { t.Run(chk.name, func(t *testing.T) { var s stream @@ -150,9 +149,7 @@ func Benchmark_PushStream(b *testing.B) { labels.Label{Name: "job", Value: "loki-dev/ingester"}, labels.Label{Name: "container", Value: "ingester"}, } - s := newStream(&Config{}, model.Fingerprint(0), ls, func() chunkenc.Chunk { - return &noopChunk{} - }) + s := newStream(&Config{}, model.Fingerprint(0), ls, NilMetrics) t, err := newTailer("foo", `{namespace="loki-dev"}`, &fakeTailServer{}) require.NoError(b, err) @@ -166,72 +163,8 @@ func Benchmark_PushStream(b *testing.B) { b.ReportAllocs() for n := 0; n < b.N; n++ { - require.NoError(b, s.Push(ctx, e, 0, 0)) + rec := recordPool.GetRecord() + require.NoError(b, s.Push(ctx, e, rec)) + recordPool.PutRecord(rec) } } - -type noopChunk struct { -} - -func (c *noopChunk) Bounds() (time.Time, time.Time) { - return time.Time{}, time.Time{} -} - -func (c *noopChunk) SpaceFor(_ *logproto.Entry) bool { - return true -} - -func (c *noopChunk) Append(entry *logproto.Entry) error { - return nil -} - -func (c *noopChunk) Size() int { - return 0 -} - -// UncompressedSize implements Chunk. -func (c *noopChunk) UncompressedSize() int { - return c.Size() -} - -// CompressedSize implements Chunk. -func (c *noopChunk) CompressedSize() int { - return 0 -} - -// Utilization implements Chunk -func (c *noopChunk) Utilization() float64 { - return 0 -} - -func (c *noopChunk) Encoding() chunkenc.Encoding { return chunkenc.EncNone } - -func (c *noopChunk) Iterator(_ context.Context, from, through time.Time, direction logproto.Direction, _ log.StreamPipeline) (iter.EntryIterator, error) { - return nil, nil -} - -func (c *noopChunk) SampleIterator(_ context.Context, from, through time.Time, _ log.StreamSampleExtractor) iter.SampleIterator { - return nil -} - -func (c *noopChunk) Bytes() ([]byte, error) { - return nil, nil -} - -func (c *noopChunk) BytesWith(_ []byte) ([]byte, error) { - return nil, nil -} - -func (c *noopChunk) WriteTo(w io.Writer) (int64, error) { return 0, nil } - -func (c *noopChunk) Blocks(_ time.Time, _ time.Time) []chunkenc.Block { - return nil -} - -func (c *noopChunk) BlockCount() int { - return 0 -} - -func (c *noopChunk) Close() error { - return nil -} diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index f290998432..610453fd64 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -239,6 +239,12 @@ func (i *Ingester) transferOut(ctx context.Context) error { // typically streams won't have many chunks in memory so sending one at a time // shouldn't add too much overhead. for _, c := range istream.chunks { + // Close the chunk first, writing any data in the headblock to a new block. + err := c.chunk.Close() + if err != nil { + return err + } + bb, err := c.chunk.Bytes() if err != nil { return err diff --git a/pkg/ingester/wal.go b/pkg/ingester/wal.go new file mode 100644 index 0000000000..0cd7fd328d --- /dev/null +++ b/pkg/ingester/wal.go @@ -0,0 +1,210 @@ +package ingester + +import ( + "flag" + "sync" + "time" + + "github.com/cortexproject/cortex/pkg/util" + "github.com/go-kit/kit/log/level" + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/prometheus/tsdb/wal" + + "github.com/grafana/loki/pkg/logproto" +) + +var ( + // shared pool for WALRecords and []logproto.Entries + recordPool = newRecordPool() +) + +const walSegmentSize = wal.DefaultSegmentSize * 4 + +type WALConfig struct { + Enabled bool `yaml:"enabled"` + Dir string `yaml:"dir"` + Recover bool `yaml:"recover"` + CheckpointDuration time.Duration `yaml:"checkpoint_duration"` + FlushOnShutdown bool `yaml:"flush_on_shutdown"` +} + +func (cfg *WALConfig) Validate() error { + if cfg.Enabled && cfg.CheckpointDuration < 1 { + return errors.Errorf("invalid checkpoint duration: %v", cfg.CheckpointDuration) + } + return nil +} + +// RegisterFlags adds the flags required to config this to the given FlagSet +func (cfg *WALConfig) RegisterFlags(f *flag.FlagSet) { + f.StringVar(&cfg.Dir, "ingester.wal-dir", "wal", "Directory to store the WAL and/or recover from WAL.") + f.BoolVar(&cfg.Enabled, "ingester.wal-enabled", false, "Enable writing of ingested data into WAL.") + f.BoolVar(&cfg.Recover, "ingester.recover-from-wal", false, "Recover data from existing WAL irrespective of WAL enabled/disabled.") + f.DurationVar(&cfg.CheckpointDuration, "ingester.checkpoint-duration", 5*time.Minute, "Interval at which checkpoints should be created.") + f.BoolVar(&cfg.FlushOnShutdown, "ingester.flush-on-shutdown", false, "When WAL is enabled, should chunks be flushed to long-term storage on shutdown.") +} + +// WAL interface allows us to have a no-op WAL when the WAL is disabled. +type WAL interface { + // Log marshalls the records and writes it into the WAL. + Log(*WALRecord) error + // Stop stops all the WAL operations. + Stop() error +} + +type noopWAL struct{} + +func (noopWAL) Log(*WALRecord) error { return nil } +func (noopWAL) Stop() error { return nil } + +type walWrapper struct { + cfg WALConfig + wal *wal.WAL + metrics *ingesterMetrics + seriesIter SeriesIter + + wait sync.WaitGroup + quit chan struct{} +} + +// newWAL creates a WAL object. If the WAL is disabled, then the returned WAL is a no-op WAL. +func newWAL(cfg WALConfig, registerer prometheus.Registerer, metrics *ingesterMetrics, seriesIter SeriesIter) (WAL, error) { + if !cfg.Enabled { + return noopWAL{}, nil + } + + tsdbWAL, err := wal.NewSize(util.Logger, registerer, cfg.Dir, walSegmentSize, false) + if err != nil { + return nil, err + } + + w := &walWrapper{ + cfg: cfg, + quit: make(chan struct{}), + wal: tsdbWAL, + metrics: metrics, + seriesIter: seriesIter, + } + + w.wait.Add(1) + go w.run() + return w, nil +} + +func (w *walWrapper) Log(record *WALRecord) error { + if record == nil || (len(record.Series) == 0 && len(record.RefEntries) == 0) { + return nil + } + select { + case <-w.quit: + return nil + default: + buf := recordPool.GetBytes()[:0] + defer func() { + recordPool.PutBytes(buf) + }() + + // Always write series then entries. + if len(record.Series) > 0 { + buf = record.encodeSeries(buf) + if err := w.wal.Log(buf); err != nil { + return err + } + w.metrics.walRecordsLogged.Inc() + w.metrics.walLoggedBytesTotal.Add(float64(len(buf))) + buf = buf[:0] + } + if len(record.RefEntries) > 0 { + buf = record.encodeEntries(buf) + if err := w.wal.Log(buf); err != nil { + return err + } + w.metrics.walRecordsLogged.Inc() + w.metrics.walLoggedBytesTotal.Add(float64(len(buf))) + } + return nil + } +} + +func (w *walWrapper) Stop() error { + close(w.quit) + w.wait.Wait() + err := w.wal.Close() + level.Info(util.Logger).Log("msg", "stopped", "component", "wal") + return err +} + +func (w *walWrapper) checkpointWriter() *WALCheckpointWriter { + return &WALCheckpointWriter{ + metrics: w.metrics, + segmentWAL: w.wal, + } +} + +func (w *walWrapper) run() { + level.Info(util.Logger).Log("msg", "started", "component", "wal") + defer w.wait.Done() + + checkpointer := NewCheckpointer( + w.cfg.CheckpointDuration, + w.seriesIter, + w.checkpointWriter(), + w.metrics, + w.quit, + ) + checkpointer.Run() + +} + +type resettingPool struct { + rPool *sync.Pool // records + ePool *sync.Pool // entries + bPool *sync.Pool // bytes +} + +func (p *resettingPool) GetRecord() *WALRecord { + rec := p.rPool.Get().(*WALRecord) + rec.Reset() + return rec +} + +func (p *resettingPool) PutRecord(r *WALRecord) { + p.rPool.Put(r) +} + +func (p *resettingPool) GetEntries() []logproto.Entry { + return p.ePool.Get().([]logproto.Entry) +} + +func (p *resettingPool) PutEntries(es []logproto.Entry) { + p.ePool.Put(es[:0]) // nolint:staticcheck +} + +func (p *resettingPool) GetBytes() []byte { + return p.bPool.Get().([]byte) +} + +func (p *resettingPool) PutBytes(b []byte) { + p.bPool.Put(b[:0]) // nolint:staticcheck +} + +func newRecordPool() *resettingPool { + return &resettingPool{ + rPool: &sync.Pool{ + New: func() interface{} { + return &WALRecord{} + }, + }, + ePool: &sync.Pool{ + New: func() interface{} { + return make([]logproto.Entry, 0, 512) + }, + }, + bPool: &sync.Pool{ + New: func() interface{} { + return make([]byte, 0, 1<<10) // 1kb + }, + }, + } +} diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 0e999440c5..74c613e95b 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -124,6 +124,9 @@ func (c *Config) Validate(log log.Logger) error { if err := c.Ruler.Validate(); err != nil { return errors.Wrap(err, "invalid ruler config") } + if err := c.Ingester.Validate(); err != nil { + return errors.Wrap(err, "invalid ingester config") + } return nil } diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index b3b363927b..4de771ddf3 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -211,6 +211,8 @@ func (t *Loki) initIngester() (_ services.Service, err error) { logproto.RegisterIngesterServer(t.server.GRPC, t.ingester) grpc_health_v1.RegisterHealthServer(t.server.GRPC, t.ingester) t.server.HTTP.Path("/flush").Handler(http.HandlerFunc(t.ingester.FlushHandler)) + // TODO(owen-d): should this use cortex style path (/ingester/shutdown), legacy style (/shutdown), or apir prefixed (/loki/api/v1/ingester/shutdown)? + t.server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(http.HandlerFunc(t.ingester.ShutdownHandler)) return t.ingester, nil }