Remove unused storage gateway references (#5627)

pull/5638/head
Christian Simon 3 years ago committed by GitHub
parent ec809e665f
commit b6e6003724
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 15
      pkg/querier/base/querier.go
  2. 59
      pkg/querier/base/store_gateway_client.go
  3. 81
      pkg/querier/base/store_gateway_client_test.go
  4. 249
      pkg/storegateway/storegatewaypb/gateway.pb.go
  5. 23
      pkg/storegateway/storegatewaypb/gateway.proto
  6. 372
      vendor/github.com/thanos-io/thanos/pkg/store/labelpb/label.go
  7. 705
      vendor/github.com/thanos-io/thanos/pkg/store/labelpb/types.pb.go
  8. 33
      vendor/github.com/thanos-io/thanos/pkg/store/labelpb/types.proto
  9. 519
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/custom.go
  10. 97
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/inprocess.go
  11. 3
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/README.md
  12. 1637
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/remote.pb.go
  13. 95
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/remote.proto
  14. 2505
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/types.pb.go
  15. 118
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/prompb/types.proto
  16. 3642
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.pb.go
  17. 200
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto
  18. 1412
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/types.pb.go
  19. 73
      vendor/github.com/thanos-io/thanos/pkg/store/storepb/types.proto
  20. 3
      vendor/modules.txt

@ -5,7 +5,6 @@ import (
"errors"
"flag"
"fmt"
"strings"
"sync"
"time"
@ -62,10 +61,6 @@ type Config struct {
// series is considered stale.
LookbackDelta time.Duration `yaml:"lookback_delta"`
// Blocks storage only.
StoreGatewayAddresses string `yaml:"store_gateway_addresses"`
StoreGatewayClient ClientConfig `yaml:"store_gateway_client"`
SecondStoreEngine string `yaml:"second_store_engine"`
UseSecondStoreBeforeTime flagext.Time `yaml:"use_second_store_before_time"`
@ -80,7 +75,6 @@ var (
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.StoreGatewayClient.RegisterFlagsWithPrefix("querier.store-gateway-client", f)
f.IntVar(&cfg.MaxConcurrent, "querier.max-concurrent", 20, "The maximum number of concurrent queries.")
f.DurationVar(&cfg.Timeout, "querier.timeout", 2*time.Minute, "The timeout for a query.")
f.BoolVar(&cfg.Iterators, "querier.iterators", false, "Use iterators to execute query, as opposed to fully materialising the series in memory.")
@ -94,7 +88,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.DefaultEvaluationInterval, "querier.default-evaluation-interval", time.Minute, "The default evaluation interval or step size for subqueries.")
f.DurationVar(&cfg.QueryStoreAfter, "querier.query-store-after", 0, "The time after which a metric should be queried from storage and not just ingesters. 0 means all queries are sent to store. When running the blocks storage, if this option is enabled, the time range of the query sent to the store will be manipulated to ensure the query end is not more recent than 'now - query-store-after'.")
f.StringVar(&cfg.ActiveQueryTrackerDir, "querier.active-query-tracker-dir", "./active-query-tracker", "Active query tracker monitors active queries, and writes them to the file in given directory. If Cortex discovers any queries in this log during startup, it will log them to the log file. Setting to empty value disables active query tracker, which also disables -querier.max-concurrent option.")
f.StringVar(&cfg.StoreGatewayAddresses, "querier.store-gateway-addresses", "", "Comma separated list of store-gateway addresses in DNS Service Discovery format. This option should be set when using the blocks storage and the store-gateway sharding is disabled (when enabled, the store-gateway instances form a ring and addresses are picked from the ring).")
f.DurationVar(&cfg.LookbackDelta, "querier.lookback-delta", 5*time.Minute, "Time since the last sample after which a time series is considered stale and ignored by expression evaluations.")
f.StringVar(&cfg.SecondStoreEngine, "querier.second-store-engine", "", "Second store engine to use for querying. Empty = disabled.")
f.Var(&cfg.UseSecondStoreBeforeTime, "querier.use-second-store-before-time", "If specified, second store is only used for queries before this timestamp. Default value 0 means secondary store is always queried.")
@ -119,14 +112,6 @@ func (cfg *Config) Validate() error {
return nil
}
func (cfg *Config) GetStoreGatewayAddresses() []string {
if cfg.StoreGatewayAddresses == "" {
return nil
}
return strings.Split(cfg.StoreGatewayAddresses, ",")
}
func getChunksIteratorFunction(cfg Config) chunkIteratorFunc {
if cfg.BatchIterators {
return batch.NewChunkMergeIterator

@ -4,67 +4,8 @@ import (
"flag"
"github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/storegateway/storegatewaypb"
)
func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "storegateway_client_request_duration_seconds",
Help: "Time spent executing requests to the store-gateway.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
ConstLabels: prometheus.Labels{"client": "querier"},
}, []string{"operation", "status_code"})
return func(addr string) (client.PoolClient, error) {
return dialStoreGatewayClient(clientCfg, addr, requestDuration)
}
}
func dialStoreGatewayClient(clientCfg grpcclient.Config, addr string, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
if err != nil {
return nil, err
}
conn, err := grpc.Dial(addr, opts...)
if err != nil {
return nil, errors.Wrapf(err, "failed to dial store-gateway %s", addr)
}
return &storeGatewayClient{
StoreGatewayClient: storegatewaypb.NewStoreGatewayClient(conn),
HealthClient: grpc_health_v1.NewHealthClient(conn),
conn: conn,
}, nil
}
type storeGatewayClient struct {
storegatewaypb.StoreGatewayClient
grpc_health_v1.HealthClient
conn *grpc.ClientConn
}
func (c *storeGatewayClient) Close() error {
return c.conn.Close()
}
func (c *storeGatewayClient) String() string {
return c.RemoteAddress()
}
func (c *storeGatewayClient) RemoteAddress() string {
return c.conn.Target()
}
type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled"`
TLS tls.ClientConfig `yaml:",inline"`

@ -1,82 +1 @@
package base
import (
"context"
"net"
"testing"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/thanos/pkg/store/storepb"
"github.com/weaveworks/common/user"
"google.golang.org/grpc"
"github.com/grafana/loki/pkg/storegateway/storegatewaypb"
)
func Test_newStoreGatewayClientFactory(t *testing.T) {
// Create a GRPC server used to query the mocked service.
grpcServer := grpc.NewServer()
defer grpcServer.GracefulStop()
srv := &mockStoreGatewayServer{}
storegatewaypb.RegisterStoreGatewayServer(grpcServer, srv)
listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)
go func() {
require.NoError(t, grpcServer.Serve(listener))
}()
// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
flagext.DefaultValues(&cfg)
reg := prometheus.NewPedanticRegistry()
factory := newStoreGatewayClientFactory(cfg, reg)
for i := 0; i < 2; i++ {
client, err := factory(listener.Addr().String())
require.NoError(t, err)
defer client.Close() //nolint:errcheck
ctx := user.InjectOrgID(context.Background(), "test")
stream, err := client.(*storeGatewayClient).Series(ctx, &storepb.SeriesRequest{})
assert.NoError(t, err)
// Read the entire response from the stream.
for _, err = stream.Recv(); err == nil; {
}
}
// Assert on the request duration metric, but since it's a duration histogram and
// we can't predict the exact time it took, we need to workaround it.
metrics, err := reg.Gather()
require.NoError(t, err)
assert.Len(t, metrics, 1)
assert.Equal(t, "cortex_storegateway_client_request_duration_seconds", metrics[0].GetName())
assert.Equal(t, dto.MetricType_HISTOGRAM, metrics[0].GetType())
assert.Len(t, metrics[0].GetMetric(), 1)
assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount())
}
type mockStoreGatewayServer struct{}
func (m *mockStoreGatewayServer) Series(_ *storepb.SeriesRequest, srv storegatewaypb.StoreGateway_SeriesServer) error {
return nil
}
func (m *mockStoreGatewayServer) LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, nil
}
func (m *mockStoreGatewayServer) LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return nil, nil
}

