Copy `cortex/pkg/distributor` package dependency into Loki (#4983)

* Fork cortex `pkg/distributor/distributor_ring.go`

Copy content from `pkg/distributor/distributor_ring.go` into Loki's
`pkg/distributor/distributor_ring.go`.

* Fork cortex `pkg/distributor/ingester_client_pool.go`.

Copy Cortex `pkg/distributor/ingester_client_pool.go` into Loki's
`pkg/distributor/clientpool/ingester_client_pool.go`.
We couldn't copy it directly to
`pkg/distributor/ingester_client_pool.go` because we distributor imports
a package that imports the ingester_client_pool, which leads to import
cycle.

* Stop using Cortex `pkg/distributor`.

Instead, use Loki's `pkg/distributor` and `pkg/distributor/clientpool`.

* Remove unused Cortex distributor pkg.

* Use random ports for Loki test.
pull/4938/merge
Dylan Guedes 3 years ago committed by GitHub
parent d8305ecd0a
commit 92c05493e1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      pkg/distributor/clientpool/ingester_client_pool.go
  2. 6
      pkg/distributor/distributor.go
  3. 0
      pkg/distributor/distributor_ring.go
  4. 4
      pkg/ingester/client/client.go
  5. 2
      pkg/loki/config_wrapper_test.go
  6. 40
      pkg/loki/loki_test.go
  7. 4
      pkg/querier/ingester_querier.go
  8. 4
      pkg/querier/querier_mock_test.go
  9. 1111
      vendor/github.com/cortexproject/cortex/pkg/distributor/distributor.go
  10. 493
      vendor/github.com/cortexproject/cortex/pkg/distributor/ha_tracker.go
  11. 494
      vendor/github.com/cortexproject/cortex/pkg/distributor/ha_tracker.pb.go
  12. 20
      vendor/github.com/cortexproject/cortex/pkg/distributor/ha_tracker.proto
  13. 101
      vendor/github.com/cortexproject/cortex/pkg/distributor/ha_tracker_http.go
  14. 97
      vendor/github.com/cortexproject/cortex/pkg/distributor/http_admin.go
  15. 26
      vendor/github.com/cortexproject/cortex/pkg/distributor/http_server.go
  16. 74
      vendor/github.com/cortexproject/cortex/pkg/distributor/ingestion_rate_strategy.go
  17. 432
      vendor/github.com/cortexproject/cortex/pkg/distributor/query.go
  18. 1
      vendor/modules.txt

@ -6,7 +6,6 @@ import (
"net/http"
"time"
cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/dskit/limiter"
"github.com/grafana/dskit/ring"
@ -22,6 +21,7 @@ import (
"go.uber.org/atomic"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/tenant"
"github.com/grafana/loki/pkg/ingester/client"
@ -38,7 +38,7 @@ var maxLabelCacheSize = 100000
// Config for a Distributor.
type Config struct {
// Distributors ring
DistributorRing cortex_distributor.RingConfig `yaml:"ring,omitempty"`
DistributorRing RingConfig `yaml:"ring,omitempty"`
// For testing.
factory ring_client.PoolFactory `yaml:"-"`
@ -123,7 +123,7 @@ func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, in
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
pool: clientpool.NewPool(clientCfg.PoolConfig, ingestersRing, factory, util_log.Logger),
ingestionRateLimiter: limiter.NewRateLimiter(ingestionRateStrategy, 10*time.Second),
labelCache: labelCache,
ingesterAppends: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{

@ -5,7 +5,6 @@ import (
"io"
"time"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/grafana/dskit/grpcclient"
dsmiddleware "github.com/grafana/dskit/middleware"
"github.com/grpc-ecosystem/grpc-opentracing/go/otgrpc"
@ -16,6 +15,7 @@ import (
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/logproto"
)
@ -41,7 +41,7 @@ type ClosableHealthAndIngesterClient struct {
// Config for an ingester client.
type Config struct {
PoolConfig distributor.PoolConfig `yaml:"pool_config,omitempty"`
PoolConfig clientpool.PoolConfig `yaml:"pool_config,omitempty"`
RemoteTimeout time.Duration `yaml:"remote_timeout,omitempty"`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config"`
GRPCUnaryClientInterceptors []grpc.UnaryClientInterceptor `yaml:"-"`

@ -10,7 +10,6 @@ import (
"testing"
"time"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
@ -19,6 +18,7 @@ import (
cortex_gcp "github.com/cortexproject/cortex/pkg/chunk/gcp"
cortex_swift "github.com/cortexproject/cortex/pkg/storage/bucket/swift"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/loki/common"
"github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/util"

@ -6,6 +6,7 @@ import (
"fmt"
"io"
"io/ioutil"
"net"
"net/http"
"strings"
"testing"
@ -90,10 +91,39 @@ func TestLoki_isModuleEnabled(t1 *testing.T) {
}
}
func getRandomPorts(n int) []int {
portListeners := []net.Listener{}
for i := 0; i < n; i++ {
listener, err := net.Listen("tcp", ":0")
if err != nil {
panic(err)
}
portListeners = append(portListeners, listener)
}
portNumbers := []int{}
for i := 0; i < n; i++ {
port := portListeners[i].Addr().(*net.TCPAddr).Port
portNumbers = append(portNumbers, port)
if err := portListeners[i].Close(); err != nil {
panic(err)
}
}
return portNumbers
}
func TestLoki_CustomRunOptsBehavior(t *testing.T) {
yamlConfig := `target: querier
ports := getRandomPorts(2)
httpPort := ports[0]
grpcPort := ports[1]
yamlConfig := fmt.Sprintf(`target: querier
server:
http_listen_port: 3100
http_listen_port: %d
grpc_listen_port: %d
common:
path_prefix: /tmp/loki
ring:
@ -108,7 +138,7 @@ schema_config:
schema: v11
index:
prefix: index_
period: 24h`
period: 24h`, httpPort, grpcPort)
cfgWrapper, _, err := configWrapperFromYAML(t, yamlConfig, nil)
require.NoError(t, err)
@ -121,7 +151,7 @@ schema_config:
// retries at most 10 times (1 second in total) to avoid infinite loops when no timeout is set.
for i := 0; i < 10; i++ {
// waits until request to /ready doesn't error.
resp, err := http.DefaultClient.Get("http://localhost:3100/ready")
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/ready", httpPort))
if err != nil {
time.Sleep(time.Millisecond * 200)
continue
@ -156,7 +186,7 @@ schema_config:
err = lokiHealthCheck()
require.NoError(t, err)
resp, err := http.DefaultClient.Get("http://localhost:3100/config")
resp, err := http.DefaultClient.Get(fmt.Sprintf("http://localhost:%d/config", httpPort))
require.NoError(t, err)
defer resp.Body.Close()

@ -6,7 +6,6 @@ import (
"strings"
"time"
cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -16,6 +15,7 @@ import (
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -48,7 +48,7 @@ func NewIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryD
func newIngesterQuerier(clientCfg client.Config, ring ring.ReadRing, extraQueryDelay time.Duration, clientFactory ring_client.PoolFactory) (*IngesterQuerier, error) {
iq := IngesterQuerier{
ring: ring,
pool: cortex_distributor.NewPool(clientCfg.PoolConfig, ring, clientFactory, util_log.Logger),
pool: clientpool.NewPool(clientCfg.PoolConfig, ring, clientFactory, util_log.Logger),
extraQueryDelay: extraQueryDelay,
}

@ -6,7 +6,6 @@ import (
"fmt"
"time"
"github.com/cortexproject/cortex/pkg/distributor"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring"
ring_client "github.com/grafana/dskit/ring/client"
@ -17,6 +16,7 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"
grpc_metadata "google.golang.org/grpc/metadata"
"github.com/grafana/loki/pkg/distributor/clientpool"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -86,7 +86,7 @@ func newIngesterClientMockFactory(c *querierClientMock) ring_client.PoolFactory
// mockIngesterClientConfig returns an ingester client config suitable for testing
func mockIngesterClientConfig() client.Config {
return client.Config{
PoolConfig: distributor.PoolConfig{
PoolConfig: clientpool.PoolConfig{
ClientCleanupPeriod: 1 * time.Minute,
HealthCheckIngesters: false,
RemoteTimeout: 1 * time.Second,

File diff suppressed because it is too large Load Diff

@ -1,493 +0,0 @@
package distributor
import (
"context"
"errors"
"flag"
"fmt"
"math/rand"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/gogo/protobuf/proto"
"github.com/grafana/dskit/kv"
"github.com/grafana/dskit/kv/codec"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/util"
)
var (
errNegativeUpdateTimeoutJitterMax = errors.New("HA tracker max update timeout jitter shouldn't be negative")
errInvalidFailoverTimeout = "HA Tracker failover timeout (%v) must be at least 1s greater than update timeout - max jitter (%v)"
)
type haTrackerLimits interface {
// MaxHAClusters returns max number of clusters that HA tracker should track for a user.
// Samples from additional clusters are rejected.
MaxHAClusters(user string) int
}
// ProtoReplicaDescFactory makes new InstanceDescs
func ProtoReplicaDescFactory() proto.Message {
return NewReplicaDesc()
}
// NewReplicaDesc returns an empty *distributor.ReplicaDesc.
func NewReplicaDesc() *ReplicaDesc {
return &ReplicaDesc{}
}
// HATrackerConfig contains the configuration require to
// create a HA Tracker.
type HATrackerConfig struct {
EnableHATracker bool `yaml:"enable_ha_tracker"`
// We should only update the timestamp if the difference
// between the stored timestamp and the time we received a sample at
// is more than this duration.
UpdateTimeout time.Duration `yaml:"ha_tracker_update_timeout"`
UpdateTimeoutJitterMax time.Duration `yaml:"ha_tracker_update_timeout_jitter_max"`
// We should only failover to accepting samples from a replica
// other than the replica written in the KVStore if the difference
// between the stored timestamp and the time we received a sample is
// more than this duration
FailoverTimeout time.Duration `yaml:"ha_tracker_failover_timeout"`
KVStore kv.Config `yaml:"kvstore" doc:"description=Backend storage to use for the ring. Please be aware that memberlist is not supported by the HA tracker since gossip propagation is too slow for HA purposes."`
}
// RegisterFlags adds the flags required to config this to the given FlagSet.
func (cfg *HATrackerConfig) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.EnableHATracker, "distributor.ha-tracker.enable", false, "Enable the distributors HA tracker so that it can accept samples from Prometheus HA replicas gracefully (requires labels).")
f.DurationVar(&cfg.UpdateTimeout, "distributor.ha-tracker.update-timeout", 15*time.Second, "Update the timestamp in the KV store for a given cluster/replica only after this amount of time has passed since the current stored timestamp.")
f.DurationVar(&cfg.UpdateTimeoutJitterMax, "distributor.ha-tracker.update-timeout-jitter-max", 5*time.Second, "Maximum jitter applied to the update timeout, in order to spread the HA heartbeats over time.")
f.DurationVar(&cfg.FailoverTimeout, "distributor.ha-tracker.failover-timeout", 30*time.Second, "If we don't receive any samples from the accepted replica for a cluster in this amount of time we will failover to the next replica we receive a sample from. This value must be greater than the update timeout")
// We want the ability to use different Consul instances for the ring and
// for HA cluster tracking. We also customize the default keys prefix, in
// order to not clash with the ring key if they both share the same KVStore
// backend (ie. run on the same consul cluster).
cfg.KVStore.RegisterFlagsWithPrefix("distributor.ha-tracker.", "ha-tracker/", f)
}
// Validate config and returns error on failure
func (cfg *HATrackerConfig) Validate() error {
if cfg.UpdateTimeoutJitterMax < 0 {
return errNegativeUpdateTimeoutJitterMax
}
minFailureTimeout := cfg.UpdateTimeout + cfg.UpdateTimeoutJitterMax + time.Second
if cfg.FailoverTimeout < minFailureTimeout {
return fmt.Errorf(errInvalidFailoverTimeout, cfg.FailoverTimeout, minFailureTimeout)
}
return nil
}
func GetReplicaDescCodec() codec.Proto {
return codec.NewProtoCodec("replicaDesc", ProtoReplicaDescFactory)
}
// Track the replica we're accepting samples from
// for each HA cluster we know about.
type haTracker struct {
services.Service
logger log.Logger
cfg HATrackerConfig
client kv.Client
updateTimeoutJitter time.Duration
limits haTrackerLimits
electedLock sync.RWMutex
elected map[string]ReplicaDesc // Replicas we are accepting samples from. Key = "user/cluster".
clusters map[string]map[string]struct{} // Known clusters with elected replicas per user. First key = user, second key = cluster name.
electedReplicaChanges *prometheus.CounterVec
electedReplicaTimestamp *prometheus.GaugeVec
electedReplicaPropagationTime prometheus.Histogram
kvCASCalls *prometheus.CounterVec
cleanupRuns prometheus.Counter
replicasMarkedForDeletion prometheus.Counter
deletedReplicas prometheus.Counter
markingForDeletionsFailed prometheus.Counter
}
// NewClusterTracker returns a new HA cluster tracker using either Consul
// or in-memory KV store. Tracker must be started via StartAsync().
func newHATracker(cfg HATrackerConfig, limits haTrackerLimits, reg prometheus.Registerer, logger log.Logger) (*haTracker, error) {
var jitter time.Duration
if cfg.UpdateTimeoutJitterMax > 0 {
jitter = time.Duration(rand.Int63n(int64(2*cfg.UpdateTimeoutJitterMax))) - cfg.UpdateTimeoutJitterMax
}
t := &haTracker{
logger: logger,
cfg: cfg,
updateTimeoutJitter: jitter,
limits: limits,
elected: map[string]ReplicaDesc{},
clusters: map[string]map[string]struct{}{},
electedReplicaChanges: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ha_tracker_elected_replica_changes_total",
Help: "The total number of times the elected replica has changed for a user ID/cluster.",
}, []string{"user", "cluster"}),
electedReplicaTimestamp: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "cortex_ha_tracker_elected_replica_timestamp_seconds",
Help: "The timestamp stored for the currently elected replica, from the KVStore.",
}, []string{"user", "cluster"}),
electedReplicaPropagationTime: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_ha_tracker_elected_replica_change_propagation_time_seconds",
Help: "The time it for the distributor to update the replica change.",
Buckets: prometheus.DefBuckets,
}),
kvCASCalls: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_ha_tracker_kv_store_cas_total",
Help: "The total number of CAS calls to the KV store for a user ID/cluster.",
}, []string{"user", "cluster"}),
cleanupRuns: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_started_total",
Help: "Number of elected replicas cleanup loops started.",
}),
replicasMarkedForDeletion: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_marked_for_deletion_total",
Help: "Number of elected replicas marked for deletion.",
}),
deletedReplicas: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_deleted_total",
Help: "Number of elected replicas deleted from KV store.",
}),
markingForDeletionsFailed: promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_ha_tracker_replicas_cleanup_delete_failed_total",
Help: "Number of elected replicas that failed to be marked for deletion, or deleted.",
}),
}
if cfg.EnableHATracker {
client, err := kv.NewClient(
cfg.KVStore,
GetReplicaDescCodec(),
kv.RegistererWithKVName(prometheus.WrapRegistererWithPrefix("cortex_", reg), "distributor-hatracker"),
logger,
)
if err != nil {
return nil, err
}
t.client = client
}
t.Service = services.NewBasicService(nil, t.loop, nil)
return t, nil
}
// Follows pattern used by ring for WatchKey.
func (c *haTracker) loop(ctx context.Context) error {
if !c.cfg.EnableHATracker {
// don't do anything, but wait until asked to stop.
<-ctx.Done()
return nil
}
// Start cleanup loop. It will stop when context is done.
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()
c.cleanupOldReplicasLoop(ctx)
}()
// The KVStore config we gave when creating c should have contained a prefix,
// which would have given us a prefixed KVStore client. So, we can pass empty string here.
c.client.WatchPrefix(ctx, "", func(key string, value interface{}) bool {
replica := value.(*ReplicaDesc)
segments := strings.SplitN(key, "/", 2)
// Valid key would look like cluster/replica, and a key without a / such as `ring` would be invalid.
if len(segments) != 2 {
return true
}
user := segments[0]
cluster := segments[1]
c.electedLock.Lock()
defer c.electedLock.Unlock()
if replica.DeletedAt > 0 {
delete(c.elected, key)
c.electedReplicaChanges.DeleteLabelValues(user, cluster)
c.electedReplicaTimestamp.DeleteLabelValues(user, cluster)
userClusters := c.clusters[user]
if userClusters != nil {
delete(userClusters, cluster)
if len(userClusters) == 0 {
delete(c.clusters, user)
}
}
return true
}
elected, exists := c.elected[key]
if replica.Replica != elected.Replica {
c.electedReplicaChanges.WithLabelValues(user, cluster).Inc()
}
if !exists {
if c.clusters[user] == nil {
c.clusters[user] = map[string]struct{}{}
}
c.clusters[user][cluster] = struct{}{}
}
c.elected[key] = *replica
c.electedReplicaTimestamp.WithLabelValues(user, cluster).Set(float64(replica.ReceivedAt / 1000))
c.electedReplicaPropagationTime.Observe(time.Since(timestamp.Time(replica.ReceivedAt)).Seconds())
return true
})
wg.Wait()
return nil
}
const (
cleanupCyclePeriod = 30 * time.Minute
cleanupCycleJitterVariance = 0.2 // for 30 minutes, this is ±6 min
// If we have received last sample for given cluster before this timeout, we will mark selected replica for deletion.
// If selected replica is marked for deletion for this time, it is deleted completely.
deletionTimeout = 30 * time.Minute
)
func (c *haTracker) cleanupOldReplicasLoop(ctx context.Context) {
tick := time.NewTicker(util.DurationWithJitter(cleanupCyclePeriod, cleanupCycleJitterVariance))
defer tick.Stop()
for {
select {
case <-ctx.Done():
return
case t := <-tick.C:
c.cleanupRuns.Inc()
c.cleanupOldReplicas(ctx, t.Add(-deletionTimeout))
}
}
}
// Replicas marked for deletion before deadline will be deleted.
// Replicas with last-received timestamp before deadline will be marked for deletion.
func (c *haTracker) cleanupOldReplicas(ctx context.Context, deadline time.Time) {
keys, err := c.client.List(ctx, "")
if err != nil {
level.Warn(c.logger).Log("msg", "cleanup: failed to list replica keys", "err", err)
return
}
for _, key := range keys {
if ctx.Err() != nil {
return
}
val, err := c.client.Get(ctx, key)
if err != nil {
level.Warn(c.logger).Log("msg", "cleanup: failed to get replica value", "key", key, "err", err)
continue
}
desc, ok := val.(*ReplicaDesc)
if !ok {
level.Error(c.logger).Log("msg", "cleanup: got invalid replica descriptor", "key", key)
continue
}
if desc.DeletedAt > 0 {
if timestamp.Time(desc.DeletedAt).After(deadline) {
continue
}
// We're blindly deleting a key here. It may happen that value was updated since we have read it few lines above,
// in which case Distributors will have updated value in memory, but Delete will remove it from KV store anyway.
// That's not great, but should not be a problem. If KV store sends Watch notification for Delete, distributors will
// delete it from memory, and recreate on next sample with matching replica.
//
// If KV store doesn't send Watch notification for Delete, distributors *with* replica in memory will keep using it,
// while distributors *without* replica in memory will try to write it to KV store -- which will update *all*
// watching distributors.
err = c.client.Delete(ctx, key)
if err != nil {
level.Error(c.logger).Log("msg", "cleanup: failed to delete old replica", "key", key, "err", err)
c.markingForDeletionsFailed.Inc()
} else {
level.Info(c.logger).Log("msg", "cleanup: deleted old replica", "key", key)
c.deletedReplicas.Inc()
}
continue
}
// Not marked as deleted yet.
if desc.DeletedAt == 0 && timestamp.Time(desc.ReceivedAt).Before(deadline) {
err := c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
d, ok := in.(*ReplicaDesc)
if !ok || d == nil || d.DeletedAt > 0 || !timestamp.Time(desc.ReceivedAt).Before(deadline) {
return nil, false, nil
}
d.DeletedAt = timestamp.FromTime(time.Now())
return d, true, nil
})
if err != nil {
c.markingForDeletionsFailed.Inc()
level.Error(c.logger).Log("msg", "cleanup: failed to mark replica as deleted", "key", key, "err", err)
} else {
c.replicasMarkedForDeletion.Inc()
level.Info(c.logger).Log("msg", "cleanup: marked replica as deleted", "key", key)
}
}
}
}
// CheckReplica checks the cluster and replica against the backing KVStore and local cache in the
// tracker c to see if we should accept the incomming sample. It will return an error if the sample
// should not be accepted. Note that internally this function does checks against the stored values
// and may modify the stored data, for example to failover between replicas after a certain period of time.
// replicasNotMatchError is returned (from checkKVStore) if we shouldn't store this sample but are
// accepting samples from another replica for the cluster, so that there isn't a bunch of error's returned
// to customers clients.
func (c *haTracker) checkReplica(ctx context.Context, userID, cluster, replica string, now time.Time) error {
// If HA tracking isn't enabled then accept the sample
if !c.cfg.EnableHATracker {
return nil
}
key := fmt.Sprintf("%s/%s", userID, cluster)
c.electedLock.RLock()
entry, ok := c.elected[key]
clusters := len(c.clusters[userID])
c.electedLock.RUnlock()
if ok && now.Sub(timestamp.Time(entry.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
if entry.Replica != replica {
return replicasNotMatchError{replica: replica, elected: entry.Replica}
}
return nil
}
if !ok {
// If we don't know about this cluster yet and we have reached the limit for number of clusters, we error out now.
if limit := c.limits.MaxHAClusters(userID); limit > 0 && clusters+1 > limit {
return tooManyClustersError{limit: limit}
}
}
err := c.checkKVStore(ctx, key, replica, now)
c.kvCASCalls.WithLabelValues(userID, cluster).Inc()
if err != nil {
// The callback within checkKVStore will return a replicasNotMatchError if the sample is being deduped,
// otherwise there may have been an actual error CAS'ing that we should log.
if !errors.Is(err, replicasNotMatchError{}) {
level.Error(c.logger).Log("msg", "rejecting sample", "err", err)
}
}
return err
}
func (c *haTracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error {
return c.client.CAS(ctx, key, func(in interface{}) (out interface{}, retry bool, err error) {
if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
// this sample at is less than updateTimeout amount of time since the timestamp in the KV store.
if desc.Replica == replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.UpdateTimeout+c.updateTimeoutJitter {
return nil, false, nil
}
// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
// is less than failover timeout amount of time since the timestamp in the KV store.
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
return nil, false, replicasNotMatchError{replica: replica, elected: desc.Replica}
}
}
// There was either invalid or no data for the key, so we now accept samples
// from this replica. Invalid could mean that the timestamp in the KV store was
// out of date based on the update and failover timeouts when compared to now.
return &ReplicaDesc{
Replica: replica,
ReceivedAt: timestamp.FromTime(now),
DeletedAt: 0,
}, true, nil
})
}
type replicasNotMatchError struct {
replica, elected string
}
func (e replicasNotMatchError) Error() string {
return fmt.Sprintf("replicas did not mach, rejecting sample: replica=%s, elected=%s", e.replica, e.elected)
}
// Needed for errors.Is to work properly.
func (e replicasNotMatchError) Is(err error) bool {
_, ok1 := err.(replicasNotMatchError)
_, ok2 := err.(*replicasNotMatchError)
return ok1 || ok2
}
// IsOperationAborted returns whether the error has been caused by an operation intentionally aborted.
func (e replicasNotMatchError) IsOperationAborted() bool {
return true
}
type tooManyClustersError struct {
limit int
}
func (e tooManyClustersError) Error() string {
return fmt.Sprintf("too many HA clusters (limit: %d)", e.limit)
}
// Needed for errors.Is to work properly.
func (e tooManyClustersError) Is(err error) bool {
_, ok1 := err.(tooManyClustersError)
_, ok2 := err.(*tooManyClustersError)
return ok1 || ok2
}
func findHALabels(replicaLabel, clusterLabel string, labels []cortexpb.LabelAdapter) (string, string) {
var cluster, replica string
var pair cortexpb.LabelAdapter
for _, pair = range labels {
if pair.Name == replicaLabel {
replica = pair.Value
}
if pair.Name == clusterLabel {
cluster = pair.Value
}
}
return cluster, replica
}
func (c *haTracker) cleanupHATrackerMetricsForUser(userID string) {
filter := map[string]string{"user": userID}
if err := util.DeleteMatchingLabels(c.electedReplicaChanges, filter); err != nil {
level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_changes_total metric for user", "user", userID, "err", err)
}
if err := util.DeleteMatchingLabels(c.electedReplicaTimestamp, filter); err != nil {
level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_elected_replica_timestamp_seconds metric for user", "user", userID, "err", err)
}
if err := util.DeleteMatchingLabels(c.kvCASCalls, filter); err != nil {
level.Warn(c.logger).Log("msg", "failed to remove cortex_ha_tracker_kv_store_cas_total metric for user", "user", userID, "err", err)
}
}

