feature: Add ingester handler for shutdown and forget tokens (#6179)

* Update grafana/dskit to 07166f9

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add /ingester/shutdown handler

This handler replaces the deprecated /ingester/flush_shutdown handler
and can be used to gracefully shut down a Loki instance and delete the
file that persists the tokens of the ingester ring.

In production environments you usually want to persist ring tokens so
that during a restart of an ingester instance, or during rollout, the
tokens from that instance are not re-distributed to other instances, but
instead kept so that the same streams end up on the same instance once
it is up and running again. For that, the tokens are written to a file
that can be specified via the `-ingester.tokens-file-path` argument.

In certain cases, however, you want to forget the tokens and
re-distribute them when shutting down an ingester instance. This was
already possible by calling `/ingester/flush_shutdown`, deleting the
tokens file and terminating the process. The new handler
`/ingester/shutdown` combines these manual steps into a
single handler.

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>

* Add changelog entry

Signed-off-by: Christian Haudum <christian.haudum@gmail.com>
pull/6361/head
Christian Haudum 4 years ago committed by GitHub
parent 50533cbd51
commit c12a1f4f43
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 26
      docs/sources/api/_index.md
  3. 2
      go.mod
  4. 4
      go.sum
  5. 65
      pkg/ingester/ingester.go
  6. 12
      pkg/loki/modules.go
  7. 12
      pkg/util/http.go
  8. 219
      vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go
  9. 197
      vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go
  10. 67
      vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go
  11. 143
      vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml
  12. 9
      vendor/github.com/grafana/dskit/ring/basic_lifecycler.go
  13. 2
      vendor/github.com/grafana/dskit/ring/http.go
  14. 78
      vendor/github.com/grafana/dskit/ring/lifecycler.go
  15. 12
      vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go
  16. 40
      vendor/github.com/grafana/dskit/ring/ring.go
  17. 2
      vendor/github.com/grafana/dskit/services/basic_service.go
  18. 4
      vendor/github.com/grafana/dskit/spanlogger/spanlogger.go
  19. 2
      vendor/modules.txt

@ -2,6 +2,7 @@
* [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields
* [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target
* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully
* [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage
* [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization
* [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail

@ -48,7 +48,8 @@ These endpoints are exposed by the distributor:
These endpoints are exposed by the ingester:
- [`POST /flush`](#post-flush)
- [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- **Deprecated** [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown)
- [`POST /ingester/shutdown`](#post-ingestershutdown)
The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably.
@ -807,13 +808,34 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester.
## `POST /ingester/flush_shutdown`
**Deprecated**: Please use `/ingester/shutdown?flush=true` instead.
`/ingester/flush_shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds.
This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned,
but instead flushed to our chunk backend.
In microservices mode, the `/ingester/flush_shutdown` endpoint is exposed by the ingester.
### `GET /distributor/ring`
## `POST /ingester/shutdown`
`/ingester/shutdown` is similar to the [`/ingester/flush_shutdown`](#post-ingesterflush_shutdown)
endpoint, but accepts three URL query parameters `flush`, `delete_ring_tokens`, and `terminate`.
**URL query parameters:**
* `flush=<bool>`:
Flag to control whether to flush any in-memory chunks the ingester holds. Defaults to `true`.
* `delete_ring_tokens=<bool>`:
Flag to control whether to delete the file that contains the ingester ring tokens of the instance if the `-ingester.token-file-path` is specified.
* `terminate=<bool>`:
Flag to control whether to terminate the Loki process after service shutdown. Defaults to `true`.
This handler, in contrast to the `/ingester/flush_shutdown` handler, terminates the Loki process by default.
This behaviour can be changed by setting the `terminate` query parameter to `false`.
In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester.
## `GET /distributor/ring`
Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor.

@ -49,7 +49,7 @@ require (
github.com/google/uuid v1.2.0
github.com/gorilla/mux v1.8.0
github.com/gorilla/websocket v1.4.2
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96
github.com/grafana/go-gelf/v2 v2.0.1
github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0

@ -1039,8 +1039,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY=
github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk=
github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 h1:mZluMeUp1vLHKb1nSrMnA0mfupSpBeUkZqDDpfHabrQ=
github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0=
github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak=
github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90=
github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY=

@ -11,6 +11,7 @@ import (
"time"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/modules"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
@ -172,8 +173,10 @@ type Interface interface {
logproto.QuerierServer
CheckReady(ctx context.Context) error
FlushHandler(w http.ResponseWriter, _ *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
GetOrCreateInstance(instanceID string) (*instance, error)
// deprecated
LegacyShutdownHandler(w http.ResponseWriter, r *http.Request)
ShutdownHandler(w http.ResponseWriter, r *http.Request)
}
// Ingester builds chunks for incoming log streams.
@ -209,6 +212,10 @@ type Ingester struct {
// Denotes whether the ingester should flush on shutdown.
// Currently only used by the WAL to signal when the disk is full.
flushOnShutdownSwitch *OnceSwitch
// Flag for whether stopping the ingester service should also terminate the
// loki process.
// This is set when calling the shutdown handler.
terminateOnShutdown bool
// Only used by WAL & flusher to coordinate backpressure during replay.
replayController *replayController
@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
tailersQuit: make(chan struct{}),
metrics: metrics,
flushOnShutdownSwitch: &OnceSwitch{},
terminateOnShutdown: false,
}
i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i})
@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error {
}
i.flushQueuesDone.Wait()
// In case the flag to terminate on shutdown is set we need to mark the
// ingester service as "failed", so Loki will shut down entirely.
// The module manager logs the failure `modules.ErrStopProcess` in a special way.
if i.terminateOnShutdown && errs.Err() == nil {
return modules.ErrStopProcess
}
return errs.Err()
}
@ -526,10 +540,16 @@ func (i *Ingester) loop() {
}
}
// ShutdownHandler triggers the following set of operations in order:
// LegacyShutdownHandler triggers the following set of operations in order:
// * Change the state of ring to stop accepting writes.
// * Flush all the chunks.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Note: This handler does not trigger a termination of the Loki process,
// despite its name. Instead, the ingester service is stopped, so an external
// source can trigger a safe termination through a signal to the process.
// The handler is deprecated and usage is discouraged. Use ShutdownHandler
// instead.
func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) {
level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.")
originalState := i.lifecycler.FlushOnShutdown()
// We want to flush the chunks if transfer fails irrespective of original flag.
i.lifecycler.SetFlushOnShutdown(true)
@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusNoContent)
}
// ShutdownHandler handles a graceful shutdown of the ingester service and
// termination of the Loki process.
func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Don't allow calling the shutdown handler multiple times
if i.State() != services.Running {
w.WriteHeader(http.StatusServiceUnavailable)
_, _ = w.Write([]byte("Ingester is stopping or already stopped."))
return
}
params := r.URL.Query()
doFlush := util.FlagFromValues(params, "flush", true)
doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false)
doTerminate := util.FlagFromValues(params, "terminate", true)
err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens)
// Stopping the module will return the modules.ErrStopProcess error. This is
// needed so the Loki process is shut down completely.
if err == nil || err == modules.ErrStopProcess {
w.WriteHeader(http.StatusNoContent)
} else {
w.WriteHeader(http.StatusInternalServerError)
_, _ = w.Write([]byte(err.Error()))
}
}
// handleShutdown triggers the following operations:
// * Change the state of ring to stop accepting writes.
// * optional: Flush all the chunks.
// * optional: Delete ring tokens file
// * Unregister from KV store
// * optional: Terminate process (handled by service manager in loki.go)
func (i *Ingester) handleShutdown(terminate, flush, del bool) error {
i.lifecycler.SetFlushOnShutdown(flush)
i.lifecycler.SetClearTokensOnShutdown(del)
i.lifecycler.SetUnregisterOnShutdown(true)
i.terminateOnShutdown = terminate
return services.StopAndAwaitTerminated(context.Background(), i)
}
// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := tenant.TenantID(ctx)

@ -342,9 +342,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
httpMiddleware := middleware.Merge(
serverutil.RecoveryHTTPMiddleware,
)
t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)))
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)))
t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.LegacyShutdownHandler)),
)
t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler(
httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)),
)
return t.Ingester, nil
}

@ -9,6 +9,7 @@ import (
"html/template"
"io"
"net/http"
"net/url"
"strings"
"github.com/go-kit/log"
@ -285,3 +286,14 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi
}
return nil
}
func FlagFromValues(values url.Values, key string, d bool) bool {
switch strings.ToLower(values.Get(key)) {
case "t", "true", "1":
return true
case "f", "false", "0":
return false
default:
return d
}
}

@ -0,0 +1,219 @@
package memberlist
import (
_ "embed"
"encoding/json"
"fmt"
"html/template"
"net/http"
"sort"
"strconv"
"strings"
"time"
"github.com/hashicorp/memberlist"
)
// HTTPStatusHandler is a http.Handler with status information about memberlist.
type HTTPStatusHandler struct {
kvs *KVInitService
tpl *template.Template
}
// StatusPageData represents the data passed to the template rendered by HTTPStatusHandler
type StatusPageData struct {
Now time.Time
Memberlist *memberlist.Memberlist
SortedMembers []*memberlist.Node
Store map[string]ValueDesc
MessageHistoryBufferBytes int
SentMessages []Message
ReceivedMessages []Message
}
// NewHTTPStatusHandler creates a new HTTPStatusHandler that will render the provided template using the data from StatusPageData.
func NewHTTPStatusHandler(kvs *KVInitService, tpl *template.Template) HTTPStatusHandler {
return HTTPStatusHandler{kvs, tpl}
}
func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kv := h.kvs.getKV()
if kv == nil {
w.Header().Set("Content-Type", "text/plain")
// Ignore inactionable errors.
_, _ = w.Write([]byte("This instance doesn't use memberlist."))
return
}
const (
downloadKeyParam = "downloadKey"
viewKeyParam = "viewKey"
viewMsgParam = "viewMsg"
deleteMessagesParam = "deleteMessages"
)
if err := req.ParseForm(); err == nil {
if req.Form[downloadKeyParam] != nil {
downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
return
}
if req.Form[viewKeyParam] != nil {
viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
return
}
if req.Form[viewMsgParam] != nil {
msgID, err := strconv.Atoi(req.Form[viewMsgParam][0])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
sent, received := kv.getSentAndReceivedMessages()
for _, m := range append(sent, received...) {
if m.ID == msgID {
viewMessage(w, kv, m, getFormat(req))
return
}
}
http.Error(w, "message not found", http.StatusNotFound)
return
}
if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" {
kv.deleteSentReceivedMessages()
// Redirect back.
w.Header().Set("Location", "?"+deleteMessagesParam+"=false")
w.WriteHeader(http.StatusFound)
return
}
}
members := kv.memberlist.Members()
sort.Slice(members, func(i, j int) bool {
return members[i].Name < members[j].Name
})
sent, received := kv.getSentAndReceivedMessages()
v := StatusPageData{
Now: time.Now(),
Memberlist: kv.memberlist,
SortedMembers: members,
Store: kv.storeCopy(),
MessageHistoryBufferBytes: kv.cfg.MessageHistoryBufferBytes,
SentMessages: sent,
ReceivedMessages: received,
}
accept := req.Header.Get("Accept")
if strings.Contains(accept, "application/json") {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "text/html")
if err := h.tpl.Execute(w, v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func getFormat(req *http.Request) string {
const viewFormat = "format"
format := ""
if len(req.Form[viewFormat]) > 0 {
format = req.Form[viewFormat][0]
}
return format
}
func viewMessage(w http.ResponseWriter, kv *KV, msg Message, format string) {
c := kv.GetCodec(msg.Pair.Codec)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}
val, err := c.Decode(msg.Pair.Value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
return
}
formatValue(w, val, format)
}
func viewKey(w http.ResponseWriter, store map[string]ValueDesc, key string, format string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}
formatValue(w, store[key].value, format)
}
func formatValue(w http.ResponseWriter, val interface{}, format string) {
w.WriteHeader(200)
w.Header().Add("content-type", "text/plain")
switch format {
case "json", "json-pretty":
enc := json.NewEncoder(w)
if format == "json-pretty" {
enc.SetIndent("", " ")
}
err := enc.Encode(val)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
default:
_, _ = fmt.Fprintf(w, "%#v", val)
}
}
func downloadKey(w http.ResponseWriter, kv *KV, store map[string]ValueDesc, key string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}
val := store[key]
c := kv.GetCodec(store[key].CodecID)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}
encoded, err := c.Encode(val.value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError)
return
}
w.Header().Add("content-type", "application/octet-stream")
// Set content-length so that client knows whether it has received full response or not.
w.Header().Add("content-length", strconv.Itoa(len(encoded)))
w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.Version, key))
w.WriteHeader(200)
// Ignore errors, we cannot do anything about them.
_, _ = w.Write(encoded)
}
//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"StringsJoin": strings.Join,
}).Parse(defaultPageContent))

@ -2,19 +2,10 @@ package memberlist
import (
"context"
_ "embed"
"encoding/json"
"fmt"
"html/template"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"time"
"github.com/go-kit/log"
"github.com/hashicorp/memberlist"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
@ -95,191 +86,5 @@ func (kvs *KVInitService) stopping(_ error) error {
}
func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) {
kv := kvs.getKV()
if kv == nil {
w.Header().Set("Content-Type", "text/plain")
// Ignore inactionable errors.
_, _ = w.Write([]byte("This instance doesn't use memberlist."))
return
}
const (
downloadKeyParam = "downloadKey"
viewKeyParam = "viewKey"
viewMsgParam = "viewMsg"
deleteMessagesParam = "deleteMessages"
)
if err := req.ParseForm(); err == nil {
if req.Form[downloadKeyParam] != nil {
downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest.
return
}
if req.Form[viewKeyParam] != nil {
viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req))
return
}
if req.Form[viewMsgParam] != nil {
msgID, err := strconv.Atoi(req.Form[viewMsgParam][0])
if err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
sent, received := kv.getSentAndReceivedMessages()
for _, m := range append(sent, received...) {
if m.ID == msgID {
viewMessage(w, kv, m, getFormat(req))
return
}
}
http.Error(w, "message not found", http.StatusNotFound)
return
}
if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" {
kv.deleteSentReceivedMessages()
// Redirect back.
w.Header().Set("Location", "?"+deleteMessagesParam+"=false")
w.WriteHeader(http.StatusFound)
return
}
}
members := kv.memberlist.Members()
sort.Slice(members, func(i, j int) bool {
return members[i].Name < members[j].Name
})
sent, received := kv.getSentAndReceivedMessages()
v := pageData{
Now: time.Now(),
Memberlist: kv.memberlist,
SortedMembers: members,
Store: kv.storeCopy(),
SentMessages: sent,
ReceivedMessages: received,
}
accept := req.Header.Get("Accept")
if strings.Contains(accept, "application/json") {
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
return
}
w.Header().Set("Content-Type", "text/html")
if err := defaultPageTemplate.Execute(w, v); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func getFormat(req *http.Request) string {
const viewFormat = "format"
format := ""
if len(req.Form[viewFormat]) > 0 {
format = req.Form[viewFormat][0]
}
return format
}
func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) {
c := kv.GetCodec(msg.Pair.Codec)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}
val, err := c.Decode(msg.Pair.Value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError)
return
}
formatValue(w, val, format)
}
func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}
formatValue(w, store[key].value, format)
}
func formatValue(w http.ResponseWriter, val interface{}, format string) {
w.WriteHeader(200)
w.Header().Add("content-type", "text/plain")
switch format {
case "json", "json-pretty":
enc := json.NewEncoder(w)
if format == "json-pretty" {
enc.SetIndent("", " ")
}
err := enc.Encode(val)
if err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
default:
_, _ = fmt.Fprintf(w, "%#v", val)
}
}
func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) {
if store[key].value == nil {
http.Error(w, "value not found", http.StatusNotFound)
return
}
val := store[key]
c := kv.GetCodec(store[key].codecID)
if c == nil {
http.Error(w, "codec not found", http.StatusNotFound)
return
}
encoded, err := c.Encode(val.value)
if err != nil {
http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError)
return
}
w.Header().Add("content-type", "application/octet-stream")
// Set content-length so that client knows whether it has received full response or not.
w.Header().Add("content-length", strconv.Itoa(len(encoded)))
w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key))
w.WriteHeader(200)
// Ignore errors, we cannot do anything about them.
_, _ = w.Write(encoded)
NewHTTPStatusHandler(kvs, defaultPageTemplate).ServeHTTP(w, req)
}
type pageData struct {
Now time.Time
Memberlist *memberlist.Memberlist
SortedMembers []*memberlist.Node
Store map[string]valueDesc
SentMessages []message
ReceivedMessages []message
}
//go:embed status.gohtml
var defaultPageContent string
var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{
"StringsJoin": strings.Join,
}).Parse(defaultPageContent))

@ -232,7 +232,7 @@ type KV struct {
// KV Store.
storeMu sync.Mutex
store map[string]valueDesc
store map[string]ValueDesc
// Codec registry
codecs map[string]codec.Codec
@ -245,9 +245,9 @@ type KV struct {
// Buffers with sent and received messages. Used for troubleshooting only.
// New messages are appended, old messages (based on configured size limit) removed from the front.
messagesMu sync.Mutex
sentMessages []message
sentMessages []Message
sentMessagesSize int
receivedMessages []message
receivedMessages []Message
receivedMessagesSize int
messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI.
@ -285,7 +285,7 @@ type KV struct {
// Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message.
// Fields are exported for templating to work.
type message struct {
type Message struct {
ID int // Unique local ID of the message.
Time time.Time // Time when message was sent or received.
Size int // Message size
@ -296,21 +296,22 @@ type message struct {
Changes []string // List of changes in this message (as computed by *this* node).
}
type valueDesc struct {
// ValueDesc stores the value along with it's codec and local version.
type ValueDesc struct {
// We store the decoded value here to prevent decoding the entire state for every
// update we receive. Whilst the updates are small and fast to decode,
// the total state can be quite large.
// The CAS function is passed a deep copy because it modifies in-place.
value Mergeable
// version (local only) is used to keep track of what we're gossiping about, and invalidate old messages
version uint
// Version (local only) is used to keep track of what we're gossiping about, and invalidate old messages.
Version uint
// ID of codec used to write this value. Only used when sending full state.
codecID string
CodecID string
}
func (v valueDesc) Clone() (result valueDesc) {
func (v ValueDesc) Clone() (result ValueDesc) {
result = v
if v.value != nil {
result.value = v.value.Clone()
@ -318,8 +319,8 @@ func (v valueDesc) Clone() (result valueDesc) {
return
}
func (v valueDesc) String() string {
return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID)
func (v ValueDesc) String() string {
return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID)
}
var (
@ -343,7 +344,7 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer
registerer: registerer,
provider: dnsProvider,
store: make(map[string]valueDesc),
store: make(map[string]ValueDesc),
codecs: make(map[string]codec.Codec),
watchers: make(map[string][]chan string),
prefixWatchers: make(map[string][]chan string),
@ -642,7 +643,7 @@ func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint,
_, _ = v.value.RemoveTombstones(time.Time{})
}
return v.value, v.version, nil
return v.value, v.Version, nil
}
// WatchKey watches for value changes for given key. When value changes, 'f' function is called with the
@ -909,7 +910,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec
return
}
m.addSentMessage(message{
m.addSentMessage(Message{
Time: time.Now(),
Size: len(pairData),
Pair: kvPair,
@ -964,7 +965,7 @@ func (m *KV) NotifyMsg(msg []byte) {
changes = mod.MergeContent()
}
m.addReceivedMessage(message{
m.addReceivedMessage(Message{
Time: time.Now(),
Size: len(msg),
Pair: kvPair,
@ -1033,9 +1034,9 @@ func (m *KV) LocalState(join bool) []byte {
continue
}
codec := m.GetCodec(val.codecID)
codec := m.GetCodec(val.CodecID)
if codec == nil {
level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key)
level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key)
continue
}
@ -1048,7 +1049,7 @@ func (m *KV) LocalState(join bool) []byte {
kvPair.Reset()
kvPair.Key = key
kvPair.Value = encoded
kvPair.Codec = val.codecID
kvPair.Codec = val.CodecID
ser, err := kvPair.Marshal()
if err != nil {
@ -1068,11 +1069,11 @@ func (m *KV) LocalState(join bool) []byte {
}
buf.Write(ser)
m.addSentMessage(message{
m.addSentMessage(Message{
Time: sent,
Size: len(ser),
Pair: kvPair, // Makes a copy of kvPair.
Version: val.version,
Version: val.Version,
})
}
@ -1136,7 +1137,7 @@ func (m *KV) MergeRemoteState(data []byte, join bool) {
changes = change.MergeContent()
}
m.addReceivedMessage(message{
m.addReceivedMessage(Message{
Time: received,
Size: int(kvPairLength),
Pair: kvPair, // Makes a copy of kvPair.
@ -1184,7 +1185,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
// the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS).
curr := m.store[key]
// if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set.
if casVersion > 0 && curr.version != casVersion {
if casVersion > 0 && curr.Version != casVersion {
return nil, 0, errVersionMismatch
}
result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0)
@ -1215,11 +1216,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui
}
}
newVersion := curr.version + 1
m.store[key] = valueDesc{
newVersion := curr.Version + 1
m.store[key] = ValueDesc{
value: result,
version: newVersion,
codecID: codec.CodecID(),
Version: newVersion,
CodecID: codec.CodecID(),
}
// The "changes" returned by Merge() can contain references to the "result"
@ -1240,17 +1241,17 @@ func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable,
return oldVal, change, err
}
func (m *KV) storeCopy() map[string]valueDesc {
func (m *KV) storeCopy() map[string]ValueDesc {
m.storeMu.Lock()
defer m.storeMu.Unlock()
result := make(map[string]valueDesc, len(m.store))
result := make(map[string]ValueDesc, len(m.store))
for k, v := range m.store {
result[k] = v.Clone()
}
return result
}
func (m *KV) addReceivedMessage(msg message) {
func (m *KV) addReceivedMessage(msg Message) {
if m.cfg.MessageHistoryBufferBytes == 0 {
return
}
@ -1264,7 +1265,7 @@ func (m *KV) addReceivedMessage(msg message) {
m.receivedMessages, m.receivedMessagesSize = addMessageToBuffer(m.receivedMessages, m.receivedMessagesSize, m.cfg.MessageHistoryBufferBytes, msg)
}
func (m *KV) addSentMessage(msg message) {
func (m *KV) addSentMessage(msg Message) {
if m.cfg.MessageHistoryBufferBytes == 0 {
return
}
@ -1278,12 +1279,12 @@ func (m *KV) addSentMessage(msg message) {
m.sentMessages, m.sentMessagesSize = addMessageToBuffer(m.sentMessages, m.sentMessagesSize, m.cfg.MessageHistoryBufferBytes, msg)
}
func (m *KV) getSentAndReceivedMessages() (sent, received []message) {
func (m *KV) getSentAndReceivedMessages() (sent, received []Message) {
m.messagesMu.Lock()
defer m.messagesMu.Unlock()
// Make copy of both slices.
return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...)
return append([]Message(nil), m.sentMessages...), append([]Message(nil), m.receivedMessages...)
}
func (m *KV) deleteSentReceivedMessages() {
@ -1296,7 +1297,7 @@ func (m *KV) deleteSentReceivedMessages() {
m.receivedMessagesSize = 0
}
func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) {
func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) {
msgs = append(msgs, msg)
size += msg.Size

@ -1,4 +1,4 @@
{{- /*gotype: github.com/grafana/dskit/kv/memberlist.statusPageData */ -}}
{{- /*gotype: github.com/grafana/dskit/kv/memberlist.StatusPageData */ -}}
<!DOCTYPE html>
<html>
<head>
@ -20,7 +20,8 @@
<thead>
<tr>
<th>Key</th>
<th>Value Details</th>
<th>Codec</th>
<th>Version</th>
<th>Actions</th>
</tr>
</thead>
@ -29,7 +30,8 @@
{{ range $k, $v := .Store }}
<tr>
<td>{{ $k }}</td>
<td>{{ $v }}</td>
<td>{{ $v.CodecID }}</td>
<td>{{ $v.Version }}</td>
<td>
<a href="?viewKey={{ $k }}&format=json">json</a>
| <a href="?viewKey={{ $k }}&format=json-pretty">json-pretty</a>
@ -68,76 +70,83 @@
<p>State: 0 = Alive, 1 = Suspect, 2 = Dead, 3 = Left</p>
<h2>Received Messages</h2>
<h2>Message History</h2>
<a href="?deleteMessages=true">Delete All Messages (received and sent)</a>
{{ if .MessageHistoryBufferBytes }}
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Time</th>
<th>Key</th>
<th>Value in the Message</th>
<th>Version After Update (0 = no change)</th>
<th>Changes</th>
<th>Actions</th>
</tr>
</thead>
<h3>Received Messages</h3>
<tbody>
{{ range .ReceivedMessages }}
<a href="?deleteMessages=true">Delete All Messages (received and sent)</a>
<table width="100%" border="1">
<thead>
<tr>
<td>{{ .ID }}</td>
<td>{{ .Time.Format "15:04:05.000" }}</td>
<td>{{ .Pair.Key }}</td>
<td>size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}</td>
<td>{{ .Version }}</td>
<td>{{ StringsJoin .Changes ", " }}</td>
<td>
<a href="?viewMsg={{ .ID }}&format=json">json</a>
| <a href="?viewMsg={{ .ID }}&format=json-pretty">json-pretty</a>
| <a href="?viewMsg={{ .ID }}&format=struct">struct</a>
</td>
<th>ID</th>
<th>Time</th>
<th>Key</th>
<th>Value in the Message</th>
<th>Version After Update (0 = no change)</th>
<th>Changes</th>
<th>Actions</th>
</tr>
{{ end }}
</tbody>
</table>
<h2>Sent Messages</h2>
<a href="?deleteMessages=true">Delete All Messages (received and sent)</a>
<table width="100%" border="1">
<thead>
<tr>
<th>ID</th>
<th>Time</th>
<th>Key</th>
<th>Value</th>
<th>Version</th>
<th>Changes</th>
<th>Actions</th>
</tr>
</thead>
<tbody>
{{ range .SentMessages }}
</thead>
<tbody>
{{ range .ReceivedMessages }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Time.Format "15:04:05.000" }}</td>
<td>{{ .Pair.Key }}</td>
<td>size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}</td>
<td>{{ .Version }}</td>
<td>{{ StringsJoin .Changes ", " }}</td>
<td>
<a href="?viewMsg={{ .ID }}&format=json">json</a>
| <a href="?viewMsg={{ .ID }}&format=json-pretty">json-pretty</a>
| <a href="?viewMsg={{ .ID }}&format=struct">struct</a>
</td>
</tr>
{{ end }}
</tbody>
</table>
<h3>Sent Messages</h3>
<a href="?deleteMessages=true">Delete All Messages (received and sent)</a>
<table width="100%" border="1">
<thead>
<tr>
<td>{{ .ID }}</td>
<td>{{ .Time.Format "15:04:05.000" }}</td>
<td>{{ .Pair.Key }}</td>
<td>size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}</td>
<td>{{ .Version }}</td>
<td>{{ StringsJoin .Changes ", " }}</td>
<td>
<a href="?viewMsg={{ .ID }}&format=json">json</a>
| <a href="?viewMsg={{ .ID }}&format=json-pretty">json-pretty</a>
| <a href="?viewMsg={{ .ID }}&format=struct">struct</a>
</td>
<th>ID</th>
<th>Time</th>
<th>Key</th>
<th>Value</th>
<th>Version</th>
<th>Changes</th>
<th>Actions</th>
</tr>
{{ end }}
</tbody>
</table>
</thead>
<tbody>
{{ range .SentMessages }}
<tr>
<td>{{ .ID }}</td>
<td>{{ .Time.Format "15:04:05.000" }}</td>
<td>{{ .Pair.Key }}</td>
<td>size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}</td>
<td>{{ .Version }}</td>
<td>{{ StringsJoin .Changes ", " }}</td>
<td>
<a href="?viewMsg={{ .ID }}&format=json">json</a>
| <a href="?viewMsg={{ .ID }}&format=json-pretty">json-pretty</a>
| <a href="?viewMsg={{ .ID }}&format=struct">struct</a>
</td>
</tr>
{{ end }}
</tbody>
</table>
{{ else }}
<p><i>Message history buffer is disabled, refer to the configuration to enable it in order to troubleshoot the message history.</i></p>
{{ end }}
</body>
</html>

@ -405,8 +405,13 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc,
// This could happen if the backend store restarted (and content deleted)
// or the instance has been forgotten. In this case, we do re-insert it.
if !ok {
level.Warn(l.logger).Log("msg", "instance missing in the ring, adding it back", "ring", l.ringName)
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), l.GetRegisteredAt())
level.Warn(l.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", l.ringName)
// Due to how shuffle sharding work, the missing instance for some period of time could have cause
// a resharding of tenants among instances: to guarantee query correctness we need to update the
// registration timestamp to current time.
registeredAt := time.Now()
instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt)
}
prevTimestamp := instanceDesc.Timestamp

@ -93,7 +93,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) {
http.Error(w, err.Error(), http.StatusInternalServerError)
return
}
_, ownedTokens := ringDesc.countTokens()
ownedTokens := ringDesc.countTokens()
var ingesterIDs []string
for id := range ringDesc.Ingesters {

@ -13,7 +13,6 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/pkg/errors"
perrors "github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"go.uber.org/atomic"
@ -110,8 +109,9 @@ type Lifecycler struct {
Zone string
// Whether to flush if transfer fails on shutdown.
flushOnShutdown *atomic.Bool
unregisterOnShutdown *atomic.Bool
flushOnShutdown *atomic.Bool
unregisterOnShutdown *atomic.Bool
clearTokensOnShutdown *atomic.Bool
// We need to remember the ingester state, tokens and registered timestamp just in case the KV store
// goes away and comes back empty. The state changes during lifecycle of instance.
@ -160,23 +160,22 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa
}
l := &Lifecycler{
cfg: cfg,
flushTransferer: flushTransferer,
KVStore: store,
Addr: fmt.Sprintf("%s:%d", addr, port),
ID: cfg.ID,
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: atomic.NewBool(flushOnShutdown),
unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
Zone: cfg.Zone,
actorChan: make(chan func()),
state: PENDING,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}
l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens))
cfg: cfg,
flushTransferer: flushTransferer,
KVStore: store,
Addr: fmt.Sprintf("%s:%d", addr, port),
ID: cfg.ID,
RingName: ringName,
RingKey: ringKey,
flushOnShutdown: atomic.NewBool(flushOnShutdown),
unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown),
clearTokensOnShutdown: atomic.NewBool(false),
Zone: cfg.Zone,
actorChan: make(chan func()),
state: PENDING,
lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg),
logger: logger,
}
l.BasicService = services.
NewBasicService(nil, l.loop, l.stopping).
@ -304,8 +303,6 @@ func (i *Lifecycler) getTokens() Tokens {
}
func (i *Lifecycler) setTokens(tokens Tokens) {
i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens)))
i.stateMtx.Lock()
defer i.stateMtx.Unlock()
@ -397,7 +394,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
// First, see if we exist in the cluster, update our state to match if we do,
// and add ourselves (without tokens) if we don't.
if err := i.initRing(context.Background()); err != nil {
return perrors.Wrapf(err, "failed to join the ring %s", i.RingName)
return errors.Wrapf(err, "failed to join the ring %s", i.RingName)
}
// We do various period tasks
@ -420,14 +417,14 @@ func (i *Lifecycler) loop(ctx context.Context) error {
// let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING
// ingesters, but we also signal that it is not fully functional yet.
if err := i.autoJoin(context.Background(), JOINING); err != nil {
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}
level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName)
observeChan = time.After(i.cfg.ObservePeriod)
} else {
if err := i.autoJoin(context.Background(), ACTIVE); err != nil {
return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName)
}
}
}
@ -514,11 +511,18 @@ heartbeatLoop:
if i.ShouldUnregisterOnShutdown() {
if err := i.unregister(context.Background()); err != nil {
return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName)
}
level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName)
}
if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() {
if err := os.Remove(i.cfg.TokensFilePath); err != nil {
return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath)
}
level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath)
}
return nil
}
@ -738,9 +742,13 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error {
}
instanceDesc, ok := ringDesc.Ingesters[i.ID]
if !ok {
// consul must have restarted
level.Info(i.logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName)
// If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work,
// the missing instance for some period of time could have cause a resharding of tenants among instances:
// to guarantee query correctness we need to update the registration timestamp to current time.
level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName)
i.setRegisteredAt(time.Now())
ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt())
} else {
instanceDesc.Timestamp = time.Now().Unix()
@ -825,8 +833,20 @@ func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool) {
i.unregisterOnShutdown.Store(enabled)
}
// ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown.
func (i *Lifecycler) ClearTokensOnShutdown() bool {
return i.clearTokensOnShutdown.Load()
}
// SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown.
// Set to `true` in case one wants to clear tokens on shutdown which are
// otherwise persisted, e.g. useful in custom shutdown handlers.
func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool) {
i.clearTokensOnShutdown.Store(enabled)
}
func (i *Lifecycler) processShutdown(ctx context.Context) {
flushRequired := i.flushOnShutdown.Load()
flushRequired := i.FlushOnShutdown()
transferStart := time.Now()
if err := i.flushTransferer.TransferOut(ctx); err != nil {
if err == ErrTransferDisabled {

@ -7,8 +7,6 @@ import (
type LifecyclerMetrics struct {
consulHeartbeats prometheus.Counter
tokensOwned prometheus.Gauge
tokensToOwn prometheus.Gauge
shutdownDuration *prometheus.HistogramVec
}
@ -19,16 +17,6 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle
Help: "The total number of heartbeats sent to consul.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_owned",
Help: "The number of tokens owned in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{
Name: "member_ring_tokens_to_own",
Help: "The number of tokens to own in the ring.",
ConstLabels: prometheus.Labels{"name": ringName},
}),
shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "shutdown_duration_seconds",
Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).",

@ -183,12 +183,9 @@ type Ring struct {
// If set to nil, no caching is done (used by tests, and subrings).
shuffledSubringCache map[subringCacheKey]*Ring
memberOwnershipGaugeVec *prometheus.GaugeVec
numMembersGaugeVec *prometheus.GaugeVec
totalTokensGauge prometheus.Gauge
numTokensGaugeVec *prometheus.GaugeVec
oldestTimestampGaugeVec *prometheus.GaugeVec
reportedOwners map[string]struct{}
logger log.Logger
}
@ -227,11 +224,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
strategy: strategy,
ringDesc: &Desc{},
shuffledSubringCache: map[subringCacheKey]*Ring{},
memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_member_ownership_percent",
Help: "The percent ownership of the ring by member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_members",
Help: "Number of members in the ring",
@ -241,11 +233,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client
Name: "ring_tokens_total",
Help: "Number of tokens in the ring",
ConstLabels: map[string]string{"name": name}}),
numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_tokens_owned",
Help: "The number of tokens in the ring owned by the member",
ConstLabels: map[string]string{"name": name}},
[]string{"member"}),
oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{
Name: "ring_oldest_member_timestamp",
Help: "Timestamp of the oldest member in the ring.",
@ -514,12 +501,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro
}, nil
}
// countTokens returns the number of tokens and tokens within the range for each instance.
func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) {
// countTokens returns the number tokens within the range for each instance.
func (r *Desc) countTokens() map[string]uint32 {
var (
owned = map[string]uint32{}
numTokens = map[string]uint32{}
owned = map[string]uint32{}
ringTokens = r.GetTokens()
ringInstanceByToken = r.getTokensInfo()
)
@ -535,7 +520,6 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) {
}
info := ringInstanceByToken[token]
numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1
owned[info.InstanceID] = owned[info.InstanceID] + diff
}
@ -543,11 +527,10 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) {
for id := range r.Ingesters {
if _, ok := owned[id]; !ok {
owned[id] = 0
numTokens[id] = 0
}
}
return numTokens, owned
return owned
}
// updateRingMetrics updates ring metrics. Caller must be holding the Write lock!
@ -587,21 +570,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) {
return
}
prevOwners := r.reportedOwners
r.reportedOwners = make(map[string]struct{})
numTokens, ownedRange := r.ringDesc.countTokens()
for id, totalOwned := range ownedRange {
r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32))
r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id]))
delete(prevOwners, id)
r.reportedOwners[id] = struct{}{}
}
for k := range prevOwners {
r.memberOwnershipGaugeVec.DeleteLabelValues(k)
r.numTokensGaugeVec.DeleteLabelValues(k)
}
r.totalTokensGauge.Set(float64(len(r.ringTokens)))
}

@ -15,7 +15,7 @@ import (
type StartingFn func(serviceContext context.Context) error
// RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state.
// If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without
// If RunningFn or StoppingFn return error, Service will end in Failed state, otherwise if both functions return without
// error, service will end in Terminated state.
type RunningFn func(serviceContext context.Context) error

@ -97,11 +97,9 @@ func (s *SpanLogger) Error(err error) error {
}
func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver) log.Logger {
// Weaveworks uses "orgs" and "orgID" to represent Cortex users,
// even though the code-base generally uses `userID` to refer to the same thing.
userID, err := resolver.TenantID(ctx)
if err == nil && userID != "" {
logger = log.With(logger, "org_id", userID)
logger = log.With(logger, "user", userID)
}
traceID, ok := tracing.ExtractSampledTraceID(ctx)

@ -528,7 +528,7 @@ github.com/gorilla/mux
# github.com/gorilla/websocket v1.4.2
## explicit; go 1.12
github.com/gorilla/websocket
# github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca
# github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96
## explicit; go 1.17
github.com/grafana/dskit/backoff
github.com/grafana/dskit/concurrency

Loading…
Cancel
Save