@ -1,249 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: pkg/storegateway/storegatewaypb/gateway.proto
package storegatewaypb
import (
context "context"
fmt "fmt"
proto "github.com/gogo/protobuf/proto"
storepb "github.com/thanos-io/thanos/pkg/store/storepb"
grpc "google.golang.org/grpc"
codes "google.golang.org/grpc/codes"
status "google.golang.org/grpc/status"
math "math"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
func init() {
proto.RegisterFile("pkg/storegateway/storegatewaypb/gateway.proto", fileDescriptor_3b0b40cab91a425a)
}
var fileDescriptor_3b0b40cab91a425a = []byte{
// 262 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0xd2, 0x2d, 0xc8, 0x4e, 0xd7,
0x2f, 0x2e, 0xc9, 0x2f, 0x4a, 0x4d, 0x4f, 0x2c, 0x49, 0x2d, 0x4f, 0xac, 0x44, 0xe1, 0x14, 0x24,
0xe9, 0x43, 0x59, 0x7a, 0x05, 0x45, 0xf9, 0x25, 0xf9, 0x42, 0x9c, 0x70, 0x09, 0x29, 0xf3, 0xf4,
0xcc, 0x92, 0x8c, 0xd2, 0x24, 0xbd, 0xe4, 0xfc, 0x5c, 0xfd, 0x92, 0x8c, 0xc4, 0xbc, 0xfc, 0x62,
0xdd, 0xcc, 0x7c, 0x28, 0x4b, 0x1f, 0x6e, 0x2a, 0x84, 0x2c, 0x48, 0xd2, 0x2f, 0x2a, 0x48, 0x86,
0x98, 0x61, 0x74, 0x8d, 0x91, 0x8b, 0x27, 0x18, 0x24, 0xea, 0x0e, 0x31, 0x4b, 0xc8, 0x92, 0x8b,
0x2d, 0x38, 0xb5, 0x28, 0x33, 0xb5, 0x58, 0x48, 0x54, 0x0f, 0xa2, 0x5f, 0x0f, 0xc2, 0x0f, 0x4a,
0x2d, 0x2c, 0x4d, 0x2d, 0x2e, 0x91, 0x12, 0x43, 0x17, 0x2e, 0x2e, 0xc8, 0xcf, 0x2b, 0x4e, 0x35,
0x60, 0x14, 0x72, 0xe6, 0xe2, 0xf2, 0x49, 0x4c, 0x4a, 0xcd, 0xf1, 0x4b, 0xcc, 0x4d, 0x2d, 0x16,
0x92, 0x84, 0xa9, 0x43, 0x88, 0xc1, 0x8c, 0x90, 0xc2, 0x26, 0x05, 0x31, 0x46, 0xc8, 0x8d, 0x8b,
0x1b, 0x2c, 0x1a, 0x96, 0x98, 0x53, 0x9a, 0x5a, 0x2c, 0x84, 0xaa, 0x14, 0x22, 0x08, 0x33, 0x46,
0x1a, 0xab, 0x1c, 0xc4, 0x1c, 0x27, 0x97, 0x0b, 0x0f, 0xe5, 0x18, 0x6e, 0x3c, 0x94, 0x63, 0xf8,
0xf0, 0x50, 0x8e, 0xb1, 0xe1, 0x91, 0x1c, 0xe3, 0x8a, 0x47, 0x72, 0x8c, 0x27, 0x1e, 0xc9, 0x31,
0x5e, 0x78, 0x24, 0xc7, 0xf8, 0xe0, 0x91, 0x1c, 0xe3, 0x8b, 0x47, 0x72, 0x0c, 0x1f, 0x1e, 0xc9,
0x31, 0x4e, 0x78, 0x2c, 0xc7, 0x70, 0xe1, 0xb1, 0x1c, 0xc3, 0x8d, 0xc7, 0x72, 0x0c, 0x51, 0x7c,
0xa8, 0x01, 0x9e, 0xc4, 0x06, 0x0e, 0x25, 0x63, 0x40, 0x00, 0x00, 0x00, 0xff, 0xff, 0x5e, 0xed,
0xb4, 0x59, 0x9a, 0x01, 0x00, 0x00,
}
// Reference imports to suppress errors if they are not otherwise used.
var _ context.Context
var _ grpc.ClientConn
// This is a compile-time assertion to ensure that this generated file
// is compatible with the grpc package it is being compiled against.
const _ = grpc.SupportPackageIsVersion4
// StoreGatewayClient is the client API for StoreGateway service.
//
// For semantics around ctx use and closing/ending streaming RPCs, please refer to https://godoc.org/google.golang.org/grpc#ClientConn.NewStream.
type StoreGatewayClient interface {
// Series streams each Series for given label matchers and time range.
//
// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more data will
// be sent for previous one.
//
// Series are sorted.
Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (StoreGateway_SeriesClient, error)
// LabelNames returns all label names that is available.
LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error)
// LabelValues returns all label values for given label name.
LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error)
}
type storeGatewayClient struct {
cc *grpc.ClientConn
}
func NewStoreGatewayClient(cc *grpc.ClientConn) StoreGatewayClient {
return &storeGatewayClient{cc}
}
func (c *storeGatewayClient) Series(ctx context.Context, in *storepb.SeriesRequest, opts ...grpc.CallOption) (StoreGateway_SeriesClient, error) {
stream, err := c.cc.NewStream(ctx, &_StoreGateway_serviceDesc.Streams[0], "/gatewaypb.StoreGateway/Series", opts...)
if err != nil {
return nil, err
}
x := &storeGatewaySeriesClient{stream}
if err := x.ClientStream.SendMsg(in); err != nil {
return nil, err
}
if err := x.ClientStream.CloseSend(); err != nil {
return nil, err
}
return x, nil
}
type StoreGateway_SeriesClient interface {
Recv() (*storepb.SeriesResponse, error)
grpc.ClientStream
}
type storeGatewaySeriesClient struct {
grpc.ClientStream
}
func (x *storeGatewaySeriesClient) Recv() (*storepb.SeriesResponse, error) {
m := new(storepb.SeriesResponse)
if err := x.ClientStream.RecvMsg(m); err != nil {
return nil, err
}
return m, nil
}
func (c *storeGatewayClient) LabelNames(ctx context.Context, in *storepb.LabelNamesRequest, opts ...grpc.CallOption) (*storepb.LabelNamesResponse, error) {
out := new(storepb.LabelNamesResponse)
err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelNames", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
func (c *storeGatewayClient) LabelValues(ctx context.Context, in *storepb.LabelValuesRequest, opts ...grpc.CallOption) (*storepb.LabelValuesResponse, error) {
out := new(storepb.LabelValuesResponse)
err := c.cc.Invoke(ctx, "/gatewaypb.StoreGateway/LabelValues", in, out, opts...)
if err != nil {
return nil, err
}
return out, nil
}
// StoreGatewayServer is the server API for StoreGateway service.
type StoreGatewayServer interface {
// Series streams each Series for given label matchers and time range.
//
// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more data will
// be sent for previous one.
//
// Series are sorted.
Series(*storepb.SeriesRequest, StoreGateway_SeriesServer) error
// LabelNames returns all label names that is available.
LabelNames(context.Context, *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error)
// LabelValues returns all label values for given label name.
LabelValues(context.Context, *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error)
}
// UnimplementedStoreGatewayServer can be embedded to have forward compatible implementations.
type UnimplementedStoreGatewayServer struct {
}
func (*UnimplementedStoreGatewayServer) Series(req *storepb.SeriesRequest, srv StoreGateway_SeriesServer) error {
return status.Errorf(codes.Unimplemented, "method Series not implemented")
}
func (*UnimplementedStoreGatewayServer) LabelNames(ctx context.Context, req *storepb.LabelNamesRequest) (*storepb.LabelNamesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method LabelNames not implemented")
}
func (*UnimplementedStoreGatewayServer) LabelValues(ctx context.Context, req *storepb.LabelValuesRequest) (*storepb.LabelValuesResponse, error) {
return nil, status.Errorf(codes.Unimplemented, "method LabelValues not implemented")
}
func RegisterStoreGatewayServer(s *grpc.Server, srv StoreGatewayServer) {
s.RegisterService(&_StoreGateway_serviceDesc, srv)
}
func _StoreGateway_Series_Handler(srv interface{}, stream grpc.ServerStream) error {
m := new(storepb.SeriesRequest)
if err := stream.RecvMsg(m); err != nil {
return err
}
return srv.(StoreGatewayServer).Series(m, &storeGatewaySeriesServer{stream})
}
type StoreGateway_SeriesServer interface {
Send(*storepb.SeriesResponse) error
grpc.ServerStream
}
type storeGatewaySeriesServer struct {
grpc.ServerStream
}
func (x *storeGatewaySeriesServer) Send(m *storepb.SeriesResponse) error {
return x.ServerStream.SendMsg(m)
}
func _StoreGateway_LabelNames_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(storepb.LabelNamesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreGatewayServer).LabelNames(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/gatewaypb.StoreGateway/LabelNames",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreGatewayServer).LabelNames(ctx, req.(*storepb.LabelNamesRequest))
}
return interceptor(ctx, in, info, handler)
}
func _StoreGateway_LabelValues_Handler(srv interface{}, ctx context.Context, dec func(interface{}) error, interceptor grpc.UnaryServerInterceptor) (interface{}, error) {
in := new(storepb.LabelValuesRequest)
if err := dec(in); err != nil {
return nil, err
}
if interceptor == nil {
return srv.(StoreGatewayServer).LabelValues(ctx, in)
}
info := &grpc.UnaryServerInfo{
Server: srv,
FullMethod: "/gatewaypb.StoreGateway/LabelValues",
}
handler := func(ctx context.Context, req interface{}) (interface{}, error) {
return srv.(StoreGatewayServer).LabelValues(ctx, req.(*storepb.LabelValuesRequest))
}
return interceptor(ctx, in, info, handler)
}
var _StoreGateway_serviceDesc = grpc.ServiceDesc{
ServiceName: "gatewaypb.StoreGateway",
HandlerType: (*StoreGatewayServer)(nil),
Methods: []grpc.MethodDesc{
{
MethodName: "LabelNames",
Handler: _StoreGateway_LabelNames_Handler,
},
{
MethodName: "LabelValues",
Handler: _StoreGateway_LabelValues_Handler,
},
},
Streams: []grpc.StreamDesc{
{
StreamName: "Series",
Handler: _StoreGateway_Series_Handler,
ServerStreams: true,
},
},
Metadata: "pkg/storegateway/storegatewaypb/gateway.proto",
}

