Lazy load chunks (#435)

* Lazy load chunks for queries.

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Add context to the lazy chunk calls

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Make sure labels are plumbed through

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Pre-load non-overlapping chunk iterators

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>

* Review feedback

Signed-off-by: Goutham Veeramachaneni <gouthamve@gmail.com>
pull/512/head
Goutham Veeramachaneni 6 years ago committed by GitHub
parent 43d13109dc
commit 27ca58e79f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 7
      Gopkg.lock
  2. 3
      Gopkg.toml
  3. 87
      pkg/chunkenc/lazy_chunk.go
  4. 6
      pkg/ingester/instance.go
  5. 8
      pkg/iter/iterator.go
  6. 192
      pkg/querier/store.go
  7. 2
      vendor/github.com/cortexproject/cortex/pkg/chunk/aws/aws_autoscaling.go
  8. 5
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go
  9. 21
      vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go
  10. 19
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go
  11. 30
      vendor/github.com/cortexproject/cortex/pkg/chunk/fixtures.go
  12. 108
      vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go
  13. 16
      vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go
  14. 75
      vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go
  15. 4
      vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go
  16. 1
      vendor/github.com/cortexproject/cortex/pkg/ingester/client/client.go
  17. 179
      vendor/github.com/cortexproject/cortex/pkg/ingester/index/index.go
  18. 15
      vendor/github.com/cortexproject/cortex/pkg/ring/lifecycler.go

7
Gopkg.lock generated

@ -186,8 +186,8 @@
version = "v2.0.0"
[[projects]]
branch = "master"
digest = "1:5750dfa5a8160b51a01c02d6d7841b50fe7d2d97344c200fc4cdc3c46b3953f8"
branch = "lazy-load-chunks"
digest = "1:ec8e0308d1e557f50317a6437073a7a859d73e4cf8e4c20a60d7009e352353c6"
name = "github.com/cortexproject/cortex"
packages = [
"pkg/chunk",
@ -214,7 +214,8 @@
"pkg/util/wire",
]
pruneopts = "UT"
revision = "ff51bd3c7267184042ea4cf347e6d1fa24934c91"
revision = "161f6716cba9a32f07f359c4f9f8578e0c5d5ae8"
source = "https://github.com/grafana/cortex"
[[projects]]
digest = "1:ffe9824d294da03b391f44e1ae8281281b4afc1bdaa9588c9097785e3af10cec"

@ -26,7 +26,8 @@
[[constraint]]
name = "github.com/cortexproject/cortex"
branch = "master"
source = "https://github.com/grafana/cortex"
branch = "lazy-load-chunks"
[[constraint]]
name = "github.com/weaveworks/common"

@ -0,0 +1,87 @@
package chunkenc
import (
"context"
"time"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
)
// LazyChunk loads the chunk when it is accessed.
type LazyChunk struct {
Chunk chunk.Chunk
Fetcher *chunk.Fetcher
}
func (c *LazyChunk) getChunk(ctx context.Context) (Chunk, error) {
chunks, err := c.Fetcher.FetchChunks(ctx, []chunk.Chunk{c.Chunk}, []string{c.Chunk.ExternalKey()})
if err != nil {
return nil, err
}
c.Chunk = chunks[0]
return chunks[0].Data.(*Facade).LokiChunk(), nil
}
// Iterator returns an entry iterator.
func (c LazyChunk) Iterator(ctx context.Context, from, through time.Time, direction logproto.Direction) (iter.EntryIterator, error) {
// If the chunk is already loaded, then use that.
if c.Chunk.Data != nil {
lokiChunk := c.Chunk.Data.(*Facade).LokiChunk()
return lokiChunk.Iterator(from, through, direction)
}
return &lazyIterator{
chunk: c,
from: from,
through: through,
direction: direction,
context: ctx,
}, nil
}
type lazyIterator struct {
iter.EntryIterator
chunk LazyChunk
err error
from, through time.Time
direction logproto.Direction
context context.Context
}
func (it *lazyIterator) Next() bool {
if it.err != nil {
return false
}
if it.EntryIterator != nil {
return it.EntryIterator.Next()
}
chk, err := it.chunk.getChunk(it.context)
if err != nil {
it.err = err
return false
}
it.EntryIterator, it.err = chk.Iterator(it.from, it.through, it.direction)
return it.Next()
}
func (it *lazyIterator) Labels() string {
return it.chunk.Chunk.Metric.String()
}
func (it *lazyIterator) Error() error {
if it.err != nil {
return it.err
}
return it.EntryIterator.Error()
}

@ -121,16 +121,16 @@ func (i *instance) Query(req *logproto.QueryRequest, queryServer logproto.Querie
func (i *instance) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
var labels []string
if req.Values {
values := i.index.LabelValues(model.LabelName(req.Name))
values := i.index.LabelValues(req.Name)
labels = make([]string, len(values))
for i := 0; i < len(values); i++ {
labels[i] = string(values[i])
labels[i] = values[i]
}
} else {
names := i.index.LabelNames()
labels = make([]string, len(names))
for i := 0; i < len(names); i++ {
labels[i] = string(names[i])
labels[i] = names[i]
}
}
return &logproto.LabelResponse{

@ -352,11 +352,15 @@ func (i *nonOverlappingIterator) Entry() logproto.Entry {
}
func (i *nonOverlappingIterator) Labels() string {
return i.labels
if i.labels != "" {
return i.labels
}
return i.curr.Labels()
}
func (i *nonOverlappingIterator) Error() error {
return nil
return i.curr.Error()
}
func (i *nonOverlappingIterator) Close() error {

@ -5,6 +5,7 @@ import (
"sort"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
@ -27,60 +28,79 @@ func (q Querier) queryStore(ctx context.Context, req *logproto.QueryRequest) ([]
matchers = append(matchers, nameLabelMatcher)
from, through := model.TimeFromUnixNano(req.Start.UnixNano()), model.TimeFromUnixNano(req.End.UnixNano())
chunks, err := q.store.Get(ctx, from, through, matchers...)
chks, fetchers, err := q.store.GetChunkRefs(ctx, from, through, matchers...)
if err != nil {
return nil, err
}
return partitionBySeriesChunks(req, chunks)
}
func partitionBySeriesChunks(req *logproto.QueryRequest, chunks []chunk.Chunk) ([]iter.EntryIterator, error) {
chunksByFp := map[model.Fingerprint][]chunk.Chunk{}
metricByFp := map[model.Fingerprint]model.Metric{}
for _, c := range chunks {
fp := c.Metric.Fingerprint()
chunksByFp[fp] = append(chunksByFp[fp], c)
delete(c.Metric, "__name__")
metricByFp[fp] = c.Metric
for i := range chks {
chks[i] = filterChunksByTime(from, through, chks[i])
}
iters := make([]iter.EntryIterator, 0, len(chunksByFp))
for fp := range chunksByFp {
iterators, err := partitionOverlappingChunks(req, metricByFp[fp].String(), chunksByFp[fp])
if err != nil {
return nil, err
}
iterator := iter.NewHeapIterator(iterators, req.Direction)
iters = append(iters, iterator)
chksBySeries := partitionBySeriesChunks(chks, fetchers)
// Make sure the initial chunks are loaded. This is not one chunk
// per series, but rather a chunk per non-overlapping iterator.
if err := loadFirstChunks(ctx, chksBySeries); err != nil {
return nil, err
}
return iters, nil
// Now that we have the first chunk for each series loaded,
// we can proceed to filter the series that don't match.
chksBySeries = filterSeriesByMatchers(chksBySeries, matchers)
return buildIterators(ctx, req, chksBySeries)
}
func partitionOverlappingChunks(req *logproto.QueryRequest, labels string, chunks []chunk.Chunk) ([]iter.EntryIterator, error) {
sort.Sort(byFrom(chunks))
func filterChunksByTime(from, through model.Time, chunks []chunk.Chunk) []chunk.Chunk {
filtered := make([]chunk.Chunk, 0, len(chunks))
keys := make([]string, 0, len(chunks))
for _, chunk := range chunks {
if chunk.Through < from || through < chunk.From {
continue
}
filtered = append(filtered, chunk)
keys = append(keys, chunk.ExternalKey())
}
return filtered
}
css := [][]chunk.Chunk{}
func filterSeriesByMatchers(chks map[model.Fingerprint][][]chunkenc.LazyChunk, matchers []*labels.Matcher) map[model.Fingerprint][][]chunkenc.LazyChunk {
outer:
for _, c := range chunks {
for i, cs := range css {
if cs[len(cs)-1].Through.Before(c.From) {
css[i] = append(css[i], c)
for fp, chunks := range chks {
for _, matcher := range matchers {
if !matcher.Matches(string(chunks[0][0].Chunk.Metric[model.LabelName(matcher.Name)])) {
delete(chks, fp)
continue outer
}
}
cs := make([]chunk.Chunk, 0, len(chunks)/(len(css)+1))
cs = append(cs, c)
css = append(css, cs)
}
result := make([]iter.EntryIterator, 0, len(css))
for i := range css {
iterators := make([]iter.EntryIterator, 0, len(css[i]))
for j := range css[i] {
lokiChunk := css[i][j].Data.(*chunkenc.Facade).LokiChunk()
iterator, err := lokiChunk.Iterator(req.Start, req.End, req.Direction)
return chks
}
func buildIterators(ctx context.Context, req *logproto.QueryRequest, chks map[model.Fingerprint][][]chunkenc.LazyChunk) ([]iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
for _, chunks := range chks {
iterator, err := buildHeapIterator(ctx, req, chunks)
if err != nil {
return nil, err
}
result = append(result, iterator)
}
return result, nil
}
func buildHeapIterator(ctx context.Context, req *logproto.QueryRequest, chks [][]chunkenc.LazyChunk) (iter.EntryIterator, error) {
result := make([]iter.EntryIterator, 0, len(chks))
labels := chks[0][0].Chunk.Metric.String()
for i := range chks {
iterators := make([]iter.EntryIterator, 0, len(chks[i]))
for j := range chks[i] {
iterator, err := chks[i][j].Iterator(ctx, req.Start, req.End, req.Direction)
if err != nil {
return nil, err
}
@ -92,13 +112,101 @@ outer:
}
iterators = append(iterators, iterator)
}
result = append(result, iter.NewNonOverlappingIterator(iterators, labels))
}
return result, nil
return iter.NewHeapIterator(result, req.Direction), nil
}
func loadFirstChunks(ctx context.Context, chks map[model.Fingerprint][][]chunkenc.LazyChunk) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "loadFirstChunks")
defer sp.Finish()
// If chunks span buckets, then we'll have different fetchers for each bucket.
chksByFetcher := map[*chunk.Fetcher][]*chunkenc.LazyChunk{}
for _, lchks := range chks {
for _, lchk := range lchks {
if len(lchk) == 0 {
continue
}
chksByFetcher[lchk[0].Fetcher] = append(chksByFetcher[lchk[0].Fetcher], &lchk[0])
}
}
errChan := make(chan error)
for fetcher, chunks := range chksByFetcher {
go func(fetcher *chunk.Fetcher, chunks []*chunkenc.LazyChunk) {
keys := make([]string, 0, len(chunks))
chks := make([]chunk.Chunk, 0, len(chunks))
for _, chk := range chunks {
keys = append(keys, chk.Chunk.ExternalKey())
chks = append(chks, chk.Chunk)
}
chks, err := fetcher.FetchChunks(ctx, chks, keys)
if err != nil {
errChan <- err
return
}
for i, chk := range chks {
chunks[i].Chunk = chk
}
errChan <- nil
}(fetcher, chunks)
}
var lastErr error
for i := 0; i < len(chksByFetcher); i++ {
if err := <-errChan; err != nil {
lastErr = err
}
}
return lastErr
}
type byFrom []chunk.Chunk
func partitionBySeriesChunks(chunks [][]chunk.Chunk, fetchers []*chunk.Fetcher) map[model.Fingerprint][][]chunkenc.LazyChunk {
chunksByFp := map[model.Fingerprint][]chunkenc.LazyChunk{}
for i, chks := range chunks {
for _, c := range chks {
fp := c.Metric.Fingerprint()
chunksByFp[fp] = append(chunksByFp[fp], chunkenc.LazyChunk{Chunk: c, Fetcher: fetchers[i]})
delete(c.Metric, "__name__")
}
}
result := make(map[model.Fingerprint][][]chunkenc.LazyChunk, len(chunksByFp))
func (b byFrom) Len() int { return len(b) }
func (b byFrom) Swap(i, j int) { b[i], b[j] = b[j], b[i] }
func (b byFrom) Less(i, j int) bool { return b[i].From < b[j].From }
for fp, chks := range chunksByFp {
result[fp] = partitionOverlappingChunks(chks)
}
return result
}
// partitionOverlappingChunks splits the list of chunks into different non-overlapping lists.
func partitionOverlappingChunks(chunks []chunkenc.LazyChunk) [][]chunkenc.LazyChunk {
sort.Slice(chunks, func(i, j int) bool {
return chunks[i].Chunk.From < chunks[i].Chunk.From
})
css := [][]chunkenc.LazyChunk{}
outer:
for _, c := range chunks {
for i, cs := range css {
// If the chunk doesn't overlap with the current list, then add it to it.
if cs[len(cs)-1].Chunk.Through.Before(c.Chunk.From) {
css[i] = append(css[i], c)
continue outer
}
}
// If the chunk overlaps with every existing list, then create a new list.
cs := make([]chunkenc.LazyChunk, 0, len(chunks)/(len(css)+1))
cs = append(cs, c)
css = append(css, cs)
}
return css
}

@ -44,7 +44,7 @@ func newAWSAutoscale(cfg DynamoDBConfig, callManager callManager) (*awsAutoscale
return nil, err
}
return &awsAutoscale{
call: callManager,
call: callManager,
ApplicationAutoScaling: applicationautoscaling.New(session),
}, nil
}

@ -10,6 +10,7 @@ import (
"time"
"github.com/go-kit/kit/log/level"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
@ -191,6 +192,10 @@ func (c *store) Get(ctx context.Context, from, through model.Time, allMatchers .
return c.getMetricNameChunks(ctx, from, through, matchers, metricName)
}
func (c *store) GetChunkRefs(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
return nil, nil, errors.New("not implemented")
}
func (c *store) validateQuery(ctx context.Context, from model.Time, through *model.Time, matchers []*labels.Matcher) (string, []*labels.Matcher, bool, error) {
log, ctx := spanlogger.New(ctx, "store.validateQuery")
defer log.Span.Finish()

@ -14,7 +14,8 @@ import (
type Store interface {
Put(ctx context.Context, chunks []Chunk) error
PutOne(ctx context.Context, from, through model.Time, chunk Chunk) error
Get(tx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
Get(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([]Chunk, error)
GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error)
Stop()
}
@ -45,7 +46,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
var store Store
var err error
switch cfg.Schema {
case "v9":
case "v9", "v10":
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
default:
store, err = newStore(storeCfg, schema, index, chunks, limits)
@ -88,6 +89,22 @@ func (c compositeStore) Get(ctx context.Context, from, through model.Time, match
return results, err
}
func (c compositeStore) GetChunkRefs(ctx context.Context, from, through model.Time, matchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
chunkIDs := [][]Chunk{}
fetchers := []*Fetcher{}
err := c.forStores(from, through, func(from, through model.Time, store Store) error {
ids, fetcher, err := store.GetChunkRefs(ctx, from, through, matchers...)
if err != nil {
return err
}
chunkIDs = append(chunkIDs, ids...)
fetchers = append(fetchers, fetcher...)
return nil
})
return chunkIDs, fetchers, err
}
func (c compositeStore) Stop() {
for _, store := range c.stores {
store.Stop()

@ -53,15 +53,18 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
func (b *bigchunk) addNextChunk(start model.Time) error {
// To save memory, we "compact" the previous chunk - the array backing the slice
// will be upto 2x too big, and we can save this space.
const chunkCapacityExcess = 32 // don't bother copying if it's within this range
if l := len(b.chunks); l > 0 {
c := b.chunks[l-1].XORChunk
buf := make([]byte, len(c.Bytes()))
copy(buf, c.Bytes())
compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf)
if err != nil {
return err
oldBuf := b.chunks[l-1].XORChunk.Bytes()
if cap(oldBuf) > len(oldBuf)+chunkCapacityExcess {
buf := make([]byte, len(oldBuf))
copy(buf, oldBuf)
compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf)
if err != nil {
return err
}
b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk)
}
b.chunks[l-1].XORChunk = compacted.(*chunkenc.XORChunk)
}
chunk := chunkenc.NewXORChunk()
@ -110,7 +113,7 @@ func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
return err
}
b.chunks = make([]smallChunk, 0, numChunks)
b.chunks = make([]smallChunk, 0, numChunks+1) // allow one extra space in case we want to add new data
for i := uint16(0); i < numChunks; i++ {
chunkLen, err := r.ReadUint16()
if err != nil {

@ -8,23 +8,23 @@ import (
// BenchmarkMetric is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated
var BenchmarkMetric = model.Metric{
model.MetricNameLabel: "container_cpu_usage_seconds_total",
"beta_kubernetes_io_arch": "amd64",
"beta_kubernetes_io_instance_type": "c3.somesize",
"beta_kubernetes_io_os": "linux",
"container_name": "some-name",
"cpu": "cpu01",
model.MetricNameLabel: "container_cpu_usage_seconds_total",
"beta_kubernetes_io_arch": "amd64",
"beta_kubernetes_io_instance_type": "c3.somesize",
"beta_kubernetes_io_os": "linux",
"container_name": "some-name",
"cpu": "cpu01",
"failure_domain_beta_kubernetes_io_region": "somewhere-1",
"failure_domain_beta_kubernetes_io_zone": "somewhere-1b",
"id": "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28",
"image": "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506",
"instance": "ip-111-11-1-11.ec2.internal",
"job": "kubernetes-cadvisor",
"kubernetes_io_hostname": "ip-111-11-1-11",
"monitor": "prod",
"name": "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0",
"namespace": "kube-system",
"pod_name": "some-other-name-5j8s8",
"id": "/kubepods/burstable/pod6e91c467-e4c5-11e7-ace3-0a97ed59c75e/a3c8498918bd6866349fed5a6f8c643b77c91836427fb6327913276ebc6bde28",
"image": "registry/organisation/name@sha256:dca3d877a80008b45d71d7edc4fd2e44c0c8c8e7102ba5cbabec63a374d1d506",
"instance": "ip-111-11-1-11.ec2.internal",
"job": "kubernetes-cadvisor",
"kubernetes_io_hostname": "ip-111-11-1-11",
"monitor": "prod",
"name": "k8s_some-name_some-other-name-5j8s8_kube-system_6e91c467-e4c5-11e7-ace3-0a97ed59c75e_0",
"namespace": "kube-system",
"pod_name": "some-other-name-5j8s8",
}
// DefaultSchemaConfig creates a simple schema config for testing

@ -1,6 +1,7 @@
package chunk
import (
"encoding/binary"
"errors"
"fmt"
"strings"
@ -540,7 +541,7 @@ func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
type v9Entries struct {
}
func (e v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
func (v9Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
return nil, ErrNotSupported
}
@ -630,3 +631,108 @@ func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuer
},
}, nil
}
// v10Entries builds on v9 by sharding index rows to reduce their size.
type v10Entries struct {
rowShards uint32
}
func (v10Entries) GetWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
return nil, ErrNotSupported
}
func (s v10Entries) GetLabelWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
seriesID := sha256bytes(labels.String())
// read first 32 bits of the hash and use this to calculate the shard
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
entries := []IndexEntry{
// Entry for metricName -> seriesID
{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s", shard, bucket.hashKey, string(metricName)),
RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1),
},
}
// Entries for metricName:labelName -> hash(value):seriesID
// We use a hash of the value to limit its length.
for key, value := range labels {
if key == model.MetricNameLabel {
continue
}
valueHash := sha256bytes(string(value))
entries = append(entries, IndexEntry{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey, metricName, key),
RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1),
Value: []byte(value),
})
}
return entries, nil
}
func (v10Entries) GetChunkWriteEntries(bucket Bucket, metricName model.LabelValue, labels model.Metric, chunkID string) ([]IndexEntry, error) {
seriesID := sha256bytes(labels.String())
encodedThroughBytes := encodeTime(bucket.through)
entries := []IndexEntry{
// Entry for seriesID -> chunkID
{
TableName: bucket.tableName,
HashValue: bucket.hashKey + ":" + string(seriesID),
RangeValue: encodeRangeKey(encodedThroughBytes, nil, []byte(chunkID), chunkTimeRangeKeyV3),
},
}
return entries, nil
}
func (s v10Entries) GetReadMetricQueries(bucket Bucket, metricName model.LabelValue) ([]IndexQuery, error) {
result := make([]IndexQuery, 0, s.rowShards)
for i := uint32(0); i < s.rowShards; i++ {
result = append(result, IndexQuery{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s", i, bucket.hashKey, string(metricName)),
})
}
return result, nil
}
func (s v10Entries) GetReadMetricLabelQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName) ([]IndexQuery, error) {
result := make([]IndexQuery, 0, s.rowShards)
for i := uint32(0); i < s.rowShards; i++ {
result = append(result, IndexQuery{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", i, bucket.hashKey, metricName, labelName),
})
}
return result, nil
}
func (s v10Entries) GetReadMetricLabelValueQueries(bucket Bucket, metricName model.LabelValue, labelName model.LabelName, labelValue model.LabelValue) ([]IndexQuery, error) {
valueHash := sha256bytes(string(labelValue))
result := make([]IndexQuery, 0, s.rowShards)
for i := uint32(0); i < s.rowShards; i++ {
result = append(result, IndexQuery{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", i, bucket.hashKey, metricName, labelName),
RangeValueStart: encodeRangeKey(valueHash),
ValueEqual: []byte(labelValue),
})
}
return result, nil
}
func (v10Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) {
encodedFromBytes := encodeTime(bucket.from)
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: bucket.hashKey + ":" + string(seriesID),
RangeValueStart: encodeRangeKey(encodedFromBytes),
},
}, nil
}

@ -3,17 +3,17 @@ package chunk
import (
"flag"
"fmt"
"github.com/go-kit/kit/log/level"
"os"
"strconv"
"time"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/mtime"
yaml "gopkg.in/yaml.v2"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/weaveworks/common/mtime"
)
const (
@ -32,6 +32,7 @@ type PeriodConfig struct {
Schema string `yaml:"schema"`
IndexTables PeriodicTableConfig `yaml:"index"`
ChunkTables PeriodicTableConfig `yaml:"chunks,omitempty"`
RowShards uint32 `yaml:"row_shards"`
}
// SchemaConfig contains the config for our chunk index schemas
@ -181,6 +182,15 @@ func (cfg PeriodConfig) createSchema() Schema {
s = schema{cfg.dailyBuckets, v6Entries{}}
case "v9":
s = schema{cfg.dailyBuckets, v9Entries{}}
case "v10":
rowShards := uint32(16)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}
s = schema{cfg.dailyBuckets, v10Entries{
rowShards: rowShards,
}}
}
return s
}
@ -424,7 +434,7 @@ func (cfg *PeriodicTableConfig) periodicTables(from, through model.Time, pCfg Pr
// ChunkTableFor calculates the chunk table shard for a given point in time.
func (cfg SchemaConfig) ChunkTableFor(t model.Time) (string, error) {
for i := range cfg.Configs {
if t > cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) {
if t >= cfg.Configs[i].From && (i+1 == len(cfg.Configs) || t < cfg.Configs[i+1].From) {
return cfg.Configs[i].ChunkTables.TableFor(t), nil
}
}

@ -108,38 +108,17 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
return nil, err
}
// Validate the query is within reasonable bounds.
metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers)
if err != nil {
return nil, err
} else if shortcut {
return nil, nil
}
level.Debug(log).Log("metric", metricName)
// Fetch the series IDs from the index, based on non-empty matchers from
// the query.
_, matchers = util.SplitFiltersAndMatchers(matchers)
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricName, matchers)
chks, _, err := c.GetChunkRefs(ctx, from, through, allMatchers...)
if err != nil {
return nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))
// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, err
if len(chks) == 0 {
// Shortcut
return nil, nil
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))
chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs)
if err != nil {
level.Error(log).Log("err", "convertChunkIDsToChunks", "err", err)
return nil, err
}
chunks := chks[0]
// Filter out chunks that are not in the selected time range.
filtered, keys := filterChunksByTime(from, through, chunks)
level.Debug(log).Log("chunks-post-filtering", len(chunks))
@ -147,8 +126,8 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
// Protect ourselves against OOMing.
maxChunksPerQuery := c.limits.MaxChunksPerQuery(userID)
if maxChunksPerQuery > 0 && len(chunkIDs) > maxChunksPerQuery {
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunkIDs), maxChunksPerQuery)
if maxChunksPerQuery > 0 && len(chunks) > maxChunksPerQuery {
err := httpgrpc.Errorf(http.StatusBadRequest, "Query %v fetched too many chunks (%d > %d)", allMatchers, len(chunks), maxChunksPerQuery)
level.Error(log).Log("err", err)
return nil, err
}
@ -165,6 +144,46 @@ func (c *seriesStore) Get(ctx context.Context, from, through model.Time, allMatc
return filteredChunks, nil
}
func (c *seriesStore) GetChunkRefs(ctx context.Context, from, through model.Time, allMatchers ...*labels.Matcher) ([][]Chunk, []*Fetcher, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.GetChunkRefs")
defer log.Span.Finish()
// Validate the query is within reasonable bounds.
metricName, matchers, shortcut, err := c.validateQuery(ctx, from, &through, allMatchers)
if err != nil {
return nil, nil, err
} else if shortcut {
return nil, nil, nil
}
level.Debug(log).Log("metric", metricName)
// Fetch the series IDs from the index, based on non-empty matchers from
// the query.
_, matchers = util.SplitFiltersAndMatchers(matchers)
seriesIDs, err := c.lookupSeriesByMetricNameMatchers(ctx, from, through, metricName, matchers)
if err != nil {
return nil, nil, err
}
level.Debug(log).Log("series-ids", len(seriesIDs))
// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, seriesIDs)
if err != nil {
level.Error(log).Log("msg", "lookupChunksBySeries", "err", err)
return nil, nil, err
}
level.Debug(log).Log("chunk-ids", len(chunkIDs))
chunks, err := c.convertChunkIDsToChunks(ctx, chunkIDs)
if err != nil {
level.Error(log).Log("op", "convertChunkIDsToChunks", "err", err)
return nil, nil, err
}
return [][]Chunk{chunks}, []*Fetcher{c.store.Fetcher}, nil
}
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()

@ -72,8 +72,8 @@ func CreateChunks(startIndex, batchSize int, start model.Time) ([]string, []chun
func dummyChunk(now model.Time) chunk.Chunk {
return dummyChunkFor(now, model.Metric{
model.MetricNameLabel: "foo",
"bar": "baz",
"toms": "code",
"bar": "baz",
"toms": "code",
})
}

@ -27,6 +27,7 @@ var ingesterClientRequestDuration = promauto.NewHistogramVec(prometheus.Histogra
type HealthAndIngesterClient interface {
IngesterClient
grpc_health_v1.HealthClient
Close() error
}
type closableHealthAndIngesterClient struct {

@ -24,7 +24,7 @@ type InvertedIndex struct {
func New() *InvertedIndex {
shards := make([]indexShard, indexShards)
for i := 0; i < indexShards; i++ {
shards[i].idx = map[model.LabelName]map[model.LabelValue][]model.Fingerprint{}
shards[i].idx = map[string]indexEntry{}
}
return &InvertedIndex{
shards: shards,
@ -32,9 +32,9 @@ func New() *InvertedIndex {
}
// Add a fingerprint under the specified labels.
func (ii *InvertedIndex) Add(labels []client.LabelPair, fp model.Fingerprint) {
func (ii *InvertedIndex) Add(labels []client.LabelPair, fp model.Fingerprint) labels.Labels {
shard := &ii.shards[util.HashFP(fp)%indexShards]
shard.add(labels, fp)
return shard.add(labels, fp)
}
// Lookup all fingerprints for the provided matchers.
@ -54,27 +54,27 @@ func (ii *InvertedIndex) Lookup(matchers []*labels.Matcher) []model.Fingerprint
}
// LabelNames returns all label names.
func (ii *InvertedIndex) LabelNames() model.LabelNames {
results := make([]model.LabelNames, 0, indexShards)
func (ii *InvertedIndex) LabelNames() []string {
results := make([][]string, 0, indexShards)
for i := range ii.shards {
shardResult := ii.shards[i].labelNames()
results = append(results, shardResult)
}
return mergeLabelNameLists(results)
return mergeStringSlices(results)
}
// LabelValues returns the values for the given label.
func (ii *InvertedIndex) LabelValues(name model.LabelName) model.LabelValues {
results := make([]model.LabelValues, 0, indexShards)
func (ii *InvertedIndex) LabelValues(name string) []string {
results := make([][]string, 0, indexShards)
for i := range ii.shards {
shardResult := ii.shards[i].labelValues(name)
results = append(results, shardResult)
}
return mergeLabelValueLists(results)
return mergeStringSlices(results)
}
// Delete a fingerprint with the given label pairs.
@ -84,7 +84,17 @@ func (ii *InvertedIndex) Delete(labels labels.Labels, fp model.Fingerprint) {
}
// NB slice entries are sorted in fp order.
type unlockIndex map[model.LabelName]map[model.LabelValue][]model.Fingerprint
type indexEntry struct {
name string
fps map[string]indexValueEntry
}
type indexValueEntry struct {
value string
fps []model.Fingerprint
}
type unlockIndex map[string]indexEntry
// This is the prevalent value for Intel and AMD CPUs as-at 2018.
const cacheLineSize = 64
@ -95,27 +105,38 @@ type indexShard struct {
pad [cacheLineSize - unsafe.Sizeof(sync.Mutex{}) - unsafe.Sizeof(unlockIndex{})]byte
}
func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) {
// add metric to the index; return all the name/value pairs as strings from the index, sorted
func (shard *indexShard) add(metric []client.LabelPair, fp model.Fingerprint) labels.Labels {
shard.mtx.Lock()
defer shard.mtx.Unlock()
for _, pair := range metric {
value := model.LabelValue(pair.Value)
values, ok := shard.idx[model.LabelName(pair.Name)]
internedLabels := make(labels.Labels, len(metric))
for i, pair := range metric {
values, ok := shard.idx[string(pair.Name)]
if !ok {
values = map[model.LabelValue][]model.Fingerprint{}
shard.idx[model.LabelName(pair.Name)] = values
values = indexEntry{
name: string(pair.Name),
fps: map[string]indexValueEntry{},
}
shard.idx[values.name] = values
}
fingerprints, ok := values.fps[string(pair.Value)]
if !ok {
fingerprints = indexValueEntry{value: string(pair.Value)}
}
fingerprints := values[value]
// Insert into the right position to keep fingerprints sorted
j := sort.Search(len(fingerprints), func(i int) bool {
return fingerprints[i] >= fp
j := sort.Search(len(fingerprints.fps), func(i int) bool {
return fingerprints.fps[i] >= fp
})
fingerprints = append(fingerprints, 0)
copy(fingerprints[j+1:], fingerprints[j:])
fingerprints[j] = fp
values[value] = fingerprints
fingerprints.fps = append(fingerprints.fps, 0)
copy(fingerprints.fps[j+1:], fingerprints.fps[j:])
fingerprints.fps[j] = fp
values.fps[fingerprints.value] = fingerprints
internedLabels[i] = labels.Label{Name: string(values.name), Value: string(fingerprints.value)}
}
sort.Sort(internedLabels)
return internedLabels
}
func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint {
@ -129,20 +150,20 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint
// loop invariant: result is sorted
var result []model.Fingerprint
for _, matcher := range matchers {
values, ok := shard.idx[model.LabelName(matcher.Name)]
values, ok := shard.idx[matcher.Name]
if !ok {
return nil
}
var toIntersect model.Fingerprints
if matcher.Type == labels.MatchEqual {
fps := values[model.LabelValue(matcher.Value)]
toIntersect = append(toIntersect, fps...) // deliberate copy
fps := values.fps[matcher.Value]
toIntersect = append(toIntersect, fps.fps...) // deliberate copy
} else {
// accumulate the matching fingerprints (which are all distinct)
// then sort to maintain the invariant
for value, fps := range values {
for value, fps := range values.fps {
if matcher.Matches(string(value)) {
toIntersect = append(toIntersect, fps...)
toIntersect = append(toIntersect, fps.fps...)
}
}
sort.Sort(toIntersect)
@ -156,20 +177,20 @@ func (shard *indexShard) lookup(matchers []*labels.Matcher) []model.Fingerprint
return result
}
func (shard *indexShard) labelNames() model.LabelNames {
func (shard *indexShard) labelNames() []string {
shard.mtx.RLock()
defer shard.mtx.RUnlock()
results := make(model.LabelNames, 0, len(shard.idx))
results := make([]string, 0, len(shard.idx))
for name := range shard.idx {
results = append(results, name)
}
sort.Sort(labelNames(results))
sort.Strings(results)
return results
}
func (shard *indexShard) labelValues(name model.LabelName) model.LabelValues {
func (shard *indexShard) labelValues(name string) []string {
shard.mtx.RLock()
defer shard.mtx.RUnlock()
@ -178,12 +199,12 @@ func (shard *indexShard) labelValues(name model.LabelName) model.LabelValues {
return nil
}
results := make(model.LabelValues, 0, len(values))
for val := range values {
results := make([]string, 0, len(values.fps))
for val := range values.fps {
results = append(results, val)
}
sort.Sort(labelValues(results))
sort.Strings(results)
return results
}
@ -192,28 +213,28 @@ func (shard *indexShard) delete(labels labels.Labels, fp model.Fingerprint) {
defer shard.mtx.Unlock()
for _, pair := range labels {
name, value := model.LabelName(pair.Name), model.LabelValue(pair.Value)
name, value := string(pair.Name), string(pair.Value)
values, ok := shard.idx[name]
if !ok {
continue
}
fingerprints, ok := values[value]
fingerprints, ok := values.fps[value]
if !ok {
continue
}
j := sort.Search(len(fingerprints), func(i int) bool {
return fingerprints[i] >= fp
j := sort.Search(len(fingerprints.fps), func(i int) bool {
return fingerprints.fps[i] >= fp
})
fingerprints = fingerprints[:j+copy(fingerprints[j:], fingerprints[j+1:])]
fingerprints.fps = fingerprints.fps[:j+copy(fingerprints.fps[j:], fingerprints.fps[j+1:])]
if len(fingerprints) == 0 {
delete(values, value)
if len(fingerprints.fps) == 0 {
delete(values.fps, value)
} else {
values[value] = fingerprints
values.fps[value] = fingerprints
}
if len(values) == 0 {
if len(values.fps) == 0 {
delete(shard.idx, name)
} else {
shard.idx[name] = values
@ -241,42 +262,31 @@ func intersect(a, b []model.Fingerprint) []model.Fingerprint {
return result
}
type labelValues model.LabelValues
func (a labelValues) Len() int { return len(a) }
func (a labelValues) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a labelValues) Less(i, j int) bool { return a[i] < a[j] }
type labelNames model.LabelNames
func (a labelNames) Len() int { return len(a) }
func (a labelNames) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a labelNames) Less(i, j int) bool { return a[i] < a[j] }
type fingerprints []model.Fingerprint
func (a fingerprints) Len() int { return len(a) }
func (a fingerprints) Swap(i, j int) { a[i], a[j] = a[j], a[i] }
func (a fingerprints) Less(i, j int) bool { return a[i] < a[j] }
func mergeLabelValueLists(lvss []model.LabelValues) model.LabelValues {
switch len(lvss) {
func mergeStringSlices(ss [][]string) []string {
switch len(ss) {
case 0:
return nil
case 1:
return lvss[0]
return ss[0]
case 2:
return mergeTwoLabelValueLists(lvss[0], lvss[1])
return mergeTwoStringSlices(ss[0], ss[1])
default:
n := len(lvss) / 2
left := mergeLabelValueLists(lvss[:n])
right := mergeLabelValueLists(lvss[n:])
return mergeTwoLabelValueLists(left, right)
halfway := len(ss) / 2
return mergeTwoStringSlices(
mergeStringSlices(ss[:halfway]),
mergeStringSlices(ss[halfway:]),
)
}
}
func mergeTwoLabelValueLists(a, b model.LabelValues) model.LabelValues {
result := make(model.LabelValues, 0, len(a)+len(b))
func mergeTwoStringSlices(a, b []string) []string {
result := make([]string, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i] < b[j] {
@ -286,45 +296,8 @@ func mergeTwoLabelValueLists(a, b model.LabelValues) model.LabelValues {
result = append(result, b[j])
j++
} else {
result = append(result, b[j])
i++
j++
}
}
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
func mergeLabelNameLists(lnss []model.LabelNames) model.LabelNames {
switch len(lnss) {
case 0:
return nil
case 1:
return lnss[0]
case 2:
return mergeTwoLabelNameLists(lnss[0], lnss[1])
default:
n := len(lnss) / 2
left := mergeLabelNameLists(lnss[:n])
right := mergeLabelNameLists(lnss[n:])
return mergeTwoLabelNameLists(left, right)
}
}
func mergeTwoLabelNameLists(a, b model.LabelNames) model.LabelNames {
result := make(model.LabelNames, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i] < b[j] {
result = append(result, a[i])
i++
} else if a[i] > b[j] {
result = append(result, b[j])
j++
} else {
result = append(result, b[j])
i++
j++
}
}

@ -30,6 +30,11 @@ var (
Name: "cortex_ingester_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
})
shutdownDuration = promauto.NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_shutdown_duration_seconds",
Help: "Duration (in seconds) of cortex shutdown procedure (ie transfer or flush).",
Buckets: prometheus.ExponentialBuckets(10, 2, 8), // Biggest bucket is 10*2^(9-1) = 2560, or 42 mins.
}, []string{"op", "status"})
)
// LifecyclerConfig is the config to build a Lifecycler.
@ -45,6 +50,7 @@ type LifecyclerConfig struct {
ClaimOnRollout bool `yaml:"claim_on_rollout,omitempty"`
NormaliseTokens bool `yaml:"normalise_tokens,omitempty"`
InfNames []string `yaml:"interface_names"`
FinalSleep time.Duration `yaml:"final_sleep"`
// For testing, you can override the address and ID of this ingester
Addr string `yaml:"address"`
@ -63,6 +69,7 @@ func (cfg *LifecyclerConfig) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MinReadyDuration, "ingester.min-ready-duration", 1*time.Minute, "Minimum duration to wait before becoming ready. This is to work around race conditions with ingesters exiting and updating the ring.")
f.BoolVar(&cfg.ClaimOnRollout, "ingester.claim-on-rollout", false, "Send chunks to PENDING ingesters on exit.")
f.BoolVar(&cfg.NormaliseTokens, "ingester.normalise-tokens", false, "Store tokens in a normalised fashion to reduce allocations.")
f.DurationVar(&cfg.FinalSleep, "ingester.final-sleep", 30*time.Second, "Duration to sleep for before exiting, to ensure metrics are scraped.")
hostname, err := os.Hostname()
if err != nil {
@ -454,16 +461,24 @@ func (i *Lifecycler) changeState(ctx context.Context, state IngesterState) error
func (i *Lifecycler) processShutdown(ctx context.Context) {
flushRequired := true
if i.cfg.ClaimOnRollout {
transferStart := time.Now()
if err := i.flushTransferer.TransferOut(ctx); err != nil {
level.Error(util.Logger).Log("msg", "Failed to transfer chunks to another ingester", "err", err)
shutdownDuration.WithLabelValues("transfer", "fail").Observe(time.Since(transferStart).Seconds())
} else {
flushRequired = false
shutdownDuration.WithLabelValues("transfer", "success").Observe(time.Since(transferStart).Seconds())
}
}
if flushRequired {
flushStart := time.Now()
i.flushTransferer.Flush()
shutdownDuration.WithLabelValues("flush", "success").Observe(time.Since(flushStart).Seconds())
}
// Sleep so the shutdownDuration metric can be collected.
time.Sleep(i.cfg.FinalSleep)
}
// unregister removes our entry from consul.

Loading…
Cancel
Save