diff --git a/storage/remote/queue_manager.go b/storage/remote/queue_manager.go index 1d27009069..a814421120 100644 --- a/storage/remote/queue_manager.go +++ b/storage/remote/queue_manager.go @@ -215,9 +215,8 @@ type QueueManager struct { shards *shards numShards int reshardChan chan int - - quit chan struct{} - wg sync.WaitGroup + quit chan struct{} + wg sync.WaitGroup samplesIn, samplesOut, samplesOutDuration *ewmaRate integralAccumulator float64 @@ -259,7 +258,7 @@ func NewQueueManager(logger log.Logger, walDir string, samplesIn *ewmaRate, high t.highestSentTimestampMetric = queueHighestSentTimestamp.WithLabelValues(t.queueName) t.pendingSamplesMetric = queuePendingSamples.WithLabelValues(t.queueName) t.enqueueRetriesMetric = enqueueRetriesTotal.WithLabelValues(t.queueName) - t.watcher = NewWALWatcher(logger, t, walDir, startTime) + t.watcher = NewWALWatcher(logger, client.Name(), t, walDir, startTime) t.shards = t.newShards() numShards.WithLabelValues(t.queueName).Set(float64(t.numShards)) @@ -310,11 +309,10 @@ func (t *QueueManager) Append(s []tsdb.RefSample) bool { } t.seriesMtx.Unlock() - backoff := t.cfg.MinBackoff outer: for _, sample := range tempSamples { - // This will result in spin/busy waiting if the queues are being resharded - // or shutting down. TODO backoff. + // This will only loop if the queues are being resharded. + backoff := t.cfg.MinBackoff for { select { case <-t.quit: @@ -325,6 +323,7 @@ outer: if t.shards.enqueue(sample.ref, sample.ts) { continue outer } + t.enqueueRetriesMetric.Inc() time.Sleep(time.Duration(backoff)) backoff = backoff * 2 @@ -359,18 +358,6 @@ func (t *QueueManager) Stop() { t.wg.Wait() } -func (t *QueueManager) Name() string { - return t.queueName -} - -// Find out which series are dropped after relabelling and make sure we have a metric label for them. -func (t *QueueManager) diffKeys(ref uint64, original, relabelled model.LabelSet) { - numDropped := len(original) - len(relabelled) - if numDropped == 0 { - return - } -} - // StoreSeries keeps track of which series we know about for lookups when sending samples to remote. func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { temp := make(map[uint64][]prompb.Label, len(series)) @@ -381,8 +368,6 @@ func (t *QueueManager) StoreSeries(series []tsdb.RefSeries, index int) { } t.processExternalLabels(ls) rl := relabel.Process(ls, t.relabelConfigs...) - - t.diffKeys(s.Ref, ls, rl) if len(rl) == 0 { t.droppedSeries[s.Ref] = struct{}{} continue diff --git a/storage/remote/storage.go b/storage/remote/storage.go index e672cf2861..428773c053 100644 --- a/storage/remote/storage.go +++ b/storage/remote/storage.go @@ -42,6 +42,7 @@ type Storage struct { queues []*QueueManager samplesIn *ewmaRate samplesInMetric prometheus.Counter + highestTimestampMtx sync.Mutex highestTimestamp int64 highestTimestampMetric prometheus.Gauge diff --git a/storage/remote/wal_watcher.go b/storage/remote/wal_watcher.go index 3ce700ba5c..5faf843d04 100644 --- a/storage/remote/wal_watcher.go +++ b/storage/remote/wal_watcher.go @@ -14,9 +14,7 @@ package remote import ( - "context" "fmt" - "io" "os" "path" "strconv" @@ -33,6 +31,12 @@ import ( "github.com/prometheus/tsdb/wal" ) +const ( + readPeriod = 10 * time.Millisecond + checkpointPeriod = 5 * time.Second + segmentCheckPeriod = 100 * time.Millisecond +) + var ( watcherSamplesRecordsRead = prometheus.NewCounterVec( prometheus.CounterOpts{ @@ -123,14 +127,15 @@ type writeTo interface { Append([]tsdb.RefSample) bool StoreSeries([]tsdb.RefSeries, int) SeriesReset(int) - Name() string } // WALWatcher watches the TSDB WAL for a given WriteTo. type WALWatcher struct { - writer writeTo - logger log.Logger - walDir string + name string + writer writeTo + logger log.Logger + walDir string + currentSegment int lastCheckpoint string startTime int64 @@ -144,138 +149,105 @@ type WALWatcher struct { samplesSentPreTailing prometheus.Counter currentSegmentMetric prometheus.Gauge - ctx context.Context - cancel context.CancelFunc - quit chan struct{} + quit chan struct{} } // NewWALWatcher creates a new WAL watcher for a given WriteTo. -func NewWALWatcher(logger log.Logger, writer writeTo, walDir string, startTime int64) *WALWatcher { +func NewWALWatcher(logger log.Logger, name string, writer writeTo, walDir string, startTime int64) *WALWatcher { if logger == nil { logger = log.NewNopLogger() } - ctx, cancel := context.WithCancel(context.Background()) w := &WALWatcher{ logger: logger, writer: writer, walDir: path.Join(walDir, "wal"), startTime: startTime, - ctx: ctx, - cancel: cancel, quit: make(chan struct{}), } - w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(w.writer.Name()) - w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(w.writer.Name()) - w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(w.writer.Name()) - w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(w.writer.Name()) - w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(w.writer.Name()) - w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(w.writer.Name()) - w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(w.writer.Name()) - w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(w.writer.Name()) + w.samplesReadMetric = watcherSamplesRecordsRead.WithLabelValues(name) + w.seriesReadMetric = watcherSeriesRecordsRead.WithLabelValues(name) + w.tombstonesReadMetric = watcherTombstoneRecordsRead.WithLabelValues(name) + w.unknownReadMetric = watcherUnknownTypeRecordsRead.WithLabelValues(name) + w.invalidReadMetric = watcherInvalidRecordsRead.WithLabelValues(name) + w.recordDecodeFailsMetric = watcherRecordDecodeFails.WithLabelValues(name) + w.samplesSentPreTailing = watcherSamplesSentPreTailing.WithLabelValues(name) + w.currentSegmentMetric = watcherCurrentSegment.WithLabelValues(name) return w } func (w *WALWatcher) Start() { - level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.writer.Name()) + level.Info(w.logger).Log("msg", "starting WAL watcher", "queue", w.name) go w.runWatcher() } func (w *WALWatcher) Stop() { - level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.writer.Name()) + level.Info(w.logger).Log("msg", "stopping WAL watcher", "queue", w.name) close(w.quit) } -// TODO: fix the exit logic for this function -// The stop param is used to stop at the end of the existing WAL on startup, -// since scraped samples may be written to the latest segment before we finish reading it. -func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) { - var ( - dec tsdb.RecordDecoder - series []tsdb.RefSeries - samples []tsdb.RefSample - ret bool - ) - for !isClosed(w.quit) { - for r.Next() { - series = series[:0] - rec := r.Record() - // If the timestamp is > start then we should Append this sample and exit readSeriesRecords, - // because this is the first sample written to the WAL after the WAL watcher was started. - typ := dec.Type(rec) - if typ == tsdb.RecordSamples { - samples, err := dec.Samples(rec, samples[:0]) - if err != nil { - continue - } - for _, s := range samples { - if s.T > w.startTime { - w.writer.Append(samples) - ret = true - w.samplesSentPreTailing.Inc() - } - } - if ret { - level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start") - level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) - return - } - } - if typ != tsdb.RecordSeries { - continue - } - - series, err := dec.Series(rec, nil) - if err != nil { - level.Error(log.With(w.logger)).Log("err", err) - break - } - - w.writer.StoreSeries(series, index) - } - // Since we only call readSeriesRecords on fully written WAL segments or checkpoints, - // Error() will only return an error if something actually went wrong when reading - // a record, either it was invalid or it was only partially written to the WAL. - if err := r.Err(); err != nil { - level.Error(w.logger).Log("err", err) - return - } - // Ensure we read all of the bytes in the segment or checkpoint. - if r.TotalRead() >= stop { - level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) - return +func (w *WALWatcher) runWatcher() { + // The WAL dir may not exist when Prometheus first starts up. + for { + if _, err := os.Stat(w.walDir); os.IsNotExist(err) { + time.Sleep(time.Second) + } else { + break } } -} -// Read all the series records from a Checkpoint directory. -func (w *WALWatcher) readCheckpoint(checkpointDir string) error { - sr, err := wal.NewSegmentsReader(checkpointDir) + nw, err := wal.New(nil, nil, w.walDir) if err != nil { - return errors.Wrap(err, "open checkpoint") + level.Error(w.logger).Log("err", err) + return } - defer sr.Close() - split := strings.Split(checkpointDir, ".") - if len(split) != 2 { - return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir) + first, last, err := nw.Segments() + if err != nil { + level.Error(w.logger).Log("err", err) + return } - i, err := strconv.Atoi(split[1]) - if err != nil { - i = w.currentSegment - 1 + if last == -1 { + level.Error(w.logger).Log("err", err) + return } - size, err := getCheckpointSize(checkpointDir) + // Read series records in the current WAL and latest checkpoint, get the segment pointer back. + // TODO: callum, handle maintaining the WAL pointer somehow across apply configs? + segment, reader, err := w.readToEnd(w.walDir, first, last) if err != nil { - level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir) + level.Error(w.logger).Log("err", err) + return } - w.readSeriesRecords(wal.NewLiveReader(sr), i, size) - level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) - w.writer.SeriesReset(i) - return nil + w.currentSegment = last + w.currentSegmentMetric.Set(float64(w.currentSegment)) + + for { + level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) + + // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. + // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. + err := w.watch(nw, reader) + segment.Close() + if err != nil { + level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) + return + } + + w.currentSegment++ + w.currentSegmentMetric.Set(float64(w.currentSegment)) + + segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) + reader = wal.NewLiveReader(segment) + // TODO: callum, is this error really fatal? + if err != nil { + level.Error(w.logger).Log("err", err) + return + } + } } // When starting the WAL watcher, there is potentially an existing WAL. In that case, we @@ -289,6 +261,7 @@ func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*w if err != nil && err != tsdb.ErrNotFound { return nil, nil, errors.Wrap(err, "find last checkpoint") } + level.Debug(w.logger).Log("msg", "reading checkpoint", "dir", dir) if err == nil { w.lastCheckpoint = dir @@ -324,188 +297,245 @@ func (w *WALWatcher) readToEnd(walDir string, firstSegment, lastSegment int) (*w return segment, r, nil } -func (w *WALWatcher) decodeRecord(rec []byte) error { +// TODO: fix the exit logic for this function +// The stop param is used to stop at the end of the existing WAL on startup, +// since scraped samples may be written to the latest segment before we finish reading it. +func (w *WALWatcher) readSeriesRecords(r *wal.LiveReader, index int, stop int64) { var ( dec tsdb.RecordDecoder series []tsdb.RefSeries samples []tsdb.RefSample + ret bool ) - switch dec.Type(rec) { - case tsdb.RecordSeries: - series, err := dec.Series(rec, series[:0]) - if err != nil { - w.recordDecodeFailsMetric.Inc() - level.Error(log.With(w.logger)).Log("err", err) - break + + for r.Next() && !isClosed(w.quit) { + series = series[:0] + rec := r.Record() + // If the timestamp is > start then we should Append this sample and exit readSeriesRecords, + // because this is the first sample written to the WAL after the WAL watcher was started. + typ := dec.Type(rec) + if typ == tsdb.RecordSamples { + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + continue + } + for _, s := range samples { + if s.T > w.startTime { + w.writer.Append(samples) + ret = true + w.samplesSentPreTailing.Inc() + } + } + if ret { + level.Info(w.logger).Log("msg", "found a sample with a timestamp after the WAL watcher start") + level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) + return + } } - w.seriesReadMetric.Add(float64(len(series))) - w.writer.StoreSeries(series, w.currentSegment) - case tsdb.RecordSamples: - samples, err := dec.Samples(rec, samples[:0]) + if typ != tsdb.RecordSeries { + continue + } + + series, err := dec.Series(rec, nil) if err != nil { - w.recordDecodeFailsMetric.Inc() level.Error(log.With(w.logger)).Log("err", err) break } - w.samplesReadMetric.Add(float64(len(samples))) - // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). - w.writer.Append(samples) - case tsdb.RecordTombstones: - w.tombstonesReadMetric.Add(float64(len(samples))) - case tsdb.RecordInvalid: - w.invalidReadMetric.Add(float64(len(samples))) - return errors.New("invalid record") - default: - level.Info(w.logger).Log("msg", "unknown TSDB record type in decodeSegment") - return errors.New("unknown TSDB record type") - } - return nil -} -func (w *WALWatcher) readSegment(r *wal.LiveReader) { - for r.Next() && !isClosed(w.quit) { - err := w.decodeRecord(r.Record()) - if err != nil { - level.Error(w.logger).Log("err", err) - } + w.writer.StoreSeries(series, index) } - if err := r.Err(); err != nil && err != io.EOF { + + // Since we only call readSeriesRecords on fully written WAL segments or checkpoints, + // Error() will only return an error if something actually went wrong when reading + // a record, either it was invalid or it was only partially written to the WAL. + if err := r.Err(); err != nil { level.Error(w.logger).Log("err", err) + return + } + + // Ensure we read all of the bytes in the segment or checkpoint. + if r.TotalRead() >= stop { + level.Info(w.logger).Log("msg", "read all series records in segment/checkpoint", "index", index) + return } } func (w *WALWatcher) watch(wl *wal.WAL, reader *wal.LiveReader) error { - readTimeout := 10 * time.Millisecond - readTicker := time.NewTicker(readTimeout) + + readTicker := time.NewTicker(readPeriod) defer readTicker.Stop() - checkpointTicker := time.NewTicker(5 * time.Second) + + checkpointTicker := time.NewTicker(checkpointPeriod) defer checkpointTicker.Stop() - segmentTicker := time.NewTicker(100 * time.Millisecond) - defer segmentTicker.Stop() - currentSegmentName := fmt.Sprintf("%08d", w.currentSegment) - w.currentSegmentMetric.Set(float64(w.currentSegment)) + segmentTicker := time.NewTicker(segmentCheckPeriod) + defer segmentTicker.Stop() for { select { case <-w.quit: level.Info(w.logger).Log("msg", "quitting WAL watcher watch loop") return errors.New("quit channel") + case <-checkpointTicker.C: - // check if there is a new checkpoint + // Periodically check if there is a new checkpoint. + // As this is considered an optimisation, we ignore errors during + // checkpoint processing. + dir, _, err := tsdb.LastCheckpoint(w.walDir) if err != nil && err != tsdb.ErrNotFound { + level.Error(w.logger).Log("msg", "error getting last checkpoint", "err", err) continue } - cn, err := checkpointNum(dir) + + if dir == w.lastCheckpoint { + continue + } + + level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir) + + d, err := checkpointNum(dir) if err != nil { + level.Error(w.logger).Log("msg", "error parsing checkpoint", "err", err) continue } - // TODO: callum, simplify the nesting here - if err == nil && dir != w.lastCheckpoint { - level.Info(w.logger).Log("msg", "new checkpoint detected", "last", w.lastCheckpoint, "new", dir) - d, err := strconv.Atoi(cn) - if err != nil { - level.Error(w.logger).Log("err", err) - } else if d < w.currentSegment { - w.lastCheckpoint = dir - // This potentially takes a long time, should we run it in another go routine? - err = w.readCheckpoint(w.lastCheckpoint) - if err != nil { - level.Error(w.logger).Log("err", err) - } - } else { - level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint for now", "current", currentSegmentName, "checkpoint", dir) - } + + if d >= w.currentSegment { + level.Info(w.logger).Log("msg", "current segment is behind the checkpoint, skipping reading of checkpoint", "current", fmt.Sprintf("%08d", w.currentSegment), "checkpoint", dir) + continue } + + w.lastCheckpoint = dir + // This potentially takes a long time, should we run it in another go routine? + err = w.readCheckpoint(w.lastCheckpoint) + if err != nil { + level.Error(w.logger).Log("err", err) + } + case <-segmentTicker.C: // check if new segments exist _, last, err := wl.Segments() if err != nil { - level.Error(w.logger).Log("err", err) + return errors.Wrap(err, "segments") + } + + if last <= w.currentSegment { continue } - if last > w.currentSegment { - w.readSegment(reader) - level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", currentSegmentName, "new", fmt.Sprintf("%08d", last)) - return nil + + if err := w.readSegment(reader); err != nil { + // Ignore errors reading to end of segment, as we're going to move to + // next segment now. + level.Error(w.logger).Log("msg", "error reading to end of segment", "err", err) } + + level.Info(w.logger).Log("msg", "a new segment exists, we should start reading it", "current", fmt.Sprintf("%08d", w.currentSegment), "new", fmt.Sprintf("%08d", last)) + return nil + case <-readTicker.C: - w.readSegment(reader) + if err := w.readSegment(reader); err != nil { + level.Error(w.logger).Log("err", err) + return err + } } } } -func (w *WALWatcher) runWatcher() { - // The WAL dir may not exist when Prometheus first starts up. - for { - if _, err := os.Stat(w.walDir); os.IsNotExist(err) { - time.Sleep(time.Second) - } else { - break +func (w *WALWatcher) readSegment(r *wal.LiveReader) error { + for r.Next() && !isClosed(w.quit) { + err := w.decodeRecord(r.Record()) + + // Intentionally skip over record decode errors. + if err != nil { + level.Error(w.logger).Log("err", err) } } + return r.Err() +} - nw, err := wal.New(nil, nil, w.walDir) - if err != nil { - level.Error(w.logger).Log("err", err) - return +func (w *WALWatcher) decodeRecord(rec []byte) error { + var ( + dec tsdb.RecordDecoder + series []tsdb.RefSeries + samples []tsdb.RefSample + ) + switch dec.Type(rec) { + case tsdb.RecordSeries: + series, err := dec.Series(rec, series[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.seriesReadMetric.Add(float64(len(series))) + w.writer.StoreSeries(series, w.currentSegment) + + case tsdb.RecordSamples: + samples, err := dec.Samples(rec, samples[:0]) + if err != nil { + w.recordDecodeFailsMetric.Inc() + return err + } + w.samplesReadMetric.Add(float64(len(samples))) + // Blocks until the sample is sent to all remote write endpoints or closed (because enqueue blocks). + w.writer.Append(samples) + + case tsdb.RecordTombstones: + w.tombstonesReadMetric.Add(float64(len(samples))) + + case tsdb.RecordInvalid: + w.invalidReadMetric.Add(float64(len(samples))) + return errors.New("invalid record") + + default: + w.recordDecodeFailsMetric.Inc() + return errors.New("unknown TSDB record type") } + return nil +} - first, last, err := nw.Segments() +// Read all the series records from a Checkpoint directory. +func (w *WALWatcher) readCheckpoint(checkpointDir string) error { + sr, err := wal.NewSegmentsReader(checkpointDir) if err != nil { - level.Error(w.logger).Log("err", err) - return + return errors.Wrap(err, "open checkpoint") } + defer sr.Close() - if last == -1 { - level.Error(w.logger).Log("err", err) - return + split := strings.Split(checkpointDir, ".") + if len(split) != 2 { + return errors.Errorf("checkpoint dir name is not in the right format: %s", checkpointDir) } - // Read series records in the current WAL and latest checkpoint, get the segment pointer back. - // TODO: callum, handle maintaining the WAL pointer somehow across apply configs? - segment, reader, err := w.readToEnd(w.walDir, first, last) + i, err := strconv.Atoi(split[1]) if err != nil { - level.Error(w.logger).Log("err", err) - return + i = w.currentSegment - 1 } - w.currentSegment = last - + size, err := getCheckpointSize(checkpointDir) if err != nil { - level.Error(w.logger).Log("err", err) - return + level.Error(w.logger).Log("msg", "error getting checkpoint size", "checkpoint", checkpointDir) + return errors.Wrap(err, "get checkpoint size") } - for { - level.Info(w.logger).Log("msg", "watching segment", "segment", w.currentSegment) - // On start, after reading the existing WAL for series records, we have a pointer to what is the latest segment. - // On subsequent calls to this function, currentSegment will have been incremented and we should open that segment. - err := w.watch(nw, reader) - segment.Close() - if err != nil { - level.Error(w.logger).Log("msg", "runWatcher is ending", "err", err) - return - } - w.currentSegment++ - segment, err = wal.OpenReadSegment(wal.SegmentName(w.walDir, w.currentSegment)) - reader = wal.NewLiveReader(segment) - // TODO: callum, is this error really fatal? - if err != nil { - level.Error(w.logger).Log("err", err) - return - } - } + w.readSeriesRecords(wal.NewLiveReader(sr), i, size) + level.Debug(w.logger).Log("msg", "read series references from checkpoint", "checkpoint", checkpointDir) + w.writer.SeriesReset(i) + return nil } -func checkpointNum(dir string) (string, error) { +func checkpointNum(dir string) (int, error) { // Checkpoint dir names are in the format checkpoint.000001 chunks := strings.Split(dir, ".") if len(chunks) != 2 { - return "", errors.Errorf("invalid checkpoint dir string: %s", dir) + return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) } - return chunks[1], nil + + result, err := strconv.Atoi(chunks[1]) + if err != nil { + return 0, errors.Errorf("invalid checkpoint dir string: %s", dir) + } + + return result, nil } func getCheckpointSize(dir string) (int64, error) { diff --git a/storage/remote/wal_watcher_test.go b/storage/remote/wal_watcher_test.go index 3a0b8f6da9..0b1cced9a7 100644 --- a/storage/remote/wal_watcher_test.go +++ b/storage/remote/wal_watcher_test.go @@ -70,10 +70,6 @@ func (wtm *writeToMock) SeriesReset(index int) { } } -func (wtm *writeToMock) Name() string { - return "" -} - func newWriteToMock() *writeToMock { return &writeToMock{ seriesLabels: make(map[uint64][]prompb.Label), @@ -140,7 +136,7 @@ func Test_readToEnd_noCheckpoint(t *testing.T) { wt := newWriteToMock() st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir, st) _, _, err = watcher.readToEnd(wdir, first, last) testutil.Ok(t, err) testutil.Equals(t, seriesCount, len(wt.seriesLabels)) @@ -218,7 +214,7 @@ func Test_readToEnd_withCheckpoint(t *testing.T) { wt := newWriteToMock() st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir, st) _, _, err = watcher.readToEnd(wdir, first, last) testutil.Ok(t, err) testutil.Equals(t, seriesCount*10*2, len(wt.seriesLabels)) @@ -274,7 +270,7 @@ func Test_readCheckpoint(t *testing.T) { wt := newWriteToMock() st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir, st) _, _, err = watcher.readToEnd(wdir, first, last) testutil.Ok(t, err) testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) @@ -327,7 +323,7 @@ func Test_checkpoint_seriesReset(t *testing.T) { wt := newWriteToMock() st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir, st) _, _, err = watcher.readToEnd(wdir, first, last) testutil.Ok(t, err) testutil.Equals(t, seriesCount*10, len(wt.seriesLabels)) @@ -352,7 +348,7 @@ func Test_decodeRecord(t *testing.T) { wt := newWriteToMock() st := timestamp.FromTime(time.Now()) - watcher := NewWALWatcher(nil, wt, dir, st) + watcher := NewWALWatcher(nil, "", wt, dir, st) // decode a series record enc := tsdb.RecordEncoder{} diff --git a/storage/remote/write.go b/storage/remote/write.go index c718f88103..f44662af1f 100644 --- a/storage/remote/write.go +++ b/storage/remote/write.go @@ -20,32 +20,47 @@ import ( // Appender implements scrape.Appendable. func (s *Storage) Appender() (storage.Appender, error) { - return s, nil + return ×tampTracker{ + storage: s, + }, nil +} + +type timestampTracker struct { + storage *Storage + samples int64 + highestTimestamp int64 } // Add implements storage.Appender. -func (s *Storage) Add(l labels.Labels, t int64, v float64) (uint64, error) { - s.samplesIn.incr(1) - s.samplesInMetric.Inc() - if t > s.highestTimestamp { - s.highestTimestamp = t +func (t *timestampTracker) Add(_ labels.Labels, ts int64, v float64) (uint64, error) { + t.samples++ + if ts > t.highestTimestamp { + t.highestTimestamp = ts } return 0, nil } // AddFast implements storage.Appender. -func (s *Storage) AddFast(l labels.Labels, _ uint64, t int64, v float64) error { - _, err := s.Add(l, t, v) +func (t *timestampTracker) AddFast(l labels.Labels, _ uint64, ts int64, v float64) error { + _, err := t.Add(l, ts, v) return err } // Commit implements storage.Appender. -func (s *Storage) Commit() error { - s.highestTimestampMetric.Set(float64(s.highestTimestamp)) +func (t *timestampTracker) Commit() error { + t.storage.samplesIn.incr(t.samples) + t.storage.samplesInMetric.Add(float64(t.samples)) + + t.storage.highestTimestampMtx.Lock() + defer t.storage.highestTimestampMtx.Unlock() + if t.highestTimestamp > t.storage.highestTimestamp { + t.storage.highestTimestamp = t.highestTimestamp + t.storage.highestTimestampMetric.Set(float64(t.highestTimestamp)) + } return nil } // Rollback implements storage.Appender. -func (*Storage) Rollback() error { +func (*timestampTracker) Rollback() error { return nil } diff --git a/vendor/modules.txt b/vendor/modules.txt index 625f27137b..d9b7f92626 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -243,12 +243,12 @@ github.com/prometheus/procfs/xfs github.com/prometheus/procfs/internal/util # github.com/prometheus/tsdb v0.4.0 github.com/prometheus/tsdb +github.com/prometheus/tsdb/fileutil +github.com/prometheus/tsdb/wal github.com/prometheus/tsdb/labels github.com/prometheus/tsdb/chunkenc github.com/prometheus/tsdb/chunks -github.com/prometheus/tsdb/fileutil github.com/prometheus/tsdb/index -github.com/prometheus/tsdb/wal # github.com/samuel/go-zookeeper v0.0.0-20161028232340-1d7be4effb13 github.com/samuel/go-zookeeper/zk # github.com/sasha-s/go-deadlock v0.0.0-20161201235124-341000892f3d