@ -1,23 +0,0 @@
syntax = "proto3";
package gatewaypb;
import "github.com/thanos-io/thanos/pkg/store/storepb/rpc.proto";
option go_package = "storegatewaypb";
service StoreGateway {
// Series streams each Series for given label matchers and time range.
//
// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more data will
// be sent for previous one.
//
// Series are sorted.
rpc Series(thanos.SeriesRequest) returns (stream thanos.SeriesResponse);
// LabelNames returns all label names that is available.
rpc LabelNames(thanos.LabelNamesRequest) returns (thanos.LabelNamesResponse);
// LabelValues returns all label values for given label name.
rpc LabelValues(thanos.LabelValuesRequest) returns (thanos.LabelValuesResponse);
}

@ -1,372 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
// Package containing proto and JSON serializable Labels and ZLabels (no copy) structs used to
// identify series. This package expose no-copy converters to Prometheus labels.Labels.
package labelpb
import (
"encoding/json"
"fmt"
"io"
"sort"
"strings"
"unsafe"
"github.com/cespare/xxhash/v2"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
)
var sep = []byte{'\xff'}
func noAllocString(buf []byte) string {
return *(*string)(unsafe.Pointer(&buf))
}
func noAllocBytes(buf string) []byte {
return *(*[]byte)(unsafe.Pointer(&buf))
}
// ZLabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner.
// It reuses the same memory. Caller should abort using passed labels.Labels.
func ZLabelsFromPromLabels(lset labels.Labels) []ZLabel {
return *(*[]ZLabel)(unsafe.Pointer(&lset))
}
// ZLabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner.
// It reuses the same memory. Caller should abort using passed []ZLabel.
// NOTE: Use with care. ZLabels holds memory from the whole protobuf unmarshal, so the returned
// Prometheus Labels will hold this memory as well.
func ZLabelsToPromLabels(lset []ZLabel) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
}
// ReAllocZLabelsStrings re-allocates all underlying bytes for string, detaching it from bigger memory pool.
func ReAllocZLabelsStrings(lset *[]ZLabel) {
for j, l := range *lset {
// NOTE: This trick converts from string to byte without copy, but copy when creating string.
(*lset)[j].Name = string(noAllocBytes(l.Name))
(*lset)[j].Value = string(noAllocBytes(l.Value))
}
}
// LabelsFromPromLabels converts Prometheus labels to slice of labelpb.ZLabel in type unsafe manner.
// It reuses the same memory. Caller should abort using passed labels.Labels.
func LabelsFromPromLabels(lset labels.Labels) []Label {
return *(*[]Label)(unsafe.Pointer(&lset))
}
// LabelsToPromLabels convert slice of labelpb.ZLabel to Prometheus labels in type unsafe manner.
// It reuses the same memory. Caller should abort using passed []Label.
func LabelsToPromLabels(lset []Label) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&lset))
}
// ZLabelSetsToPromLabelSets converts slice of labelpb.ZLabelSet to slice of Prometheus labels.
func ZLabelSetsToPromLabelSets(lss ...ZLabelSet) []labels.Labels {
res := make([]labels.Labels, 0, len(lss))
for _, ls := range lss {
res = append(res, ls.PromLabels())
}
return res
}
// ZLabel is a Label (also easily transformable to Prometheus labels.Labels) that can be unmarshalled from protobuf
// reusing the same memory address for string bytes.
// NOTE: While unmarshalling it uses exactly same bytes that were allocated for protobuf. This mean that *whole* protobuf
// bytes will be not GC-ed as long as ZLabels are referenced somewhere. Use it carefully, only for short living
// protobuf message processing.
type ZLabel Label
func (m *ZLabel) MarshalTo(data []byte) (int, error) {
f := Label(*m)
return f.MarshalTo(data)
}
func (m *ZLabel) MarshalToSizedBuffer(data []byte) (int, error) {
f := Label(*m)
return f.MarshalToSizedBuffer(data)
}
// Unmarshal unmarshalls gRPC protobuf into ZLabel struct. ZLabel string is directly using bytes passed in `data`.
// To use it add (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel" to proto field tag.
// NOTE: This exists in internal Google protobuf implementation, but not in open source one: https://news.ycombinator.com/item?id=23588882
func (m *ZLabel) Unmarshal(data []byte) error {
l := len(data)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ZLabel: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ZLabel: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = noAllocString(data[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := data[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = noAllocString(data[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(data[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ZLabel) UnmarshalJSON(entry []byte) error {
f := Label(*m)
if err := json.Unmarshal(entry, &f); err != nil {
return errors.Wrapf(err, "labels: label field unmarshal: %v", string(entry))
}
*m = ZLabel(f)
return nil
}
func (m *ZLabel) Marshal() ([]byte, error) {
f := Label(*m)
return f.Marshal()
}
func (m *ZLabel) MarshalJSON() ([]byte, error) {
return json.Marshal(Label(*m))
}
// Size implements proto.Sizer.
func (m *ZLabel) Size() (n int) {
f := Label(*m)
return f.Size()
}
// Equal implements proto.Equaler.
func (m *ZLabel) Equal(other ZLabel) bool {
return m.Name == other.Name && m.Value == other.Value
}
// Compare implements proto.Comparer.
func (m *ZLabel) Compare(other ZLabel) int {
if c := strings.Compare(m.Name, other.Name); c != 0 {
return c
}
return strings.Compare(m.Value, other.Value)
}
// ExtendSortedLabels extend given labels by extend in labels format.
// The type conversion is done safely, which means we don't modify extend labels underlying array.
//
// In case of existing labels already present in given label set, it will be overwritten by external one.
// NOTE: Labels and extend has to be sorted.
func ExtendSortedLabels(lset, extend labels.Labels) labels.Labels {
ret := make(labels.Labels, 0, len(lset)+len(extend))
// Inject external labels in place.
for len(lset) > 0 && len(extend) > 0 {
d := strings.Compare(lset[0].Name, extend[0].Name)
if d == 0 {
// Duplicate, prefer external labels.
// NOTE(fabxc): Maybe move it to a prefixed version to still ensure uniqueness of series?
ret = append(ret, extend[0])
lset, extend = lset[1:], extend[1:]
} else if d < 0 {
ret = append(ret, lset[0])
lset = lset[1:]
} else if d > 0 {
ret = append(ret, extend[0])
extend = extend[1:]
}
}
// Append all remaining elements.
ret = append(ret, lset...)
ret = append(ret, extend...)
return ret
}
func PromLabelSetsToString(lsets []labels.Labels) string {
s := []string{}
for _, ls := range lsets {
s = append(s, ls.String())
}
sort.Strings(s)
return strings.Join(s, ",")
}
func (m *ZLabelSet) UnmarshalJSON(entry []byte) error {
lbls := labels.Labels{}
if err := lbls.UnmarshalJSON(entry); err != nil {
return errors.Wrapf(err, "labels: labels field unmarshal: %v", string(entry))
}
sort.Sort(lbls)
m.Labels = ZLabelsFromPromLabels(lbls)
return nil
}
func (m *ZLabelSet) MarshalJSON() ([]byte, error) {
return m.PromLabels().MarshalJSON()
}
// PromLabels return Prometheus labels.Labels without extra allocation.
func (m *ZLabelSet) PromLabels() labels.Labels {
return ZLabelsToPromLabels(m.Labels)
}
// DeepCopy copies labels and each label's string to separate bytes.
func DeepCopy(lbls []ZLabel) []ZLabel {
ret := make([]ZLabel, len(lbls))
for i := range lbls {
ret[i].Name = string(noAllocBytes(lbls[i].Name))
ret[i].Value = string(noAllocBytes(lbls[i].Value))
}
return ret
}
// HashWithPrefix returns a hash for the given prefix and labels.
func HashWithPrefix(prefix string, lbls []ZLabel) uint64 {
// Use xxhash.Sum64(b) for fast path as it's faster.
b := make([]byte, 0, 1024)
b = append(b, prefix...)
b = append(b, sep[0])
for i, v := range lbls {
if len(b)+len(v.Name)+len(v.Value)+2 >= cap(b) {
// If labels entry is 1KB allocate do not allocate whole entry.
h := xxhash.New()
_, _ = h.Write(b)
for _, v := range lbls[i:] {
_, _ = h.WriteString(v.Name)
_, _ = h.Write(sep)
_, _ = h.WriteString(v.Value)
_, _ = h.Write(sep)
}
return h.Sum64()
}
b = append(b, v.Name...)
b = append(b, sep[0])
b = append(b, v.Value...)
b = append(b, sep[0])
}
return xxhash.Sum64(b)
}
// ZLabelSets is a sortable list of ZLabelSet. It assumes the label pairs in each ZLabelSet element are already sorted.
type ZLabelSets []ZLabelSet
func (z ZLabelSets) Len() int { return len(z) }
func (z ZLabelSets) Swap(i, j int) { z[i], z[j] = z[j], z[i] }
func (z ZLabelSets) Less(i, j int) bool {
l := 0
r := 0
var result int
lenI, lenJ := len(z[i].Labels), len(z[j].Labels)
for l < lenI && r < lenJ {
result = z[i].Labels[l].Compare(z[j].Labels[r])
if result == 0 {
l++
r++
continue
}
return result < 0
}
return l == lenI
}

@ -1,705 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: store/labelpb/types.proto
package labelpb
import (
fmt "fmt"
io "io"
math "math"
math_bits "math/bits"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type Label struct {
Name string `protobuf:"bytes,1,opt,name=name,proto3" json:"name,omitempty"`
Value string `protobuf:"bytes,2,opt,name=value,proto3" json:"value,omitempty"`
}
func (m *Label) Reset() { *m = Label{} }
func (m *Label) String() string { return proto.CompactTextString(m) }
func (*Label) ProtoMessage() {}
func (*Label) Descriptor() ([]byte, []int) {
return fileDescriptor_cdcc9e7dae4870e8, []int{0}
}
func (m *Label) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *Label) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_Label.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *Label) XXX_Merge(src proto.Message) {
xxx_messageInfo_Label.Merge(m, src)
}
func (m *Label) XXX_Size() int {
return m.Size()
}
func (m *Label) XXX_DiscardUnknown() {
xxx_messageInfo_Label.DiscardUnknown(m)
}
var xxx_messageInfo_Label proto.InternalMessageInfo
type LabelSet struct {
Labels []Label `protobuf:"bytes,1,rep,name=labels,proto3" json:"labels"`
}
func (m *LabelSet) Reset() { *m = LabelSet{} }
func (m *LabelSet) String() string { return proto.CompactTextString(m) }
func (*LabelSet) ProtoMessage() {}
func (*LabelSet) Descriptor() ([]byte, []int) {
return fileDescriptor_cdcc9e7dae4870e8, []int{1}
}
func (m *LabelSet) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *LabelSet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_LabelSet.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *LabelSet) XXX_Merge(src proto.Message) {
xxx_messageInfo_LabelSet.Merge(m, src)
}
func (m *LabelSet) XXX_Size() int {
return m.Size()
}
func (m *LabelSet) XXX_DiscardUnknown() {
xxx_messageInfo_LabelSet.DiscardUnknown(m)
}
var xxx_messageInfo_LabelSet proto.InternalMessageInfo
type ZLabelSet struct {
Labels []ZLabel `protobuf:"bytes,1,rep,name=labels,proto3,customtype=ZLabel" json:"labels"`
}
func (m *ZLabelSet) Reset() { *m = ZLabelSet{} }
func (m *ZLabelSet) String() string { return proto.CompactTextString(m) }
func (*ZLabelSet) ProtoMessage() {}
func (*ZLabelSet) Descriptor() ([]byte, []int) {
return fileDescriptor_cdcc9e7dae4870e8, []int{2}
}
func (m *ZLabelSet) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ZLabelSet) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ZLabelSet.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ZLabelSet) XXX_Merge(src proto.Message) {
xxx_messageInfo_ZLabelSet.Merge(m, src)
}
func (m *ZLabelSet) XXX_Size() int {
return m.Size()
}
func (m *ZLabelSet) XXX_DiscardUnknown() {
xxx_messageInfo_ZLabelSet.DiscardUnknown(m)
}
var xxx_messageInfo_ZLabelSet proto.InternalMessageInfo
func init() {
proto.RegisterType((*Label)(nil), "thanos.Label")
proto.RegisterType((*LabelSet)(nil), "thanos.LabelSet")
proto.RegisterType((*ZLabelSet)(nil), "thanos.ZLabelSet")
}
func init() { proto.RegisterFile("store/labelpb/types.proto", fileDescriptor_cdcc9e7dae4870e8) }
var fileDescriptor_cdcc9e7dae4870e8 = []byte{
// 212 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x92, 0x2c, 0x2e, 0xc9, 0x2f,
0x4a, 0xd5, 0xcf, 0x49, 0x4c, 0x4a, 0xcd, 0x29, 0x48, 0xd2, 0x2f, 0xa9, 0x2c, 0x48, 0x2d, 0xd6,
0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0x62, 0x2b, 0xc9, 0x48, 0xcc, 0xcb, 0x2f, 0x96, 0x12, 0x49,
0xcf, 0x4f, 0xcf, 0x07, 0x0b, 0xe9, 0x83, 0x58, 0x10, 0x59, 0x25, 0x43, 0x2e, 0x56, 0x1f, 0x90,
0x26, 0x21, 0x21, 0x2e, 0x96, 0xbc, 0xc4, 0xdc, 0x54, 0x09, 0x46, 0x05, 0x46, 0x0d, 0xce, 0x20,
0x30, 0x5b, 0x48, 0x84, 0x8b, 0xb5, 0x2c, 0x31, 0xa7, 0x34, 0x55, 0x82, 0x09, 0x2c, 0x08, 0xe1,
0x28, 0x99, 0x73, 0x71, 0x80, 0xb5, 0x04, 0xa7, 0x96, 0x08, 0x69, 0x73, 0xb1, 0x81, 0xed, 0x2c,
0x96, 0x60, 0x54, 0x60, 0xd6, 0xe0, 0x36, 0xe2, 0xd5, 0x83, 0xd8, 0xa6, 0x07, 0x56, 0xe1, 0xc4,
0x72, 0xe2, 0x9e, 0x3c, 0x43, 0x10, 0x54, 0x89, 0x92, 0x13, 0x17, 0x67, 0x14, 0x5c, 0xa7, 0x29,
0x7e, 0x9d, 0x7c, 0x20, 0x9d, 0xb7, 0xee, 0xc9, 0xb3, 0x41, 0x74, 0xc0, 0xcc, 0x70, 0x52, 0x3d,
0xf1, 0x50, 0x8e, 0xe1, 0xc4, 0x23, 0x39, 0xc6, 0x0b, 0x8f, 0xe4, 0x18, 0x1f, 0x3c, 0x92, 0x63,
0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xd8, 0xa1,
0x01, 0x90, 0xc4, 0x06, 0xf6, 0x9d, 0x31, 0x20, 0x00, 0x00, 0xff, 0xff, 0xd0, 0x80, 0xe8, 0x16,
0x18, 0x01, 0x00, 0x00,
}
func (m *Label) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *Label) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *Label) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Value) > 0 {
i -= len(m.Value)
copy(dAtA[i:], m.Value)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Value)))
i--
dAtA[i] = 0x12
}
if len(m.Name) > 0 {
i -= len(m.Name)
copy(dAtA[i:], m.Name)
i = encodeVarintTypes(dAtA, i, uint64(len(m.Name)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func (m *LabelSet) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *LabelSet) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *LabelSet) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size, err := m.Labels[iNdEx].MarshalToSizedBuffer(dAtA[:i])
if err != nil {
return 0, err
}
i -= size
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func (m *ZLabelSet) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ZLabelSet) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ZLabelSet) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if len(m.Labels) > 0 {
for iNdEx := len(m.Labels) - 1; iNdEx >= 0; iNdEx-- {
{
size := m.Labels[iNdEx].Size()
i -= size
if _, err := m.Labels[iNdEx].MarshalTo(dAtA[i:]); err != nil {
return 0, err
}
i = encodeVarintTypes(dAtA, i, uint64(size))
}
i--
dAtA[i] = 0xa
}
}
return len(dAtA) - i, nil
}
func encodeVarintTypes(dAtA []byte, offset int, v uint64) int {
offset -= sovTypes(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *Label) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Name)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
l = len(m.Value)
if l > 0 {
n += 1 + l + sovTypes(uint64(l))
}
return n
}
func (m *LabelSet) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
return n
}
func (m *ZLabelSet) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
if len(m.Labels) > 0 {
for _, e := range m.Labels {
l = e.Size()
n += 1 + l + sovTypes(uint64(l))
}
}
return n
}
func sovTypes(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozTypes(x uint64) (n int) {
return sovTypes(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (m *Label) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: Label: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: Label: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Name", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Name = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Value", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Value = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *LabelSet) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: LabelSet: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: LabelSet: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = append(m.Labels, Label{})
if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func (m *ZLabelSet) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ZLabelSet: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ZLabelSet: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType)
}
var msglen int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowTypes
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
msglen |= int(b&0x7F) << shift
if b < 0x80 {
break
}
}
if msglen < 0 {
return ErrInvalidLengthTypes
}
postIndex := iNdEx + msglen
if postIndex < 0 {
return ErrInvalidLengthTypes
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Labels = append(m.Labels, ZLabel{})
if err := m.Labels[len(m.Labels)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil {
return err
}
iNdEx = postIndex
default:
iNdEx = preIndex
skippy, err := skipTypes(dAtA[iNdEx:])
if err != nil {
return err
}
if (skippy < 0) || (iNdEx+skippy) < 0 {
return ErrInvalidLengthTypes
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipTypes(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
depth := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
case 1:
iNdEx += 8
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowTypes
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthTypes
}
iNdEx += length
case 3:
depth++
case 4:
if depth == 0 {
return 0, ErrUnexpectedEndOfGroupTypes
}
depth--
case 5:
iNdEx += 4
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
if iNdEx < 0 {
return 0, ErrInvalidLengthTypes
}
if depth == 0 {
return iNdEx, nil
}
}
return 0, io.ErrUnexpectedEOF
}
var (
ErrInvalidLengthTypes = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowTypes = fmt.Errorf("proto: integer overflow")
ErrUnexpectedEndOfGroupTypes = fmt.Errorf("proto: unexpected end of group")
)

@ -1,33 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
syntax = "proto3";
package thanos;
option go_package = "labelpb";
import "gogoproto/gogo.proto";
option (gogoproto.sizer_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
// Do not generate XXX fields to reduce memory footprint and opening a door
// for zero-copy casts to/from prometheus data types.
option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;
message Label {
string name = 1;
string value = 2;
}
message LabelSet {
repeated Label labels = 1 [(gogoproto.nullable) = false];
}
message ZLabelSet {
repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "ZLabel"];
}

@ -1,519 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package storepb
import (
"bytes"
"encoding/binary"
"fmt"
"sort"
"strconv"
"strings"
"github.com/gogo/protobuf/types"
"github.com/pkg/errors"
"github.com/prometheus/prometheus/model/labels"
"github.com/thanos-io/thanos/pkg/store/labelpb"
)
var PartialResponseStrategyValues = func() []string {
var s []string
for k := range PartialResponseStrategy_value {
s = append(s, k)
}
sort.Strings(s)
return s
}()
func NewWarnSeriesResponse(err error) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Warning{
Warning: err.Error(),
},
}
}
func NewSeriesResponse(series *Series) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Series{
Series: series,
},
}
}
func NewHintsSeriesResponse(hints *types.Any) *SeriesResponse {
return &SeriesResponse{
Result: &SeriesResponse_Hints{
Hints: hints,
},
}
}
type emptySeriesSet struct{}
func (emptySeriesSet) Next() bool { return false }
func (emptySeriesSet) At() (labels.Labels, []AggrChunk) { return nil, nil }
func (emptySeriesSet) Err() error { return nil }
// EmptySeriesSet returns a new series set that contains no series.
func EmptySeriesSet() SeriesSet {
return emptySeriesSet{}
}
// MergeSeriesSets takes all series sets and returns as a union single series set.
// It assumes series are sorted by labels within single SeriesSet, similar to remote read guarantees.
// However, they can be partial: in such case, if the single SeriesSet returns the same series within many iterations,
// MergeSeriesSets will merge those into one.
//
// It also assumes in a "best effort" way that chunks are sorted by min time. It's done as an optimization only, so if input
// series' chunks are NOT sorted, the only consequence is that the duplicates might be not correctly removed. This is double checked
// which on just-before PromQL level as well, so the only consequence is increased network bandwidth.
// If all chunks were sorted, MergeSeriesSet ALSO returns sorted chunks by min time.
//
// Chunks within the same series can also overlap (within all SeriesSet
// as well as single SeriesSet alone). If the chunk ranges overlap, the *exact* chunk duplicates will be removed
// (except one), and any other overlaps will be appended into on chunks slice.
func MergeSeriesSets(all ...SeriesSet) SeriesSet {
switch len(all) {
case 0:
return emptySeriesSet{}
case 1:
return newUniqueSeriesSet(all[0])
}
h := len(all) / 2
return newMergedSeriesSet(
MergeSeriesSets(all[:h]...),
MergeSeriesSets(all[h:]...),
)
}
// SeriesSet is a set of series and their corresponding chunks.
// The set is sorted by the label sets. Chunks may be overlapping or expected of order.
type SeriesSet interface {
Next() bool
At() (labels.Labels, []AggrChunk)
Err() error
}
// mergedSeriesSet takes two series sets as a single series set.
type mergedSeriesSet struct {
a, b SeriesSet
lset labels.Labels
chunks []AggrChunk
adone, bdone bool
}
func newMergedSeriesSet(a, b SeriesSet) *mergedSeriesSet {
s := &mergedSeriesSet{a: a, b: b}
// Initialize first elements of both sets as Next() needs
// one element look-ahead.
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return s
}
func (s *mergedSeriesSet) At() (labels.Labels, []AggrChunk) {
return s.lset, s.chunks
}
func (s *mergedSeriesSet) Err() error {
if s.a.Err() != nil {
return s.a.Err()
}
return s.b.Err()
}
func (s *mergedSeriesSet) compare() int {
if s.adone {
return 1
}
if s.bdone {
return -1
}
lsetA, _ := s.a.At()
lsetB, _ := s.b.At()
return labels.Compare(lsetA, lsetB)
}
func (s *mergedSeriesSet) Next() bool {
if s.adone && s.bdone || s.Err() != nil {
return false
}
d := s.compare()
if d > 0 {
s.lset, s.chunks = s.b.At()
s.bdone = !s.b.Next()
return true
}
if d < 0 {
s.lset, s.chunks = s.a.At()
s.adone = !s.a.Next()
return true
}
// Both a and b contains the same series. Go through all chunks, remove duplicates and concatenate chunks from both
// series sets. We best effortly assume chunks are sorted by min time. If not, we will not detect all deduplicate which will
// be account on select layer anyway. We do it still for early optimization.
lset, chksA := s.a.At()
_, chksB := s.b.At()
s.lset = lset
// Slice reuse is not generally safe with nested merge iterators.
// We err on the safe side an create a new slice.
s.chunks = make([]AggrChunk, 0, len(chksA)+len(chksB))
b := 0
Outer:
for a := range chksA {
for {
if b >= len(chksB) {
// No more b chunks.
s.chunks = append(s.chunks, chksA[a:]...)
break Outer
}
cmp := chksA[a].Compare(chksB[b])
if cmp > 0 {
s.chunks = append(s.chunks, chksA[a])
break
}
if cmp < 0 {
s.chunks = append(s.chunks, chksB[b])
b++
continue
}
// Exact duplicated chunks, discard one from b.
b++
}
}
if b < len(chksB) {
s.chunks = append(s.chunks, chksB[b:]...)
}
s.adone = !s.a.Next()
s.bdone = !s.b.Next()
return true
}
// uniqueSeriesSet takes one series set and ensures each iteration contains single, full series.
type uniqueSeriesSet struct {
SeriesSet
done bool
peek *Series
lset labels.Labels
chunks []AggrChunk
}
func newUniqueSeriesSet(wrapped SeriesSet) *uniqueSeriesSet {
return &uniqueSeriesSet{SeriesSet: wrapped}
}
func (s *uniqueSeriesSet) At() (labels.Labels, []AggrChunk) {
return s.lset, s.chunks
}
func (s *uniqueSeriesSet) Next() bool {
if s.Err() != nil {
return false
}
for !s.done {
if s.done = !s.SeriesSet.Next(); s.done {
break
}
lset, chks := s.SeriesSet.At()
if s.peek == nil {
s.peek = &Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chks}
continue
}
if labels.Compare(lset, s.peek.PromLabels()) != 0 {
s.lset, s.chunks = s.peek.PromLabels(), s.peek.Chunks
s.peek = &Series{Labels: labelpb.ZLabelsFromPromLabels(lset), Chunks: chks}
return true
}
// We assume non-overlapping, sorted chunks. This is best effort only, if it's otherwise it
// will just be duplicated, but well handled by StoreAPI consumers.
s.peek.Chunks = append(s.peek.Chunks, chks...)
}
if s.peek == nil {
return false
}
s.lset, s.chunks = s.peek.PromLabels(), s.peek.Chunks
s.peek = nil
return true
}
// Compare returns positive 1 if chunk is smaller -1 if larger than b by min time, then max time.
// It returns 0 if chunks are exactly the same.
func (m AggrChunk) Compare(b AggrChunk) int {
if m.MinTime < b.MinTime {
return 1
}
if m.MinTime > b.MinTime {
return -1
}
// Same min time.
if m.MaxTime < b.MaxTime {
return 1
}
if m.MaxTime > b.MaxTime {
return -1
}
// We could use proto.Equal, but we need ordering as well.
for _, cmp := range []func() int{
func() int { return m.Raw.Compare(b.Raw) },
func() int { return m.Count.Compare(b.Count) },
func() int { return m.Sum.Compare(b.Sum) },
func() int { return m.Min.Compare(b.Min) },
func() int { return m.Max.Compare(b.Max) },
func() int { return m.Counter.Compare(b.Counter) },
} {
if c := cmp(); c == 0 {
continue
} else {
return c
}
}
return 0
}
// Compare returns positive 1 if chunk is smaller -1 if larger.
// It returns 0 if chunks are exactly the same.
func (m *Chunk) Compare(b *Chunk) int {
if m == nil && b == nil {
return 0
}
if b == nil {
return 1
}
if m == nil {
return -1
}
if m.Type < b.Type {
return 1
}
if m.Type > b.Type {
return -1
}
return bytes.Compare(m.Data, b.Data)
}
func (x *PartialResponseStrategy) UnmarshalJSON(entry []byte) error {
fieldStr, err := strconv.Unquote(string(entry))
if err != nil {
return errors.Wrapf(err, fmt.Sprintf("failed to unqote %v, in order to unmarshal as 'partial_response_strategy'. Possible values are %s", string(entry), strings.Join(PartialResponseStrategyValues, ",")))
}
if fieldStr == "" {
// NOTE: For Rule default is abort as this is recommended for alerting.
*x = PartialResponseStrategy_ABORT
return nil
}
strategy, ok := PartialResponseStrategy_value[strings.ToUpper(fieldStr)]
if !ok {
return errors.Errorf(fmt.Sprintf("failed to unmarshal %v as 'partial_response_strategy'. Possible values are %s", string(entry), strings.Join(PartialResponseStrategyValues, ",")))
}
*x = PartialResponseStrategy(strategy)
return nil
}
func (x *PartialResponseStrategy) MarshalJSON() ([]byte, error) {
return []byte(strconv.Quote(x.String())), nil
}
// PromMatchersToMatchers returns proto matchers from Prometheus matchers.
// NOTE: It allocates memory.
func PromMatchersToMatchers(ms ...*labels.Matcher) ([]LabelMatcher, error) {
res := make([]LabelMatcher, 0, len(ms))
for _, m := range ms {
var t LabelMatcher_Type
switch m.Type {
case labels.MatchEqual:
t = LabelMatcher_EQ
case labels.MatchNotEqual:
t = LabelMatcher_NEQ
case labels.MatchRegexp:
t = LabelMatcher_RE
case labels.MatchNotRegexp:
t = LabelMatcher_NRE
default:
return nil, errors.Errorf("unrecognized matcher type %d", m.Type)
}
res = append(res, LabelMatcher{Type: t, Name: m.Name, Value: m.Value})
}
return res, nil
}
// MatchersToPromMatchers returns Prometheus matchers from proto matchers.
// NOTE: It allocates memory.
func MatchersToPromMatchers(ms ...LabelMatcher) ([]*labels.Matcher, error) {
res := make([]*labels.Matcher, 0, len(ms))
for _, m := range ms {
var t labels.MatchType
switch m.Type {
case LabelMatcher_EQ:
t = labels.MatchEqual
case LabelMatcher_NEQ:
t = labels.MatchNotEqual
case LabelMatcher_RE:
t = labels.MatchRegexp
case LabelMatcher_NRE:
t = labels.MatchNotRegexp
default:
return nil, errors.Errorf("unrecognized label matcher type %d", m.Type)
}
m, err := labels.NewMatcher(t, m.Name, m.Value)
if err != nil {
return nil, err
}
res = append(res, m)
}
return res, nil
}
// MatchersToString converts label matchers to string format.
// String should be parsable as a valid PromQL query metric selector.
func MatchersToString(ms ...LabelMatcher) string {
var res string
for i, m := range ms {
res += m.PromString()
if i < len(ms)-1 {
res += ", "
}
}
return "{" + res + "}"
}
// PromMatchersToString converts prometheus label matchers to string format.
// String should be parsable as a valid PromQL query metric selector.
func PromMatchersToString(ms ...*labels.Matcher) string {
var res string
for i, m := range ms {
res += m.String()
if i < len(ms)-1 {
res += ", "
}
}
return "{" + res + "}"
}
func (m *LabelMatcher) PromString() string {
return fmt.Sprintf("%s%s%q", m.Name, m.Type.PromString(), m.Value)
}
func (x LabelMatcher_Type) PromString() string {
typeToStr := map[LabelMatcher_Type]string{
LabelMatcher_EQ: "=",
LabelMatcher_NEQ: "!=",
LabelMatcher_RE: "=~",
LabelMatcher_NRE: "!~",
}
if str, ok := typeToStr[x]; ok {
return str
}
panic("unknown match type")
}
// PromLabels return Prometheus labels.Labels without extra allocation.
func (m *Series) PromLabels() labels.Labels {
return labelpb.ZLabelsToPromLabels(m.Labels)
}
// Deprecated.
// TODO(bwplotka): Remove this once Cortex dep will stop using it.
type Label = labelpb.ZLabel
// Deprecated.
// TODO(bwplotka): Remove this in next PR. Done to reduce diff only.
type LabelSet = labelpb.ZLabelSet
// Deprecated.
// TODO(bwplotka): Remove this once Cortex dep will stop using it.
func CompareLabels(a, b []Label) int {
return labels.Compare(labelpb.ZLabelsToPromLabels(a), labelpb.ZLabelsToPromLabels(b))
}
// Deprecated.
// TODO(bwplotka): Remove this once Cortex dep will stop using it.
func LabelsToPromLabelsUnsafe(lset []Label) labels.Labels {
return labelpb.ZLabelsToPromLabels(lset)
}
// XORNumSamples return number of samples. Returns 0 if it's not XOR chunk.
func (m *Chunk) XORNumSamples() int {
if m.Type == Chunk_XOR {
return int(binary.BigEndian.Uint16(m.Data))
}
return 0
}
type SeriesStatsCounter struct {
lastSeriesHash uint64
Series int
Chunks int
Samples int
}
func (c *SeriesStatsCounter) CountSeries(seriesLabels []labelpb.ZLabel) {
seriesHash := labelpb.HashWithPrefix("", seriesLabels)
if c.lastSeriesHash != 0 || seriesHash != c.lastSeriesHash {
c.lastSeriesHash = seriesHash
c.Series++
}
}
func (c *SeriesStatsCounter) Count(series *Series) {
c.CountSeries(series.Labels)
for _, chk := range series.Chunks {
if chk.Raw != nil {
c.Chunks++
c.Samples += chk.Raw.XORNumSamples()
}
if chk.Count != nil {
c.Chunks++
c.Samples += chk.Count.XORNumSamples()
}
if chk.Counter != nil {
c.Chunks++
c.Samples += chk.Counter.XORNumSamples()
}
if chk.Max != nil {
c.Chunks++
c.Samples += chk.Max.XORNumSamples()
}
if chk.Min != nil {
c.Chunks++
c.Samples += chk.Min.XORNumSamples()
}
if chk.Sum != nil {
c.Chunks++
c.Samples += chk.Sum.XORNumSamples()
}
}
}

