Remove dependency on Bigchunk, which is a leftover of Cortex and is only used in tests (#10354)

**What this PR does / why we need it**:

This PR removes the dependency on "Bigchunk", which is a "default" implementation of the `chunk.Data` interface and used only in tests.

**Special notes for your reviewer**:

The PR replaces the usage of this chunk type with the Loki implementation (`chunkenc.MemChunk` wrapped into `chunkenc.Facade`) where possible. In case where it was not possible (tests in `pkg/storage/chunk` package) due to otherwise circular import, it was replaces by the `chunk.dummyChunk` implementation, which is just a no-op that implements `chunk.Data` interface)

The PR also removes the protos for the ingester client (`pkg/ingester/client/ingester.proto`) which were only referenced in the removed `pkg/util/chunkcompat/compat.go` file.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/10394/head
Christian Haudum 2 years ago committed by GitHub
parent 92ba055ae5
commit 31981bd4af
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      pkg/ingester/client/compat.go
  2. 7585
      pkg/ingester/client/ingester.pb.go
  3. 167
      pkg/ingester/client/ingester.proto
  4. 256
      pkg/storage/chunk/bigchunk.go
  5. 15
      pkg/storage/chunk/cache/cache_test.go
  6. 24
      pkg/storage/chunk/chunk_test.go
  7. 15
      pkg/storage/chunk/client/grpc/grpc_client_test.go
  8. 2
      pkg/storage/chunk/client/grpc/grpc_server_mock_test.go
  9. 9
      pkg/storage/chunk/client/testutils/testutils.go
  10. 54
      pkg/storage/chunk/dummy.go
  11. 58
      pkg/storage/chunk/factory.go
  12. 33
      pkg/storage/chunk/fetcher/fetcher_test.go
  13. 47
      pkg/storage/config/schema_config_test.go
  14. 216
      pkg/storage/stores/series/series_store_test.go
  15. 53
      pkg/util/chunkcompat/compat.go

@ -19,16 +19,6 @@ func hashNew() uint64 {
return offset64
}
// LabelsToKeyString is used to form a string to be used as
// the hashKey. Don't print, use l.String() for printing.
func LabelsToKeyString(l labels.Labels) string {
// We are allocating 1024, even though most series are less than 600b long.
// But this is not an issue as this function is being inlined when called in a loop
// and buffer allocated is a static buffer and not a dynamic buffer on the heap.
b := make([]byte, 0, 1024)
return string(l.Bytes(b))
}
// FastFingerprint runs the same algorithm as Prometheus labelSetToFastFingerprint()
func FastFingerprint(ls []logproto.LabelAdapter) model.Fingerprint {
if len(ls) == 0 {

File diff suppressed because it is too large Load Diff

@ -1,167 +0,0 @@
syntax = "proto3";
package ingesterpb;
import "gogoproto/gogo.proto";
import "pkg/logproto/logproto.proto";
import "pkg/logproto/metrics.proto";
option go_package = "github.com/grafana/loki/pkg/ingester/client";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
service Ingester {
rpc Push(logproto.WriteRequest) returns (logproto.WriteResponse) {}
rpc Query(QueryRequest) returns (QueryResponse) {}
rpc QueryStream(QueryRequest) returns (stream QueryStreamResponse) {}
rpc QueryExemplars(ExemplarQueryRequest) returns (ExemplarQueryResponse) {}
rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse) {}
rpc LabelNames(LabelNamesRequest) returns (LabelNamesResponse) {}
rpc UserStats(UserStatsRequest) returns (UserStatsResponse) {}
rpc AllUserStats(UserStatsRequest) returns (UsersStatsResponse) {}
rpc MetricsForLabelMatchers(MetricsForLabelMatchersRequest) returns (MetricsForLabelMatchersResponse) {}
rpc MetricsMetadata(MetricsMetadataRequest) returns (MetricsMetadataResponse) {}
// TransferChunks allows leaving ingester (client) to stream chunks directly to joining ingesters (server).
rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {}
}
message ReadRequest {
repeated QueryRequest queries = 1;
}
message ReadResponse {
repeated QueryResponse results = 1;
}
message QueryRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatcher matchers = 3;
}
message ExemplarQueryRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatchers matchers = 3;
}
message QueryResponse {
repeated logproto.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}
// QueryStreamResponse contains a batch of timeseries chunks or timeseries. Only one of these series will be populated.
message QueryStreamResponse {
repeated TimeSeriesChunk chunkseries = 1 [(gogoproto.nullable) = false];
repeated logproto.TimeSeries timeseries = 2 [(gogoproto.nullable) = false];
}
message ExemplarQueryResponse {
repeated logproto.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
}
message LabelValuesRequest {
string label_name = 1;
int64 start_timestamp_ms = 2;
int64 end_timestamp_ms = 3;
LabelMatchers matchers = 4;
}
message LabelValuesResponse {
repeated string label_values = 1;
}
message LabelNamesRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
}
message LabelNamesResponse {
repeated string label_names = 1;
}
message UserStatsRequest {}
message UserStatsResponse {
double ingestion_rate = 1;
uint64 num_series = 2;
double api_ingestion_rate = 3;
double rule_ingestion_rate = 4;
}
message UserIDStatsResponse {
string user_id = 1;
UserStatsResponse data = 2;
}
message UsersStatsResponse {
repeated UserIDStatsResponse stats = 1;
}
message MetricsForLabelMatchersRequest {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatchers matchers_set = 3;
}
message MetricsForLabelMatchersResponse {
repeated logproto.Metric metric = 1;
}
message MetricsMetadataRequest {}
message MetricsMetadataResponse {
repeated logproto.MetricMetadata metadata = 1;
}
message TimeSeriesChunk {
string from_ingester_id = 1;
string user_id = 2;
repeated logproto.LegacyLabelPair labels = 3 [
(gogoproto.nullable) = false,
(gogoproto.customtype) = "github.com/grafana/loki/pkg/logproto.LabelAdapter"
];
repeated Chunk chunks = 4 [(gogoproto.nullable) = false];
}
message Chunk {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
int32 encoding = 3;
bytes data = 4;
}
message TransferChunksResponse {}
message LabelMatchers {
repeated LabelMatcher matchers = 1;
}
enum MatchType {
EQUAL = 0;
NOT_EQUAL = 1;
REGEX_MATCH = 2;
REGEX_NO_MATCH = 3;
}
message LabelMatcher {
MatchType type = 1;
string name = 2;
string value = 3;
}
message TimeSeriesFile {
string from_ingester_id = 1;
string user_id = 2;
string filename = 3;
bytes data = 4;
}

