Update cortex to bring v11 schema (#1201)

* Update cortex to bring v11 schema

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* Fixes change in ParseProtoReader

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>

* sync go mod

Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com>
pull/950/head
Cyril Tovena 6 years ago committed by GitHub
parent 3b802b2ce1
commit d438bda94a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      go.mod
  2. 4
      go.sum
  3. 3
      pkg/distributor/http.go
  4. 3
      pkg/promtail/client/client_test.go
  5. 3
      pkg/promtail/promtail_test.go
  6. 11
      vendor/github.com/cortexproject/cortex/pkg/chunk/aws/dynamodb_table_client.go
  7. 11
      vendor/github.com/cortexproject/cortex/pkg/chunk/cache/redis_cache.go
  8. 4
      vendor/github.com/cortexproject/cortex/pkg/chunk/cassandra/storage_client.go
  9. 2
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store_utils.go
  10. 2
      vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go
  11. 6
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/bigchunk.go
  12. 36
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/chunk.go
  13. 355
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/delta.go
  14. 102
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/doubledelta.go
  15. 19
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/factory.go
  16. 108
      vendor/github.com/cortexproject/cortex/pkg/chunk/encoding/varbit.go
  17. 7
      vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/bigtable_index_client.go
  18. 21
      vendor/github.com/cortexproject/cortex/pkg/chunk/gcp/table_client.go
  19. 1
      vendor/github.com/cortexproject/cortex/pkg/chunk/inmemory_storage_client.go
  20. 1
      vendor/github.com/cortexproject/cortex/pkg/chunk/local/boltdb_index_client.go
  21. 113
      vendor/github.com/cortexproject/cortex/pkg/chunk/schema.go
  22. 87
      vendor/github.com/cortexproject/cortex/pkg/chunk/schema_caching.go
  23. 118
      vendor/github.com/cortexproject/cortex/pkg/chunk/schema_config.go
  24. 60
      vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go
  25. 2
      vendor/github.com/cortexproject/cortex/pkg/chunk/table_manager.go
  26. 13
      vendor/github.com/cortexproject/cortex/pkg/chunk/testutils/testutils.go
  27. 4
      vendor/github.com/cortexproject/cortex/pkg/ring/batch.go
  28. 69
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/client.go
  29. 43
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/consul/mock.go
  30. 26
      vendor/github.com/cortexproject/cortex/pkg/util/http.go
  31. 2
      vendor/github.com/cortexproject/cortex/pkg/util/validation/limits.go
  32. 9
      vendor/github.com/cortexproject/cortex/pkg/util/validation/override.go
  33. 2
      vendor/modules.txt

@ -10,7 +10,7 @@ require (
github.com/bmatcuk/doublestar v1.1.1
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e
github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59
github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.0.0-20190607191414-238f8eaa31aa

@ -98,8 +98,8 @@ github.com/coreos/go-systemd v0.0.0-20190321100706-95778dfbb74e/go.mod h1:F5haX7
github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59 h1:DPfMYL5cV21JIaFtf64szezjkopANcwiQmeMZVCbStg=
github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59/go.mod h1:XLeoQLsfseLmVzRpZ6MIuoUOTAC979R7WSdBdnwe800=
github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953 h1:V6cRjz6Kx4lmv5xkWdgNzhDwWxFU4nl9ttSX+9YhJJE=
github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953/go.mod h1:XLeoQLsfseLmVzRpZ6MIuoUOTAC979R7WSdBdnwe800=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8=
github.com/cznic/fileutil v0.0.0-20180108211300-6a051e75936f/go.mod h1:8S58EK26zhXSxzv7NQFpnliaOQsmDUxvoQO3rt154Vg=

@ -1,6 +1,7 @@
package distributor
import (
"math"
"net/http"
"github.com/weaveworks/common/httpgrpc"
@ -37,7 +38,7 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
}
default:
if _, err := util.ParseProtoReader(r.Context(), r.Body, &req, util.RawSnappy); err != nil {
if _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

@ -1,6 +1,7 @@
package client
import (
"math"
"net/http"
"net/http/httptest"
"strings"
@ -243,7 +244,7 @@ func createServerHandler(receivedReqsChan chan logproto.PushRequest, status int)
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) {
// Parse the request
var pushReq logproto.PushRequest
if _, err := util.ParseProtoReader(req.Context(), req.Body, &pushReq, util.RawSnappy); err != nil {
if _, err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil {
rw.WriteHeader(500)
return
}

@ -4,6 +4,7 @@ import (
"fmt"
"io"
"io/ioutil"
"math"
"math/rand"
"net/http"
"os"
@ -435,7 +436,7 @@ type testServerHandler struct {
func (h *testServerHandler) ServeHTTP(w http.ResponseWriter, r *http.Request) {
var req logproto.PushRequest
if _, err := util.ParseProtoReader(r.Context(), r.Body, &req, util.RawSnappy); err != nil {
if _, err := util.ParseProtoReader(r.Context(), r.Body, int(r.ContentLength), math.MaxInt32, &req, util.RawSnappy); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}

@ -151,11 +151,18 @@ func (d dynamoTableClient) CreateTable(ctx context.Context, desc chunk.TableDesc
KeyType: aws.String(dynamodb.KeyTypeRange),
},
},
ProvisionedThroughput: &dynamodb.ProvisionedThroughput{
}
if desc.UseOnDemandIOMode {
input.BillingMode = aws.String(dynamodb.BillingModePayPerRequest)
} else {
input.BillingMode = aws.String(dynamodb.BillingModeProvisioned)
input.ProvisionedThroughput = &dynamodb.ProvisionedThroughput{
ReadCapacityUnits: aws.Int64(desc.ProvisionedRead),
WriteCapacityUnits: aws.Int64(desc.ProvisionedWrite),
},
}
}
output, err := d.DynamoDB.CreateTableWithContext(ctx, input)
if err != nil {
return err

@ -141,14 +141,3 @@ func (c *RedisCache) ping(ctx context.Context) error {
}
return err
}
func redisStatusCode(err error) string {
switch err {
case nil:
return "200"
case redis.ErrNil:
return "404"
default:
return "500"
}
}

@ -14,10 +14,6 @@ import (
"github.com/cortexproject/cortex/pkg/chunk/util"
)
const (
maxRowReads = 100
)
// Config for a StorageClient
type Config struct {
Addresses string `yaml:"addresses,omitempty"`

@ -42,14 +42,12 @@ func labelNamesFromChunks(chunks []Chunk) []string {
var result []string
for _, c := range chunks {
for _, l := range c.Metric {
if l.Name != model.MetricNameLabel {
if _, ok := keys[string(l.Name)]; !ok {
keys[string(l.Name)] = struct{}{}
result = append(result, string(l.Name))
}
}
}
}
sort.Strings(result)
return result
}

@ -55,7 +55,7 @@ func (c *CompositeStore) AddPeriod(storeCfg StoreConfig, cfg PeriodConfig, index
var store Store
var err error
switch cfg.Schema {
case "v9", "v10":
case "v9", "v10", "v11":
store, err = newSeriesStore(storeCfg, schema, index, chunks, limits)
default:
store, err = newStore(storeCfg, schema, index, chunks, limits)

@ -32,10 +32,10 @@ func newBigchunk() *bigchunk {
return &bigchunk{}
}
func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
func (b *bigchunk) Add(sample model.SamplePair) (Chunk, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
return addToOverflowChunk(b, sample)
return addToOverflowChunk(sample)
}
if err := b.addNextChunk(sample.Timestamp); err != nil {
return nil, err
@ -44,7 +44,7 @@ func (b *bigchunk) Add(sample model.SamplePair) ([]Chunk, error) {
b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
b.remainingSamples--
return []Chunk{b}, nil
return nil, nil
}
// addNextChunk adds a new XOR "subchunk" to the internal list of chunks.

@ -31,19 +31,16 @@ const ChunkLen = 1024
var (
errChunkBoundsExceeded = errors.New("attempted access outside of chunk boundaries")
errAddedToEvictedChunk = errors.New("attempted to add sample to evicted chunk")
)
// Chunk is the interface for all chunks. Chunks are generally not
// goroutine-safe.
type Chunk interface {
// Add adds a SamplePair to the chunks, performs any necessary
// re-encoding, and adds any necessary overflow chunks. It returns the
// new version of the original chunk, followed by overflow chunks, if
// any. The first chunk returned might be the same as the original one
// or a newly allocated version. In any case, take the returned chunk as
// the relevant one and discard the original chunk.
Add(sample model.SamplePair) ([]Chunk, error)
// re-encoding, and creates any necessary overflow chunk.
// The returned Chunk is the overflow chunk if it was created.
// The returned Chunk is nil if the sample got appended to the same chunk.
Add(sample model.SamplePair) (Chunk, error)
// NewIterator returns an iterator for the chunks.
// The iterator passed as argument is for re-use. Depending on implementation,
// the iterator can be re-used or a new iterator can be allocated.
@ -123,12 +120,13 @@ func RangeValues(it Iterator, in metric.Interval) ([]model.SamplePair, error) {
// addToOverflowChunk is a utility function that creates a new chunk as overflow
// chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(c Chunk, s model.SamplePair) ([]Chunk, error) {
overflowChunks, err := New().Add(s)
func addToOverflowChunk(s model.SamplePair) (Chunk, error) {
overflowChunk := New()
_, err := overflowChunk.Add(s)
if err != nil {
return nil, err
}
return []Chunk{c, overflowChunks[0]}, nil
return overflowChunk, nil
}
// transcodeAndAdd is a utility function that transcodes the dst chunk into the
@ -140,26 +138,32 @@ func transcodeAndAdd(dst Chunk, src Chunk, s model.SamplePair) ([]Chunk, error)
var (
head = dst
body, NewChunks []Chunk
newChunk Chunk
body = []Chunk{head}
err error
)
it := src.NewIterator(nil)
for it.Scan() {
if NewChunks, err = head.Add(it.Value()); err != nil {
if newChunk, err = head.Add(it.Value()); err != nil {
return nil, err
}
body = append(body, NewChunks[:len(NewChunks)-1]...)
head = NewChunks[len(NewChunks)-1]
if newChunk != nil {
body = append(body, newChunk)
head = newChunk
}
}
if it.Err() != nil {
return nil, it.Err()
}
if NewChunks, err = head.Add(s); err != nil {
if newChunk, err = head.Add(s); err != nil {
return nil, err
}
return append(body, NewChunks...), nil
if newChunk != nil {
body = append(body, newChunk)
}
return body, nil
}
// indexAccessor allows accesses to samples by index.

@ -1,355 +0,0 @@
// This file was taken from Prometheus (https://github.com/prometheus/prometheus).
// The original license header is included below:
//
// Copyright 2014 The Prometheus Authors
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
package encoding
import (
"encoding/binary"
"fmt"
"io"
"math"
"github.com/prometheus/common/model"
)
// The 21-byte header of a delta-encoded chunk looks like:
//
// - time delta bytes: 1 bytes
// - value delta bytes: 1 bytes
// - is integer: 1 byte
// - base time: 8 bytes
// - base value: 8 bytes
// - used buf bytes: 2 bytes
const (
deltaHeaderBytes = 21
deltaHeaderTimeBytesOffset = 0
deltaHeaderValueBytesOffset = 1
deltaHeaderIsIntOffset = 2
deltaHeaderBaseTimeOffset = 3
deltaHeaderBaseValueOffset = 11
deltaHeaderBufLenOffset = 19
)
// A deltaEncodedChunk adaptively stores sample timestamps and values with a
// delta encoding of various types (int, float) and bit widths. However, once 8
// bytes would be needed to encode a delta value, a fall-back to the absolute
// numbers happens (so that timestamps are saved directly as int64 and values as
// float64). It implements the chunk interface.
type deltaEncodedChunk []byte
// newDeltaEncodedChunk returns a newly allocated deltaEncodedChunk.
func newDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *deltaEncodedChunk {
if tb < 1 {
panic("need at least 1 time delta byte")
}
if length < deltaHeaderBytes+16 {
panic(fmt.Errorf(
"chunk length %d bytes is insufficient, need at least %d",
length, deltaHeaderBytes+16,
))
}
c := make(deltaEncodedChunk, deltaHeaderIsIntOffset+1, length)
c[deltaHeaderTimeBytesOffset] = byte(tb)
c[deltaHeaderValueBytesOffset] = byte(vb)
if vb < d8 && isInt { // Only use int for fewer than 8 value delta bytes.
c[deltaHeaderIsIntOffset] = 1
} else {
c[deltaHeaderIsIntOffset] = 0
}
return &c
}
// Add implements chunk.
func (c deltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.Len() == 0 {
c = c[:deltaHeaderBytes]
binary.LittleEndian.PutUint64(c[deltaHeaderBaseTimeOffset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64(c[deltaHeaderBaseValueOffset:], math.Float64bits(float64(s.Value)))
}
remainingBytes := cap(c) - len(c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
return addToOverflowChunk(&c, s)
}
baseValue := c.baseValue()
dt := s.Timestamp - c.baseTime()
if dt < 0 {
return nil, fmt.Errorf("time delta is less than zero: %v", dt)
}
dv := s.Value - baseValue
tb := c.timeBytes()
vb := c.valueBytes()
isInt := c.isInt()
// If the new sample is incompatible with the current encoding, reencode the
// existing chunk data into new chunk(s).
ntb, nvb, nInt := tb, vb, isInt
if isInt && !isInt64(dv) {
// int->float.
nvb = d4
nInt = false
} else if !isInt && vb == d4 && baseValue+model.SampleValue(float32(dv)) != s.Value {
// float32->float64.
nvb = d8
} else {
if tb < d8 {
// Maybe more bytes for timestamp.
ntb = max(tb, bytesNeededForUnsignedTimestampDelta(dt))
}
if c.isInt() && vb < d8 {
// Maybe more bytes for sample value.
nvb = max(vb, bytesNeededForIntegerSampleValueDelta(dv))
}
}
if tb != ntb || vb != nvb || isInt != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
return addToOverflowChunk(&c, s)
}
offset := len(c)
c = c[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(dt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(dt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(dt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
default:
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
offset += int(tb)
if c.isInt() {
switch vb {
case d0:
// No-op. Constant value is stored as base value.
case d1:
c[offset] = byte(int8(dv))
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(int16(dv)))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(dv)))
// d8 must not happen. Those samples are encoded as float64.
default:
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
}
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(dv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
default:
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []Chunk{&c}, nil
}
func (c *deltaEncodedChunk) Slice(_, _ model.Time) Chunk {
return c
}
// NewIterator implements chunk.
func (c *deltaEncodedChunk) NewIterator(_ Iterator) Iterator {
return newIndexAccessingChunkIterator(c.Len(), &deltaEncodedIndexAccessor{
c: *c,
baseT: c.baseTime(),
baseV: c.baseValue(),
tBytes: c.timeBytes(),
vBytes: c.valueBytes(),
isInt: c.isInt(),
})
}
// Marshal implements chunk.
func (c deltaEncodedChunk) Marshal(w io.Writer) error {
if len(c) > math.MaxUint16 {
panic("chunk buffer length would overflow a 16 bit uint.")
}
binary.LittleEndian.PutUint16(c[deltaHeaderBufLenOffset:], uint16(len(c)))
n, err := w.Write(c[:cap(c)])
if err != nil {
return err
}
if n != cap(c) {
return fmt.Errorf("wanted to write %d bytes, wrote %d", cap(c), n)
}
return nil
}
// UnmarshalFromBuf implements chunk.
func (c *deltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
return c.setLen()
}
// setLen sets the length of the underlying slice and performs some sanity checks.
func (c *deltaEncodedChunk) setLen() error {
l := binary.LittleEndian.Uint16((*c)[deltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
return fmt.Errorf("delta chunk length exceeded during unmarshalling: %d", l)
}
if int(l) < deltaHeaderBytes {
return fmt.Errorf("delta chunk length less than header size: %d < %d", l, deltaHeaderBytes)
}
switch c.timeBytes() {
case d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of time bytes in delta chunk: %d", c.timeBytes())
}
switch c.valueBytes() {
case d0, d1, d2, d4, d8:
// Pass.
default:
return fmt.Errorf("invalid number of value bytes in delta chunk: %d", c.valueBytes())
}
*c = (*c)[:l]
return nil
}
// Encoding implements chunk.
func (c deltaEncodedChunk) Encoding() Encoding { return Delta }
// Utilization implements chunk.
func (c deltaEncodedChunk) Utilization() float64 {
return float64(len(c)) / float64(cap(c))
}
func (c deltaEncodedChunk) timeBytes() deltaBytes {
return deltaBytes(c[deltaHeaderTimeBytesOffset])
}
func (c deltaEncodedChunk) valueBytes() deltaBytes {
return deltaBytes(c[deltaHeaderValueBytesOffset])
}
func (c deltaEncodedChunk) isInt() bool {
return c[deltaHeaderIsIntOffset] == 1
}
func (c deltaEncodedChunk) baseTime() model.Time {
return model.Time(binary.LittleEndian.Uint64(c[deltaHeaderBaseTimeOffset:]))
}
func (c deltaEncodedChunk) baseValue() model.SampleValue {
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(c[deltaHeaderBaseValueOffset:])))
}
func (c deltaEncodedChunk) sampleSize() int {
return int(c.timeBytes() + c.valueBytes())
}
// Len implements Chunk. Runs in constant time.
func (c deltaEncodedChunk) Len() int {
if len(c) < deltaHeaderBytes {
return 0
}
return (len(c) - deltaHeaderBytes) / c.sampleSize()
}
func (c deltaEncodedChunk) Size() int {
return len(c)
}
// deltaEncodedIndexAccessor implements indexAccessor.
type deltaEncodedIndexAccessor struct {
c deltaEncodedChunk
baseT model.Time
baseV model.SampleValue
tBytes, vBytes deltaBytes
isInt bool
lastErr error
}
func (acc *deltaEncodedIndexAccessor) err() error {
return acc.lastErr
}
func (acc *deltaEncodedIndexAccessor) timestampAtIndex(idx int) model.Time {
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes)
switch acc.tBytes {
case d1:
return acc.baseT + model.Time(uint8(acc.c[offset]))
case d2:
return acc.baseT + model.Time(binary.LittleEndian.Uint16(acc.c[offset:]))
case d4:
return acc.baseT + model.Time(binary.LittleEndian.Uint32(acc.c[offset:]))
case d8:
// Take absolute value for d8.
return model.Time(binary.LittleEndian.Uint64(acc.c[offset:]))
default:
acc.lastErr = fmt.Errorf("invalid number of bytes for time delta: %d", acc.tBytes)
return model.Earliest
}
}
func (acc *deltaEncodedIndexAccessor) sampleValueAtIndex(idx int) model.SampleValue {
offset := deltaHeaderBytes + idx*int(acc.tBytes+acc.vBytes) + int(acc.tBytes)
if acc.isInt {
switch acc.vBytes {
case d0:
return acc.baseV
case d1:
return acc.baseV + model.SampleValue(int8(acc.c[offset]))
case d2:
return acc.baseV + model.SampleValue(int16(binary.LittleEndian.Uint16(acc.c[offset:])))
case d4:
return acc.baseV + model.SampleValue(int32(binary.LittleEndian.Uint32(acc.c[offset:])))
// No d8 for ints.
default:
acc.lastErr = fmt.Errorf("invalid number of bytes for integer delta: %d", acc.vBytes)
return 0
}
} else {
switch acc.vBytes {
case d4:
return acc.baseV + model.SampleValue(math.Float32frombits(binary.LittleEndian.Uint32(acc.c[offset:])))
case d8:
// Take absolute value for d8.
return model.SampleValue(math.Float64frombits(binary.LittleEndian.Uint64(acc.c[offset:])))
default:
acc.lastErr = fmt.Errorf("invalid number of bytes for floating point delta: %d", acc.vBytes)
return 0
}
}
}

@ -84,26 +84,28 @@ func newDoubleDeltaEncodedChunk(tb, vb deltaBytes, isInt bool, length int) *doub
}
// Add implements chunk.
func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
func (c *doubleDeltaEncodedChunk) Add(s model.SamplePair) (Chunk, error) {
// TODO(beorn7): Since we return &c, this method might cause an unnecessary allocation.
if c.Len() == 0 {
return c.addFirstSample(s), nil
c.addFirstSample(s)
return nil, nil
}
tb := c.timeBytes()
vb := c.valueBytes()
if c.Len() == 1 {
return c.addSecondSample(s, tb, vb)
err := c.addSecondSample(s, tb, vb)
return nil, err
}
remainingBytes := cap(c) - len(c)
remainingBytes := cap(*c) - len(*c)
sampleSize := c.sampleSize()
// Do we generally have space for another sample in this chunk? If not,
// overflow into a new one.
if remainingBytes < sampleSize {
return addToOverflowChunk(&c, s)
return addToOverflowChunk(s)
}
projectedTime := c.baseTime() + model.Time(c.Len())*c.baseTimeDelta()
@ -133,26 +135,47 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
}
}
if tb != ntb || vb != nvb || c.isInt() != nInt {
if len(c)*2 < cap(c) {
return transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(c)), &c, s)
if len(*c)*2 < cap(*c) {
result, err := transcodeAndAdd(newDoubleDeltaEncodedChunk(ntb, nvb, nInt, cap(*c)), c, s)
if err != nil {
return nil, err
}
// We cannot handle >2 chunks returned as we can only return 1 chunk.
// Ideally there wont be >2 chunks, but if it happens to be >2,
// we fall through to perfom `addToOverflowChunk` instead.
if len(result) == 1 {
// Replace the current chunk with the new bigger chunk.
c0 := result[0].(*doubleDeltaEncodedChunk)
*c = *c0
return nil, nil
} else if len(result) == 2 {
// Replace the current chunk with the new bigger chunk
// and return the additional chunk.
c0 := result[0].(*doubleDeltaEncodedChunk)
c1 := result[1].(*doubleDeltaEncodedChunk)
*c = *c0
return c1, nil
}
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
return addToOverflowChunk(&c, s)
// We also perform this if `transcodeAndAdd` resulted in >2 chunks.
return addToOverflowChunk(s)
}
offset := len(c)
c = c[:offset+sampleSize]
offset := len(*c)
(*c) = (*c)[:offset+sampleSize]
switch tb {
case d1:
c[offset] = byte(ddt)
(*c)[offset] = byte(ddt)
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(ddt))
binary.LittleEndian.PutUint16((*c)[offset:], uint16(ddt))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(ddt))
binary.LittleEndian.PutUint32((*c)[offset:], uint32(ddt))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], uint64(s.Timestamp))
binary.LittleEndian.PutUint64((*c)[offset:], uint64(s.Timestamp))
default:
return nil, fmt.Errorf("invalid number of bytes for time delta: %d", tb)
}
@ -164,11 +187,11 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
case d0:
// No-op. Constant delta is stored as base value.
case d1:
c[offset] = byte(int8(ddv))
(*c)[offset] = byte(int8(ddv))
case d2:
binary.LittleEndian.PutUint16(c[offset:], uint16(int16(ddv)))
binary.LittleEndian.PutUint16((*c)[offset:], uint16(int16(ddv)))
case d4:
binary.LittleEndian.PutUint32(c[offset:], uint32(int32(ddv)))
binary.LittleEndian.PutUint32((*c)[offset:], uint32(int32(ddv)))
// d8 must not happen. Those samples are encoded as float64.
default:
return nil, fmt.Errorf("invalid number of bytes for integer delta: %d", vb)
@ -176,15 +199,15 @@ func (c doubleDeltaEncodedChunk) Add(s model.SamplePair) ([]Chunk, error) {
} else {
switch vb {
case d4:
binary.LittleEndian.PutUint32(c[offset:], math.Float32bits(float32(ddv)))
binary.LittleEndian.PutUint32((*c)[offset:], math.Float32bits(float32(ddv)))
case d8:
// Store the absolute value (no delta) in case of d8.
binary.LittleEndian.PutUint64(c[offset:], math.Float64bits(float64(s.Value)))
binary.LittleEndian.PutUint64((*c)[offset:], math.Float64bits(float64(s.Value)))
default:
return nil, fmt.Errorf("invalid number of bytes for floating point delta: %d", vb)
}
}
return []Chunk{&c}, nil
return nil, nil
}
// FirstTime implements chunk.
@ -243,15 +266,15 @@ func (c doubleDeltaEncodedChunk) MarshalToBuf(buf []byte) error {
// UnmarshalFromBuf implements chunk.
func (c *doubleDeltaEncodedChunk) UnmarshalFromBuf(buf []byte) error {
*c = (*c)[:cap(*c)]
copy(*c, buf)
(*c) = (*c)[:cap((*c))]
copy((*c), buf)
return c.setLen()
}
// setLen sets the length of the underlying slice and performs some sanity checks.
func (c *doubleDeltaEncodedChunk) setLen() error {
l := binary.LittleEndian.Uint16((*c)[doubleDeltaHeaderBufLenOffset:])
if int(l) > cap(*c) {
if int(l) > cap((*c)) {
return fmt.Errorf("doubledelta chunk length exceeded during unmarshalling: %d", l)
}
if int(l) < doubleDeltaHeaderMinBytes {
@ -269,7 +292,7 @@ func (c *doubleDeltaEncodedChunk) setLen() error {
default:
return fmt.Errorf("invalid number of value bytes in doubledelta chunk: %d", c.valueBytes())
}
*c = (*c)[:l]
(*c) = (*c)[:l]
return nil
}
@ -356,40 +379,39 @@ func (c doubleDeltaEncodedChunk) isInt() bool {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) []Chunk {
c = c[:doubleDeltaHeaderBaseValueOffset+8]
func (c *doubleDeltaEncodedChunk) addFirstSample(s model.SamplePair) {
(*c) = (*c)[:doubleDeltaHeaderBaseValueOffset+8]
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeOffset:],
(*c)[doubleDeltaHeaderBaseTimeOffset:],
uint64(s.Timestamp),
)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueOffset:],
(*c)[doubleDeltaHeaderBaseValueOffset:],
math.Float64bits(float64(s.Value)),
)
return []Chunk{&c}
}
// addSecondSample is a helper method only used by c.add(). It calculates the
// base delta from the provided sample and adds it to the chunk.
func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) ([]Chunk, error) {
func (c *doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb deltaBytes) error {
baseTimeDelta := s.Timestamp - c.baseTime()
if baseTimeDelta < 0 {
return nil, fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
return fmt.Errorf("base time delta is less than zero: %v", baseTimeDelta)
}
c = c[:doubleDeltaHeaderBytes]
(*c) = (*c)[:doubleDeltaHeaderBytes]
if tb >= d8 || bytesNeededForUnsignedTimestampDelta(baseTimeDelta) >= d8 {
// If already the base delta needs d8 (or we are at d8
// already, anyway), we better encode this timestamp
// directly rather than as a delta and switch everything
// to d8.
c[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
(*c)[doubleDeltaHeaderTimeBytesOffset] = byte(d8)
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
(*c)[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(s.Timestamp),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseTimeDeltaOffset:],
(*c)[doubleDeltaHeaderBaseTimeDeltaOffset:],
uint64(baseTimeDelta),
)
}
@ -400,19 +422,19 @@ func (c doubleDeltaEncodedChunk) addSecondSample(s model.SamplePair, tb, vb delt
// if we are at d8 already, anyway), we better encode
// this value directly rather than as a delta and switch
// everything to d8.
c[doubleDeltaHeaderValueBytesOffset] = byte(d8)
c[doubleDeltaHeaderIsIntOffset] = 0
(*c)[doubleDeltaHeaderValueBytesOffset] = byte(d8)
(*c)[doubleDeltaHeaderIsIntOffset] = 0
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
(*c)[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(s.Value)),
)
} else {
binary.LittleEndian.PutUint64(
c[doubleDeltaHeaderBaseValueDeltaOffset:],
(*c)[doubleDeltaHeaderBaseValueDeltaOffset:],
math.Float64bits(float64(baseValueDelta)),
)
}
return []Chunk{&c}, nil
return nil
}
// doubleDeltaEncodedIndexAccessor implements indexAccessor.

@ -1,6 +1,7 @@
package encoding
import (
"errors"
"flag"
"fmt"
"strconv"
@ -26,6 +27,15 @@ func (Config) RegisterFlags(f *flag.FlagSet) {
flag.IntVar(&bigchunkSizeCapBytes, "store.bigchunk-size-cap-bytes", bigchunkSizeCapBytes, "When using bigchunk encoding, start a new bigchunk if over this size (0 = unlimited)")
}
// Validate errors out if the encoding is set to Delta.
func (Config) Validate() error {
if DefaultEncoding == Delta {
// Delta is deprecated.
return errors.New("delta encoding is deprecated")
}
return nil
}
// String implements flag.Value.
func (e Encoding) String() string {
if known, found := encodings[e]; found {
@ -35,7 +45,8 @@ func (e Encoding) String() string {
}
const (
// Delta encoding
// Delta encoding is no longer supported and will be automatically changed to DoubleDelta.
// It still exists here to not change the `ingester.chunk-encoding` flag values.
Delta Encoding = iota
// DoubleDelta encoding
DoubleDelta
@ -51,12 +62,6 @@ type encoding struct {
}
var encodings = map[Encoding]encoding{
Delta: {
Name: "Delta",
New: func() Chunk {
return newDeltaEncodedChunk(d1, d0, true, ChunkLen)
},
},
DoubleDelta: {
Name: "DoubleDelta",
New: func() Chunk {

@ -260,17 +260,20 @@ func newVarbitChunk(enc varbitValueEncoding) *varbitChunk {
}
// Add implements chunk.
func (c *varbitChunk) Add(s model.SamplePair) ([]Chunk, error) {
func (c *varbitChunk) Add(s model.SamplePair) (Chunk, error) {
offset := c.nextSampleOffset()
switch {
case c.closed():
return addToOverflowChunk(c, s)
return addToOverflowChunk(s)
case offset > varbitNextSampleBitOffsetThreshold:
return c.addLastSample(s), nil
c.addLastSample(s)
return nil, nil
case offset == varbitFirstSampleBitOffset:
return c.addFirstSample(s), nil
c.addFirstSample(s)
return nil, nil
case offset == varbitSecondSampleBitOffset:
return c.addSecondSample(s)
err := c.addSecondSample(s)
return nil, err
}
return c.addLaterSample(s, offset)
}
@ -492,7 +495,7 @@ func (c varbitChunk) setLastSample(s model.SamplePair) {
// addFirstSample is a helper method only used by c.add(). It adds timestamp and
// value as base time and value.
func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk {
func (c *varbitChunk) addFirstSample(s model.SamplePair) {
binary.BigEndian.PutUint64(
(*c)[varbitFirstTimeOffset:],
uint64(s.Timestamp),
@ -503,21 +506,21 @@ func (c *varbitChunk) addFirstSample(s model.SamplePair) []Chunk {
)
c.setLastSample(s) // To simplify handling of single-sample chunks.
c.setNextSampleOffset(varbitSecondSampleBitOffset)
return []Chunk{c}
}
// addSecondSample is a helper method only used by c.add(). It calculates the
// first time delta from the provided sample and adds it to the chunk together
// with the provided sample as the last sample.
func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) {
func (c *varbitChunk) addSecondSample(s model.SamplePair) error {
firstTimeDelta := s.Timestamp - c.firstTime()
if firstTimeDelta < 0 {
return nil, fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
return fmt.Errorf("first Δt is less than zero: %v", firstTimeDelta)
}
if firstTimeDelta > varbitMaxTimeDelta {
// A time delta too great. Still, we can add it as a last sample
// before overflowing.
return c.addLastSample(s), nil
c.addLastSample(s)
return nil
}
(*c)[varbitFirstTimeDeltaOffset] = byte(firstTimeDelta >> 16)
(*c)[varbitFirstTimeDeltaOffset+1] = byte(firstTimeDelta >> 8)
@ -529,7 +532,7 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) {
c.setLastSample(s)
c.setNextSampleOffset(varbitThirdSampleBitOffset)
return []Chunk{c}, nil
return nil
}
// addLastSample is a helper method only used by c.add() and in other helper
@ -538,15 +541,15 @@ func (c *varbitChunk) addSecondSample(s model.SamplePair) ([]Chunk, error) {
// adds the very last sample added to this chunk ever, while setLastSample sets
// the sample most recently added to the chunk so that it can be used for the
// calculations required to add the next sample.
func (c *varbitChunk) addLastSample(s model.SamplePair) []Chunk {
func (c *varbitChunk) addLastSample(s model.SamplePair) {
c.setLastSample(s)
(*c)[varbitFlagOffset] |= 0x80
return []Chunk{c}
return
}
// addLaterSample is a helper method only used by c.add(). It adds a third or
// later sample.
func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk, error) {
func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) (Chunk, error) {
var (
lastTime = c.lastTime()
lastTimeDelta = c.lastTimeDelta()
@ -564,39 +567,88 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
if newTimeDelta > varbitMaxTimeDelta {
// A time delta too great. Still, we can add it as a last sample
// before overflowing.
return c.addLastSample(s), nil
c.addLastSample(s)
return nil, nil
}
// Analyze worst case, does it fit? If not, set new sample as the last.
if int(offset)+varbitWorstCaseBitsPerSample[encoding] > ChunkLen*8 {
return c.addLastSample(s), nil
c.addLastSample(s)
return nil, nil
}
// Transcoding/overflow decisions first.
if encoding == varbitZeroEncoding && s.Value != lastValue {
// Cannot go on with zero encoding.
if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
}
if offset <= ChunkLen*4 {
var result []Chunk
var err error
if isInt32(s.Value - lastValue) {
// Trying int encoding looks promising.
return transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s)
result, err = transcodeAndAdd(newVarbitChunk(varbitIntDoubleDeltaEncoding), c, s)
} else {
result, err = transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
}
if err != nil {
return nil, err
}
return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
// We cannot handle >2 chunks returned as we can only return 1 chunk.
// Ideally there wont be >2 chunks, but if it happens to be >2,
// we fall through to perfom `addToOverflowChunk` instead.
if len(result) == 1 {
// Replace the current chunk with the new bigger chunk.
c0 := result[0].(*varbitChunk)
*c = *c0
return nil, nil
} else if len(result) == 2 {
// Replace the current chunk with the new bigger chunk
// and return the additional chunk.
c0 := result[0].(*varbitChunk)
c1 := result[1].(*varbitChunk)
*c = *c0
return c1, nil
}
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
// We also perform this if `transcodeAndAdd` resulted in >2 chunks.
return addToOverflowChunk(s)
}
if encoding == varbitIntDoubleDeltaEncoding && !isInt32(s.Value-lastValue) {
// Cannot go on with int encoding.
if offset > ChunkLen*4 {
// Chunk already half full. Don't transcode, overflow instead.
return addToOverflowChunk(c, s)
if offset <= ChunkLen*4 {
result, err := transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
if err != nil {
return nil, err
}
return transcodeAndAdd(newVarbitChunk(varbitXOREncoding), c, s)
// We cannot handle >2 chunks returned as we can only return 1 chunk.
// Ideally there wont be >2 chunks, but if it happens to be >2,
// we fall through to perfom `addToOverflowChunk` instead.
if len(result) == 1 {
// Replace the current chunk with the new bigger chunk.
c0 := result[0].(*varbitChunk)
*c = *c0
return nil, nil
} else if len(result) == 2 {
// Replace the current chunk with the new bigger chunk
// and return the additional chunk.
c0 := result[0].(*varbitChunk)
c1 := result[1].(*varbitChunk)
*c = *c0
return c1, nil
}
}
// Chunk is already half full. Better create a new one and save the transcoding efforts.
// We also perform this if `transcodeAndAdd` resulted in >2 chunks.
return addToOverflowChunk(s)
}
offset, overflow := c.addDDTime(offset, lastTimeDelta, newTimeDelta)
if overflow {
return c.addLastSample(s), nil
c.addLastSample(s)
return nil, nil
}
switch encoding {
case varbitZeroEncoding:
@ -613,7 +665,7 @@ func (c *varbitChunk) addLaterSample(s model.SamplePair, offset uint16) ([]Chunk
c.setNextSampleOffset(offset)
c.setLastSample(s)
return []Chunk{c}, nil
return nil, nil
}
func (c varbitChunk) prepForThirdSample(

@ -8,6 +8,7 @@ import (
"flag"
"fmt"
"strings"
"time"
"cloud.google.com/go/bigtable"
ot "github.com/opentracing/opentracing-go"
@ -26,7 +27,6 @@ const (
column = "c"
separator = "\000"
maxRowReads = 100
null = string('\xff')
)
// Config for a StorageClient
@ -38,12 +38,17 @@ type Config struct {
ColumnKey bool
DistributeKeys bool
TableCacheEnabled bool
TableCacheExpiration time.Duration
}
// RegisterFlags adds the flags required to config this to the given FlagSet
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Project, "bigtable.project", "", "Bigtable project ID.")
f.StringVar(&cfg.Instance, "bigtable.instance", "", "Bigtable instance ID.")
f.BoolVar(&cfg.TableCacheEnabled, "bigtable.table-cache.enabled", true, "If enabled, once a tables info is fetched, it is cached.")
f.DurationVar(&cfg.TableCacheExpiration, "bigtable.table-cache.expiration", 30*time.Minute, "Duration to cache tables before checking again.")
cfg.GRPCClientConfig.RegisterFlags("bigtable", f)
}

@ -2,6 +2,7 @@ package gcp
import (
"context"
"time"
"google.golang.org/grpc/codes"
@ -15,6 +16,9 @@ import (
type tableClient struct {
cfg Config
client *bigtable.AdminClient
tableInfo map[string]*bigtable.TableInfo
tableExpiration time.Time
}
// NewTableClient returns a new TableClient.
@ -27,25 +31,37 @@ func NewTableClient(ctx context.Context, cfg Config) (chunk.TableClient, error)
return &tableClient{
cfg: cfg,
client: client,
tableInfo: map[string]*bigtable.TableInfo{},
}, nil
}
// ListTables lists all of the correctly specified cortex tables in bigtable
func (c *tableClient) ListTables(ctx context.Context) ([]string, error) {
tables, err := c.client.Tables(ctx)
if err != nil {
return nil, errors.Wrap(err, "client.Tables")
}
// Check each table has the right column family. If not, omit it.
if c.tableExpiration.Before(time.Now()) {
c.tableInfo = map[string]*bigtable.TableInfo{}
c.tableExpiration = time.Now().Add(c.cfg.TableCacheExpiration)
}
output := make([]string, 0, len(tables))
for _, table := range tables {
info, err := c.client.TableInfo(ctx, table)
info, exists := c.tableInfo[table]
if !c.cfg.TableCacheEnabled || !exists {
info, err = c.client.TableInfo(ctx, table)
if err != nil {
return nil, errors.Wrap(err, "client.TableInfo")
}
}
// Check each table has the right column family. If not, omit it.
if hasColumnFamily(info.FamilyInfos) {
output = append(output, table)
c.tableInfo[table] = info
}
}
@ -86,6 +102,7 @@ func (c *tableClient) DeleteTable(ctx context.Context, name string) error {
if err := c.client.DeleteTable(ctx, name); err != nil {
return errors.Wrap(err, "client.DeleteTable")
}
delete(c.tableInfo, name)
return nil
}

@ -165,6 +165,7 @@ func (m *MockStorage) BatchWrite(ctx context.Context, batch WriteBatch) error {
itemComponents := decodeRangeKey(items[i].rangeValue)
if !bytes.Equal(itemComponents[3], metricNameRangeKeyV1) &&
!bytes.Equal(itemComponents[3], seriesRangeKeyV1) &&
!bytes.Equal(itemComponents[3], labelNamesRangeKeyV1) &&
!bytes.Equal(itemComponents[3], labelSeriesRangeKeyV1) {
return fmt.Errorf("Dupe write")
}

@ -22,7 +22,6 @@ var bucketName = []byte("index")
const (
separator = "\000"
null = string('\xff')
dbReloadPeriod = 10 * time.Minute
)

@ -7,6 +7,7 @@ import (
"fmt"
"strings"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
)
@ -22,6 +23,8 @@ var (
// For v9 schema
seriesRangeKeyV1 = []byte{'7'}
labelSeriesRangeKeyV1 = []byte{'8'}
// For v11 schema
labelNamesRangeKeyV1 = []byte{'9'}
// ErrNotSupported when a schema doesn't support that particular lookup.
ErrNotSupported = errors.New("not supported")
@ -45,6 +48,8 @@ type Schema interface {
// If the query resulted in series IDs, use this method to find chunks.
GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
// Returns queries to retrieve all label names of multiple series by id.
GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error)
}
// IndexQuery describes a query for entries
@ -78,9 +83,11 @@ type IndexEntry struct {
Value []byte
}
type schemaBucketsFunc func(from, through model.Time, userID string) []Bucket
// schema implements Schema given a bucketing function and and set of range key callbacks
type schema struct {
buckets func(from, through model.Time, userID string) []Bucket
buckets schemaBucketsFunc
entries entries
}
@ -194,6 +201,20 @@ func (s schema) GetChunksForSeries(from, through model.Time, userID string, seri
return result, nil
}
func (s schema) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
var result []IndexQuery
buckets := s.buckets(from, through, userID)
for _, bucket := range buckets {
entries, err := s.entries.GetLabelNamesForSeries(bucket, seriesID)
if err != nil {
return nil, err
}
result = append(result, entries...)
}
return result, nil
}
type entries interface {
GetWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error)
@ -203,6 +224,7 @@ type entries interface {
GetReadMetricLabelQueries(bucket Bucket, metricName string, labelName string) ([]IndexQuery, error)
GetReadMetricLabelValueQueries(bucket Bucket, metricName string, labelName string, labelValue string) ([]IndexQuery, error)
GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error)
}
// original entries:
@ -274,6 +296,10 @@ func (originalEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, err
return nil, ErrNotSupported
}
func (originalEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v3Schema went to base64 encoded label values & a version ID
// - range key: <label name>\0<base64(label value)>\0<chunk name>\0<version 1>
@ -389,6 +415,10 @@ func (labelNameInHashKeyEntries) GetChunksForSeries(_ Bucket, _ []byte) ([]Index
return nil, ErrNotSupported
}
func (labelNameInHashKeyEntries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v5 schema is an extension of v4, with the chunk end time in the
// range key to improve query latency. However, it did it wrong
// so the chunk end times are ignored.
@ -459,6 +489,10 @@ func (v5Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
func (v5Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v6Entries fixes issues with v5 time encoding being wrong (see #337), and
// moves label value out of range key (see #199).
type v6Entries struct{}
@ -535,6 +569,10 @@ func (v6Entries) GetChunksForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
func (v6Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v9Entries adds a layer of indirection between labels -> series -> chunks.
type v9Entries struct {
}
@ -630,6 +668,10 @@ func (v9Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQuer
}, nil
}
func (v9Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v10Entries builds on v9 by sharding index rows to reduce their size.
type v10Entries struct {
rowShards uint32
@ -734,3 +776,72 @@ func (v10Entries) GetChunksForSeries(bucket Bucket, seriesID []byte) ([]IndexQue
},
}, nil
}
func (v10Entries) GetLabelNamesForSeries(_ Bucket, _ []byte) ([]IndexQuery, error) {
return nil, ErrNotSupported
}
// v11Entries builds on v10 but adds index entries for each series to store respective labels.
type v11Entries struct {
v10Entries
}
func (s v11Entries) GetLabelWriteEntries(bucket Bucket, metricName string, labels labels.Labels, chunkID string) ([]IndexEntry, error) {
seriesID := labelsSeriesID(labels)
// read first 32 bits of the hash and use this to calculate the shard
shard := binary.BigEndian.Uint32(seriesID) % s.rowShards
labelNames := make([]string, 0, len(labels))
for _, l := range labels {
if l.Name == model.MetricNameLabel {
continue
}
labelNames = append(labelNames, l.Name)
}
data, err := jsoniter.ConfigFastest.Marshal(labelNames)
if err != nil {
return nil, err
}
entries := []IndexEntry{
// Entry for metricName -> seriesID
{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s", shard, bucket.hashKey, metricName),
RangeValue: encodeRangeKey(seriesID, nil, nil, seriesRangeKeyV1),
},
// Entry for seriesID -> label names
{
TableName: bucket.tableName,
HashValue: string(seriesID),
RangeValue: encodeRangeKey(nil, nil, nil, labelNamesRangeKeyV1),
Value: data,
},
}
// Entries for metricName:labelName -> hash(value):seriesID
// We use a hash of the value to limit its length.
for _, v := range labels {
if v.Name == model.MetricNameLabel {
continue
}
valueHash := sha256bytes(v.Value)
entries = append(entries, IndexEntry{
TableName: bucket.tableName,
HashValue: fmt.Sprintf("%02d:%s:%s:%s", shard, bucket.hashKey, metricName, v.Name),
RangeValue: encodeRangeKey(valueHash, seriesID, nil, labelSeriesRangeKeyV1),
Value: []byte(v.Value),
})
}
return entries, nil
}
func (v11Entries) GetLabelNamesForSeries(bucket Bucket, seriesID []byte) ([]IndexQuery, error) {
return []IndexQuery{
{
TableName: bucket.tableName,
HashValue: string(seriesID),
},
}, nil
}

@ -14,81 +14,60 @@ type schemaCaching struct {
}
func (s *schemaCaching) GetReadQueriesForMetric(from, through model.Time, userID string, metricName string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
})
queries, err := s.Schema.GetReadQueriesForMetric(from, through, userID, metricName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) GetReadQueriesForMetricLabel(from, through model.Time, userID string, metricName string, labelName string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
})
queries, err := s.Schema.GetReadQueriesForMetricLabel(from, through, userID, metricName, labelName)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
func (s *schemaCaching) GetReadQueriesForMetricLabelValue(from, through model.Time, userID string, metricName string, labelName string, labelValue string) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
})
queries, err := s.Schema.GetReadQueriesForMetricLabelValue(from, through, userID, metricName, labelName, labelValue)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
// If the query resulted in series IDs, use this method to find chunks.
func (s *schemaCaching) GetChunksForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
return s.splitTimesByCacheability(from, through, func(from, through model.Time) ([]IndexQuery, error) {
return s.Schema.GetChunksForSeries(from, through, userID, seriesID)
})
}
func (s *schemaCaching) splitTimesByCacheability(from, through model.Time, f func(from, through model.Time) ([]IndexQuery, error)) ([]IndexQuery, error) {
var (
cacheableQueries []IndexQuery
activeQueries []IndexQuery
err error
cacheBefore = model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
)
if from.After(cacheBefore) {
activeQueries, err = f(from, through)
if err != nil {
return nil, err
}
} else if through.Before(cacheBefore) {
cacheableQueries, err = f(from, through)
queries, err := s.Schema.GetChunksForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}
} else {
cacheableQueries, err = f(from, cacheBefore)
if err != nil {
return nil, err
return s.setImmutability(from, through, queries), nil
}
activeQueries, err = f(cacheBefore, through)
func (s *schemaCaching) GetLabelNamesForSeries(from, through model.Time, userID string, seriesID []byte) ([]IndexQuery, error) {
queries, err := s.Schema.GetLabelNamesForSeries(from, through, userID, seriesID)
if err != nil {
return nil, err
}
return s.setImmutability(from, through, queries), nil
}
return mergeCacheableAndActiveQueries(cacheableQueries, activeQueries), nil
}
func mergeCacheableAndActiveQueries(cacheableQueries []IndexQuery, activeQueries []IndexQuery) []IndexQuery {
finalQueries := make([]IndexQuery, 0, len(cacheableQueries)+len(activeQueries))
Outer:
for _, cq := range cacheableQueries {
for _, aq := range activeQueries {
// When deduping, the bucket values only influence TableName and HashValue
// and just checking those is enough.
if cq.TableName == aq.TableName && cq.HashValue == aq.HashValue {
continue Outer
func (s *schemaCaching) setImmutability(from, through model.Time, queries []IndexQuery) []IndexQuery {
cacheBefore := model.TimeFromUnix(mtime.Now().Add(-s.cacheOlderThan).Unix())
// If the entire query is cacheable then cache it.
// While not super effective stand-alone, when combined with query-frontend and splitting,
// old queries will mostly be all behind boundary.
// To cleanly split cacheable and non-cacheable ranges, we'd need bucket start and end times
// which we don't know.
// See: https://github.com/cortexproject/cortex/issues/1698
if through.Before(cacheBefore) {
for i := range queries {
queries[i].Immutable = true
}
}
cq.Immutable = true
finalQueries = append(finalQueries, cq)
}
finalQueries = append(finalQueries, activeQueries...)
return finalQueries
return queries
}

@ -1,6 +1,7 @@
package chunk
import (
"errors"
"flag"
"fmt"
"os"
@ -23,6 +24,11 @@ const (
millisecondsInDay = int64(24 * time.Hour / time.Millisecond)
)
var (
errInvalidSchemaVersion = errors.New("invalid schema version")
errInvalidTablePeriod = errors.New("the table period must be a multiple of 24h (1h for schema v1)")
)
// PeriodConfig defines the schema and tables to use for a period of time
type PeriodConfig struct {
From DayTime `yaml:"from"` // used when working with config
@ -116,8 +122,7 @@ func (cfg *LegacySchemaConfig) RegisterFlags(f *flag.FlagSet) {
cfg.ChunkTables.RegisterFlags("dynamodb.chunk-table", "cortex_chunks_", f)
}
// translate from command-line parameters into new config data structure
func (cfg *SchemaConfig) translate() error {
func (cfg *SchemaConfig) loadFromFlags() error {
cfg.Configs = []PeriodConfig{}
add := func(t string, f model.Time) {
@ -172,6 +177,30 @@ func (cfg *SchemaConfig) translate() error {
return nil
}
// loadFromFile loads the schema config from a yaml file
func (cfg *SchemaConfig) loadFromFile() error {
f, err := os.Open(cfg.fileName)
if err != nil {
return err
}
decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
return decoder.Decode(&cfg)
}
// Validate the schema config and returns an error if the validation
// doesn't pass
func (cfg *SchemaConfig) Validate() error {
for _, periodCfg := range cfg.Configs {
if err := periodCfg.validate(); err != nil {
return err
}
}
return nil
}
// ForEachAfter will call f() on every entry after t, splitting
// entries if necessary so there is an entry starting at t
func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)) {
@ -190,33 +219,75 @@ func (cfg *SchemaConfig) ForEachAfter(t model.Time, f func(config *PeriodConfig)
// CreateSchema returns the schema defined by the PeriodConfig
func (cfg PeriodConfig) CreateSchema() Schema {
var s schema
rowShards := uint32(16)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
}
var e entries
switch cfg.Schema {
case "v1":
s = schema{cfg.hourlyBuckets, originalEntries{}}
e = originalEntries{}
case "v2":
s = schema{cfg.dailyBuckets, originalEntries{}}
e = originalEntries{}
case "v3":
s = schema{cfg.dailyBuckets, base64Entries{originalEntries{}}}
e = base64Entries{originalEntries{}}
case "v4":
s = schema{cfg.dailyBuckets, labelNameInHashKeyEntries{}}
e = labelNameInHashKeyEntries{}
case "v5":
s = schema{cfg.dailyBuckets, v5Entries{}}
e = v5Entries{}
case "v6":
s = schema{cfg.dailyBuckets, v6Entries{}}
e = v6Entries{}
case "v9":
s = schema{cfg.dailyBuckets, v9Entries{}}
e = v9Entries{}
case "v10":
rowShards := uint32(16)
if cfg.RowShards > 0 {
rowShards = cfg.RowShards
e = v10Entries{
rowShards: rowShards,
}
s = schema{cfg.dailyBuckets, v10Entries{
case "v11":
e = v11Entries{
v10Entries: v10Entries{
rowShards: rowShards,
}}
},
}
default:
return nil
}
buckets, _ := cfg.createBucketsFunc()
return schema{buckets, e}
}
func (cfg PeriodConfig) createBucketsFunc() (schemaBucketsFunc, time.Duration) {
switch cfg.Schema {
case "v1":
return cfg.hourlyBuckets, 1 * time.Hour
default:
return cfg.dailyBuckets, 24 * time.Hour
}
}
// validate the period config
func (cfg PeriodConfig) validate() error {
// Ensure the schema version exists
schema := cfg.CreateSchema()
if schema == nil {
return errInvalidSchemaVersion
}
// Ensure the tables period is a multiple of the bucket period
_, bucketsPeriod := cfg.createBucketsFunc()
if cfg.IndexTables.Period > 0 && cfg.IndexTables.Period%bucketsPeriod != 0 {
return errInvalidTablePeriod
}
return s
if cfg.ChunkTables.Period > 0 && cfg.ChunkTables.Period%bucketsPeriod != 0 {
return errInvalidTablePeriod
}
return nil
}
// Load the yaml file, or build the config from legacy command-line flags
@ -224,18 +295,21 @@ func (cfg *SchemaConfig) Load() error {
if len(cfg.Configs) > 0 {
return nil
}
// Load config from file (if provided), falling back to CLI flags
var err error
if cfg.fileName == "" {
return cfg.translate()
err = cfg.loadFromFlags()
} else {
err = cfg.loadFromFile()
}
f, err := os.Open(cfg.fileName)
if err != nil {
return err
}
decoder := yaml.NewDecoder(f)
decoder.SetStrict(true)
return decoder.Decode(&cfg)
return cfg.Validate()
}
// PrintYaml dumps the yaml to stdout, to aid in migration

@ -4,8 +4,10 @@ import (
"context"
"fmt"
"net/http"
"sort"
"github.com/go-kit/kit/log/level"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
@ -199,6 +201,25 @@ func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, userID string
}
level.Debug(log).Log("series-ids", len(seriesIDs))
// Lookup the series in the index to get label names.
labelNames, err := c.lookupLabelNamesBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
// looking up metrics by series is not supported falling back on chunks
if err == ErrNotSupported {
return c.lookupLabelNamesByChunks(ctx, from, through, userID, seriesIDs)
}
level.Error(log).Log("msg", "lookupLabelNamesBySeries", "err", err)
return nil, err
}
level.Debug(log).Log("labelNames", len(labelNames))
return labelNames, nil
}
func (c *seriesStore) lookupLabelNamesByChunks(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupLabelNamesByChunks")
defer log.Span.Finish()
// Lookup the series in the index to get the chunks.
chunkIDs, err := c.lookupChunksBySeries(ctx, from, through, userID, seriesIDs)
if err != nil {
@ -228,7 +249,6 @@ func (c *seriesStore) LabelNamesForMetricName(ctx context.Context, userID string
}
return labelNamesFromChunks(allChunks), nil
}
func (c *seriesStore) lookupSeriesByMetricNameMatchers(ctx context.Context, from, through model.Time, userID, metricName string, matchers []*labels.Matcher) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupSeriesByMetricNameMatchers", "metricName", metricName, "matchers", len(matchers))
defer log.Span.Finish()
@ -367,6 +387,44 @@ func (c *seriesStore) lookupChunksBySeries(ctx context.Context, from, through mo
return result, err
}
func (c *seriesStore) lookupLabelNamesBySeries(ctx context.Context, from, through model.Time, userID string, seriesIDs []string) ([]string, error) {
log, ctx := spanlogger.New(ctx, "SeriesStore.lookupLabelNamesBySeries")
defer log.Span.Finish()
level.Debug(log).Log("seriesIDs", len(seriesIDs))
queries := make([]IndexQuery, 0, len(seriesIDs))
for _, seriesID := range seriesIDs {
qs, err := c.schema.GetLabelNamesForSeries(from, through, userID, []byte(seriesID))
if err != nil {
return nil, err
}
queries = append(queries, qs...)
}
level.Debug(log).Log("queries", len(queries))
entries, err := c.lookupEntriesByQueries(ctx, queries)
if err != nil {
return nil, err
}
level.Debug(log).Log("entries", len(entries))
result := []string{model.MetricNameLabel}
uniqueLabelNames := map[string]struct{}{model.MetricNameLabel: {}}
for _, entry := range entries {
lbs := []string{}
err := jsoniter.ConfigFastest.Unmarshal(entry.Value, &lbs)
if err != nil {
return nil, err
}
for _, l := range lbs {
if _, ok := uniqueLabelNames[l]; !ok {
uniqueLabelNames[l] = struct{}{}
result = append(result, l)
}
}
}
sort.Strings(result)
return result, nil
}
// Put implements ChunkStore
func (c *seriesStore) Put(ctx context.Context, chunks []Chunk) error {
for _, chunk := range chunks {

@ -217,7 +217,7 @@ func (m *TableManager) bucketRetentionLoop() {
// not and update those that need it. It is exposed for testing.
func (m *TableManager) SyncTables(ctx context.Context) error {
expected := m.calculateExpectedTables()
level.Info(util.Logger).Log("msg", "synching tables", "num_expected_tables", len(expected), "expected_tables", len(expected))
level.Info(util.Logger).Log("msg", "synching tables", "expected_tables", len(expected))
toCreate, toCheckThroughput, toDelete, err := m.partitionTables(ctx, expected)
if err != nil {

@ -71,21 +71,14 @@ func CreateChunks(startIndex, batchSize int, start model.Time) ([]string, []chun
return keys, chunks, nil
}
func dummyChunk(now model.Time) chunk.Chunk {
return dummyChunkFor(now, labels.Labels{
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
{Name: "toms", Value: "code"},
})
}
func dummyChunkFor(now model.Time, metric labels.Labels) chunk.Chunk {
cs, _ := promchunk.New().Add(model.SamplePair{Timestamp: now, Value: 0})
cs := promchunk.New()
cs.Add(model.SamplePair{Timestamp: now, Value: 0})
chunk := chunk.NewChunk(
userID,
client.Fingerprint(metric),
metric,
cs[0],
cs,
now.Add(-time.Hour),
now,
)

@ -2,6 +2,7 @@ package ring
import (
"context"
"fmt"
"sync"
"sync/atomic"
)
@ -37,6 +38,9 @@ type itemTracker struct {
//
// Not implemented as a method on Ring so we can test separately.
func DoBatch(ctx context.Context, r ReadRing, keys []uint32, callback func(IngesterDesc, []int) error, cleanup func()) error {
if r.IngesterCount() <= 0 {
return fmt.Errorf("DoBatch: IngesterCount <= 0")
}
expectedTrackers := len(keys) * (r.ReplicationFactor() + 1) / r.IngesterCount()
itemTrackers := make([]itemTracker, len(keys))
ingesters := make(map[string]ingester, r.IngesterCount())

@ -10,8 +10,9 @@ import (
"github.com/go-kit/kit/log/level"
consul "github.com/hashicorp/consul/api"
cleanhttp "github.com/hashicorp/go-cleanhttp"
"github.com/hashicorp/go-cleanhttp"
"github.com/weaveworks/common/instrument"
"golang.org/x/time/rate"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
@ -39,6 +40,8 @@ type Config struct {
ACLToken string
HTTPClientTimeout time.Duration
ConsistentReads bool
WatchKeyRateLimit float64 // Zero disables rate limit
WatchKeyBurstSize int // Burst when doing rate-limit, defaults to 1
}
type kv interface {
@ -62,6 +65,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, prefix string) {
f.StringVar(&cfg.ACLToken, prefix+"consul.acltoken", "", "ACL Token used to interact with Consul.")
f.DurationVar(&cfg.HTTPClientTimeout, prefix+"consul.client-timeout", 2*longPollDuration, "HTTP timeout when talking to Consul")
f.BoolVar(&cfg.ConsistentReads, prefix+"consul.consistent-reads", true, "Enable consistent reads to Consul.")
f.Float64Var(&cfg.WatchKeyRateLimit, prefix+"consul.watch-rate-limit", 0, "Rate limit when watching key or prefix in Consul, in requests per second. 0 disables the rate limit.")
f.IntVar(&cfg.WatchKeyBurstSize, prefix+"consul.watch-burst-size", 1, "Burst size used in rate limit. Values less than 1 are treated as 1.")
}
// NewClient returns a new Client.
@ -170,8 +175,17 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
var (
backoff = util.NewBackoff(ctx, backoffConfig)
index = uint64(0)
limiter = c.createRateLimiter()
)
for backoff.Ongoing() {
err := limiter.Wait(ctx)
if err != nil {
level.Error(util.Logger).Log("msg", "error while rate-limiting", "key", key, "err", err)
backoff.Wait()
continue
}
queryOptions := &consul.QueryOptions{
AllowStale: !c.cfg.ConsistentReads,
RequireConsistent: c.cfg.ConsistentReads,
@ -187,12 +201,11 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) b
}
backoff.Reset()
// Skip if the index is the same as last time, because the key value is
// guaranteed to be the same as last time
if index == meta.LastIndex {
skip := false
index, skip = checkLastIndex(index, meta.LastIndex)
if skip {
continue
}
index = meta.LastIndex
out, err := c.codec.Decode(kvp.Value)
if err != nil {
@ -212,8 +225,16 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
var (
backoff = util.NewBackoff(ctx, backoffConfig)
index = uint64(0)
limiter = c.createRateLimiter()
)
for backoff.Ongoing() {
err := limiter.Wait(ctx)
if err != nil {
level.Error(util.Logger).Log("msg", "error while rate-limiting", "prefix", prefix, "err", err)
backoff.Wait()
continue
}
queryOptions := &consul.QueryOptions{
AllowStale: !c.cfg.ConsistentReads,
RequireConsistent: c.cfg.ConsistentReads,
@ -228,13 +249,13 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
continue
}
backoff.Reset()
// Skip if the index is the same as last time, because the key value is
// guaranteed to be the same as last time
if index == meta.LastIndex {
skip := false
index, skip = checkLastIndex(index, meta.LastIndex)
if skip {
continue
}
index = meta.LastIndex
for _, kvp := range kvps {
out, err := c.codec.Decode(kvp.Value)
if err != nil {
@ -264,3 +285,33 @@ func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
}
return c.codec.Decode(kvp.Value)
}
func checkLastIndex(index, metaLastIndex uint64) (newIndex uint64, skip bool) {
// See https://www.consul.io/api/features/blocking.html#implementation-details for logic behind these checks.
if metaLastIndex == 0 {
// Don't just keep using index=0.
// After blocking request, returned index must be at least 1.
return 1, false
} else if metaLastIndex < index {
// Index reset.
return 0, false
} else if index == metaLastIndex {
// Skip if the index is the same as last time, because the key value is
// guaranteed to be the same as last time
return metaLastIndex, true
} else {
return metaLastIndex, false
}
}
func (c *Client) createRateLimiter() *rate.Limiter {
if c.cfg.WatchKeyRateLimit <= 0 {
// burst is ignored when limit = rate.Inf
return rate.NewLimiter(rate.Inf, 0)
}
burst := c.cfg.WatchKeyBurstSize
if burst < 1 {
burst = 1
}
return rate.NewLimiter(rate.Limit(c.cfg.WatchKeyRateLimit), burst)
}

@ -1,7 +1,7 @@
package consul
import (
fmt "fmt"
"fmt"
"sync"
"time"
@ -21,6 +21,11 @@ type mockKV struct {
// NewInMemoryClient makes a new mock consul client.
func NewInMemoryClient(codec codec.Codec) *Client {
return NewInMemoryClientWithConfig(codec, Config{})
}
// NewInMemoryClientWithConfig makes a new mock consul client with supplied Config.
func NewInMemoryClientWithConfig(codec codec.Codec, cfg Config) *Client {
m := mockKV{
kvps: map[string]*consul.KVPair{},
}
@ -29,6 +34,7 @@ func NewInMemoryClient(codec codec.Codec) *Client {
return &Client{
kv: &m,
codec: codec,
cfg: cfg,
}
}
@ -67,6 +73,8 @@ func (m *mockKV) Put(p *consul.KVPair, q *consul.WriteOptions) (*consul.WriteMet
}
m.cond.Broadcast()
level.Debug(util.Logger).Log("msg", "Put", "key", p.Key, "value", fmt.Sprintf("%.40q", p.Value), "modify_index", m.current)
return nil, nil
}
@ -109,9 +117,16 @@ func (m *mockKV) Get(key string, q *consul.QueryOptions) (*consul.KVPair, *consu
return nil, &consul.QueryMeta{LastIndex: m.current}, nil
}
if q.WaitTime > 0 {
if q.WaitIndex >= value.ModifyIndex && q.WaitTime > 0 {
deadline := time.Now().Add(q.WaitTime)
for q.WaitIndex >= value.ModifyIndex && time.Now().Before(deadline) {
if ctxDeadline, ok := q.Context().Deadline(); ok && ctxDeadline.Before(deadline) {
// respect deadline from context, if set.
deadline = ctxDeadline
}
// simply wait until value.ModifyIndex changes. This allows us to test reporting old index values by resetting them.
startModify := value.ModifyIndex
for startModify == value.ModifyIndex && time.Now().Before(deadline) {
m.cond.Wait()
}
if time.Now().After(deadline) {
@ -144,3 +159,25 @@ func (m *mockKV) List(prefix string, q *consul.QueryOptions) (consul.KVPairs, *c
}
return result, &consul.QueryMeta{LastIndex: m.current}, nil
}
func (m *mockKV) ResetIndex() {
m.mtx.Lock()
defer m.mtx.Unlock()
m.current = 0
m.cond.Broadcast()
level.Debug(util.Logger).Log("msg", "Reset")
}
func (m *mockKV) ResetIndexForKey(key string) {
m.mtx.Lock()
defer m.mtx.Unlock()
if value, ok := m.kvps[key]; ok {
value.ModifyIndex = 0
}
m.cond.Broadcast()
level.Debug(util.Logger).Log("msg", "ResetIndexForKey", "key", key)
}

@ -6,7 +6,6 @@ import (
"encoding/json"
"fmt"
"io"
"io/ioutil"
"net/http"
"github.com/blang/semver"
@ -57,31 +56,46 @@ func CompressionTypeFor(version string) CompressionType {
}
// ParseProtoReader parses a compressed proto from an io.Reader.
func ParseProtoReader(ctx context.Context, reader io.Reader, req proto.Message, compression CompressionType) ([]byte, error) {
func ParseProtoReader(ctx context.Context, reader io.Reader, expectedSize, maxSize int, req proto.Message, compression CompressionType) ([]byte, error) {
var body []byte
var err error
sp := opentracing.SpanFromContext(ctx)
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[start reading]"))
}
var buf bytes.Buffer
if expectedSize > 0 {
if expectedSize > maxSize {
return nil, fmt.Errorf("message expected size larger than max (%d vs %d)", expectedSize, maxSize)
}
buf.Grow(expectedSize + bytes.MinRead) // extra space guarantees no reallocation
}
switch compression {
case NoCompression:
body, err = ioutil.ReadAll(reader)
// Read from LimitReader with limit max+1. So if the underlying
// reader is over limit, the result will be bigger than max.
_, err = buf.ReadFrom(io.LimitReader(reader, int64(maxSize)+1))
body = buf.Bytes()
case FramedSnappy:
body, err = ioutil.ReadAll(snappy.NewReader(reader))
_, err = buf.ReadFrom(io.LimitReader(snappy.NewReader(reader), int64(maxSize)+1))
body = buf.Bytes()
case RawSnappy:
body, err = ioutil.ReadAll(reader)
_, err = buf.ReadFrom(reader)
body = buf.Bytes()
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[decompress]"),
otlog.Int("size", len(body)))
}
if err == nil {
if err == nil && len(body) <= maxSize {
body, err = snappy.Decode(nil, body)
}
}
if err != nil {
return nil, err
}
if len(body) > maxSize {
return nil, fmt.Errorf("received message larger than max (%d vs %d)", len(body), maxSize)
}
if sp != nil {
sp.LogFields(otlog.String("event", "util.ParseProtoRequest[unmarshal]"),

@ -70,7 +70,7 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&l.CardinalityLimit, "store.cardinality-limit", 1e5, "Cardinality limit for index queries.")
f.StringVar(&l.PerTenantOverrideConfig, "limits.per-user-override-config", "", "File name of per-user overrides.")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with this to reload the overrides.")
f.DurationVar(&l.PerTenantOverridePeriod, "limits.per-user-override-period", 10*time.Second, "Period with which to reload the overrides.")
}
// UnmarshalYAML implements the yaml.Unmarshaler interface.

@ -15,10 +15,6 @@ var overridesReloadSuccess = promauto.NewGauge(prometheus.GaugeOpts{
Help: "Whether the last overrides reload attempt was successful.",
})
func init() {
overridesReloadSuccess.Set(1) // Default to 1
}
// OverridesLoader loads the overrides
type OverridesLoader func(string) (map[string]interface{}, error)
@ -48,6 +44,10 @@ func NewOverridesManager(cfg OverridesManagerConfig) (*OverridesManager, error)
}
if cfg.OverridesLoadPath != "" {
if err := overridesManager.loadOverrides(); err != nil {
// Log but don't stop on error - we don't want to halt all ingesters because of a typo
level.Error(util.Logger).Log("msg", "failed to load limit overrides", "err", err)
}
go overridesManager.loop()
} else {
level.Info(util.Logger).Log("msg", "per-tenant overrides disabled")
@ -65,6 +65,7 @@ func (om *OverridesManager) loop() {
case <-ticker.C:
err := om.loadOverrides()
if err != nil {
// Log but don't stop on error - we don't want to halt all ingesters because of a typo
level.Error(util.Logger).Log("msg", "failed to load limit overrides", "err", err)
}
case <-om.quit:

@ -113,7 +113,7 @@ github.com/coreos/go-systemd/sdjournal
# github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/coreos/pkg/capnslog
github.com/coreos/pkg/dlopen
# github.com/cortexproject/cortex v0.2.1-0.20191003165238-857bb8476e59
# github.com/cortexproject/cortex v0.3.1-0.20191025190927-77a09cc7c953
github.com/cortexproject/cortex/pkg/chunk
github.com/cortexproject/cortex/pkg/chunk/aws
github.com/cortexproject/cortex/pkg/chunk/cache

Loading…
Cancel
Save