@ -1,97 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
package storepb
import (
"context"
"io"
"google.golang.org/grpc"
)
func ServerAsClient(srv StoreServer, clientReceiveBufferSize int) StoreClient {
return &serverAsClient{srv: srv, clientReceiveBufferSize: clientReceiveBufferSize}
}
// serverAsClient allows to use servers as clients.
// NOTE: Passing CallOptions does not work - it would be needed to be implemented in grpc itself (before, after are private).
type serverAsClient struct {
clientReceiveBufferSize int
srv StoreServer
}
func (s serverAsClient) Info(ctx context.Context, in *InfoRequest, _ ...grpc.CallOption) (*InfoResponse, error) {
return s.srv.Info(ctx, in)
}
func (s serverAsClient) LabelNames(ctx context.Context, in *LabelNamesRequest, _ ...grpc.CallOption) (*LabelNamesResponse, error) {
return s.srv.LabelNames(ctx, in)
}
func (s serverAsClient) LabelValues(ctx context.Context, in *LabelValuesRequest, _ ...grpc.CallOption) (*LabelValuesResponse, error) {
return s.srv.LabelValues(ctx, in)
}
func (s serverAsClient) Series(ctx context.Context, in *SeriesRequest, _ ...grpc.CallOption) (Store_SeriesClient, error) {
inSrv := &inProcessStream{recv: make(chan *SeriesResponse, s.clientReceiveBufferSize), err: make(chan error)}
inSrv.ctx, inSrv.cancel = context.WithCancel(ctx)
go func() {
inSrv.err <- s.srv.Series(in, inSrv)
close(inSrv.err)
close(inSrv.recv)
}()
return &inProcessClientStream{srv: inSrv}, nil
}
// TODO(bwplotka): Add streaming attributes, metadata etc. Currently those are disconnected. Follow up on https://github.com/grpc/grpc-go/issues/906.
// TODO(bwplotka): Use this in proxy.go and receiver multi tenant proxy.
type inProcessStream struct {
grpc.ServerStream
ctx context.Context
cancel context.CancelFunc
recv chan *SeriesResponse
err chan error
}
func (s *inProcessStream) Context() context.Context { return s.ctx }
func (s *inProcessStream) Send(r *SeriesResponse) error {
select {
case <-s.ctx.Done():
return s.ctx.Err()
case s.recv <- r:
return nil
}
}
type inProcessClientStream struct {
grpc.ClientStream
srv *inProcessStream
}
func (s *inProcessClientStream) Context() context.Context { return s.srv.ctx }
func (s *inProcessClientStream) CloseSend() error {
s.srv.cancel()
return nil
}
func (s *inProcessClientStream) Recv() (*SeriesResponse, error) {
select {
case <-s.srv.ctx.Done():
return nil, s.srv.ctx.Err()
case r, ok := <-s.srv.recv:
if !ok {
return nil, io.EOF
}
return r, nil
case err := <-s.srv.err:
if err == nil {
return nil, io.EOF
}
return nil, err
}
}