@ -1,256 +0,0 @@
package chunk
import (
"bytes"
"encoding/binary"
"errors"
"io"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/tsdb/chunkenc"
"github.com/grafana/loki/pkg/util/filter"
)
const samplesPerChunk = 120
var errOutOfBounds = errors.New("out of bounds")
type smallChunk struct {
chunkenc.XORChunk
start int64
}
// bigchunk is a set of prometheus/tsdb chunks. It grows over time and has no
// upperbound on number of samples it can contain.
type bigchunk struct {
chunks []smallChunk
appender chunkenc.Appender
remainingSamples int
}
func newBigchunk() *bigchunk {
return &bigchunk{}
}
// TODO(owen-d): remove bigchunk from our code, we don't use it.
// Hack an Entries() impl
func (b *bigchunk) Entries() int { return 0 }
func (b *bigchunk) Add(sample model.SamplePair) (Data, error) {
if b.remainingSamples == 0 {
if bigchunkSizeCapBytes > 0 && b.Size() > bigchunkSizeCapBytes {
return addToOverflowChunk(sample)
}
if err := b.addNextChunk(sample.Timestamp); err != nil {
return nil, err
}
}
b.appender.Append(int64(sample.Timestamp), float64(sample.Value))
b.remainingSamples--
return nil, nil
}
// addNextChunk adds a new XOR "subchunk" to the internal list of chunks.
func (b *bigchunk) addNextChunk(start model.Time) error {
// To save memory, we "compact" the previous chunk - the array backing the slice
// will be upto 2x too big, and we can save this space.
const chunkCapacityExcess = 32 // don't bother copying if it's within this range
if l := len(b.chunks); l > 0 {
oldBuf := b.chunks[l-1].XORChunk.Bytes()
if cap(oldBuf) > len(oldBuf)+chunkCapacityExcess {
buf := make([]byte, len(oldBuf))
copy(buf, oldBuf)
compacted, err := chunkenc.FromData(chunkenc.EncXOR, buf)
if err != nil {
return err
}
b.chunks[l-1].XORChunk = *compacted.(*chunkenc.XORChunk)
}
}
// Explicitly reallocate slice to avoid up to 2x overhead if we let append() do it
if len(b.chunks)+1 > cap(b.chunks) {
newChunks := make([]smallChunk, len(b.chunks), len(b.chunks)+1)
copy(newChunks, b.chunks)
b.chunks = newChunks
}
b.chunks = append(b.chunks, smallChunk{
XORChunk: *chunkenc.NewXORChunk(),
start: int64(start),
})
appender, err := b.chunks[len(b.chunks)-1].Appender()
if err != nil {
return err
}
b.appender = appender
b.remainingSamples = samplesPerChunk
return nil
}
func (b *bigchunk) Rebound(_, _ model.Time, _ filter.Func) (Data, error) {
return nil, errors.New("not implemented")
}
func (b *bigchunk) Marshal(wio io.Writer) error {
w := writer{wio}
if err := w.WriteVarInt16(uint16(len(b.chunks))); err != nil {
return err
}
for _, chunk := range b.chunks {
buf := chunk.Bytes()
if err := w.WriteVarInt16(uint16(len(buf))); err != nil {
return err
}
if _, err := w.Write(buf); err != nil {
return err
}
}
return nil
}
func (b *bigchunk) MarshalToBuf(buf []byte) error {
writer := bytes.NewBuffer(buf)
return b.Marshal(writer)
}
func (b *bigchunk) UnmarshalFromBuf(buf []byte) error {
r := reader{buf: buf}
numChunks, err := r.ReadUint16()
if err != nil {
return err
}
b.chunks = make([]smallChunk, 0, numChunks+1) // allow one extra space in case we want to add new data
var reuseIter chunkenc.Iterator
for i := uint16(0); i < numChunks; i++ {
chunkLen, err := r.ReadUint16()
if err != nil {
return err
}
chunkBuf, err := r.ReadBytes(int(chunkLen))
if err != nil {
return err
}
chunk, err := chunkenc.FromData(chunkenc.EncXOR, chunkBuf)
if err != nil {
return err
}
var start int64
start, reuseIter, err = firstTime(chunk, reuseIter)
if err != nil {
return err
}
b.chunks = append(b.chunks, smallChunk{
XORChunk: *chunk.(*chunkenc.XORChunk),
start: start,
})
}
return nil
}
func (b *bigchunk) Encoding() Encoding {
return Bigchunk
}
func (b *bigchunk) Utilization() float64 {
return 1.0
}
func (b *bigchunk) Len() int {
sum := 0
for _, c := range b.chunks {
sum += c.NumSamples()
}
return sum
}
// Unused, but for compatibility
func (b *bigchunk) UncompressedSize() int { return b.Size() }
func (b *bigchunk) Size() int {
sum := 2 // For the number of sub chunks.
for _, c := range b.chunks {
sum += 2 // For the length of the sub chunk.
sum += len(c.Bytes())
}
return sum
}
func (b *bigchunk) Slice(start, end model.Time) Data {
i, j := 0, len(b.chunks)
for k := 0; k < len(b.chunks); k++ {
if b.chunks[k].start <= int64(start) {
i = k
}
if b.chunks[k].start > int64(end) {
j = k
break
}
}
return &bigchunk{
chunks: b.chunks[i:j],
}
}
type writer struct {
io.Writer
}
func (w writer) WriteVarInt16(i uint16) error {
var b [2]byte
binary.LittleEndian.PutUint16(b[:], i)
_, err := w.Write(b[:])
return err
}
type reader struct {
i int
buf []byte
}
func (r *reader) ReadUint16() (uint16, error) {
if r.i+2 > len(r.buf) {
return 0, errOutOfBounds
}
result := binary.LittleEndian.Uint16(r.buf[r.i:])
r.i += 2
return result, nil
}
func (r *reader) ReadBytes(count int) ([]byte, error) {
if r.i+count > len(r.buf) {
return nil, errOutOfBounds
}
result := r.buf[r.i : r.i+count]
r.i += count
return result, nil
}
// addToOverflowChunk is a utility function that creates a new chunk as overflow
// chunk, adds the provided sample to it, and returns a chunk slice containing
// the provided old chunk followed by the new overflow chunk.
func addToOverflowChunk(s model.SamplePair) (Data, error) {
overflowChunk := New()
_, err := overflowChunk.(*bigchunk).Add(s)
if err != nil {
return nil, err
}
return overflowChunk, nil
}
func firstTime(c chunkenc.Chunk, iter chunkenc.Iterator) (int64, chunkenc.Iterator, error) {
var first int64
iter = c.Iterator(iter)
if iter.Next() != chunkenc.ValNone {
first, _ = iter.At()
}
return first, iter, iter.Err()
}

