Replace memberlist service in favor of cortex provided service (#2224)

* Bump cortex deps to include non-experimental memberlist service

* Replace memberlist service in favor of cortex provided service

* Fix route method param for querytee tool

* Register memberlist flags

* Add documentation and example for memberlist
pull/2241/head
Periklis Tsirakidis 6 years ago committed by GitHub
parent 9bf1130f08
commit 815c47533e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      cmd/querytee/main.go
  2. 108
      docs/configuration/README.md
  3. 74
      docs/configuration/examples.md
  4. 2
      go.mod
  5. 4
      go.sum
  6. 3
      pkg/loki/loki.go
  7. 8
      pkg/loki/modules.go
  8. 12
      vendor/github.com/cortexproject/cortex/pkg/chunk/chunk_store.go
  9. 5
      vendor/github.com/cortexproject/cortex/pkg/chunk/composite_store.go
  10. 87
      vendor/github.com/cortexproject/cortex/pkg/chunk/purger/purger.go
  11. 5
      vendor/github.com/cortexproject/cortex/pkg/chunk/series_store.go
  12. 2
      vendor/github.com/cortexproject/cortex/pkg/cortex/cortex.go
  13. 8
      vendor/github.com/cortexproject/cortex/pkg/cortex/modules.go
  14. 31
      vendor/github.com/cortexproject/cortex/pkg/ingester/client/compat.go
  15. 47
      vendor/github.com/cortexproject/cortex/pkg/querier/frontend/frontend.go
  16. 2
      vendor/github.com/cortexproject/cortex/pkg/querier/queryrange/query_range.go
  17. 2
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/client.go
  18. 63
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_init_service.go
  19. 38
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/kv_state.go
  20. 300
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/memberlist_client.go
  21. 19
      vendor/github.com/cortexproject/cortex/pkg/ring/kv/memberlist/metrics.go
  22. 10
      vendor/github.com/cortexproject/cortex/tools/querytee/proxy.go
  23. 141
      vendor/github.com/cortexproject/cortex/tools/querytee/proxy_endpoint.go
  24. 18
      vendor/github.com/cortexproject/cortex/tools/querytee/proxy_metrics.go
  25. 2
      vendor/modules.txt

@ -63,15 +63,15 @@ func lokiReadRoutes() []querytee.Route {
samplesComparator.RegisterSamplesType(loghttp.ResultTypeStream, compareStreams)
return []querytee.Route{
{Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: "GET", ResponseComparator: samplesComparator},
{Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: "GET", ResponseComparator: samplesComparator},
{Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: "GET", ResponseComparator: nil},
{Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: "GET", ResponseComparator: nil},
{Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: "GET", ResponseComparator: nil},
{Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: "GET", ResponseComparator: nil},
{Path: "/api/prom/query", RouteName: "api_prom_query", Methods: "GET", ResponseComparator: samplesComparator},
{Path: "/api/prom/label", RouteName: "api_prom_label", Methods: "GET", ResponseComparator: nil},
{Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: "GET", ResponseComparator: nil},
{Path: "/api/prom/series", RouteName: "api_prom_series", Methods: "GET", ResponseComparator: nil},
{Path: "/loki/api/v1/query_range", RouteName: "api_v1_query_range", Methods: []string{"GET"}, ResponseComparator: samplesComparator},
{Path: "/loki/api/v1/query", RouteName: "api_v1_query", Methods: []string{"GET"}, ResponseComparator: samplesComparator},
{Path: "/loki/api/v1/label", RouteName: "api_v1_label", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/loki/api/v1/labels", RouteName: "api_v1_labels", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/loki/api/v1/label/{name}/values", RouteName: "api_v1_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/loki/api/v1/series", RouteName: "api_v1_series", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/api/prom/query", RouteName: "api_prom_query", Methods: []string{"GET"}, ResponseComparator: samplesComparator},
{Path: "/api/prom/label", RouteName: "api_prom_label", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/api/prom/label/{name}/values", RouteName: "api_prom_label_name_values", Methods: []string{"GET"}, ResponseComparator: nil},
{Path: "/api/prom/series", RouteName: "api_prom_series", Methods: []string{"GET"}, ResponseComparator: nil},
}
}

@ -15,6 +15,7 @@ Configuration examples can be found in the [Configuration Examples](examples.md)
* [ingester_config](#ingester_config)
* [lifecycler_config](#lifecycler_config)
* [ring_config](#ring_config)
* [memberlist_config](#memberlist_config)
* [storage_config](#storage_config)
* [cache_config](#cache_config)
* [chunk_store_config](#chunk_store_config)
@ -78,6 +79,10 @@ Supported contents and default values of `loki.yaml`:
# key value store.
[ingester: <ingester_config>]
# Configuration for an memberlist gossip ring. Only applies if
# store is "memberlist"
[memberlist: <memberlist_config>]
# Configures where Loki will store data.
[storage_config: <storage_config>]
@ -371,7 +376,7 @@ The `ring_config` is used to discover and connect to Ingesters.
```yaml
kvstore:
# The backend storage to use for the ring. Supported values are
# consul, etcd, inmemory
# consul, etcd, inmemory, memberlist
store: <string>
# The prefix for the keys in the store. Should end with a /.
@ -414,6 +419,107 @@ kvstore:
[replication_factor: <int> | default = 3]
```
## memberlist_config
The `memberlist_config` block configures the gossip ring to discover and connect
between distributors, ingesters and queriers. The configuration is unique for all
three components to ensure a single shared ring.
```yaml
# Name of the node in memberlist cluster. Defaults to hostname.
# CLI flag: -memberlist.nodename
[node_name: <string> | default = ""]
# Add random suffix to the node name.
# CLI flag: -memberlist.randomize-node-name
[randomize_node_name: <boolean> | default = true]
# The timeout for establishing a connection with a remote node, and for
# read/write operations. Uses memberlist LAN defaults if 0.
# CLI flag: -memberlist.stream-timeout
[stream_timeout: <duration> | default = 0s]
# Multiplication factor used when sending out messages (factor * log(N+1)).
# CLI flag: -memberlist.retransmit-factor
[retransmit_factor: <int> | default = 0]
# How often to use pull/push sync. Uses memberlist LAN defaults if 0.
# CLI flag: -memberlist.pullpush-interval
[pull_push_interval: <duration> | default = 0s]
# How often to gossip. Uses memberlist LAN defaults if 0.
# CLI flag: -memberlist.gossip-interval
[gossip_interval: <duration> | default = 0s]
# How many nodes to gossip to. Uses memberlist LAN defaults if 0.
# CLI flag: -memberlist.gossip-nodes
[gossip_nodes: <int> | default = 0]
# How long to keep gossiping to dead nodes, to give them chance to refute their
# death. Uses memberlist LAN defaults if 0.
# CLI flag: -memberlist.gossip-to-dead-nodes-time
[gossip_to_dead_nodes_time: <duration> | default = 0s]
# How soon can dead node's name be reclaimed with new address. Defaults to 0,
# which is disabled.
# CLI flag: -memberlist.dead-node-reclaim-time
[dead_node_reclaim_time: <duration> | default = 0s]
# Other cluster members to join. Can be specified multiple times.
# CLI flag: -memberlist.join
[join_members: <list of string> | default = ]
# Min backoff duration to join other cluster members.
# CLI flag: -memberlist.min-join-backoff
[min_join_backoff: <duration> | default = 1s]
# Max backoff duration to join other cluster members.
# CLI flag: -memberlist.max-join-backoff
[max_join_backoff: <duration> | default = 1m]
# Max number of retries to join other cluster members.
# CLI flag: -memberlist.max-join-retries
[max_join_retries: <int> | default = 10]
# If this node fails to join memberlist cluster, abort.
# CLI flag: -memberlist.abort-if-join-fails
[abort_if_cluster_join_fails: <boolean> | default = true]
# If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix
# the cluster split issue, and is harmless otherwise. For example when using
# only few components as a seed nodes (via -memberlist.join), then it's
# recommended to use rejoin. If -memberlist.join points to dynamic service that
# resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin
# is not needed.
# CLI flag: -memberlist.rejoin-interval
[rejoin_interval: <duration> | default = 0s]
# How long to keep LEFT ingesters in the ring.
# CLI flag: -memberlist.left-ingesters-timeout
[left_ingesters_timeout: <duration> | default = 5m]
# Timeout for leaving memberlist cluster.
# CLI flag: -memberlist.leave-timeout
[leave_timeout: <duration> | default = 5s]
# IP address to listen on for gossip messages. Multiple addresses may be
# specified. Defaults to 0.0.0.0
# CLI flag: -memberlist.bind-addr
[bind_addr: <list of string> | default = ]
# Port to listen on for gossip messages.
# CLI flag: -memberlist.bind-port
[bind_port: <int> | default = 7946]
# Timeout used when connecting to other nodes to send packet.
# CLI flag: -memberlist.packet-dial-timeout
[packet_dial_timeout: <duration> | default = 5s]
# Timeout for writing 'packet' data.
# CLI flag: -memberlist.packet-write-timeout
[packet_write_timeout: <duration> | default = 5s]
```
## storage_config
The `storage_config` block configures one of many possible stores for both the

@ -4,7 +4,8 @@
2. [Google Cloud Storage](#google-cloud-storage)
3. [Cassandra Index](#cassandra-index)
4. [AWS](#aws)
5. [Using the query-frontend](#query-frontend)
5. [Almost zero dependencies setup with Memberlist and BoltDB Shipper](#almost-zero-dependencies-setup)
6. [Using the query-frontend](#query-frontend)
## Complete Local config
@ -146,6 +147,77 @@ storage_config:
s3forcepathstyle: true
```
## Almost zero dependencies setup
This is a configuration to deploy Loki depending only on storage solution, e.g. an
S3-compatible API like minio. The ring configuration is based on the gossip memberlist
and the index is shipped to storage via [boltdb-shipper](../operations/storage/boltdb-shipper.md).
```yaml
auth_enabled: false
server:
http_listen_port: 3100
distributor:
ring:
store: memberlist
ingester:
lifecycler:
ring:
kvstore:
store: memberlist
replication_factor: 1
final_sleep: 0s
chunk_idle_period: 5m
chunk_retain_period: 30s
memberlist:
abort_if_cluster_join_fails: false
# Expose this port on all distributor, ingester
# and querier replicas.
bind_port: 7946
# You can use a headless k8s service for all distributor,
# ingester and querier components.
join_members:
- loki-gossip-ring.loki.svc.cluster.local:7946
max_join_backoff: 1m
max_join_retries: 10
min_join_backoff: 1s
schema_config:
configs:
- from: 2020-05-15
store: boltdb_shipper
object_store: s3
schema: v11
index:
prefix: index_
period: 168h
storage_config:
boltdb_shipper:
active_index_directory: /data/loki/index
cache_location: /data/loki/index_cache
resync_interval: 5s
shared_store: s3
aws:
s3: s3://access_key:secret_access_key@custom_endpoint/bucket_name
s3forcepathstyle: true
limits_config:
enforce_metric_name: false
reject_old_samples: true
reject_old_samples_max_age: 168h
```
## Query Frontend
[example configuration](./query-frontend.md)

@ -10,7 +10,7 @@ require (
github.com/containerd/containerd v1.3.2 // indirect
github.com/containerd/fifo v0.0.0-20190226154929-a9fb20d87448 // indirect
github.com/coreos/go-systemd v0.0.0-20191104093116-d3cd4ed1dbcf
github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a
github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3
github.com/davecgh/go-spew v1.1.1
github.com/docker/distribution v2.7.1+incompatible // indirect
github.com/docker/docker v0.7.3-0.20190817195342-4760db040282

@ -233,8 +233,8 @@ github.com/coreos/pkg v0.0.0-20160727233714-3ac0863d7acf/go.mod h1:E3G3o1h8I7cfc
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f h1:lBNOc5arjvs8E5mO2tbpBpLoyyu8B6e44T7hJy6potg=
github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f/go.mod h1:E3G3o1h8I7cfcXa63jLwjI0eiQQMgzzUDFVpN/nH/eA=
github.com/cortexproject/cortex v0.6.1-0.20200228110116-92ab6cbe0995/go.mod h1:3Xa3DjJxtpXqxcMGdk850lcIRb81M0fyY1MQ6udY134=
github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a h1:7K5ZOo5c+G+nyhG0CNeL6g5LnyjTUMykA8ceC2YgTYs=
github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a/go.mod h1:gySKqWpKaHBAFML0dOQyglOhxlJ/Eyfslf4QNBJjPoY=
github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3 h1:PebLohuxr0MSgWYAc/iUCFpxIdZyISk7jKUuQKTRbRw=
github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3/go.mod h1:gySKqWpKaHBAFML0dOQyglOhxlJ/Eyfslf4QNBJjPoY=
github.com/cpuguy83/go-md2man/v2 v2.0.0-20190314233015-f79a8a8ca69d/go.mod h1:maD7wRr/U5Z6m/iR4s+kqSMx2CaBsrgA7czyZG/E6dU=
github.com/creack/pty v1.1.7/go.mod h1:lj5s0c3V2DBrqTV7llrYr5NG6My20zk30Fl46Y7DoTY=
github.com/cznic/b v0.0.0-20180115125044-35e9bbe41f07/go.mod h1:URriBxXwVq5ijiJ12C7iIZqlA69nTlI+LgI6/pwftG8=

@ -83,6 +83,7 @@ func (c *Config) RegisterFlags(f *flag.FlagSet) {
c.Worker.RegisterFlags(f)
c.QueryRange.RegisterFlags(f)
c.RuntimeConfig.RegisterFlags(f)
c.MemberlistKV.RegisterFlags(f, "")
c.Tracing.RegisterFlags(f)
}
@ -123,7 +124,7 @@ type Loki struct {
frontend *frontend.Frontend
stopper queryrange.Stopper
runtimeConfig *runtimeconfig.Manager
memberlistKV *memberlist.KVInit
memberlistKV *memberlist.KVInitService
httpAuthMiddleware middleware.Interface
}

@ -324,11 +324,9 @@ func (t *Loki) initMemberlistKV() (services.Service, error) {
t.cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
}
t.memberlistKV = memberlist.NewKVInit(&t.cfg.MemberlistKV)
return services.NewIdleService(nil, func(_ error) error {
t.memberlistKV.Stop()
return nil
}), nil
t.memberlistKV = memberlist.NewKVInitService(&t.cfg.MemberlistKV)
return t.memberlistKV, nil
}
// activePeriodConfig type returns index type which would be applicable to logs that would be pushed starting now

@ -505,6 +505,11 @@ func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQ
log, ctx := spanlogger.New(ctx, "store.lookupEntriesByQueries")
defer log.Span.Finish()
// Nothing to do if there are no queries.
if len(queries) == 0 {
return nil, nil
}
var lock sync.Mutex
var entries []IndexEntry
err := c.index.QueryPages(ctx, queries, func(query IndexQuery, resp ReadBatch) bool {
@ -527,7 +532,12 @@ func (c *baseStore) lookupEntriesByQueries(ctx context.Context, queries []IndexQ
return entries, err
}
func (c *baseStore) parseIndexEntries(ctx context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {
func (c *baseStore) parseIndexEntries(_ context.Context, entries []IndexEntry, matcher *labels.Matcher) ([]string, error) {
// Nothing to do if there are no entries.
if len(entries) == 0 {
return nil, nil
}
result := make([]string, 0, len(entries))
for _, entry := range entries {
chunkKey, labelValue, _, err := parseChunkTimeRangeValue(entry.RangeValue, entry.Value)

@ -162,6 +162,11 @@ func (c compositeStore) GetChunkRefs(ctx context.Context, userID string, from, t
return err
}
// Skip it if there are no chunks.
if len(ids) == 0 {
return nil
}
chunkIDs = append(chunkIDs, ids...)
fetchers = append(fetchers, fetcher...)
return nil

@ -102,6 +102,13 @@ type DataPurger struct {
inProcessRequestIDs map[string]string
inProcessRequestIDsMtx sync.RWMutex
// We do not want to limit pulling new delete requests to a fixed interval which otherwise would limit number of delete requests we process per user.
// While loading delete requests if we find more requests from user pending to be processed, we just set their id in usersWithPendingRequests and
// when a user's delete request gets processed we just check this map to see whether we want to load more requests without waiting for next ticker to load new batch.
usersWithPendingRequests map[string]struct{}
usersWithPendingRequestsMtx sync.Mutex
pullNewRequestsChan chan struct{}
pendingPlansCount map[string]int // per request pending plan count
pendingPlansCountMtx sync.Mutex
@ -113,31 +120,23 @@ func NewDataPurger(cfg Config, deleteStore *DeleteStore, chunkStore chunk.Store,
util.WarnExperimentalUse("Delete series API")
dataPurger := DataPurger{
cfg: cfg,
deleteStore: deleteStore,
chunkStore: chunkStore,
objectClient: storageClient,
metrics: newPurgerMetrics(registerer),
executePlansChan: make(chan deleteRequestWithLogger, 50),
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
pendingPlansCount: map[string]int{},
cfg: cfg,
deleteStore: deleteStore,
chunkStore: chunkStore,
objectClient: storageClient,
metrics: newPurgerMetrics(registerer),
pullNewRequestsChan: make(chan struct{}, 1),
executePlansChan: make(chan deleteRequestWithLogger, 50),
workerJobChan: make(chan workerJob, 50),
inProcessRequestIDs: map[string]string{},
usersWithPendingRequests: map[string]struct{}{},
pendingPlansCount: map[string]int{},
}
dataPurger.Service = services.NewTimerService(time.Hour, dataPurger.init, dataPurger.runOneIteration, dataPurger.stop)
dataPurger.Service = services.NewBasicService(dataPurger.init, dataPurger.loop, dataPurger.stop)
return &dataPurger, nil
}
// Run keeps pulling delete requests for planning after initializing necessary things
func (dp *DataPurger) runOneIteration(ctx context.Context) error {
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
// Don't return error here, or Timer service will stop.
return nil
}
// init starts workers, scheduler and then loads in process delete requests
func (dp *DataPurger) init(ctx context.Context) error {
for i := 0; i < dp.cfg.NumWorkers; i++ {
@ -151,6 +150,29 @@ func (dp *DataPurger) init(ctx context.Context) error {
return dp.loadInprocessDeleteRequests()
}
func (dp *DataPurger) loop(ctx context.Context) error {
loadRequestsTicker := time.NewTicker(time.Hour)
defer loadRequestsTicker.Stop()
loadRequests := func() {
err := dp.pullDeleteRequestsToPlanDeletes()
if err != nil {
level.Error(util.Logger).Log("msg", "error pulling delete requests for building plans", "err", err)
}
}
for {
select {
case <-loadRequestsTicker.C:
loadRequests()
case <-dp.pullNewRequestsChan:
loadRequests()
case <-ctx.Done():
return nil
}
}
}
// Stop waits until all background tasks stop.
func (dp *DataPurger) stop(_ error) error {
dp.wg.Wait()
@ -183,6 +205,21 @@ func (dp *DataPurger) workerJobCleanup(job workerJob) {
dp.inProcessRequestIDsMtx.Lock()
delete(dp.inProcessRequestIDs, job.userID)
dp.inProcessRequestIDsMtx.Unlock()
// request loading of more delete request if
// - user has more pending requests and
// - we do not have a pending request to load more requests
dp.usersWithPendingRequestsMtx.Lock()
defer dp.usersWithPendingRequestsMtx.Unlock()
if _, ok := dp.usersWithPendingRequests[job.userID]; ok {
delete(dp.usersWithPendingRequests, job.userID)
select {
case dp.pullNewRequestsChan <- struct{}{}:
// sent
default:
// already sent
}
}
} else {
dp.pendingPlansCountMtx.Unlock()
}
@ -345,12 +382,16 @@ func (dp *DataPurger) pullDeleteRequestsToPlanDeletes() error {
}
dp.inProcessRequestIDsMtx.RLock()
inprocessDeleteRequstID := dp.inProcessRequestIDs[deleteRequest.UserID]
inprocessDeleteRequestID := dp.inProcessRequestIDs[deleteRequest.UserID]
dp.inProcessRequestIDsMtx.RUnlock()
if inprocessDeleteRequstID != "" {
if inprocessDeleteRequestID != "" {
dp.usersWithPendingRequestsMtx.Lock()
dp.usersWithPendingRequests[deleteRequest.UserID] = struct{}{}
dp.usersWithPendingRequestsMtx.Unlock()
level.Debug(util.Logger).Log("msg", "skipping delete request processing for now since another request from same user is already in process",
"inprocess_request_id", inprocessDeleteRequstID,
"inprocess_request_id", inprocessDeleteRequestID,
"skipped_request_id", deleteRequest.RequestID, "user_id", deleteRequest.UserID)
continue
}

@ -182,6 +182,11 @@ func (c *seriesStore) GetChunkRefs(ctx context.Context, userID string, from, thr
level.Debug(log).Log("chunks-post-filtering", len(chunks))
chunksPerQuery.Observe(float64(len(chunks)))
// We should return an empty chunks slice if there are no chunks.
if len(chunks) == 0 {
return [][]Chunk{}, []*Fetcher{}, nil
}
return [][]Chunk{chunks}, []*Fetcher{c.baseStore.Fetcher}, nil
}

@ -216,7 +216,7 @@ type Cortex struct {
Alertmanager *alertmanager.MultitenantAlertmanager
Compactor *compactor.Compactor
StoreGateway *storegateway.StoreGateway
MemberlistKV *memberlist.KVInit
MemberlistKV *memberlist.KVInitService
// Queryable that the querier should use to query the long
// term storage. It depends on the storage engine used.

@ -475,12 +475,8 @@ func (t *Cortex) initMemberlistKV() (services.Service, error) {
t.Cfg.MemberlistKV.Codecs = []codec.Codec{
ring.GetCodec(),
}
t.MemberlistKV = memberlist.NewKVInit(&t.Cfg.MemberlistKV)
return services.NewIdleService(nil, func(_ error) error {
t.MemberlistKV.Stop()
return nil
}), nil
t.MemberlistKV = memberlist.NewKVInitService(&t.Cfg.MemberlistKV)
return t.MemberlistKV, nil
}
func (t *Cortex) initDataPurger() (services.Service, error) {

@ -229,19 +229,34 @@ func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
// get in input labels whose data structure is reused.
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
result := make(labels.Labels, len(input))
size := 0
for _, l := range input {
size += len(l.Name)
size += len(l.Value)
}
// Copy all strings into the buffer, and use 'yoloString' to convert buffer
// slices to strings.
buf := make([]byte, size)
for i, l := range input {
result[i] = labels.Label{
Name: copyString(l.Name),
Value: copyString(l.Value),
}
result[i].Name, buf = copyStringToBuffer(l.Name, buf)
result[i].Value, buf = copyStringToBuffer(l.Value, buf)
}
return result
}
func copyString(in string) string {
out := make([]byte, len(in))
copy(out, in)
return string(out)
// Copies string to buffer (which must be big enough), and converts buffer slice containing
// the string copy into new string.
func copyStringToBuffer(in string, buf []byte) (string, []byte) {
l := len(in)
c := copy(buf, in)
if c != l {
panic("not copied full string")
}
return yoloString(buf[0:l]), buf[l:]
}
// FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter.

@ -51,7 +51,7 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.MaxOutstandingPerTenant, "querier.max-outstanding-requests-per-tenant", 100, "Maximum number of outstanding requests per tenant per frontend; requests beyond this error with HTTP 429.")
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. 0 to disable.")
f.DurationVar(&cfg.LogQueriesLongerThan, "frontend.log-queries-longer-than", 0, "Log queries that are slower than the specified duration. Set to 0 to disable. Set to < 0 to enable on all queries.")
}
// Frontend queues HTTP requests, dispatches them to backends, and handles retries
@ -164,34 +164,41 @@ func (f *Frontend) handle(w http.ResponseWriter, r *http.Request) {
resp, err := f.roundTripper.RoundTrip(r)
queryResponseTime := time.Since(startTime)
if f.cfg.LogQueriesLongerThan > 0 && queryResponseTime > f.cfg.LogQueriesLongerThan {
if err != nil {
writeError(w, err)
} else {
hs := w.Header()
for h, vs := range resp.Header {
hs[h] = vs
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
// If LogQueriesLongerThan is set to <0 we log every query, if it is set to 0 query logging
// is disabled
if f.cfg.LogQueriesLongerThan != 0 && queryResponseTime > f.cfg.LogQueriesLongerThan {
logMessage := []interface{}{
"msg", "slow query",
"msg", "slow query detected",
"method", r.Method,
"host", r.Host,
"path", r.URL.Path,
"time_taken", queryResponseTime.String(),
}
for k, v := range r.URL.Query() {
logMessage = append(logMessage, fmt.Sprintf("qs_%s", k), strings.Join(v, ","))
}
pf := r.PostForm.Encode()
if pf != "" {
logMessage = append(logMessage, "body", pf)
// Ensure the form has been parsed so all the parameters are present
err = r.ParseForm()
if err != nil {
level.Warn(util.WithContext(r.Context(), f.log)).Log("msg", "unable to parse form for request", "error", err)
}
level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
}
if err != nil {
writeError(w, err)
return
}
// Attempt to iterate through the Form to log any filled in values
for k, v := range r.Form {
logMessage = append(logMessage, fmt.Sprintf("param_%s", k), strings.Join(v, ","))
}
hs := w.Header()
for h, vs := range resp.Header {
hs[h] = vs
level.Info(util.WithContext(r.Context(), f.log)).Log(logMessage...)
}
w.WriteHeader(resp.StatusCode)
io.Copy(w, resp.Body)
}
func writeError(w http.ResponseWriter, err error) {

@ -266,6 +266,8 @@ func (prometheusCodec) EncodeResponse(ctx context.Context, res Response) (*http.
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "invalid response format")
}
sp.LogFields(otlog.Int("series", len(a.Data.Result)))
b, err := json.Marshal(a)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusInternalServerError, "error encoding response: %v", err)

@ -61,7 +61,7 @@ func (cfg *Config) RegisterFlagsWithPrefix(flagsPrefix, defaultPrefix string, f
flagsPrefix = "ring."
}
f.StringVar(&cfg.Prefix, flagsPrefix+"prefix", defaultPrefix, "The prefix for the keys in the store. Should end with a /.")
f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, multi, memberlist (experimental).")
f.StringVar(&cfg.Store, flagsPrefix+"store", "consul", "Backend storage to use for the ring. Supported values are: consul, etcd, inmemory, memberlist, multi.")
}
// Client is a high-level client for key-value stores (such as Etcd and

@ -0,0 +1,63 @@
package memberlist
import (
"context"
"sync"
"github.com/cortexproject/cortex/pkg/util/services"
)
// This service initialized memberlist.KV on first call to GetMemberlistKV, and starts it. On stop,
// KV is stopped too. If KV fails, error is reported from the service.
type KVInitService struct {
services.Service
// config used for initialization
cfg *KVConfig
// init function, to avoid multiple initializations.
init sync.Once
// state
kv *KV
err error
watcher *services.FailureWatcher
}
func NewKVInitService(cfg *KVConfig) *KVInitService {
kvinit := &KVInitService{
cfg: cfg,
watcher: services.NewFailureWatcher(),
}
kvinit.Service = services.NewBasicService(nil, kvinit.running, kvinit.stopping)
return kvinit
}
// This method will initialize Memberlist.KV on first call, and add it to service failure watcher.
func (kvs *KVInitService) GetMemberlistKV() (*KV, error) {
kvs.init.Do(func() {
kvs.kv = NewKV(*kvs.cfg)
kvs.watcher.WatchService(kvs.kv)
kvs.err = kvs.kv.StartAsync(context.Background())
})
return kvs.kv, kvs.err
}
func (kvs *KVInitService) running(ctx context.Context) error {
select {
case <-ctx.Done():
return nil
case err := <-kvs.watcher.Chan():
// Only happens if KV service was actually initialized in GetMemberlistKV and it fails.
return err
}
}
func (kvs *KVInitService) stopping(_ error) error {
if kvs.kv == nil {
return nil
}
return services.StopAndAwaitTerminated(context.Background(), kvs.kv)
}

@ -1,38 +0,0 @@
package memberlist
import (
"sync"
)
// This struct holds state of initialization of memberlist.KV instance.
type KVInit struct {
// config used for initialization
cfg *KVConfig
// init function, to avoid multiple initializations.
init sync.Once
// state
kv *KV
err error
}
func NewKVInit(cfg *KVConfig) *KVInit {
return &KVInit{
cfg: cfg,
}
}
func (kvs *KVInit) GetMemberlistKV() (*KV, error) {
kvs.init.Do(func() {
kvs.kv, kvs.err = NewKV(*kvs.cfg)
})
return kvs.kv, kvs.err
}
func (kvs *KVInit) Stop() {
if kvs.kv != nil {
kvs.kv.Stop()
}
}

@ -3,6 +3,7 @@ package memberlist
import (
"bytes"
"context"
"crypto/rand"
"encoding/binary"
"errors"
"flag"
@ -15,11 +16,11 @@ import (
"github.com/go-kit/kit/log/level"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
)
const (
@ -48,11 +49,21 @@ func NewClient(kv *KV, codec codec.Codec) (*Client, error) {
// List is part of kv.Client interface.
func (c *Client) List(ctx context.Context, prefix string) ([]string, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.List(prefix), nil
}
// Get is part of kv.Client interface.
func (c *Client) Get(ctx context.Context, key string) (interface{}, error) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return nil, err
}
return c.kv.Get(key, c.codec)
}
@ -63,24 +74,57 @@ func (c *Client) Delete(ctx context.Context, key string) error {
// CAS is part of kv.Client interface
func (c *Client) CAS(ctx context.Context, key string, f func(in interface{}) (out interface{}, retry bool, err error)) error {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return err
}
return c.kv.CAS(ctx, key, c.codec, f)
}
// WatchKey is part of kv.Client interface.
func (c *Client) WatchKey(ctx context.Context, key string, f func(interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchKey(ctx, key, c.codec, f)
}
// WatchPrefix calls f whenever any value stored under prefix changes.
// Part of kv.Client interface.
func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string, interface{}) bool) {
err := c.awaitKVRunningOrStopping(ctx)
if err != nil {
return
}
c.kv.WatchPrefix(ctx, prefix, c.codec, f)
}
// We want to use KV in Running and Stopping states.
func (c *Client) awaitKVRunningOrStopping(ctx context.Context) error {
s := c.kv.State()
switch s {
case services.Running, services.Stopping:
return nil
case services.New, services.Starting:
err := c.kv.AwaitRunning(ctx)
if ns := c.kv.State(); ns == services.Stopping {
return nil
}
return err
default:
return fmt.Errorf("unexpected state: %v", s)
}
}
// KVConfig is a config for memberlist.KV
type KVConfig struct {
// Memberlist options.
NodeName string `yaml:"node_name"`
RandomizeNodeName bool `yaml:"randomize_node_name"`
StreamTimeout time.Duration `yaml:"stream_timeout"`
RetransmitMult int `yaml:"retransmit_factor"`
PushPullInterval time.Duration `yaml:"pull_push_interval"`
@ -91,7 +135,11 @@ type KVConfig struct {
// List of members to join
JoinMembers flagext.StringSlice `yaml:"join_members"`
MinJoinBackoff time.Duration `yaml:"min_join_backoff"`
MaxJoinBackoff time.Duration `yaml:"max_join_backoff"`
MaxJoinRetries int `yaml:"max_join_retries"`
AbortIfJoinFails bool `yaml:"abort_if_cluster_join_fails"`
RejoinInterval time.Duration `yaml:"rejoin_interval"`
// Remove LEFT ingesters from ring after this timeout.
LeftIngestersTimeout time.Duration `yaml:"left_ingesters_timeout"`
@ -113,10 +161,15 @@ type KVConfig struct {
func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) {
// "Defaults to hostname" -- memberlist sets it to hostname by default.
f.StringVar(&cfg.NodeName, prefix+"memberlist.nodename", "", "Name of the node in memberlist cluster. Defaults to hostname.") // memberlist.DefaultLANConfig will put hostname here.
f.BoolVar(&cfg.RandomizeNodeName, prefix+"memberlist.randomize-node-name", true, "Add random suffix to the node name.")
f.DurationVar(&cfg.StreamTimeout, prefix+"memberlist.stream-timeout", 0, "The timeout for establishing a connection with a remote node, and for read/write operations. Uses memberlist LAN defaults if 0.")
f.IntVar(&cfg.RetransmitMult, prefix+"memberlist.retransmit-factor", 0, "Multiplication factor used when sending out messages (factor * log(N+1)).")
f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times. Memberlist store is EXPERIMENTAL.")
f.Var(&cfg.JoinMembers, prefix+"memberlist.join", "Other cluster members to join. Can be specified multiple times.")
f.DurationVar(&cfg.MinJoinBackoff, prefix+"memberlist.min-join-backoff", 1*time.Second, "Min backoff duration to join other cluster members.")
f.DurationVar(&cfg.MaxJoinBackoff, prefix+"memberlist.max-join-backoff", 1*time.Minute, "Max backoff duration to join other cluster members.")
f.IntVar(&cfg.MaxJoinRetries, prefix+"memberlist.max-join-retries", 10, "Max number of retries to join other cluster members.")
f.BoolVar(&cfg.AbortIfJoinFails, prefix+"memberlist.abort-if-join-fails", true, "If this node fails to join memberlist cluster, abort.")
f.DurationVar(&cfg.RejoinInterval, prefix+"memberlist.rejoin-interval", 0, "If not 0, how often to rejoin the cluster. Occasional rejoin can help to fix the cluster split issue, and is harmless otherwise. For example when using only few components as a seed nodes (via -memberlist.join), then it's recommended to use rejoin. If -memberlist.join points to dynamic service that resolves to all gossiping nodes (eg. Kubernetes headless service), then rejoin is not needed.")
f.DurationVar(&cfg.LeftIngestersTimeout, prefix+"memberlist.left-ingesters-timeout", 5*time.Minute, "How long to keep LEFT ingesters in the ring.")
f.DurationVar(&cfg.LeaveTimeout, prefix+"memberlist.leave-timeout", 5*time.Second, "Timeout for leaving memberlist cluster.")
f.DurationVar(&cfg.GossipInterval, prefix+"memberlist.gossip-interval", 0, "How often to gossip. Uses memberlist LAN defaults if 0.")
@ -128,16 +181,31 @@ func (cfg *KVConfig) RegisterFlags(f *flag.FlagSet, prefix string) {
cfg.TCPTransport.RegisterFlags(f, prefix)
}
func generateRandomSuffix() string {
suffix := make([]byte, 4)
_, err := rand.Read(suffix)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to generate random suffix", "err", err)
return "error"
}
return fmt.Sprintf("%2x", suffix)
}
// KV implements Key-Value store on top of memberlist library. KV store has API similar to kv.Client,
// except methods also need explicit codec for each operation.
// KV is a Service. It needs to be started first, and is only usable once it enters Running state.
// If joining of the cluster if configured, it is done in Running state, and if join fails and Abort flag is set, service
// fails.
type KV struct {
cfg KVConfig
services.Service
cfg KVConfig
// Protects access to memberlist and broadcasts fields.
initWG sync.WaitGroup
memberlist *memberlist.Memberlist
broadcasts *memberlist.TransmitLimitedQueue
// Disabled on Stop()
casBroadcastsEnabled *atomic.Bool
// KV Store.
storeMu sync.Mutex
store map[string]valueDesc
@ -199,46 +267,70 @@ var (
errTooManyRetries = errors.New("too many retries")
)
// NewKV creates new Client instance. If cfg.JoinMembers is set, it will also try to connect
// to these members and join the cluster. If that fails and AbortIfJoinFails is true, error is returned and no
// client is created.
func NewKV(cfg KVConfig) (*KV, error) {
util.WarnExperimentalUse("Gossip memberlist ring")
// NewKV creates new gossip-based KV service. Note that service needs to be started, until then it doesn't initialize
// gossiping part. Only after service is in Running state, it is really gossiping.
// Starting the service will also trigger connecting to the existing memberlist cluster, if cfg.JoinMembers is set.
// If that fails and AbortIfJoinFails is true, error is returned and service enters Failed state.
func NewKV(cfg KVConfig) *KV {
cfg.TCPTransport.MetricsRegisterer = cfg.MetricsRegisterer
cfg.TCPTransport.MetricsNamespace = cfg.MetricsNamespace
tr, err := NewTCPTransport(cfg.TCPTransport)
mlkv := &KV{
cfg: cfg,
store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
}
mlkv.createAndRegisterMetrics()
for _, c := range cfg.Codecs {
mlkv.codecs[c.CodecID()] = c
}
mlkv.Service = services.NewBasicService(mlkv.starting, mlkv.running, mlkv.stopping)
return mlkv
}
func (m *KV) buildMemberlistConfig() (*memberlist.Config, error) {
tr, err := NewTCPTransport(m.cfg.TCPTransport)
if err != nil {
return nil, fmt.Errorf("failed to create transport: %v", err)
}
mlCfg := memberlist.DefaultLANConfig()
mlCfg.Delegate = m
if cfg.StreamTimeout != 0 {
mlCfg.TCPTimeout = cfg.StreamTimeout
if m.cfg.StreamTimeout != 0 {
mlCfg.TCPTimeout = m.cfg.StreamTimeout
}
if m.cfg.RetransmitMult != 0 {
mlCfg.RetransmitMult = m.cfg.RetransmitMult
}
if cfg.RetransmitMult != 0 {
mlCfg.RetransmitMult = cfg.RetransmitMult
if m.cfg.PushPullInterval != 0 {
mlCfg.PushPullInterval = m.cfg.PushPullInterval
}
if cfg.PushPullInterval != 0 {
mlCfg.PushPullInterval = cfg.PushPullInterval
if m.cfg.GossipInterval != 0 {
mlCfg.GossipInterval = m.cfg.GossipInterval
}
if cfg.GossipInterval != 0 {
mlCfg.GossipInterval = cfg.GossipInterval
if m.cfg.GossipNodes != 0 {
mlCfg.GossipNodes = m.cfg.GossipNodes
}
if cfg.GossipNodes != 0 {
mlCfg.GossipNodes = cfg.GossipNodes
if m.cfg.GossipToTheDeadTime > 0 {
mlCfg.GossipToTheDeadTime = m.cfg.GossipToTheDeadTime
}
if cfg.GossipToTheDeadTime > 0 {
mlCfg.GossipToTheDeadTime = cfg.GossipToTheDeadTime
if m.cfg.DeadNodeReclaimTime > 0 {
mlCfg.DeadNodeReclaimTime = m.cfg.DeadNodeReclaimTime
}
if cfg.DeadNodeReclaimTime > 0 {
mlCfg.DeadNodeReclaimTime = cfg.DeadNodeReclaimTime
if m.cfg.NodeName != "" {
mlCfg.Name = m.cfg.NodeName
}
if cfg.NodeName != "" {
mlCfg.Name = cfg.NodeName
if m.cfg.RandomizeNodeName {
mlCfg.Name = mlCfg.Name + "-" + generateRandomSuffix()
level.Info(util.Logger).Log("msg", "Using memberlist cluster node name", "name", mlCfg.Name)
}
mlCfg.LogOutput = newMemberlistLoggerAdapter(util.Logger, false)
@ -247,55 +339,80 @@ func NewKV(cfg KVConfig) (*KV, error) {
// Memberlist uses UDPBufferSize to figure out how many messages it can put into single "packet".
// As we don't use UDP for sending packets, we can use higher value here.
mlCfg.UDPBufferSize = 10 * 1024 * 1024
return mlCfg, nil
}
mlkv := &KV{
cfg: cfg,
store: make(map[string]valueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
shutdown: make(chan struct{}),
maxCasRetries: maxCasRetries,
casBroadcastsEnabled: atomic.NewBool(true),
}
func (m *KV) starting(_ context.Context) error {
util.WarnExperimentalUse("Gossip memberlist ring")
mlCfg.Delegate = mlkv
mlCfg, err := m.buildMemberlistConfig()
if err != nil {
return err
}
// Wait for memberlist and broadcasts fields creation because
// memberlist may start calling delegate methods if it
// receives traffic.
// See https://godoc.org/github.com/hashicorp/memberlist#Delegate
//
// Note: We cannot check for Starting state, as we want to use delegate during cluster joining process
// that happens in Starting state.
m.initWG.Add(1)
list, err := memberlist.Create(mlCfg)
if err != nil {
return nil, fmt.Errorf("failed to create memberlist: %v", err)
return fmt.Errorf("failed to create memberlist: %v", err)
}
// finish delegate initialization
mlkv.memberlist = list
mlkv.broadcasts = &memberlist.TransmitLimitedQueue{
// Finish delegate initialization.
m.memberlist = list
m.broadcasts = &memberlist.TransmitLimitedQueue{
NumNodes: list.NumMembers,
RetransmitMult: cfg.RetransmitMult,
RetransmitMult: m.cfg.RetransmitMult,
}
m.initWG.Done()
// Almost ready...
mlkv.createAndRegisterMetrics()
for _, c := range cfg.Codecs {
mlkv.codecs[c.CodecID()] = c
}
return nil
}
// Join the cluster
if len(cfg.JoinMembers) > 0 {
reached, err := mlkv.JoinMembers(cfg.JoinMembers)
if err != nil && cfg.AbortIfJoinFails {
_ = mlkv.memberlist.Shutdown()
return nil, err
}
var errFailedToJoinCluster = errors.New("failed to join memberlist cluster on startup")
func (m *KV) running(ctx context.Context) error {
// Join the cluster, if configured. We want this to happen in Running state, because started memberlist
// is good enough for usage from Client (which checks for Running state), even before it connects to the cluster.
if len(m.cfg.JoinMembers) > 0 {
err := m.joinMembersOnStartup(ctx, m.cfg.JoinMembers)
if err != nil {
level.Error(util.Logger).Log("msg", "failed to join memberlist cluster", "err", err)
} else {
level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached)
if m.cfg.AbortIfJoinFails {
return errFailedToJoinCluster
}
}
}
return mlkv, nil
var tickerChan <-chan time.Time = nil
if m.cfg.RejoinInterval > 0 {
t := time.NewTicker(m.cfg.RejoinInterval)
defer t.Stop()
tickerChan = t.C
}
for {
select {
case <-tickerChan:
reached, err := m.memberlist.Join(m.cfg.JoinMembers)
if err == nil {
level.Info(util.Logger).Log("msg", "re-joined memberlist cluster", "reached_nodes", reached)
} else {
// Don't report error from rejoin, otherwise KV service would be stopped completely.
level.Warn(util.Logger).Log("msg", "re-joining memberlist cluster failed", "err", err)
}
case <-ctx.Done():
return nil
}
}
}
// GetCodec returns codec for given ID or nil.
@ -304,23 +421,65 @@ func (m *KV) GetCodec(codecID string) codec.Codec {
}
// GetListeningPort returns port used for listening for memberlist communication. Useful when BindPort is set to 0.
// This call is only valid after KV service has been started.
func (m *KV) GetListeningPort() int {
return int(m.memberlist.LocalNode().Port)
}
// JoinMembers joins the cluster with given members.
// See https://godoc.org/github.com/hashicorp/memberlist#Memberlist.Join
// This call is only valid after KV service has been started and is still running.
func (m *KV) JoinMembers(members []string) (int, error) {
if m.State() != services.Running {
return 0, fmt.Errorf("service not Running")
}
return m.memberlist.Join(members)
}
// Stop tries to leave memberlist cluster and then shutdown memberlist client.
func (m *KV) joinMembersOnStartup(ctx context.Context, members []string) error {
reached, err := m.memberlist.Join(m.cfg.JoinMembers)
if err == nil {
level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached)
return nil
}
if m.cfg.MaxJoinRetries <= 0 {
return err
}
level.Debug(util.Logger).Log("msg", "attempt to join memberlist cluster failed", "retries", 0, "err", err)
lastErr := err
cfg := util.BackoffConfig{
MinBackoff: m.cfg.MinJoinBackoff,
MaxBackoff: m.cfg.MaxJoinBackoff,
MaxRetries: m.cfg.MaxJoinRetries,
}
backoff := util.NewBackoff(ctx, cfg)
for backoff.Ongoing() {
backoff.Wait()
reached, err := m.memberlist.Join(members)
if err != nil {
lastErr = err
level.Debug(util.Logger).Log("msg", "attempt to join memberlist cluster failed", "retries", backoff.NumRetries(), "err", err)
continue
}
level.Info(util.Logger).Log("msg", "joined memberlist cluster", "reached_nodes", reached)
return nil
}
return lastErr
}
// While Stopping, we try to leave memberlist cluster and then shutdown memberlist client.
// We do this in order to send out last messages, typically that ingester has LEFT the ring.
func (m *KV) Stop() {
func (m *KV) stopping(_ error) error {
level.Info(util.Logger).Log("msg", "leaving memberlist cluster")
m.casBroadcastsEnabled.Store(false)
// Wait until broadcast queue is empty, but don't wait for too long.
// Also don't wait if there is just one node left.
// Problem is that broadcast queue is also filled up by state changes received from other nodes,
@ -348,6 +507,7 @@ func (m *KV) Stop() {
if err != nil {
level.Error(util.Logger).Log("msg", "error when shutting down memberlist client", "err", err)
}
return nil
}
// List returns all known keys under a given prefix.
@ -581,7 +741,7 @@ outer:
m.casSuccesses.Inc()
m.notifyWatchers(key)
if m.casBroadcastsEnabled.Load() {
if m.State() == services.Running {
m.broadcastNewValue(key, change, newver, codec)
} else {
level.Warn(util.Logger).Log("msg", "skipped broadcasting CAS update because memberlist KV is shutting down", "key", key)
@ -680,6 +840,8 @@ func (m *KV) NodeMeta(limit int) []byte {
// NotifyMsg is method from Memberlist Delegate interface
// Called when single message is received, i.e. what our broadcastNewValue has sent.
func (m *KV) NotifyMsg(msg []byte) {
m.initWG.Wait()
m.numberOfReceivedMessages.Inc()
m.totalSizeOfReceivedMessages.Add(float64(len(msg)))
@ -740,6 +902,8 @@ func (m *KV) queueBroadcast(key string, content []string, version uint, message
// GetBroadcasts is method from Memberlist Delegate interface
// It returns all pending broadcasts (within the size limit)
func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
m.initWG.Wait()
return m.broadcasts.GetBroadcasts(overhead, limit)
}
@ -749,6 +913,8 @@ func (m *KV) GetBroadcasts(overhead, limit int) [][]byte {
// Here we dump our entire state -- all keys and their values. There is no limit on message size here,
// as Memberlist uses 'stream' operations for transferring this state.
func (m *KV) LocalState(join bool) []byte {
m.initWG.Wait()
m.numberOfPulls.Inc()
m.storeMu.Lock()
@ -799,6 +965,8 @@ func (m *KV) LocalState(join bool) []byte {
//
// Data is full state of remote KV store, as generated by `LocalState` method (run on another node).
func (m *KV) MergeRemoteState(data []byte, join bool) {
m.initWG.Wait()
m.numberOfPushes.Inc()
m.totalSizeOfPushes.Add(float64(len(data)))

@ -9,6 +9,7 @@ import (
"github.com/prometheus/client_golang/prometheus"
"github.com/cortexproject/cortex/pkg/util"
"github.com/cortexproject/cortex/pkg/util/services"
)
func (m *KV) createAndRegisterMetrics() {
@ -69,7 +70,11 @@ func (m *KV) createAndRegisterMetrics() {
Name: "messages_in_broadcast_queue",
Help: "Number of user messages in the broadcast queue",
}, func() float64 {
return float64(m.broadcasts.NumQueued())
// m.broadcasts is not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.broadcasts.NumQueued())
}
return 0
})
m.totalSizeOfBroadcastMessagesInQueue = prometheus.NewGauge(prometheus.GaugeOpts{
@ -116,7 +121,11 @@ func (m *KV) createAndRegisterMetrics() {
Name: "cluster_members_count",
Help: "Number of members in memberlist cluster",
}, func() float64 {
return float64(m.memberlist.NumMembers())
// m.memberlist is not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.memberlist.NumMembers())
}
return 0
})
m.memberlistHealthScore = prometheus.NewGaugeFunc(prometheus.GaugeOpts{
@ -125,7 +134,11 @@ func (m *KV) createAndRegisterMetrics() {
Name: "cluster_node_health_score",
Help: "Health score of this cluster. Lower value is better. 0 = healthy",
}, func() float64 {
return float64(m.memberlist.GetHealthScore())
// m.memberlist is not set before Starting state
if m.State() == services.Running || m.State() == services.Stopping {
return float64(m.memberlist.GetHealthScore())
}
return 0
})
m.watchPrefixDroppedNotifications = prometheus.NewCounterVec(prometheus.CounterOpts{

@ -34,7 +34,7 @@ type ProxyConfig struct {
func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
f.IntVar(&cfg.ServerServicePort, "server.service-port", 80, "The port where the query-tee service listens to.")
f.StringVar(&cfg.BackendEndpoints, "backend.endpoints", "", "Comma separated list of backend endpoints to query.")
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client.")
f.StringVar(&cfg.PreferredBackend, "backend.preferred", "", "The hostname of the preferred backend when selecting the response to send back to the client. If no preferred backend is configured then the query-tee will send back to the client the first successful response received without waiting for other backends.")
f.DurationVar(&cfg.BackendReadTimeout, "backend.read-timeout", 90*time.Second, "The timeout when reading the response from a backend.")
f.BoolVar(&cfg.CompareResponses, "proxy.compare-responses", false, "Compare responses between preferred and secondary endpoints for supported routes.")
}
@ -42,7 +42,7 @@ func (cfg *ProxyConfig) RegisterFlags(f *flag.FlagSet) {
type Route struct {
Path string
RouteName string
Methods string
Methods []string
ResponseComparator ResponsesComparator
}
@ -63,7 +63,7 @@ type Proxy struct {
func NewProxy(cfg ProxyConfig, logger log.Logger, routes []Route, registerer prometheus.Registerer) (*Proxy, error) {
if cfg.CompareResponses && cfg.PreferredBackend == "" {
return nil, fmt.Errorf("when enabling comparion of results -backend.preferred flag must be set to hostname of preferred backend")
return nil, fmt.Errorf("when enabling comparison of results -backend.preferred flag must be set to hostname of preferred backend")
}
p := &Proxy{
@ -134,12 +134,12 @@ func (p *Proxy) Start() error {
}))
// register routes
var comparator ResponsesComparator
for _, route := range p.routes {
var comparator ResponsesComparator
if p.cfg.CompareResponses {
comparator = route.ResponseComparator
}
router.Path(route.Path).Methods(route.Methods).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator))
router.Path(route.Path).Methods(route.Methods...).Handler(NewProxyEndpoint(p.backends, route.RouteName, p.metrics, p.logger, comparator))
}
p.srvListener = listener

@ -7,10 +7,10 @@ import (
"sync"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/cortexproject/cortex/pkg/util"
)
type ResponsesComparator interface {
@ -23,17 +23,29 @@ type ProxyEndpoint struct {
logger log.Logger
comparator ResponsesComparator
// Whether for this endpoint there's a preferred backend configured.
hasPreferredBackend bool
// The route name used to track metrics.
routeName string
}
func NewProxyEndpoint(backends []*ProxyBackend, routeName string, metrics *ProxyMetrics, logger log.Logger, comparator ResponsesComparator) *ProxyEndpoint {
hasPreferredBackend := false
for _, backend := range backends {
if backend.preferred {
hasPreferredBackend = true
break
}
}
return &ProxyEndpoint{
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
backends: backends,
routeName: routeName,
metrics: metrics,
logger: logger,
comparator: comparator,
hasPreferredBackend: hasPreferredBackend,
}
}
@ -41,9 +53,29 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
level.Debug(p.logger).Log("msg", "Received request", "path", r.URL.Path, "query", r.URL.RawQuery)
// Send the same request to all backends.
resCh := make(chan *backendResponse, len(p.backends))
go p.executeBackendRequests(r, resCh)
// Wait for the first response that's feasible to be sent back to the client.
downstreamRes := p.waitBackendResponseForDownstream(resCh)
if downstreamRes.err != nil {
http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError)
} else {
w.WriteHeader(downstreamRes.status)
if _, err := w.Write(downstreamRes.body); err != nil {
level.Warn(p.logger).Log("msg", "Unable to write response", "err", err)
}
}
p.metrics.responsesTotal.WithLabelValues(downstreamRes.backend.name, r.Method, p.routeName).Inc()
}
func (p *ProxyEndpoint) executeBackendRequests(r *http.Request, resCh chan *backendResponse) {
responses := make([]*backendResponse, 0, len(p.backends))
wg := sync.WaitGroup{}
wg.Add(len(p.backends))
resCh := make(chan *backendResponse, len(p.backends))
for _, b := range p.backends {
b := b
@ -60,9 +92,7 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
status: status,
body: body,
err: err,
elapsed: elapsed,
}
resCh <- res
// Log with a level based on the backend response.
lvl := level.Debug
@ -71,6 +101,14 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
}
lvl(p.logger).Log("msg", "Backend response", "path", r.URL.Path, "query", r.URL.RawQuery, "backend", b.name, "status", status, "elapsed", elapsed)
p.metrics.requestDuration.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(elapsed.Seconds())
// Keep track of the response if required.
if p.comparator != nil {
responses = append(responses, res)
}
resCh <- res
}()
}
@ -78,59 +116,55 @@ func (p *ProxyEndpoint) ServeHTTP(w http.ResponseWriter, r *http.Request) {
wg.Wait()
close(resCh)
// Collect all responses and track metrics for each of them.
responses := make([]*backendResponse, 0, len(p.backends))
for res := range resCh {
responses = append(responses, res)
p.metrics.durationMetric.WithLabelValues(res.backend.name, r.Method, p.routeName, strconv.Itoa(res.statusCode())).Observe(res.elapsed.Seconds())
}
// Select the response to send back to the client.
downstreamRes := p.pickResponseForDownstream(responses)
if downstreamRes.err != nil {
http.Error(w, downstreamRes.err.Error(), http.StatusInternalServerError)
} else {
w.WriteHeader(downstreamRes.status)
if _, err := w.Write(downstreamRes.body); err != nil {
level.Warn(p.logger).Log("msg", "Unable to write response", "err", err)
}
}
// Compare responses.
if p.comparator != nil {
go func() {
expectedResponse := responses[0]
actualResponse := responses[1]
if responses[1].backend.preferred {
expectedResponse, actualResponse = actualResponse, expectedResponse
}
expectedResponse := responses[0]
actualResponse := responses[1]
if responses[1].backend.preferred {
expectedResponse, actualResponse = actualResponse, expectedResponse
}
result := resultSuccess
err := p.compareResponses(expectedResponse, actualResponse)
if err != nil {
level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = resultFailed
}
result := comparisonSuccess
err := p.compareResponses(expectedResponse, actualResponse)
if err != nil {
level.Error(util.Logger).Log("msg", "response comparison failed", "route-name", p.routeName,
"query", r.URL.RawQuery, "err", err)
result = comparisonFailed
}
p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc()
}()
p.metrics.responsesComparedTotal.WithLabelValues(p.routeName, result).Inc()
}
}
func (p *ProxyEndpoint) pickResponseForDownstream(responses []*backendResponse) *backendResponse {
// Look for a successful response from the preferred backend.
for _, res := range responses {
if res.backend.preferred && res.succeeded() {
func (p *ProxyEndpoint) waitBackendResponseForDownstream(resCh chan *backendResponse) *backendResponse {
var (
responses = make([]*backendResponse, 0, len(p.backends))
preferredResponseReceived = false
)
for res := range resCh {
// If the response is successful we can immediately return it if:
// - There's no preferred backend configured
// - Or this response is from the preferred backend
// - Or the preferred backend response has already been received and wasn't successful
if res.succeeded() && (!p.hasPreferredBackend || res.backend.preferred || preferredResponseReceived) {
return res
}
}
// Look for any other successful response.
for _, res := range responses {
if res.succeeded() {
return res
// If we received a non successful response from the preferred backend, then we can
// return the first successful response received so far (if any).
if res.backend.preferred && !res.succeeded() {
preferredResponseReceived = true
for _, prevRes := range responses {
if prevRes.succeeded() {
return prevRes
}
}
}
// Otherwise we keep track of it for later.
responses = append(responses, res)
}
// No successful response, so let's pick the first one.
@ -159,7 +193,6 @@ type backendResponse struct {
status int
body []byte
err error
elapsed time.Duration
}
func (r *backendResponse) succeeded() bool {

@ -7,28 +7,34 @@ import (
)
const (
resultSuccess = "success"
resultFailed = "fail"
comparisonSuccess = "success"
comparisonFailed = "fail"
)
type ProxyMetrics struct {
durationMetric *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
responsesTotal *prometheus.CounterVec
responsesComparedTotal *prometheus.CounterVec
}
func NewProxyMetrics(registerer prometheus.Registerer) *ProxyMetrics {
m := &ProxyMetrics{
durationMetric: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
requestDuration: promauto.With(registerer).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex_querytee",
Name: "request_duration_seconds",
Help: "Time (in seconds) spent serving HTTP requests.",
Buckets: instrument.DefBuckets,
}, []string{"backend", "method", "route", "status_code"}),
responsesTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_querytee",
Name: "responses_total",
Help: "Total number of responses sent back to the client by the selected backend.",
}, []string{"backend", "method", "route"}),
responsesComparedTotal: promauto.With(registerer).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex_querytee",
Name: "responses_compared_total",
Help: "Total number of responses compared per route name by result",
}, []string{"route_name", "result"}),
Help: "Total number of responses compared per route name by result.",
}, []string{"route", "result"}),
}
return m

@ -142,7 +142,7 @@ github.com/coreos/go-systemd/journal
github.com/coreos/go-systemd/sdjournal
# github.com/coreos/pkg v0.0.0-20180928190104-399ea9e2e55f
github.com/coreos/pkg/capnslog
# github.com/cortexproject/cortex v1.1.1-0.20200609120740-6bd667db776a
# github.com/cortexproject/cortex v1.1.1-0.20200616130854-34b45d1180c3
github.com/cortexproject/cortex/pkg/alertmanager
github.com/cortexproject/cortex/pkg/alertmanager/alerts
github.com/cortexproject/cortex/pkg/alertmanager/alerts/configdb

Loading…
Cancel
Save