@ -1,494 +0,0 @@
// Code generated by protoc-gen-gogo. DO NOT EDIT.
// source: ha_tracker.proto
package distributor
import (
fmt "fmt"
_ "github.com/gogo/protobuf/gogoproto"
proto "github.com/gogo/protobuf/proto"
io "io"
math "math"
math_bits "math/bits"
reflect "reflect"
strings "strings"
)
// Reference imports to suppress errors if they are not otherwise used.
var _ = proto.Marshal
var _ = fmt.Errorf
var _ = math.Inf
// This is a compile-time assertion to ensure that this generated file
// is compatible with the proto package it is being compiled against.
// A compilation error at this line likely means your copy of the
// proto package needs to be updated.
const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package
type ReplicaDesc struct {
Replica string `protobuf:"bytes,1,opt,name=replica,proto3" json:"replica,omitempty"`
ReceivedAt int64 `protobuf:"varint,2,opt,name=received_at,json=receivedAt,proto3" json:"received_at,omitempty"`
// Unix timestamp in millseconds when this entry was marked for deletion.
// Reason for doing marking first, and delete later, is to make sure that distributors
// watching the prefix will receive notification on "marking" -- at which point they can
// already remove entry from memory. Actual deletion from KV store does *not* trigger
// "watch" notification with a key for all KV stores.
DeletedAt int64 `protobuf:"varint,3,opt,name=deleted_at,json=deletedAt,proto3" json:"deleted_at,omitempty"`
}
func (m *ReplicaDesc) Reset() { *m = ReplicaDesc{} }
func (*ReplicaDesc) ProtoMessage() {}
func (*ReplicaDesc) Descriptor() ([]byte, []int) {
return fileDescriptor_86f0e7bcf71d860b, []int{0}
}
func (m *ReplicaDesc) XXX_Unmarshal(b []byte) error {
return m.Unmarshal(b)
}
func (m *ReplicaDesc) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) {
if deterministic {
return xxx_messageInfo_ReplicaDesc.Marshal(b, m, deterministic)
} else {
b = b[:cap(b)]
n, err := m.MarshalToSizedBuffer(b)
if err != nil {
return nil, err
}
return b[:n], nil
}
}
func (m *ReplicaDesc) XXX_Merge(src proto.Message) {
xxx_messageInfo_ReplicaDesc.Merge(m, src)
}
func (m *ReplicaDesc) XXX_Size() int {
return m.Size()
}
func (m *ReplicaDesc) XXX_DiscardUnknown() {
xxx_messageInfo_ReplicaDesc.DiscardUnknown(m)
}
var xxx_messageInfo_ReplicaDesc proto.InternalMessageInfo
func (m *ReplicaDesc) GetReplica() string {
if m != nil {
return m.Replica
}
return ""
}
func (m *ReplicaDesc) GetReceivedAt() int64 {
if m != nil {
return m.ReceivedAt
}
return 0
}
func (m *ReplicaDesc) GetDeletedAt() int64 {
if m != nil {
return m.DeletedAt
}
return 0
}
func init() {
proto.RegisterType((*ReplicaDesc)(nil), "distributor.ReplicaDesc")
}
func init() { proto.RegisterFile("ha_tracker.proto", fileDescriptor_86f0e7bcf71d860b) }
var fileDescriptor_86f0e7bcf71d860b = []byte{
// 224 bytes of a gzipped FileDescriptorProto
0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0xc8, 0x48, 0x8c, 0x2f,
0x29, 0x4a, 0x4c, 0xce, 0x4e, 0x2d, 0xd2, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, 0xe2, 0x4e, 0xc9,
0x2c, 0x2e, 0x29, 0xca, 0x4c, 0x2a, 0x2d, 0xc9, 0x2f, 0x92, 0xd2, 0x4d, 0xcf, 0x2c, 0xc9, 0x28,
0x4d, 0xd2, 0x4b, 0xce, 0xcf, 0xd5, 0x4f, 0xcf, 0x4f, 0xcf, 0xd7, 0x07, 0xab, 0x49, 0x2a, 0x4d,
0x03, 0xf3, 0xc0, 0x1c, 0x30, 0x0b, 0xa2, 0x57, 0x29, 0x9d, 0x8b, 0x3b, 0x28, 0xb5, 0x20, 0x27,
0x33, 0x39, 0xd1, 0x25, 0xb5, 0x38, 0x59, 0x48, 0x82, 0x8b, 0xbd, 0x08, 0xc2, 0x95, 0x60, 0x54,
0x60, 0xd4, 0xe0, 0x0c, 0x82, 0x71, 0x85, 0xe4, 0xb9, 0xb8, 0x8b, 0x52, 0x93, 0x53, 0x33, 0xcb,
0x52, 0x53, 0xe2, 0x13, 0x4b, 0x24, 0x98, 0x14, 0x18, 0x35, 0x98, 0x83, 0xb8, 0x60, 0x42, 0x8e,
0x25, 0x42, 0xb2, 0x5c, 0x5c, 0x29, 0xa9, 0x39, 0xa9, 0x25, 0x10, 0x79, 0x66, 0xb0, 0x3c, 0x27,
0x54, 0xc4, 0xb1, 0xc4, 0xc9, 0xe4, 0xc2, 0x43, 0x39, 0x86, 0x1b, 0x0f, 0xe5, 0x18, 0x3e, 0x3c,
0x94, 0x63, 0x6c, 0x78, 0x24, 0xc7, 0xb8, 0xe2, 0x91, 0x1c, 0xe3, 0x89, 0x47, 0x72, 0x8c, 0x17,
0x1e, 0xc9, 0x31, 0x3e, 0x78, 0x24, 0xc7, 0xf8, 0xe2, 0x91, 0x1c, 0xc3, 0x87, 0x47, 0x72, 0x8c,
0x13, 0x1e, 0xcb, 0x31, 0x5c, 0x78, 0x2c, 0xc7, 0x70, 0xe3, 0xb1, 0x1c, 0x43, 0x12, 0x1b, 0xd8,
0x95, 0xc6, 0x80, 0x00, 0x00, 0x00, 0xff, 0xff, 0xb3, 0xd1, 0xdd, 0x8d, 0xf5, 0x00, 0x00, 0x00,
}
func (this *ReplicaDesc) Equal(that interface{}) bool {
if that == nil {
return this == nil
}
that1, ok := that.(*ReplicaDesc)
if !ok {
that2, ok := that.(ReplicaDesc)
if ok {
that1 = &that2
} else {
return false
}
}
if that1 == nil {
return this == nil
} else if this == nil {
return false
}
if this.Replica != that1.Replica {
return false
}
if this.ReceivedAt != that1.ReceivedAt {
return false
}
if this.DeletedAt != that1.DeletedAt {
return false
}
return true
}
func (this *ReplicaDesc) GoString() string {
if this == nil {
return "nil"
}
s := make([]string, 0, 7)
s = append(s, "&distributor.ReplicaDesc{")
s = append(s, "Replica: "+fmt.Sprintf("%#v", this.Replica)+",\n")
s = append(s, "ReceivedAt: "+fmt.Sprintf("%#v", this.ReceivedAt)+",\n")
s = append(s, "DeletedAt: "+fmt.Sprintf("%#v", this.DeletedAt)+",\n")
s = append(s, "}")
return strings.Join(s, "")
}
func valueToGoStringHaTracker(v interface{}, typ string) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv)
}
func (m *ReplicaDesc) Marshal() (dAtA []byte, err error) {
size := m.Size()
dAtA = make([]byte, size)
n, err := m.MarshalToSizedBuffer(dAtA[:size])
if err != nil {
return nil, err
}
return dAtA[:n], nil
}
func (m *ReplicaDesc) MarshalTo(dAtA []byte) (int, error) {
size := m.Size()
return m.MarshalToSizedBuffer(dAtA[:size])
}
func (m *ReplicaDesc) MarshalToSizedBuffer(dAtA []byte) (int, error) {
i := len(dAtA)
_ = i
var l int
_ = l
if m.DeletedAt != 0 {
i = encodeVarintHaTracker(dAtA, i, uint64(m.DeletedAt))
i--
dAtA[i] = 0x18
}
if m.ReceivedAt != 0 {
i = encodeVarintHaTracker(dAtA, i, uint64(m.ReceivedAt))
i--
dAtA[i] = 0x10
}
if len(m.Replica) > 0 {
i -= len(m.Replica)
copy(dAtA[i:], m.Replica)
i = encodeVarintHaTracker(dAtA, i, uint64(len(m.Replica)))
i--
dAtA[i] = 0xa
}
return len(dAtA) - i, nil
}
func encodeVarintHaTracker(dAtA []byte, offset int, v uint64) int {
offset -= sovHaTracker(v)
base := offset
for v >= 1<<7 {
dAtA[offset] = uint8(v&0x7f | 0x80)
v >>= 7
offset++
}
dAtA[offset] = uint8(v)
return base
}
func (m *ReplicaDesc) Size() (n int) {
if m == nil {
return 0
}
var l int
_ = l
l = len(m.Replica)
if l > 0 {
n += 1 + l + sovHaTracker(uint64(l))
}
if m.ReceivedAt != 0 {
n += 1 + sovHaTracker(uint64(m.ReceivedAt))
}
if m.DeletedAt != 0 {
n += 1 + sovHaTracker(uint64(m.DeletedAt))
}
return n
}
func sovHaTracker(x uint64) (n int) {
return (math_bits.Len64(x|1) + 6) / 7
}
func sozHaTracker(x uint64) (n int) {
return sovHaTracker(uint64((x << 1) ^ uint64((int64(x) >> 63))))
}
func (this *ReplicaDesc) String() string {
if this == nil {
return "nil"
}
s := strings.Join([]string{`&ReplicaDesc{`,
`Replica:` + fmt.Sprintf("%v", this.Replica) + `,`,
`ReceivedAt:` + fmt.Sprintf("%v", this.ReceivedAt) + `,`,
`DeletedAt:` + fmt.Sprintf("%v", this.DeletedAt) + `,`,
`}`,
}, "")
return s
}
func valueToStringHaTracker(v interface{}) string {
rv := reflect.ValueOf(v)
if rv.IsNil() {
return "nil"
}
pv := reflect.Indirect(rv).Interface()
return fmt.Sprintf("*%v", pv)
}
func (m *ReplicaDesc) Unmarshal(dAtA []byte) error {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
preIndex := iNdEx
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHaTracker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
fieldNum := int32(wire >> 3)
wireType := int(wire & 0x7)
if wireType == 4 {
return fmt.Errorf("proto: ReplicaDesc: wiretype end group for non-group")
}
if fieldNum <= 0 {
return fmt.Errorf("proto: ReplicaDesc: illegal tag %d (wire type %d)", fieldNum, wire)
}
switch fieldNum {
case 1:
if wireType != 2 {
return fmt.Errorf("proto: wrong wireType = %d for field Replica", wireType)
}
var stringLen uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHaTracker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
stringLen |= uint64(b&0x7F) << shift
if b < 0x80 {
break
}
}
intStringLen := int(stringLen)
if intStringLen < 0 {
return ErrInvalidLengthHaTracker
}
postIndex := iNdEx + intStringLen
if postIndex < 0 {
return ErrInvalidLengthHaTracker
}
if postIndex > l {
return io.ErrUnexpectedEOF
}
m.Replica = string(dAtA[iNdEx:postIndex])
iNdEx = postIndex
case 2:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field ReceivedAt", wireType)
}
m.ReceivedAt = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHaTracker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.ReceivedAt |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
case 3:
if wireType != 0 {
return fmt.Errorf("proto: wrong wireType = %d for field DeletedAt", wireType)
}
m.DeletedAt = 0
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return ErrIntOverflowHaTracker
}
if iNdEx >= l {
return io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
m.DeletedAt |= int64(b&0x7F) << shift
if b < 0x80 {
break
}
}
default:
iNdEx = preIndex
skippy, err := skipHaTracker(dAtA[iNdEx:])
if err != nil {
return err
}
if skippy < 0 {
return ErrInvalidLengthHaTracker
}
if (iNdEx + skippy) < 0 {
return ErrInvalidLengthHaTracker
}
if (iNdEx + skippy) > l {
return io.ErrUnexpectedEOF
}
iNdEx += skippy
}
}
if iNdEx > l {
return io.ErrUnexpectedEOF
}
return nil
}
func skipHaTracker(dAtA []byte) (n int, err error) {
l := len(dAtA)
iNdEx := 0
for iNdEx < l {
var wire uint64
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHaTracker
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
wire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
wireType := int(wire & 0x7)
switch wireType {
case 0:
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHaTracker
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
iNdEx++
if dAtA[iNdEx-1] < 0x80 {
break
}
}
return iNdEx, nil
case 1:
iNdEx += 8
return iNdEx, nil
case 2:
var length int
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHaTracker
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
length |= (int(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
if length < 0 {
return 0, ErrInvalidLengthHaTracker
}
iNdEx += length
if iNdEx < 0 {
return 0, ErrInvalidLengthHaTracker
}
return iNdEx, nil
case 3:
for {
var innerWire uint64
var start int = iNdEx
for shift := uint(0); ; shift += 7 {
if shift >= 64 {
return 0, ErrIntOverflowHaTracker
}
if iNdEx >= l {
return 0, io.ErrUnexpectedEOF
}
b := dAtA[iNdEx]
iNdEx++
innerWire |= (uint64(b) & 0x7F) << shift
if b < 0x80 {
break
}
}
innerWireType := int(innerWire & 0x7)
if innerWireType == 4 {
break
}
next, err := skipHaTracker(dAtA[start:])
if err != nil {
return 0, err
}
iNdEx = start + next
if iNdEx < 0 {
return 0, ErrInvalidLengthHaTracker
}
}
return iNdEx, nil
case 4:
return iNdEx, nil
case 5:
iNdEx += 4
return iNdEx, nil
default:
return 0, fmt.Errorf("proto: illegal wireType %d", wireType)
}
}
panic("unreachable")
}
var (
ErrInvalidLengthHaTracker = fmt.Errorf("proto: negative length found during unmarshaling")
ErrIntOverflowHaTracker = fmt.Errorf("proto: integer overflow")
)

@ -1,20 +0,0 @@
syntax = "proto3";
package distributor;
import "github.com/gogo/protobuf/gogoproto/gogo.proto";
option (gogoproto.marshaler_all) = true;
option (gogoproto.unmarshaler_all) = true;
message ReplicaDesc {
string replica = 1;
int64 received_at = 2;
// Unix timestamp in millseconds when this entry was marked for deletion.
// Reason for doing marking first, and delete later, is to make sure that distributors
// watching the prefix will receive notification on "marking" -- at which point they can
// already remove entry from memory. Actual deletion from KV store does *not* trigger
// "watch" notification with a key for all KV stores.
int64 deleted_at = 3;
}

@ -1,101 +0,0 @@
package distributor
import (
"html/template"
"net/http"
"sort"
"strings"
"time"
"github.com/prometheus/prometheus/model/timestamp"
"github.com/cortexproject/cortex/pkg/util"
)
const trackerTpl = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex HA Tracker Status</title>
</head>
<body>
<h1>Cortex HA Tracker Status</h1>
<p>Current time: {{ .Now }}</p>
<table width="100%" border="1">
<thead>
<tr>
<th>User ID</th>
<th>Cluster</th>
<th>Replica</th>
<th>Elected Time</th>
<th>Time Until Update</th>
<th>Time Until Failover</th>
</tr>
</thead>
<tbody>
{{ range .Elected }}
<tr>
<td>{{ .UserID }}</td>
<td>{{ .Cluster }}</td>
<td>{{ .Replica }}</td>
<td>{{ .ElectedAt }}</td>
<td>{{ .UpdateTime }}</td>
<td>{{ .FailoverTime }}</td>
</tr>
{{ end }}
</tbody>
</table>
</body>
</html>`
var trackerTmpl *template.Template
func init() {
trackerTmpl = template.Must(template.New("ha-tracker").Parse(trackerTpl))
}
func (h *haTracker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
h.electedLock.RLock()
type replica struct {
UserID string `json:"userID"`
Cluster string `json:"cluster"`
Replica string `json:"replica"`
ElectedAt time.Time `json:"electedAt"`
UpdateTime time.Duration `json:"updateDuration"`
FailoverTime time.Duration `json:"failoverDuration"`
}
electedReplicas := []replica{}
for key, desc := range h.elected {
chunks := strings.SplitN(key, "/", 2)
electedReplicas = append(electedReplicas, replica{
UserID: chunks[0],
Cluster: chunks[1],
Replica: desc.Replica,
ElectedAt: timestamp.Time(desc.ReceivedAt),
UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)),
})
}
h.electedLock.RUnlock()
sort.Slice(electedReplicas, func(i, j int) bool {
first := electedReplicas[i]
second := electedReplicas[j]
if first.UserID != second.UserID {
return first.UserID < second.UserID
}
return first.Cluster < second.Cluster
})
util.RenderHTTPResponse(w, struct {
Elected []replica `json:"elected"`
Now time.Time `json:"now"`
}{
Elected: electedReplicas,
Now: time.Now(),
}, trackerTmpl, req)
}

@ -1,97 +0,0 @@
package distributor
import (
"encoding/json"
"fmt"
"html/template"
"net/http"
"sort"
"strings"
"time"
"github.com/cortexproject/cortex/pkg/util"
)
const tpl = `
<!DOCTYPE html>
<html>
<head>
<meta charset="UTF-8">
<title>Cortex Ingester Stats</title>
</head>
<body>
<h1>Cortex Ingester Stats</h1>
<p>Current time: {{ .Now }}</p>
<p><b>NB stats do not account for replication factor, which is currently set to {{ .ReplicationFactor }}</b></p>
<form action="" method="POST">
<input type="hidden" name="csrf_token" value="$__CSRF_TOKEN_PLACEHOLDER__">
<table border="1">
<thead>
<tr>
<th>User</th>
<th># Series</th>
<th>Total Ingest Rate</th>
<th>API Ingest Rate</th>
<th>Rule Ingest Rate</th>
</tr>
</thead>
<tbody>
{{ range .Stats }}
<tr>
<td>{{ .UserID }}</td>
<td align='right'>{{ .UserStats.NumSeries }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.IngestionRate }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.APIIngestionRate }}</td>
<td align='right'>{{ printf "%.2f" .UserStats.RuleIngestionRate }}</td>
</tr>
{{ end }}
</tbody>
</table>
</form>
</body>
</html>`
var tmpl *template.Template
func init() {
tmpl = template.Must(template.New("webpage").Parse(tpl))
}
type userStatsByTimeseries []UserIDStats
func (s userStatsByTimeseries) Len() int { return len(s) }
func (s userStatsByTimeseries) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
func (s userStatsByTimeseries) Less(i, j int) bool {
return s[i].NumSeries > s[j].NumSeries ||
(s[i].NumSeries == s[j].NumSeries && s[i].UserID < s[j].UserID)
}
// AllUserStatsHandler shows stats for all users.
func (d *Distributor) AllUserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.AllUserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
sort.Sort(userStatsByTimeseries(stats))
if encodings, found := r.Header["Accept"]; found &&
len(encodings) > 0 && strings.Contains(encodings[0], "json") {
if err := json.NewEncoder(w).Encode(stats); err != nil {
http.Error(w, fmt.Sprintf("Error marshalling response: %v", err), http.StatusInternalServerError)
}
return
}
util.RenderHTTPResponse(w, struct {
Now time.Time `json:"now"`
Stats []UserIDStats `json:"stats"`
ReplicationFactor int `json:"replicationFactor"`
}{
Now: time.Now(),
Stats: stats,
ReplicationFactor: d.ingestersRing.ReplicationFactor(),
}, tmpl, r)
}

@ -1,26 +0,0 @@
package distributor
import (
"net/http"
"github.com/cortexproject/cortex/pkg/util"
)
// UserStats models ingestion statistics for one user.
type UserStats struct {
IngestionRate float64 `json:"ingestionRate"`
NumSeries uint64 `json:"numSeries"`
APIIngestionRate float64 `json:"APIIngestionRate"`
RuleIngestionRate float64 `json:"RuleIngestionRate"`
}
// UserStatsHandler handles user stats to the Distributor.
func (d *Distributor) UserStatsHandler(w http.ResponseWriter, r *http.Request) {
stats, err := d.UserStats(r.Context())
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
util.WriteJSONResponse(w, stats)
}

@ -1,74 +0,0 @@
package distributor
import (
"github.com/grafana/dskit/limiter"
"golang.org/x/time/rate"
"github.com/cortexproject/cortex/pkg/util/validation"
)
// ReadLifecycler represents the read interface to the lifecycler.
type ReadLifecycler interface {
HealthyInstancesCount() int
}
type localStrategy struct {
limits *validation.Overrides
}
func newLocalIngestionRateStrategy(limits *validation.Overrides) limiter.RateLimiterStrategy {
return &localStrategy{
limits: limits,
}
}
func (s *localStrategy) Limit(tenantID string) float64 {
return s.limits.IngestionRate(tenantID)
}
func (s *localStrategy) Burst(tenantID string) int {
return s.limits.IngestionBurstSize(tenantID)
}
type globalStrategy struct {
limits *validation.Overrides
ring ReadLifecycler
}
func newGlobalIngestionRateStrategy(limits *validation.Overrides, ring ReadLifecycler) limiter.RateLimiterStrategy {
return &globalStrategy{
limits: limits,
ring: ring,
}
}
func (s *globalStrategy) Limit(tenantID string) float64 {
numDistributors := s.ring.HealthyInstancesCount()
if numDistributors == 0 {
return s.limits.IngestionRate(tenantID)
}
return s.limits.IngestionRate(tenantID) / float64(numDistributors)
}
func (s *globalStrategy) Burst(tenantID string) int {
// The meaning of burst doesn't change for the global strategy, in order
// to keep it easier to understand for users / operators.
return s.limits.IngestionBurstSize(tenantID)
}
type infiniteStrategy struct{}
func newInfiniteIngestionRateStrategy() limiter.RateLimiterStrategy {
return &infiniteStrategy{}
}
func (s *infiniteStrategy) Limit(tenantID string) float64 {
return float64(rate.Inf)
}
func (s *infiniteStrategy) Burst(tenantID string) int {
// Burst is ignored when limit = rate.Inf
return 0
}

@ -1,432 +0,0 @@
package distributor
import (
"context"
"io"
"sort"
"time"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/ring"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/instrument"
"github.com/cortexproject/cortex/pkg/cortexpb"
ingester_client "github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/querier/stats"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/extract"
"github.com/cortexproject/cortex/pkg/util/limiter"
"github.com/cortexproject/cortex/pkg/util/validation"
)
// Query multiple ingesters and returns a Matrix of samples.
func (d *Distributor) Query(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (model.Matrix, error) {
var matrix model.Matrix
err := instrument.CollectedRequest(ctx, "Distributor.Query", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
if err != nil {
return err
}
matrix, err = d.queryIngesters(ctx, replicationSet, req)
if err != nil {
return err
}
if s := opentracing.SpanFromContext(ctx); s != nil {
s.LogKV("series", len(matrix))
}
return nil
})
return matrix, err
}
func (d *Distributor) QueryExemplars(ctx context.Context, from, to model.Time, matchers ...[]*labels.Matcher) (*ingester_client.ExemplarQueryResponse, error) {
var result *ingester_client.ExemplarQueryResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryExemplars", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToExemplarQueryRequest(from, to, matchers...)
if err != nil {
return err
}
// We ask for all ingesters without passing matchers because exemplar queries take in an array of array of label matchers.
replicationSet, err := d.GetIngestersForQuery(ctx)
if err != nil {
return err
}
result, err = d.queryIngestersExemplars(ctx, replicationSet, req)
if err != nil {
return err
}
if s := opentracing.SpanFromContext(ctx); s != nil {
s.LogKV("series", len(result.Timeseries))
}
return nil
})
return result, err
}
// QueryStream multiple ingesters via the streaming interface and returns big ol' set of chunks.
func (d *Distributor) QueryStream(ctx context.Context, from, to model.Time, matchers ...*labels.Matcher) (*ingester_client.QueryStreamResponse, error) {
var result *ingester_client.QueryStreamResponse
err := instrument.CollectedRequest(ctx, "Distributor.QueryStream", d.queryDuration, instrument.ErrorCode, func(ctx context.Context) error {
req, err := ingester_client.ToQueryRequest(from, to, matchers)
if err != nil {
return err
}
replicationSet, err := d.GetIngestersForQuery(ctx, matchers...)
if err != nil {
return err
}
result, err = d.queryIngesterStream(ctx, replicationSet, req)
if err != nil {
return err
}
if s := opentracing.SpanFromContext(ctx); s != nil {
s.LogKV("chunk-series", len(result.GetChunkseries()), "time-series", len(result.GetTimeseries()))
}
return nil
})
return result, err
}
// GetIngestersForQuery returns a replication set including all ingesters that should be queried
// to fetch series matching input label matchers.
func (d *Distributor) GetIngestersForQuery(ctx context.Context, matchers ...*labels.Matcher) (ring.ReplicationSet, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return ring.ReplicationSet{}, err
}
// If shuffle sharding is enabled we should only query ingesters which are
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}
// If "shard by all labels" is disabled, we can get ingesters by metricName if exists.
if !d.cfg.ShardByAllLabels && len(matchers) > 0 {
metricNameMatcher, _, ok := extract.MetricNameMatcherFromMatchers(matchers)
if ok && metricNameMatcher.Type == labels.MatchEqual {
return d.ingestersRing.Get(shardByMetricName(userID, metricNameMatcher.Value), ring.Read, nil, nil, nil)
}
}
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}
// GetIngestersForMetadata returns a replication set including all ingesters that should be queried
// to fetch metadata (eg. label names/values or series).
func (d *Distributor) GetIngestersForMetadata(ctx context.Context) (ring.ReplicationSet, error) {
userID, err := tenant.TenantID(ctx)
if err != nil {
return ring.ReplicationSet{}, err
}
// If shuffle sharding is enabled we should only query ingesters which are
// part of the tenant's subring.
if d.cfg.ShardingStrategy == util.ShardingStrategyShuffle {
shardSize := d.limits.IngestionTenantShardSize(userID)
lookbackPeriod := d.cfg.ShuffleShardingLookbackPeriod
if shardSize > 0 && lookbackPeriod > 0 {
return d.ingestersRing.ShuffleShardWithLookback(userID, shardSize, lookbackPeriod, time.Now()).GetReplicationSetForOperation(ring.Read)
}
}
return d.ingestersRing.GetReplicationSetForOperation(ring.Read)
}
// queryIngesters queries the ingesters via the older, sample-based API.
func (d *Distributor) queryIngesters(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (model.Matrix, error) {
// Fetch samples from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
resp, err := client.(ingester_client.IngesterClient).Query(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
}
return ingester_client.FromQueryResponse(resp), nil
})
if err != nil {
return nil, err
}
// Merge the results into a single matrix.
fpToSampleStream := map[model.Fingerprint]*model.SampleStream{}
for _, result := range results {
for _, ss := range result.(model.Matrix) {
fp := ss.Metric.Fingerprint()
mss, ok := fpToSampleStream[fp]
if !ok {
mss = &model.SampleStream{
Metric: ss.Metric,
}
fpToSampleStream[fp] = mss
}
mss.Values = util.MergeSampleSets(mss.Values, ss.Values)
}
}
result := model.Matrix{}
for _, ss := range fpToSampleStream {
result = append(result, ss)
}
return result, nil
}
// mergeExemplarSets merges and dedupes two sets of already sorted exemplar pairs.
// Both a and b should be lists of exemplars from the same series.
// Defined here instead of pkg/util to avoid a import cycle.
func mergeExemplarSets(a, b []cortexpb.Exemplar) []cortexpb.Exemplar {
result := make([]cortexpb.Exemplar, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
// Add the rest of a or b. One of them is empty now.
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
// queryIngestersExemplars queries the ingesters for exemplars.
func (d *Distributor) queryIngestersExemplars(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.ExemplarQueryRequest) (*ingester_client.ExemplarQueryResponse, error) {
// Fetch exemplars from multiple ingesters in parallel, using the replicationSet
// to deal with consistency.
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
resp, err := client.(ingester_client.IngesterClient).QueryExemplars(ctx, req)
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
}
return resp, nil
})
if err != nil {
return nil, err
}
// Merge results from replication set.
var keys []string
exemplarResults := make(map[string]cortexpb.TimeSeries)
for _, result := range results {
r := result.(*ingester_client.ExemplarQueryResponse)
for _, ts := range r.Timeseries {
lbls := cortexpb.FromLabelAdaptersToLabels(ts.Labels).String()
e, ok := exemplarResults[lbls]
if !ok {
exemplarResults[lbls] = ts
keys = append(keys, lbls)
}
// Merge in any missing values from another ingesters exemplars for this series.
e.Exemplars = mergeExemplarSets(e.Exemplars, ts.Exemplars)
}
}
// Query results from each ingester were sorted, but are not necessarily still sorted after merging.
sort.Strings(keys)
result := make([]cortexpb.TimeSeries, len(exemplarResults))
for i, k := range keys {
result[i] = exemplarResults[k]
}
return &ingester_client.ExemplarQueryResponse{Timeseries: result}, nil
}
// queryIngesterStream queries the ingesters using the new streaming API.
func (d *Distributor) queryIngesterStream(ctx context.Context, replicationSet ring.ReplicationSet, req *ingester_client.QueryRequest) (*ingester_client.QueryStreamResponse, error) {
var (
queryLimiter = limiter.QueryLimiterFromContextWithFallback(ctx)
reqStats = stats.FromContext(ctx)
)
// Fetch samples from multiple ingesters
results, err := replicationSet.Do(ctx, d.cfg.ExtraQueryDelay, func(ctx context.Context, ing *ring.InstanceDesc) (interface{}, error) {
client, err := d.ingesterPool.GetClientFor(ing.Addr)
if err != nil {
return nil, err
}
d.ingesterQueries.WithLabelValues(ing.Addr).Inc()
stream, err := client.(ingester_client.IngesterClient).QueryStream(ctx, req)
if err != nil {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
return nil, err
}
defer stream.CloseSend() //nolint:errcheck
result := &ingester_client.QueryStreamResponse{}
for {
resp, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
// Do not track a failure if the context was canceled.
if !grpcutil.IsGRPCContextCanceled(err) {
d.ingesterQueryFailures.WithLabelValues(ing.Addr).Inc()
}
return nil, err
}
// Enforce the max chunks limits.
if chunkLimitErr := queryLimiter.AddChunks(resp.ChunksCount()); chunkLimitErr != nil {
return nil, validation.LimitError(chunkLimitErr.Error())
}
for _, series := range resp.Chunkseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
}
if chunkBytesLimitErr := queryLimiter.AddChunkBytes(resp.ChunksSize()); chunkBytesLimitErr != nil {
return nil, validation.LimitError(chunkBytesLimitErr.Error())
}
for _, series := range resp.Timeseries {
if limitErr := queryLimiter.AddSeries(series.Labels); limitErr != nil {
return nil, validation.LimitError(limitErr.Error())
}
}
result.Chunkseries = append(result.Chunkseries, resp.Chunkseries...)
result.Timeseries = append(result.Timeseries, resp.Timeseries...)
}
return result, nil
})
if err != nil {
return nil, err
}
hashToChunkseries := map[string]ingester_client.TimeSeriesChunk{}
hashToTimeSeries := map[string]cortexpb.TimeSeries{}
for _, result := range results {
response := result.(*ingester_client.QueryStreamResponse)
// Parse any chunk series
for _, series := range response.Chunkseries {
key := ingester_client.LabelsToKeyString(cortexpb.FromLabelAdaptersToLabels(series.Labels))
existing := hashToChunkseries[key]
existing.Labels = series.Labels
existing.Chunks = append(existing.Chunks, series.Chunks...)
hashToChunkseries[key] = existing
}
// Parse any time series
for _, series := range response.Timeseries {
key := ingester_client.LabelsToKeyString(cortexpb.FromLabelAdaptersToLabels(series.Labels))
existing := hashToTimeSeries[key]
existing.Labels = series.Labels
if existing.Samples == nil {
existing.Samples = series.Samples
} else {
existing.Samples = mergeSamples(existing.Samples, series.Samples)
}
hashToTimeSeries[key] = existing
}
}
resp := &ingester_client.QueryStreamResponse{
Chunkseries: make([]ingester_client.TimeSeriesChunk, 0, len(hashToChunkseries)),
Timeseries: make([]cortexpb.TimeSeries, 0, len(hashToTimeSeries)),
}
for _, series := range hashToChunkseries {
resp.Chunkseries = append(resp.Chunkseries, series)
}
for _, series := range hashToTimeSeries {
resp.Timeseries = append(resp.Timeseries, series)
}
reqStats.AddFetchedSeries(uint64(len(resp.Chunkseries) + len(resp.Timeseries)))
reqStats.AddFetchedChunkBytes(uint64(resp.ChunksSize()))
return resp, nil
}
// Merges and dedupes two sorted slices with samples together.
func mergeSamples(a, b []cortexpb.Sample) []cortexpb.Sample {
if sameSamples(a, b) {
return a
}
result := make([]cortexpb.Sample, 0, len(a)+len(b))
i, j := 0, 0
for i < len(a) && j < len(b) {
if a[i].TimestampMs < b[j].TimestampMs {
result = append(result, a[i])
i++
} else if a[i].TimestampMs > b[j].TimestampMs {
result = append(result, b[j])
j++
} else {
result = append(result, a[i])
i++
j++
}
}
// Add the rest of a or b. One of them is empty now.
result = append(result, a[i:]...)
result = append(result, b[j:]...)
return result
}
func sameSamples(a, b []cortexpb.Sample) bool {
if len(a) != len(b) {
return false
}
for i := 0; i < len(a); i++ {
if a[i] != b[i] {
return false
}
}
return true
}

@ -266,7 +266,6 @@ github.com/cortexproject/cortex/pkg/configs/client
github.com/cortexproject/cortex/pkg/configs/legacy_promql
github.com/cortexproject/cortex/pkg/configs/userconfig
github.com/cortexproject/cortex/pkg/cortexpb
github.com/cortexproject/cortex/pkg/distributor
github.com/cortexproject/cortex/pkg/frontend/v1/frontendv1pb
github.com/cortexproject/cortex/pkg/frontend/v2/frontendv2pb
github.com/cortexproject/cortex/pkg/ingester/client

Loading…
Cancel
Save