@ -2,6 +2,7 @@ package cache_test
import (
"context"
"fmt"
"math/rand"
"sort"
"strconv"
@ -13,6 +14,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
@ -31,13 +33,14 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
chunks := []chunk.Chunk{}
for i := 0; i < 111; i++ {
ts := model.TimeFromUnix(int64(i * chunkLen))
promChunk := chunk.New()
nc, err := promChunk.Add(model.SamplePair{
Timestamp: ts,
Value: model.SampleValue(i),
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt, 256*1024, 0)
err := cs.Append(&logproto.Entry{
Timestamp: ts.Time(),
Line: fmt.Sprintf("line ts=%d", ts),
})
require.NoError(t, err)
require.Nil(t, nc)
c := chunk.NewChunk(
userID,
model.Fingerprint(1),
@ -45,7 +48,7 @@ func fillCache(t *testing.T, scfg config.SchemaConfig, cache cache.Cache) ([]str
{Name: model.MetricNameLabel, Value: "foo"},
{Name: "bar", Value: "baz"},
},
promChunk,
chunkenc.NewFacade(cs, 0, 0),
ts,
ts.Add(chunkLen),
)

@ -22,17 +22,19 @@ var labelsForDummyChunks = labels.Labels{
{Name: "toms", Value: "code"},
}
func dummyChunk(now model.Time) Chunk {
return dummyChunkFor(now, labelsForDummyChunks)
// Deprecated
func dummyChunkFor(now model.Time, metric labels.Labels) Chunk {
return dummyChunkForEncoding(now, metric, 1)
}
// Deprecated
func dummyChunkForEncoding(now model.Time, metric labels.Labels, samples int) Chunk {
c, _ := NewForEncoding(Bigchunk)
c := newDummyChunk()
chunkStart := now.Add(-time.Hour)
for i := 0; i < samples; i++ {
t := time.Duration(i) * 15 * time.Second
nc, err := c.(*bigchunk).Add(model.SamplePair{Timestamp: chunkStart.Add(t), Value: model.SampleValue(i)})
nc, err := c.Add(model.SamplePair{Timestamp: chunkStart.Add(t), Value: model.SampleValue(i)})
if err != nil {
panic(err)
}
@ -57,12 +59,8 @@ func dummyChunkForEncoding(now model.Time, metric labels.Labels, samples int) Ch
return chunk
}
func dummyChunkFor(now model.Time, metric labels.Labels) Chunk {
return dummyChunkForEncoding(now, metric, 1)
}
func TestChunkCodec(t *testing.T) {
dummy := dummyChunk(model.Now())
dummy := dummyChunkFor(model.Now(), labelsForDummyChunks)
decodeContext := NewDecodeContext()
key := fmt.Sprintf("%s/%x:%x:%x:%x", dummy.ChunkRef.UserID, dummy.ChunkRef.Fingerprint, int64(dummy.ChunkRef.From), int64(dummy.ChunkRef.Through), dummy.ChunkRef.Checksum)
@ -182,12 +180,8 @@ var BenchmarkLabels = labels.Labels{
{Name: "pod_name", Value: "some-other-name-5j8s8"},
}
func benchmarkChunk(now model.Time) Chunk {
return dummyChunkFor(now, BenchmarkLabels)
}
func BenchmarkEncode(b *testing.B) {
chunk := dummyChunk(model.Now())
chunk := dummyChunkFor(model.Now(), labelsForDummyChunks)
b.ResetTimer()
@ -203,7 +197,7 @@ func BenchmarkDecode100(b *testing.B) { benchmarkDecode(b, 100) }
func BenchmarkDecode10000(b *testing.B) { benchmarkDecode(b, 10000) }
func benchmarkDecode(b *testing.B, batchSize int) {
chunk := benchmarkChunk(model.Now())
chunk := dummyChunkFor(model.Now(), BenchmarkLabels)
err := chunk.Encode()
require.NoError(b, err)
buf, err := chunk.Encoded()

@ -7,6 +7,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/config"
@ -76,6 +77,12 @@ func TestGrpcStore(t *testing.T) {
// rpc calls for storageClient
storageClient, _ := NewTestStorageClient(cfg, schemaCfg)
newChunkData := func() chunk.Data {
return chunkenc.NewFacade(
chunkenc.NewMemChunk(
chunkenc.ChunkFormatV3, chunkenc.EncNone, chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt, 256*1024, 0,
), 0, 0)
}
putChunksTestData := []chunk.Chunk{
{
@ -100,8 +107,8 @@ func TestGrpcStore(t *testing.T) {
Value: "prometheus",
},
},
Encoding: chunk.Bigchunk,
Data: chunk.New(),
Encoding: chunkenc.LogChunk,
Data: newChunkData(),
},
}
err = storageClient.PutChunks(context.Background(), putChunksTestData)
@ -130,8 +137,8 @@ func TestGrpcStore(t *testing.T) {
Value: "prometheus",
},
},
Encoding: chunk.Bigchunk,
Data: chunk.New(),
Encoding: chunkenc.LogChunk,
Data: newChunkData(),
},
}
_, err = storageClient.GetChunks(context.Background(), getChunksTestData)

@ -52,7 +52,7 @@ func (s server) DeleteIndex(_ context.Context, deletes *DeleteIndexRequest) (*em
//
// Support new and old chunk key formats
func (s server) PutChunks(_ context.Context, request *PutChunksRequest) (*empty.Empty, error) {
if request.Chunks[0].TableName == "" && (request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:e5e91607") {
if request.Chunks[0].TableName == "" && (request.Chunks[0].Key == "fake/ddf337b84e835f32:171bc00155a:171bc00155a:e6a518a") {
return &empty.Empty{}, nil
}
err := errors.New("putChunks from storageClient request doesn't match with test from gRPC client")

@ -2,6 +2,7 @@ package testutils
import (
"context"
"fmt"
"io"
"strconv"
"time"
@ -10,7 +11,9 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
chunkclient "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
@ -82,10 +85,10 @@ func CreateChunks(scfg config.SchemaConfig, startIndex, batchSize int, from mode
}
func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk {
cs := chunk.New()
cs := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncGZIP, chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt, 256*1024, 0)
for ts := from; ts <= through; ts = ts.Add(15 * time.Second) {
_, err := cs.Add(model.SamplePair{Timestamp: ts, Value: 0})
err := cs.Append(&logproto.Entry{Timestamp: ts.Time(), Line: fmt.Sprintf("line ts=%d", ts)})
if err != nil {
panic(err)
}
@ -95,7 +98,7 @@ func DummyChunkFor(from, through model.Time, metric labels.Labels) chunk.Chunk {
userID,
client.Fingerprint(metric),
metric,
cs,
chunkenc.NewFacade(cs, 0, 0),
from,
through,
)

@ -0,0 +1,54 @@
package chunk
import (
"io"
"github.com/prometheus/common/model"
"github.com/grafana/loki/pkg/util/filter"
)
func newDummyChunk() *dummyChunk {
return &dummyChunk{}
}
// dummyChunk implements chunk.Data
// It is a placeholder chunk with Encoding(0)
// It can be used in tests where the content of a chunk is irrelevant.
type dummyChunk struct{}
func (chk *dummyChunk) Add(sample model.SamplePair) (Data, error) {
return nil, nil
}
func (chk *dummyChunk) Marshal(io.Writer) error {
return nil
}
func (chk *dummyChunk) UnmarshalFromBuf([]byte) error {
return nil
}
func (chk *dummyChunk) Encoding() Encoding {
return Dummy
}
func (chk *dummyChunk) Rebound(start, end model.Time, filter filter.Func) (Data, error) {
return nil, nil
}
func (chk *dummyChunk) Size() int {
return 0
}
func (chk *dummyChunk) UncompressedSize() int {
return 0
}
func (chk *dummyChunk) Entries() int {
return 0
}
func (chk *dummyChunk) Utilization() float64 {
return 0
}

@ -1,7 +1,6 @@
package chunk
import (
"flag"
"fmt"
"strconv"
)
@ -9,26 +8,10 @@ import (
// Encoding defines which encoding we are using, delta, doubledelta, or varbit
type Encoding byte
// Config configures the behaviour of chunk encoding
type Config struct{}
var (
// DefaultEncoding exported for use in unit tests elsewhere
DefaultEncoding = Bigchunk
bigchunkSizeCapBytes = 0
const (
Dummy Encoding = iota
)
// RegisterFlags registers configuration settings.
func (Config) RegisterFlags(f *flag.FlagSet) {
f.Var(&DefaultEncoding, "ingester.chunk-encoding", "Encoding version to use for chunks.")
f.IntVar(&bigchunkSizeCapBytes, "store.bigchunk-size-cap-bytes", bigchunkSizeCapBytes, "When using bigchunk encoding, start a new bigchunk if over this size (0 = unlimited)")
}
// Validate errors out if the encoding is set to Delta.
func (Config) Validate() error {
return nil
}
// String implements flag.Value.
func (e Encoding) String() string {
if known, found := encodings[e]; found {
@ -37,25 +20,6 @@ func (e Encoding) String() string {
return fmt.Sprintf("%d", e)
}
const (
// Big chunk encoding.
Bigchunk Encoding = iota
)
type encoding struct {
Name string
New func() Data
}
var encodings = map[Encoding]encoding{
Bigchunk: {
Name: "Bigchunk",
New: func() Data {
return newBigchunk()
},
},
}
// Set implements flag.Value.
func (e *Encoding) Set(s string) error {
// First see if the name was given
@ -80,14 +44,16 @@ func (e *Encoding) Set(s string) error {
return nil
}
// New creates a new chunk according to the encoding set by the
// DefaultEncoding flag.
func New() Data {
chunk, err := NewForEncoding(DefaultEncoding)
if err != nil {
panic(err)
}
return chunk
type encoding struct {
Name string
New func() Data
}
var encodings = map[Encoding]encoding{
Dummy: {
Name: "dummy",
New: func() Data { return newDummyChunk() },
},
}
// NewForEncoding allows configuring what chunk type you want

@ -2,6 +2,7 @@ package fetcher
import (
"context"
"fmt"
"strconv"
"testing"
"time"
@ -11,6 +12,7 @@ import (
"github.com/stretchr/testify/assert"
"golang.org/x/exp/slices"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
"github.com/grafana/loki/pkg/storage/chunk/cache"
@ -317,27 +319,32 @@ type c struct {
func makeChunks(now time.Time, tpls ...c) []chunk.Chunk {
var chks []chunk.Chunk
for _, chk := range tpls {
c := chunk.Chunk{
ChunkRef: logproto.ChunkRef{
UserID: "fake",
From: model.TimeFromUnix(now.Add(-chk.from).UTC().Unix()),
Through: model.TimeFromUnix(now.Add(-chk.through).UTC().Unix()),
},
}
from := int(chk.from) / int(time.Hour)
// This is only here because it's helpful for debugging.
c.Metric = labels.Labels{labels.Label{Name: "start", Value: strconv.Itoa(from)}}
// This isn't even the write format for Loki but we dont' care for the sake of these tests.
c.Data = chunk.New()
memChk := chunkenc.NewMemChunk(chunkenc.ChunkFormatV4, chunkenc.EncNone, chunkenc.UnorderedWithNonIndexedLabelsHeadBlockFmt, 256*1024, 0)
// To make sure the fetcher doesn't swap keys and buffers each chunk is built with different, but deterministic data
for i := 0; i < from; i++ {
_, _ = c.Data.Add(model.SamplePair{
Timestamp: model.TimeFromUnix(int64(i)),
Value: model.SampleValue(from),
_ = memChk.Append(&logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf("line ts=%d", i),
})
}
data := chunkenc.NewFacade(memChk, 0, 0)
c := chunk.Chunk{
ChunkRef: logproto.ChunkRef{
UserID: "fake",
From: model.TimeFromUnix(now.Add(-chk.from).UTC().Unix()),
Through: model.TimeFromUnix(now.Add(-chk.through).UTC().Unix()),
},
Metric: labels.Labels{labels.Label{Name: "start", Value: strconv.Itoa(from)}},
Data: data,
Encoding: data.Encoding(),
}
// Encode to set the checksum
_ = c.Encode()
if err := c.Encode(); err != nil {
panic(err)
}
chks = append(chks, c)
}

@ -12,7 +12,6 @@ import (
"github.com/stretchr/testify/require"
yaml "gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/storage/chunk"
)
@ -1025,52 +1024,6 @@ var (
}
)
func TestChunkDecodeBackwardsCompatibility(t *testing.T) {
// lets build a new chunk same as what was built using code at commit b1777a50ab19
c, err := chunk.NewForEncoding(chunk.Bigchunk)
require.NoError(t, err)
nc, err := c.Add(model.SamplePair{Timestamp: fixedTimestamp, Value: 0})
require.NoError(t, err)
require.Equal(t, nil, nc, "returned chunk should be nil")
chnk := chunk.NewChunk(
userID,
client.Fingerprint(labelsForDummyChunks),
labelsForDummyChunks,
c,
fixedTimestamp.Add(-time.Hour),
fixedTimestamp,
)
// Force checksum calculation.
require.NoError(t, chnk.Encode())
// Chunk encoded using code at commit b1777a50ab19
rawData := []byte("\x00\x00\x00\xb7\xff\x06\x00\x00sNaPpY\x01\xa5\x00\x00\xfcB\xb4\xc9{\"fingerprint\":18245339272195143978,\"userID\":\"userID\",\"from\":1557650721,\"through\":1557654321,\"metric\":{\"__name__\":\"foo\",\"bar\":\"baz\",\"toms\":\"code\"},\"encoding\":0}\n\x00\x00\x00\x15\x01\x00\x11\x00\x00\x01\xd0\xdd\xf5\xb6\xd5Z\x00\x00\x00\x00\x00\x00\x00\x00\x00")
decodeContext := chunk.NewDecodeContext()
have, err := chunk.ParseExternalKey(userID, "userID/fd3477666dacf92a:16aab37c8e8:16aab6eb768:70b431bb")
require.NoError(t, err)
require.NoError(t, have.Decode(decodeContext, rawData))
want := chnk
// We can't just compare these two chunks, since the Bigchunk internals are different on construction and read-in.
// Compare the serialised version instead
require.NoError(t, have.Encode())
require.NoError(t, want.Encode())
haveEncoded, _ := have.Encoded()
wantEncoded, _ := want.Encoded()
require.Equal(t, haveEncoded, wantEncoded)
s := SchemaConfig{
Configs: []PeriodConfig{
{
From: DayTime{Time: 0},
Schema: "v11",
RowShards: 16,
},
},
}
require.Equal(t, s.ExternalKey(have.ChunkRef), s.ExternalKey(want.ChunkRef))
}
func TestChunkKeys(t *testing.T) {
for _, tc := range []struct {
name string

@ -17,7 +17,9 @@ import (
"github.com/prometheus/prometheus/promql/parser"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk"
@ -123,13 +125,6 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) {
"toms", "code",
)
fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)
barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)
for _, tc := range []struct {
metricName, labelName string
expect []string
@ -173,9 +168,17 @@ func TestChunkStore_LabelValuesForMetricName(t *testing.T) {
t.Run(fmt.Sprintf("%s / %s / %s / %s", tc.metricName, tc.labelName, schema, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelValues with metricName", tc.metricName, "with labelName", tc.labelName, "with schema", schema)
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, schema, storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
fooChunk1 := dummyChunkWithFormat(t, now, fooMetric1, chunkFmt, headBlockFmt)
fooChunk2 := dummyChunkWithFormat(t, now, fooMetric2, chunkFmt, headBlockFmt)
fooChunk3 := dummyChunkWithFormat(t, now, fooMetric3, chunkFmt, headBlockFmt)
barChunk1 := dummyChunkWithFormat(t, now, barMetric1, chunkFmt, headBlockFmt)
barChunk2 := dummyChunkWithFormat(t, now, barMetric2, chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{
fooChunk1,
fooChunk2,
@ -241,14 +244,6 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
"toms", "code",
)
fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
fooChunk3 := dummyChunkFor(now, fooMetric3)
fooChunk4 := dummyChunkFor(now.Add(-time.Hour), fooMetric1) // same series but different chunk
barChunk1 := dummyChunkFor(now, barMetric1)
barChunk2 := dummyChunkFor(now, barMetric2)
for _, tc := range []struct {
metricName string
expect []string
@ -267,9 +262,19 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
t.Run(fmt.Sprintf("%s / %s / %s ", tc.metricName, schema, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelNames with metricName", tc.metricName, "with schema", schema)
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, schema, storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
fooChunk1 := dummyChunkWithFormat(t, now, fooMetric1, chunkFmt, headBlockFmt)
fooChunk2 := dummyChunkWithFormat(t, now, fooMetric2, chunkFmt, headBlockFmt)
fooChunk3 := dummyChunkWithFormat(t, now, fooMetric3, chunkFmt, headBlockFmt)
fooChunk4 := dummyChunkWithFormat(t, now.Add(-time.Hour), fooMetric1, chunkFmt, headBlockFmt) // same series but different chunk
barChunk1 := dummyChunkWithFormat(t, now, barMetric1, chunkFmt, headBlockFmt)
barChunk2 := dummyChunkWithFormat(t, now, barMetric2, chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{
fooChunk1,
fooChunk2,
@ -312,73 +317,76 @@ func TestChunkStore_LabelNamesForMetricName(t *testing.T) {
// TestChunkStore_getMetricNameChunks tests if chunks are fetched correctly when we have the metric name
func TestChunkStore_getMetricNameChunks(t *testing.T) {
now := model.Now()
chunk1 := dummyChunkFor(now, labels.FromStrings(labels.MetricName, "foo",
"bar", "baz",
"flip", "flop",
"toms", "code",
))
chunk2 := dummyChunkFor(now, labels.FromStrings(labels.MetricName, "foo",
"bar", "beep",
"toms", "code",
))
testCases := []struct {
query string
expect []chunk.Chunk
}{
{
`foo`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{flip=""}`,
[]chunk.Chunk{chunk2},
},
{
`foo{bar="baz"}`,
[]chunk.Chunk{chunk1},
},
{
`foo{bar="beep"}`,
[]chunk.Chunk{chunk2},
},
{
`foo{toms="code"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{bar!="baz"}`,
[]chunk.Chunk{chunk2},
},
{
`foo{bar=~"beep|baz"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{bar=~"beeping|baz"}`,
[]chunk.Chunk{chunk1},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{toms="code", bar="baz"}`,
[]chunk.Chunk{chunk1},
},
}
for _, schema := range schemas {
for _, storeCase := range stores {
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, schema, storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
now := model.Now()
chunk1 := dummyChunkWithFormat(t, now, labels.FromStrings(labels.MetricName, "foo",
"bar", "baz",
"flip", "flop",
"toms", "code",
), chunkFmt, headBlockFmt)
chunk2 := dummyChunkWithFormat(t, now, labels.FromStrings(labels.MetricName, "foo",
"bar", "beep",
"toms", "code",
), chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{chunk1, chunk2}); err != nil {
t.Fatal(err)
}
testCases := []struct {
query string
expect []chunk.Chunk
}{
{
`foo`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{flip=""}`,
[]chunk.Chunk{chunk2},
},
{
`foo{bar="baz"}`,
[]chunk.Chunk{chunk1},
},
{
`foo{bar="beep"}`,
[]chunk.Chunk{chunk2},
},
{
`foo{toms="code"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{bar!="baz"}`,
[]chunk.Chunk{chunk2},
},
{
`foo{bar=~"beep|baz"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{bar=~"beeping|baz"}`,
[]chunk.Chunk{chunk1},
},
{
`foo{toms="code", bar=~"beep|baz"}`,
[]chunk.Chunk{chunk1, chunk2},
},
{
`foo{toms="code", bar="baz"}`,
[]chunk.Chunk{chunk1},
},
}
for _, tc := range testCases {
t.Run(fmt.Sprintf("%s / %s / %s", tc.query, schema, storeCase.name), func(t *testing.T) {
t.Log("========= Running query", tc.query, "with schema", schema)
@ -432,12 +440,10 @@ func Test_GetSeries(t *testing.T) {
"flip", "flop",
"toms", "code",
)
chunk1 := dummyChunkFor(now, ch1lbs)
ch2lbs := labels.FromStrings(labels.MetricName, "foo",
"bar", "beep",
"toms", "code",
)
chunk2 := dummyChunkFor(now, ch2lbs)
testCases := []struct {
query string
@ -500,9 +506,13 @@ func Test_GetSeries(t *testing.T) {
for _, storeCase := range stores {
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, schema, storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
chunk1 := dummyChunkWithFormat(t, now, ch1lbs, chunkFmt, headBlockFmt)
chunk2 := dummyChunkWithFormat(t, now, ch2lbs, chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{chunk1, chunk2}); err != nil {
t.Fatal(err)
}
@ -531,12 +541,10 @@ func Test_GetSeriesShard(t *testing.T) {
"flip", "flop",
"toms", "code",
)
chunk1 := dummyChunkFor(now, ch1lbs)
ch2lbs := labels.FromStrings(labels.MetricName, "foo",
"bar", "beep",
"toms", "code",
)
chunk2 := dummyChunkFor(now, ch2lbs)
testCases := []struct {
query string
@ -554,9 +562,13 @@ func Test_GetSeriesShard(t *testing.T) {
for _, storeCase := range stores {
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, "v12", storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, "v12", storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
chunk1 := dummyChunkWithFormat(t, now, ch1lbs, chunkFmt, headBlockFmt)
chunk2 := dummyChunkWithFormat(t, now, ch2lbs, chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{chunk1, chunk2}); err != nil {
t.Fatal(err)
}
@ -587,10 +599,12 @@ func BenchmarkIndexCaching(b *testing.B) {
storeMaker := stores[1]
storeCfg := storeMaker.configFn()
store, _ := newTestChunkStoreConfig(b, "v9", storeCfg)
store, schemaCfg := newTestChunkStoreConfig(b, "v9", storeCfg)
defer store.Stop()
fooChunk1 := dummyChunkFor(model.Time(0).Add(15*time.Second), BenchmarkLabels)
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
fooChunk1 := dummyChunkWithFormat(b, model.Time(0).Add(15*time.Second), BenchmarkLabels, chunkFmt, headBlockFmt)
b.ResetTimer()
@ -665,9 +679,6 @@ func TestSeriesStore_LabelValuesForMetricName(t *testing.T) {
"class", "secret",
)
fooChunk1 := dummyChunkFor(now, fooMetric1)
fooChunk2 := dummyChunkFor(now, fooMetric2)
for _, tc := range []struct {
metricName, labelName string
expect []string
@ -699,9 +710,13 @@ func TestSeriesStore_LabelValuesForMetricName(t *testing.T) {
t.Run(fmt.Sprintf("%s / %s / %s / %s", tc.metricName, tc.labelName, schema, storeCase.name), func(t *testing.T) {
t.Log("========= Running labelValues with metricName", tc.metricName, "with labelName", tc.labelName, "with schema", schema)
storeCfg := storeCase.configFn()
store, _ := newTestChunkStoreConfig(t, schema, storeCfg)
store, schemaCfg := newTestChunkStoreConfig(t, schema, storeCfg)
defer store.Stop()
chunkFmt, headBlockFmt, _ := schemaCfg.Configs[0].ChunkFormat()
fooChunk1 := dummyChunkWithFormat(t, now, fooMetric1, chunkFmt, headBlockFmt)
fooChunk2 := dummyChunkWithFormat(t, now, fooMetric2, chunkFmt, headBlockFmt)
if err := store.Put(ctx, []chunk.Chunk{
fooChunk1,
fooChunk2,
@ -724,41 +739,34 @@ func TestSeriesStore_LabelValuesForMetricName(t *testing.T) {
}
}
func dummyChunkForEncoding(now model.Time, metric labels.Labels, samples int) chunk.Chunk {
c, _ := chunk.NewForEncoding(chunk.Bigchunk)
func dummyChunkWithFormat(t testing.TB, now model.Time, metric labels.Labels, format byte, headfmt chunkenc.HeadBlockFmt) chunk.Chunk {
t.Helper()
samples := 1
chunkStart := now.Add(-time.Hour)
chk := chunkenc.NewMemChunk(format, chunkenc.EncGZIP, headfmt, 256*1024, 0)
for i := 0; i < samples; i++ {
t := time.Duration(i) * 15 * time.Second
nc, err := c.Add(model.SamplePair{Timestamp: chunkStart.Add(t), Value: model.SampleValue(i)})
if err != nil {
panic(err)
}
if nc != nil {
panic("returned chunk was not nil")
}
ts := time.Duration(i) * 15 * time.Second
err := chk.Append(&logproto.Entry{Timestamp: chunkStart.Time().Add(ts), Line: fmt.Sprintf("line %d", i)})
require.NoError(t, err)
}
chunk := chunk.NewChunk(
userID,
client.Fingerprint(metric),
metric,
c,
chunkenc.NewFacade(chk, 0, 0),
chunkStart,
now,
)
// Force checksum calculation.
err := chunk.Encode()
if err != nil {
panic(err)
if err := chunk.Encode(); err != nil {
t.Fatal(err)
}
return chunk
}
func dummyChunkFor(now model.Time, metric labels.Labels) chunk.Chunk {
return dummyChunkForEncoding(now, metric, 1)
}
// BenchmarkLabels is a real example from Kubernetes' embedded cAdvisor metrics, lightly obfuscated
var BenchmarkLabels = labels.FromStrings(model.MetricNameLabel, "container_cpu_usage_seconds_total",
"beta_kubernetes_io_arch", "amd64",

@ -1,53 +0,0 @@
package chunkcompat
import (
"bytes"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/storage/chunk"
)
// FromChunks converts []client.Chunk to []chunk.Chunk.
func FromChunks(userID string, metric labels.Labels, in []client.Chunk) ([]chunk.Chunk, error) {
out := make([]chunk.Chunk, 0, len(in))
for _, i := range in {
o, err := chunk.NewForEncoding(chunk.Encoding(byte(i.Encoding)))
if err != nil {
return nil, err
}
if err := o.UnmarshalFromBuf(i.Data); err != nil {
return nil, err
}
firstTime, lastTime := model.Time(i.StartTimestampMs), model.Time(i.EndTimestampMs)
// As the lifetime of this chunk is scopes to this request, we don't need
// to supply a fingerprint.
out = append(out, chunk.NewChunk(userID, 0, metric, o, firstTime, lastTime))
}
return out, nil
}
// ToChunks converts []chunk.Chunk to []client.Chunk.
func ToChunks(in []chunk.Chunk) ([]client.Chunk, error) {
out := make([]client.Chunk, 0, len(in))
for _, i := range in {
wireChunk := client.Chunk{
StartTimestampMs: int64(i.From),
EndTimestampMs: int64(i.Through),
Encoding: int32(i.Data.Encoding()),
}
buf := bytes.NewBuffer(make([]byte, 0, 1024))
if err := i.Data.Marshal(buf); err != nil {
return nil, err
}
wireChunk.Data = buf.Bytes()
out = append(out, wireChunk)
}
return out, nil
}
Loading…
Cancel
Save