@ -1,3 +0,0 @@
NOTE(bwplotka): This excerpt of "github.com/prometheus/prometheus/prompb" reconstructed to avoid XXX fields for unsafe conversion to safe allocs.
Controlled by `make proto`

File diff suppressed because it is too large Load Diff

@ -1,95 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
// Copyright 2016 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus_copy;
option go_package = "prompb";
import "store/storepb/prompb/types.proto";
import "gogoproto/gogo.proto";
// Do not generate XXX fields to reduce memory footprint and opening a door
// for zero-copy casts to/from prometheus data types.
option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;
message WriteRequest {
repeated TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
// Cortex uses this field to determine the source of the write request.
// We reserve it to avoid any compatibility issues.
reserved 2;
repeated MetricMetadata metadata = 3 [(gogoproto.nullable) = false];
}
// ReadRequest represents a remote read request.
message ReadRequest {
repeated Query queries = 1;
enum ResponseType {
// Server will return a single ReadResponse message with matched series that includes list of raw samples.
// It's recommended to use streamed response types instead.
//
// Response headers:
// Content-Type: "application/x-protobuf"
// Content-Encoding: "snappy"
SAMPLES = 0;
// Server will stream a delimited ChunkedReadResponse message that contains XOR encoded chunks for a single series.
// Each message is following varint size and fixed size bigendian uint32 for CRC32 Castagnoli checksum.
//
// Response headers:
// Content-Type: "application/x-streamed-protobuf; proto=prometheus.ChunkedReadResponse"
// Content-Encoding: ""
STREAMED_XOR_CHUNKS = 1;
}
// accepted_response_types allows negotiating the content type of the response.
//
// Response types are taken from the list in the FIFO order. If no response type in `accepted_response_types` is
// implemented by server, error is returned.
// For request that do not contain `accepted_response_types` field the SAMPLES response type will be used.
repeated ResponseType accepted_response_types = 2;
}
// ReadResponse is a response when response_type equals SAMPLES.
message ReadResponse {
// In same order as the request's queries.
repeated QueryResult results = 1;
}
message Query {
int64 start_timestamp_ms = 1;
int64 end_timestamp_ms = 2;
repeated LabelMatcher matchers = 3;
ReadHints hints = 4;
}
message QueryResult {
// Samples within a time series must be ordered by time.
repeated TimeSeries timeseries = 1;
}
// ChunkedReadResponse is a response when response_type equals STREAMED_XOR_CHUNKS.
// We strictly stream full series after series, optionally split by time. This means that a single frame can contain
// partition of the single series, but once a new series is started to be streamed it means that no more chunks will
// be sent for previous one.
message ChunkedReadResponse {
repeated ChunkedSeries chunked_series = 1;
// query_index represents an index of the query from ReadRequest.queries these chunks relates to.
int64 query_index = 2;
}

