Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/ingester/stream.go

618 lines
19 KiB

package ingester
import (
"bytes"
"context"
"fmt"
"net/http"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/httpgrpc"
"github.com/opentracing/opentracing-go"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/v3/pkg/chunkenc"
"github.com/grafana/loki/v3/pkg/distributor/writefailures"
"github.com/grafana/loki/v3/pkg/ingester/wal"
"github.com/grafana/loki/v3/pkg/iter"
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/v3/pkg/logqlmodel/stats"
"github.com/grafana/loki/v3/pkg/util/flagext"
util_log "github.com/grafana/loki/v3/pkg/util/log"
"github.com/grafana/loki/v3/pkg/validation"
)
var ErrEntriesExist = errors.New("duplicate push - entries already exist")
type line struct {
ts time.Time
content string
}
type stream struct {
limiter *StreamRateLimiter
cfg *Config
tenant string
// Newest chunk at chunks[n-1].
// Not thread-safe; assume accesses to this are locked by caller.
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
chunks []chunkDesc
fp model.Fingerprint // possibly remapped fingerprint, used in the streams map
chunkMtx sync.RWMutex
labels labels.Labels
labelsString string
labelHash uint64
labelHashNoShard uint64
// most recently pushed line. This is used to prevent duplicate pushes.
// It also determines chunk synchronization when unordered writes are disabled.
lastLine line
// keeps track of the highest timestamp accepted by the stream.
// This is used when unordered writes are enabled to cap the validity window
// of accepted writes and for chunk synchronization.
highestTs time.Time
metrics *ingesterMetrics
tailers map[uint32]*tailer
tailerMtx sync.RWMutex
// entryCt is a counter which is incremented on each accepted entry.
// This allows us to discard WAL entries during replays which were
// already recovered via checkpoints. Historically out of order
// errors were used to detect this, but this counter has been
// introduced to facilitate removing the ordering constraint.
entryCt int64
unorderedWrites bool
streamRateCalculator *StreamRateCalculator
writeFailures *writefailures.Manager
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
chunkFormat byte
chunkHeadBlockFormat chunkenc.HeadBlockFmt
}
type chunkDesc struct {
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
chunk *chunkenc.MemChunk
closed bool
synced bool
flushed time.Time
reason string
lastUpdated time.Time
}
type entryWithError struct {
entry *logproto.Entry
e error
}
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
func newStream(
chunkFormat byte,
headBlockFmt chunkenc.HeadBlockFmt,
cfg *Config,
limits RateLimiterStrategy,
tenant string,
fp model.Fingerprint,
labels labels.Labels,
unorderedWrites bool,
streamRateCalculator *StreamRateCalculator,
metrics *ingesterMetrics,
writeFailures *writefailures.Manager,
) *stream {
hashNoShard, _ := labels.HashWithoutLabels(make([]byte, 0, 1024), ShardLbName)
return &stream{
limiter: NewStreamRateLimiter(limits, tenant, 10*time.Second),
cfg: cfg,
fp: fp,
labels: labels,
labelsString: labels.String(),
labelHash: labels.Hash(),
labelHashNoShard: hashNoShard,
tailers: map[uint32]*tailer{},
metrics: metrics,
tenant: tenant,
streamRateCalculator: streamRateCalculator,
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
unorderedWrites: unorderedWrites,
writeFailures: writeFailures,
chunkFormat: chunkFormat,
chunkHeadBlockFormat: headBlockFmt,
}
}
// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
// Must hold chunkMtx
// DEPRECATED: chunk transfers are no longer suggested and remain for compatibility.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data, s.cfg.BlockSize, s.cfg.TargetChunkSize)
if err != nil {
return err
}
s.chunks = append(s.chunks, chunkDesc{
chunk: c,
})
s.metrics.chunksCreatedTotal.Inc()
return nil
}
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
// setChunks is used during checkpoint recovery
func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err error) {
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
chks, err := fromWireChunks(s.cfg, s.chunkHeadBlockFormat, chunks)
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
if err != nil {
return 0, 0, err
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
s.chunks = chks
for _, c := range s.chunks {
entriesAdded += c.chunk.Size()
bytesAdded += c.chunk.UncompressedSize()
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
return bytesAdded, entriesAdded, nil
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
func (s *stream) NewChunk() *chunkenc.MemChunk {
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
return chunkenc.NewMemChunk(s.chunkFormat, s.cfg.parsedEncoding, s.chunkHeadBlockFormat, s.cfg.BlockSize, s.cfg.TargetChunkSize)
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
func (s *stream) Push(
ctx context.Context,
entries []logproto.Entry,
// WAL record to add push contents to.
// May be nil to disable this functionality.
record *wal.Record,
// Counter used in WAL replay to avoid duplicates.
// If this is non-zero, the stream will reject entries
// with a counter value less than or equal to it's own.
// It is set to zero and thus bypassed outside of WAL replays.
counter int64,
// Lock chunkMtx while pushing.
// If this is false, chunkMtx must be held outside Push.
lockChunk bool,
// Whether nor not to ingest all at once or not. It is a per-tenant configuration.
rateLimitWholeStream bool,
) (int, error) {
if lockChunk {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
}
isReplay := counter > 0
if isReplay && counter <= s.entryCt {
var byteCt int
for _, e := range entries {
byteCt += len(e.Line)
}
s.metrics.walReplaySamplesDropped.WithLabelValues(duplicateReason).Add(float64(len(entries)))
s.metrics.walReplayBytesDropped.WithLabelValues(duplicateReason).Add(float64(byteCt))
return 0, ErrEntriesExist
}
toStore, invalid := s.validateEntries(entries, isReplay, rateLimitWholeStream)
if rateLimitWholeStream && hasRateLimitErr(invalid) {
return 0, errorForFailedEntries(s, invalid, len(entries))
}
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
prevNumChunks := len(s.chunks)
if prevNumChunks == 0 {
s.chunks = append(s.chunks, chunkDesc{
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
chunk: s.NewChunk(),
})
s.metrics.chunksCreatedTotal.Inc()
s.metrics.chunkCreatedStats.Inc(1)
}
bytesAdded, storedEntries, entriesWithErr := s.storeEntries(ctx, toStore)
s.recordAndSendToTailers(record, storedEntries)
if len(s.chunks) != prevNumChunks {
s.metrics.memoryChunks.Add(float64(len(s.chunks) - prevNumChunks))
}
return bytesAdded, errorForFailedEntries(s, append(invalid, entriesWithErr...), len(entries))
}
func errorForFailedEntries(s *stream, failedEntriesWithError []entryWithError, totalEntries int) error {
if len(failedEntriesWithError) == 0 {
return nil
}
lastEntryWithErr := failedEntriesWithError[len(failedEntriesWithError)-1]
_, ok := lastEntryWithErr.e.(*validation.ErrStreamRateLimit)
outOfOrder := chunkenc.IsOutOfOrderErr(lastEntryWithErr.e)
if !outOfOrder && !ok {
return lastEntryWithErr.e
}
var statusCode int
if outOfOrder {
statusCode = http.StatusBadRequest
}
if ok {
statusCode = http.StatusTooManyRequests
}
// Return a http status 4xx request response with all failed entries.
buf := bytes.Buffer{}
streamName := s.labelsString
limitedFailedEntries := failedEntriesWithError
if maxIgnore := s.cfg.MaxReturnedErrors; maxIgnore > 0 && len(limitedFailedEntries) > maxIgnore {
limitedFailedEntries = limitedFailedEntries[:maxIgnore]
}
for _, entryWithError := range limitedFailedEntries {
fmt.Fprintf(&buf,
"entry with timestamp %s ignored, reason: '%s',\n",
entryWithError.entry.Timestamp.String(), entryWithError.e.Error())
}
fmt.Fprintf(&buf, "user '%s', total ignored: %d out of %d for stream: %s", s.tenant, len(failedEntriesWithError), totalEntries, streamName)
return httpgrpc.Errorf(statusCode, buf.String())
}
func hasRateLimitErr(errs []entryWithError) bool {
if len(errs) == 0 {
return false
}
lastErr := errs[len(errs)-1]
_, ok := lastErr.e.(*validation.ErrStreamRateLimit)
return ok
}
func (s *stream) recordAndSendToTailers(record *wal.Record, entries []logproto.Entry) {
if len(entries) == 0 {
return
}
// record will be nil when replaying the wal (we don't want to rewrite wal entries as we replay them).
if record != nil {
record.AddEntries(uint64(s.fp), s.entryCt, entries...)
} else {
// If record is nil, this is a WAL recovery.
s.metrics.recoveredEntriesTotal.Add(float64(len(entries)))
}
s.tailerMtx.RLock()
hasTailers := len(s.tailers) != 0
s.tailerMtx.RUnlock()
if hasTailers {
stream := logproto.Stream{Labels: s.labelsString, Entries: entries}
closedTailers := []uint32{}
s.tailerMtx.RLock()
for _, tailer := range s.tailers {
if tailer.isClosed() {
closedTailers = append(closedTailers, tailer.getID())
continue
}
tailer.send(stream, s.labels)
}
s.tailerMtx.RUnlock()
if len(closedTailers) != 0 {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
for _, closedTailerID := range closedTailers {
delete(s.tailers, closedTailerID)
}
}
}
}
func (s *stream) storeEntries(ctx context.Context, entries []logproto.Entry) (int, []logproto.Entry, []entryWithError) {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to store entries", "labels", s.labelsString)
defer sp.LogKV("event", "stream finished to store entries")
}
var bytesAdded, outOfOrderSamples, outOfOrderBytes int
var invalid []entryWithError
storedEntries := make([]logproto.Entry, 0, len(entries))
for i := 0; i < len(entries); i++ {
chunk := &s.chunks[len(s.chunks)-1]
if chunk.closed || !chunk.chunk.SpaceFor(&entries[i]) || s.cutChunkForSynchronization(entries[i].Timestamp, s.highestTs, chunk, s.cfg.SyncPeriod, s.cfg.SyncMinUtilization) {
chunk = s.cutChunk(ctx)
}
chunk.lastUpdated = time.Now()
if err := chunk.chunk.Append(&entries[i]); err != nil {
invalid = append(invalid, entryWithError{&entries[i], err})
if chunkenc.IsOutOfOrderErr(err) {
s.writeFailures.Log(s.tenant, err)
outOfOrderSamples++
outOfOrderBytes += len(entries[i].Line)
}
continue
}
s.entryCt++
s.lastLine.ts = entries[i].Timestamp
s.lastLine.content = entries[i].Line
if s.highestTs.Before(entries[i].Timestamp) {
s.highestTs = entries[i].Timestamp
}
bytesAdded += len(entries[i].Line)
storedEntries = append(storedEntries, entries[i])
}
s.reportMetrics(outOfOrderSamples, outOfOrderBytes, 0, 0)
return bytesAdded, storedEntries, invalid
}
func (s *stream) validateEntries(entries []logproto.Entry, isReplay, rateLimitWholeStream bool) ([]logproto.Entry, []entryWithError) {
var (
outOfOrderSamples, outOfOrderBytes int
rateLimitedSamples, rateLimitedBytes int
validBytes, totalBytes int
failedEntriesWithError []entryWithError
limit = s.limiter.lim.Limit()
lastLine = s.lastLine
highestTs = s.highestTs
toStore = make([]logproto.Entry, 0, len(entries))
)
for i := range entries {
// If this entry matches our last appended line's timestamp and contents,
// ignore it.
//
// This check is done at the stream level so it persists across cut and
// flushed chunks.
//
// NOTE: it's still possible for duplicates to be appended if a stream is
// deleted from inactivity.
if entries[i].Timestamp.Equal(lastLine.ts) && entries[i].Line == lastLine.content {
continue
}
lineBytes := len(entries[i].Line)
totalBytes += lineBytes
now := time.Now()
if !rateLimitWholeStream && !s.limiter.AllowN(now, len(entries[i].Line)) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(lineBytes)}})
s.writeFailures.Log(s.tenant, failedEntriesWithError[len(failedEntriesWithError)-1].e)
rateLimitedSamples++
rateLimitedBytes += lineBytes
continue
}
// The validity window for unordered writes is the highest timestamp present minus 1/2 * max-chunk-age.
cutoff := highestTs.Add(-s.cfg.MaxChunkAge / 2)
if !isReplay && s.unorderedWrites && !highestTs.IsZero() && cutoff.After(entries[i].Timestamp) {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&entries[i], chunkenc.ErrTooFarBehind(entries[i].Timestamp, cutoff)})
s.writeFailures.Log(s.tenant, fmt.Errorf("%w for stream %s", failedEntriesWithError[len(failedEntriesWithError)-1].e, s.labels))
outOfOrderSamples++
outOfOrderBytes += lineBytes
continue
}
validBytes += lineBytes
lastLine.ts = entries[i].Timestamp
lastLine.content = entries[i].Line
if highestTs.Before(entries[i].Timestamp) {
highestTs = entries[i].Timestamp
}
toStore = append(toStore, entries[i])
}
// Each successful call to 'AllowN' advances the limiter. With all-or-nothing
// ingestion, the limiter should only be advanced when the whole stream can be
// sent
now := time.Now()
if rateLimitWholeStream && !s.limiter.AllowN(now, validBytes) {
// Report that the whole stream was rate limited
rateLimitedSamples = len(toStore)
failedEntriesWithError = make([]entryWithError, 0, len(toStore))
for i := 0; i < len(toStore); i++ {
failedEntriesWithError = append(failedEntriesWithError, entryWithError{&toStore[i], &validation.ErrStreamRateLimit{RateLimit: flagext.ByteSize(limit), Labels: s.labelsString, Bytes: flagext.ByteSize(len(toStore[i].Line))}})
rateLimitedBytes += len(toStore[i].Line)
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
}
s.streamRateCalculator.Record(s.tenant, s.labelHash, s.labelHashNoShard, totalBytes)
s.reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes)
return toStore, failedEntriesWithError
}
func (s *stream) reportMetrics(outOfOrderSamples, outOfOrderBytes, rateLimitedSamples, rateLimitedBytes int) {
if outOfOrderSamples > 0 {
name := validation.OutOfOrder
if s.unorderedWrites {
name = validation.TooFarBehind
Adds WAL support (experimental) (#2981) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * prevent shared access bug with tailers and entry pool * removes stream push entry pool optimization
5 years ago
}
validation.DiscardedSamples.WithLabelValues(name, s.tenant).Add(float64(outOfOrderSamples))
validation.DiscardedBytes.WithLabelValues(name, s.tenant).Add(float64(outOfOrderBytes))
}
if rateLimitedSamples > 0 {
validation.DiscardedSamples.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedSamples))
validation.DiscardedBytes.WithLabelValues(validation.StreamRateLimit, s.tenant).Add(float64(rateLimitedBytes))
}
}
func (s *stream) cutChunk(ctx context.Context) *chunkDesc {
if sp := opentracing.SpanFromContext(ctx); sp != nil {
sp.LogKV("event", "stream started to cut chunk")
defer sp.LogKV("event", "stream finished to cut chunk")
}
// If the chunk has no more space call Close to make sure anything in the head block is cut and compressed
chunk := &s.chunks[len(s.chunks)-1]
err := chunk.chunk.Close()
if err != nil {
// This should be an unlikely situation, returning an error up the stack doesn't help much here
// so instead log this to help debug the issue if it ever arises.
level.Error(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "failed to Close chunk", "err", err)
}
chunk.closed = true
s.metrics.samplesPerChunk.Observe(float64(chunk.chunk.Size()))
s.metrics.blocksPerChunk.Observe(float64(chunk.chunk.BlockCount()))
s.metrics.chunksCreatedTotal.Inc()
s.metrics.chunkCreatedStats.Inc(1)
s.chunks = append(s.chunks, chunkDesc{
chunk: s.NewChunk(),
})
return &s.chunks[len(s.chunks)-1]
}
// Returns true, if chunk should be cut before adding new entry. This is done to make ingesters
// cut the chunk for this stream at the same moment, so that new chunk will contain exactly the same entries.
func (s *stream) cutChunkForSynchronization(entryTimestamp, latestTs time.Time, c *chunkDesc, synchronizePeriod time.Duration, minUtilization float64) bool {
// Never sync when it's not enabled, it's the first push, or if a write isn't the latest ts
// to prevent syncing many unordered writes.
if synchronizePeriod <= 0 || latestTs.IsZero() || latestTs.After(entryTimestamp) {
return false
}
// we use fingerprint as a jitter here, basically offsetting stream synchronization points to different
// this breaks if streams are mapped to different fingerprints on different ingesters, which is too bad.
cts := (uint64(entryTimestamp.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
pts := (uint64(latestTs.UnixNano()) + uint64(s.fp)) % uint64(synchronizePeriod.Nanoseconds())
// if current entry timestamp has rolled over synchronization period
if cts < pts {
if minUtilization <= 0 {
c.synced = true
return true
}
if c.chunk.Utilization() > minUtilization {
c.synced = true
return true
}
}
return false
}
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
func (s *stream) Bounds() (from, to time.Time) {
s.chunkMtx.RLock()
defer s.chunkMtx.RUnlock()
if len(s.chunks) > 0 {
from, _ = s.chunks[0].chunk.Bounds()
_, to = s.chunks[len(s.chunks)-1].chunk.Bounds()
}
return from, to
}
// Returns an iterator.
func (s *stream) Iterator(ctx context.Context, statsCtx *stats.Context, from, through time.Time, direction logproto.Direction, pipeline log.StreamPipeline) (iter.EntryIterator, error) {
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
s.chunkMtx.RLock()
defer s.chunkMtx.RUnlock()
iterators := make([]iter.EntryIterator, 0, len(s.chunks))
var lastMax time.Time
ordered := true
for _, c := range s.chunks {
mint, maxt := c.chunk.Bounds()
// skip this chunk
if through.Before(mint) || maxt.Before(from) {
continue
}
if mint.Before(lastMax) {
ordered = false
}
lastMax = maxt
itr, err := c.chunk.Iterator(ctx, from, through, direction, pipeline)
Chunking (#10) * Add checkenc without serialisation for now. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify interface and add serialisatio` Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Move away from \xFF magic to something simple Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add serialisation and Deserialisation Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Modify interface to be closer to logish interface. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Fix race b/w append and iteration. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Make iterators honour bounds Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Remove locks as safety is assured externally Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * chunkenc: Add checksums Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Add code quotes around block design. * Split headBlock into it's own type. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Simplify encoding and decoding. Signed-off-by: Goutham Veeramachaneni <cs14btech11014@iith.ac.in> * Expose flags. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use the already existing EntryIterator interface Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Use existing Chunk interface. Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Review feedback Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com> * Integrate the compressed chunk and add metrics Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
7 years ago
if err != nil {
return nil, err
}
if itr != nil {
iterators = append(iterators, itr)
}
}
if direction != logproto.FORWARD {
for left, right := 0, len(iterators)-1; left < right; left, right = left+1, right-1 {
iterators[left], iterators[right] = iterators[right], iterators[left]
}
}
if statsCtx != nil {
statsCtx.AddIngesterTotalChunkMatched(int64(len(iterators)))
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
}
if ordered {
return iter.NewNonOverlappingIterator(iterators), nil
}
return iter.NewSortEntryIterator(iterators, direction), nil
}
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
5 years ago
// Returns an SampleIterator.
func (s *stream) SampleIterator(ctx context.Context, statsCtx *stats.Context, from, through time.Time, extractor log.StreamSampleExtractor) (iter.SampleIterator, error) {
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
s.chunkMtx.RLock()
defer s.chunkMtx.RUnlock()
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
5 years ago
iterators := make([]iter.SampleIterator, 0, len(s.chunks))
var lastMax time.Time
ordered := true
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
5 years ago
for _, c := range s.chunks {
mint, maxt := c.chunk.Bounds()
// skip this chunk
if through.Before(mint) || maxt.Before(from) {
continue
}
if mint.Before(lastMax) {
ordered = false
}
lastMax = maxt
if itr := c.chunk.SampleIterator(ctx, from, through, extractor); itr != nil {
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
5 years ago
iterators = append(iterators, itr)
}
}
if statsCtx != nil {
statsCtx.AddIngesterTotalChunkMatched(int64(len(iterators)))
}
if ordered {
return iter.NewNonOverlappingSampleIterator(iterators), nil
Introduces per stream chunks mutex (#3000) * marshalable chunks * wal record types custom serialization * proto types for wal checkpoints * byteswith output unaffected by buffer * wal & record pool ifcs * wal record can hold entries from multiple series * entry pool * ingester uses noopWal * removes duplicate argument passing in ingester code. adds ingester config validation & derives chunk encoding. * segment writing * [WIP] wal recovery from segments * replay uses sync.Maps & preserves WAL fingerprints * in memory wal recovery * wal segment recovery * ingester metrics struct * wal replay locks streamsMtx in instances, adds checkpoint codec * ingester metrics * checkpointer * WAL checkpoint writer * checkpointwriter can write multiple checkpoints * reorgs checkpointing * wires up checkpointwriter to wal * ingester SeriesIter impl * wires up ingesterRecoverer to consume checkpoints * generic recovery fn * generic recovery fn * recover from both wal types * cleans up old tmp checkpoints & allows aborting in flight checkpoints * wires up wal checkpointing * more granular wal logging * fixes off by 1 wal truncation & removes double logging * adds userID to wal records correctly * wire chunk encoding tests * more granular wal metrics * checkpoint encoding test * ignores debug bins * segment replay ignores out of orders * fixes bug between WAL reading []byte validity and proto unmarshalling refs * conf validations, removes comments * flush on shutdown config * POST /ingester/shutdown * renames flush on shutdown * wal & checkpoint use same segment size * writes entries to wal regardless of tailers * makes wal checkpoing duration default to 5m * recovery metrics * encodes headchunks separately for wal purposes * merge upstream * linting * adds chunkMtx to ingester streams * all flush locking locks streams -> chunks in order to prevent deadlocks * wal integration tests * addresses pr feedback uses entry pool in stream push/tailer removes unnecessary pool interaction checkpointbytes comment fillchunk helper, record resetting in tests via pool redundant comment defers wg done in recovery s/num/count/ checkpoint wal uses a logger encodeWithTypeHeader now creates its own []byte removes pool from decodeEntries wal stop can error * cleans up merge * lint * inline functions to simplify unlocking
5 years ago
}
return iter.NewSortSampleIterator(iterators), nil
Improve metric queries by computing samples at the edges. (#2293) * First pass breaking the code appart. Wondering how we're going to achieve fast mutation of labels. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Work in progress. I realize I need hash for deduping lines. going to benchmark somes. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Tested some hash and decided which one to use. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Wip Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Starting working on ingester. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Trying to find a better hash function. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More hash testing we have a winner. xxhash it is. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Settle on xxhash Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Better params interfacing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add interface for queryparams for things that exist in both type of params. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add storage sample iterator implementations. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing tests and verifying we don't get collions for the hashing method. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing ingesters tests and refactoring utility function/tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing and testing that stats are still well computed. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixing more tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More engine tests finished. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes sharding evaluator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes more engine tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix error tests in the engine. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finish fixing all tests. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes a bug where extractor was not passed in correctly. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add notes about upgrade. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Renamed and fix a bug. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add memchunk tests and starting test for sampleIterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Test heap sample iterator. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working on test. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Finishing testing all new iterators. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Making sure all store functions are tested. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Benchmark and verify everything is working well. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Make the linter happy. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * use xxhash v2. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix a flaky test because of map. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * go.mod. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Edward Welch <edward.welch@grafana.com>
5 years ago
}
func (s *stream) addTailer(t *tailer) {
s.tailerMtx.Lock()
defer s.tailerMtx.Unlock()
s.tailers[t.getID()] = t
}
func (s *stream) resetCounter() {
s.entryCt = 0
}
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
func headBlockType(chunkfmt byte, unorderedWrites bool) chunkenc.HeadBlockFmt {
if unorderedWrites {
Pin `chunk` and `index` format to `schema` version. (#10213) We pin all three `Chunk`, `HeadBlock` and `TSDB` Version to `schema` version in period config. This is the following mapping (after being discussed with @owen-d and @sandeepsukhani ) `v12` (current existing schema) - ChunkFormatV3 (UnorderedHeadBlock) + TSDBv2 `v13` (introducing new schema) - ChunkFormatV4 (UnorderedWithNonIndexedLabelsHeadBlockFmt) + TSDBv3 Note the new schema `v13` supports the latest chunk and index format. **NOTES for Reviewer** 1. General approach is we removed the idea of `index.LiveFormat`, `chunkenc.DefaultChunkFormat` and `chunkenc.DefaultHeadBlockFmt` and made following two changes. These variables were used before to tie chunk and tsdb formats specific to Loki versions. This PR remove that coupling and pin these formats to `schema` version instead. 1. These variables were replaced with explicit chunk and index formats within those packages (and it's tests) 2. If these variables were used outside it's own packages say by ingester, compactor, etc. Then we extract correct chunk and index versions from the `schema` config. 2. Add two methods to `periodConfig`. (1) `ChunkFormat()` returning chunk and head format tied to schema (2) `TSDBFormat()` returning tsdb format tied to schema. 2. Other ideas I thought of doing but didn't end up doing is make `ChunkFormat` and `IndexFormat` as separate type (rather than `byte` and `int` currently. Similar to `HeadBlockFmt` type). But didnt' do it eventually to keep the PR small and don't want to complicate with lots of changes. 4. Moved couple of test cases from `chunkenc` to `config` package, because the test case was actually testing methods on `schemaconfig` and it was creating cycling dependencies. --------- Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com>
2 years ago
if chunkfmt >= chunkenc.ChunkFormatV3 {
return chunkenc.ChunkHeadFormatFor(chunkfmt)
}
}
return chunkenc.OrderedHeadBlockFmt
}