File diff suppressed because it is too large Load Diff

@ -1,118 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
// Copyright 2017 Prometheus Team
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
syntax = "proto3";
package prometheus_copy;
option go_package = "prompb";
import "gogoproto/gogo.proto";
import "store/labelpb/types.proto";
// Do not generate XXX fields to reduce memory footprint and opening a door
// for zero-copy casts to/from prometheus data types.
option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;
message MetricMetadata {
enum MetricType {
UNKNOWN = 0;
COUNTER = 1;
GAUGE = 2;
HISTOGRAM = 3;
GAUGEHISTOGRAM = 4;
SUMMARY = 5;
INFO = 6;
STATESET = 7;
}
// Represents the metric type, these match the set from Prometheus.
// Refer to pkg/textparse/interface.go for details.
MetricType type = 1;
string metric_family_name = 2;
string help = 4;
string unit = 5;
}
message Sample {
double value = 1;
int64 timestamp = 2;
}
message Exemplar {
// Optional, can be empty.
repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"];
double value = 2;
// timestamp is in ms format, see pkg/timestamp/timestamp.go for
// conversion from time.Time to Prometheus timestamp.
int64 timestamp = 3;
}
// TimeSeries represents samples and labels for a single time series.
message TimeSeries {
// Labels have to be sorted by label names and without duplicated label names.
// TODO(bwplotka): Don't use zero copy ZLabels, see https://github.com/thanos-io/thanos/pull/3279 for details.
repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"];
repeated Sample samples = 2 [(gogoproto.nullable) = false];
repeated Exemplar exemplars = 3 [(gogoproto.nullable) = false];
}
// Matcher specifies a rule, which can match or set of labels or not.
message LabelMatcher {
enum Type {
EQ = 0;
NEQ = 1;
RE = 2;
NRE = 3;
}
Type type = 1;
string name = 2;
string value = 3;
}
message ReadHints {
int64 step_ms = 1; // Query step size in milliseconds.
string func = 2; // String representation of surrounding function or aggregation.
int64 start_ms = 3; // Start time in milliseconds.
int64 end_ms = 4; // End time in milliseconds.
repeated string grouping = 5; // List of label names used in aggregation.
bool by = 6; // Indicate whether it is without or by.
int64 range_ms = 7; // Range vector selector range in milliseconds.
}
// Chunk represents a TSDB chunk.
// Time range [min, max] is inclusive.
message Chunk {
int64 min_time_ms = 1;
int64 max_time_ms = 2;
// We require this to match chunkenc.Encoding.
enum Encoding {
UNKNOWN = 0;
XOR = 1;
}
Encoding type = 3;
bytes data = 4;
}
// ChunkedSeries represents single, encoded time series.
message ChunkedSeries {
// Labels should be sorted.
repeated thanos.Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"];
// Chunks will be in start time order and may overlap.
repeated Chunk chunks = 2 [(gogoproto.nullable) = false];
}

File diff suppressed because it is too large Load Diff

@ -1,200 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
syntax = "proto3";
package thanos;
import "store/storepb/types.proto";
import "gogoproto/gogo.proto";
import "store/storepb/prompb/types.proto";
import "store/labelpb/types.proto";
import "google/protobuf/any.proto";
option go_package = "storepb";
option (gogoproto.sizer_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
// Do not generate XXX fields to reduce memory footprint and opening a door
// for zero-copy casts to/from prometheus data types.
option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;
/// Store represents API against instance that stores XOR encoded values with label set metadata (e.g Prometheus metrics).
service Store {
/// Info returns meta information about a store e.g labels that makes that store unique as well as time range that is
/// available.
rpc Info(InfoRequest) returns (InfoResponse);
/// Series streams each Series (Labels and chunk/downsampling chunk) for given label matchers and time range.
///
/// Series should strictly stream full series after series, optionally split by time. This means that a single frame can contain
/// partition of the single series, but once a new series is started to be streamed it means that no more data will
/// be sent for previous one.
/// Series has to be sorted.
///
/// There is no requirements on chunk sorting, however it is recommended to have chunk sorted by chunk min time.
/// This heavily optimizes the resource usage on Querier / Federated Queries.
rpc Series(SeriesRequest) returns (stream SeriesResponse);
/// LabelNames returns all label names constrained by the given matchers.
rpc LabelNames(LabelNamesRequest) returns (LabelNamesResponse);
/// LabelValues returns all label values for given label name.
rpc LabelValues(LabelValuesRequest) returns (LabelValuesResponse);
}
/// WriteableStore represents API against instance that stores XOR encoded values with label set metadata (e.g Prometheus metrics).
service WriteableStore {
// WriteRequest allows you to write metrics to this store via remote write
rpc RemoteWrite(WriteRequest) returns (WriteResponse) {}
}
message WriteResponse {
}
message WriteRequest {
repeated prometheus_copy.TimeSeries timeseries = 1 [(gogoproto.nullable) = false];
string tenant = 2;
int64 replica = 3;
}
message InfoRequest {}
enum StoreType {
UNKNOWN = 0;
QUERY = 1;
RULE = 2;
SIDECAR = 3;
STORE = 4;
RECEIVE = 5;
// DEBUG represents some debug StoreAPI components e.g. thanos tools store-api-serve.
DEBUG = 6;
}
message InfoResponse {
// Deprecated. Use label_sets instead.
repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"];
int64 min_time = 2;
int64 max_time = 3;
StoreType storeType = 4;
// label_sets is an unsorted list of `ZLabelSet`s.
repeated ZLabelSet label_sets = 5 [(gogoproto.nullable) = false];
}
message SeriesRequest {
int64 min_time = 1;
int64 max_time = 2;
repeated LabelMatcher matchers = 3 [(gogoproto.nullable) = false];
int64 max_resolution_window = 4;
repeated Aggr aggregates = 5;
// Deprecated. Use partial_response_strategy instead.
bool partial_response_disabled = 6;
// TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI.
PartialResponseStrategy partial_response_strategy = 7;
// skip_chunks controls whether sending chunks or not in series responses.
bool skip_chunks = 8;
// hints is an opaque data structure that can be used to carry additional information.
// The content of this field and whether it's supported depends on the
// implementation of a specific store.
google.protobuf.Any hints = 9;
// Query step size in milliseconds.
int64 step = 10;
// Range vector selector range in milliseconds.
int64 range = 11;
}
enum Aggr {
RAW = 0;
COUNT = 1;
SUM = 2;
MIN = 3;
MAX = 4;
COUNTER = 5;
}
message SeriesResponse {
oneof result {
/// series contains 1 response series. The series labels are sorted by name.
Series series = 1;
/// warning is considered an information piece in place of series for warning purposes.
/// It is used to warn store API user about suspicious cases or partial response (if enabled).
string warning = 2;
/// hints is an opaque data structure that can be used to carry additional information from
/// the store. The content of this field and whether it's supported depends on the
/// implementation of a specific store. It's also implementation specific if it's allowed that
/// multiple SeriesResponse frames contain hints for a single Series() request and how should they
/// be handled in such case (ie. merged vs keep the first/last one).
google.protobuf.Any hints = 3;
}
}
message LabelNamesRequest {
bool partial_response_disabled = 1;
// TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI.
PartialResponseStrategy partial_response_strategy = 2;
int64 start = 3;
int64 end = 4;
// hints is an opaque data structure that can be used to carry additional information.
// The content of this field and whether it's supported depends on the
// implementation of a specific store.
google.protobuf.Any hints = 5;
repeated LabelMatcher matchers = 6 [(gogoproto.nullable) = false];
}
message LabelNamesResponse {
repeated string names = 1;
repeated string warnings = 2;
/// hints is an opaque data structure that can be used to carry additional information from
/// the store. The content of this field and whether it's supported depends on the
/// implementation of a specific store.
google.protobuf.Any hints = 3;
}
message LabelValuesRequest {
string label = 1;
bool partial_response_disabled = 2;
// TODO(bwplotka): Move Thanos components to use strategy instead. Including QueryAPI.
PartialResponseStrategy partial_response_strategy = 3;
int64 start = 4;
int64 end = 5;
// hints is an opaque data structure that can be used to carry additional information.
// The content of this field and whether it's supported depends on the
// implementation of a specific store.
google.protobuf.Any hints = 6;
repeated LabelMatcher matchers = 7 [(gogoproto.nullable) = false];
}
message LabelValuesResponse {
repeated string values = 1;
repeated string warnings = 2;
/// hints is an opaque data structure that can be used to carry additional information from
/// the store. The content of this field and whether it's supported depends on the
/// implementation of a specific store.
google.protobuf.Any hints = 3;
}

File diff suppressed because it is too large Load Diff

@ -1,73 +0,0 @@
// Copyright (c) The Thanos Authors.
// Licensed under the Apache License 2.0.
syntax = "proto3";
package thanos;
option go_package = "storepb";
import "gogoproto/gogo.proto";
import "store/labelpb/types.proto";
option (gogoproto.sizer_all) = true;
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
option (gogoproto.goproto_getters_all) = false;
// Do not generate XXX fields to reduce memory footprint and opening a door
// for zero-copy casts to/from prometheus data types.
option (gogoproto.goproto_unkeyed_all) = false;
option (gogoproto.goproto_unrecognized_all) = false;
option (gogoproto.goproto_sizecache_all) = false;
message Chunk {
enum Encoding {
XOR = 0;
}
Encoding type = 1;
bytes data = 2;
}
message Series {
repeated Label labels = 1 [(gogoproto.nullable) = false, (gogoproto.customtype) = "github.com/thanos-io/thanos/pkg/store/labelpb.ZLabel"];
repeated AggrChunk chunks = 2 [(gogoproto.nullable) = false];
}
message AggrChunk {
int64 min_time = 1;
int64 max_time = 2;
Chunk raw = 3;
Chunk count = 4;
Chunk sum = 5;
Chunk min = 6;
Chunk max = 7;
Chunk counter = 8;
}
// Matcher specifies a rule, which can match or set of labels or not.
message LabelMatcher {
enum Type {
EQ = 0; // =
NEQ = 1; // !=
RE = 2; // =~
NRE = 3; // !~
}
Type type = 1;
string name = 2;
string value = 3;
}
/// PartialResponseStrategy controls partial response handling.
enum PartialResponseStrategy {
/// WARN strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying
/// storeAPI is temporarily not available) as warning which will not fail the whole query (still OK response).
/// Server should produce those as a warnings field in response.
WARN = 0;
/// ABORT strategy tells server to treat any error that will related to single StoreAPI (e.g missing chunk series because of underlying
/// storeAPI is temporarily not available) as the gRPC error that aborts the query.
///
/// This is especially useful for any rule/alert evaluations on top of StoreAPI which usually does not tolerate partial
/// errors.
ABORT = 1;
}

@ -986,9 +986,6 @@ github.com/thanos-io/thanos/pkg/objstore/gcs
github.com/thanos-io/thanos/pkg/objstore/s3
github.com/thanos-io/thanos/pkg/objstore/swift
github.com/thanos-io/thanos/pkg/runutil
github.com/thanos-io/thanos/pkg/store/labelpb
github.com/thanos-io/thanos/pkg/store/storepb
github.com/thanos-io/thanos/pkg/store/storepb/prompb
github.com/thanos-io/thanos/pkg/strutil
github.com/thanos-io/thanos/pkg/testutil
github.com/thanos-io/thanos/pkg/tracing

Loading…
Cancel
Save