diff --git a/CHANGELOG.md b/CHANGELOG.md index 93c7978512..9d2cc6eaa6 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -2,6 +2,7 @@ * [6372](https://github.com/grafana/loki/pull/6372) **splitice**: Add support for numbers in JSON fields * [6105](https://github.com/grafana/loki/pull/6105) **rutgerke** Export metrics for the promtail journal target +* [6179](https://github.com/grafana/loki/pull/6179) **chaudum**: Add new HTTP endpoint to delete ingester ring token file and shutdown process gracefully * [6099](https://github.com/grafana/loki/pull/6099/files) **cstyan**: Drop lines with malformed JSON in Promtail JSON pipeline stage * [6136](https://github.com/grafana/loki/pull/6136) **periklis**: Add support for alertmanager header authorization * [6102](https://github.com/grafana/loki/pull/6102) **timchenko-a**: Add multi-tenancy support to lambda-promtail diff --git a/docs/sources/api/_index.md b/docs/sources/api/_index.md index 2b113b96d6..55ac2e123e 100644 --- a/docs/sources/api/_index.md +++ b/docs/sources/api/_index.md @@ -48,7 +48,8 @@ These endpoints are exposed by the distributor: These endpoints are exposed by the ingester: - [`POST /flush`](#post-flush) -- [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown) +- **Deprecated** [`POST /ingester/flush_shutdown`](#post-ingesterflush_shutdown) +- [`POST /ingester/shutdown`](#post-ingestershutdown) The API endpoints starting with `/loki/` are [Prometheus API-compatible](https://prometheus.io/docs/prometheus/latest/querying/api/) and the result formats can be used interchangeably. @@ -807,13 +808,34 @@ In microservices mode, the `/flush` endpoint is exposed by the ingester. ## `POST /ingester/flush_shutdown` +**Deprecated**: Please use `/ingester/shutdown?flush=true` instead. + `/ingester/flush_shutdown` triggers a shutdown of the ingester and notably will _always_ flush any in memory chunks it holds. This is helpful for scaling down WAL-enabled ingesters where we want to ensure old WAL directories are not orphaned, but instead flushed to our chunk backend. In microservices mode, the `/ingester/flush_shutdown` endpoint is exposed by the ingester. -### `GET /distributor/ring` +## `POST /ingester/shutdown` + +`/ingester/shutdown` is similar to the [`/ingester/flush_shutdown`](#post-ingesterflush_shutdown) +endpoint, but accepts three URL query parameters `flush`, `delete_ring_tokens`, and `terminate`. + +**URL query parameters:** + +* `flush=`: + Flag to control whether to flush any in-memory chunks the ingester holds. Defaults to `true`. +* `delete_ring_tokens=`: + Flag to control whether to delete the file that contains the ingester ring tokens of the instance if the `-ingester.token-file-path` is specified. +* `terminate=`: + Flag to control whether to terminate the Loki process after service shutdown. Defaults to `true`. + +This handler, in contrast to the `/ingester/flush_shutdown` handler, terminates the Loki process by default. +This behaviour can be changed by setting the `terminate` query parameter to `false`. + +In microservices mode, the `/ingester/shutdown` endpoint is exposed by the ingester. + +## `GET /distributor/ring` Displays a web page with the distributor hash ring status, including the state, healthy and last heartbeat time of each distributor. diff --git a/go.mod b/go.mod index 025cd165a3..0e9d70364e 100644 --- a/go.mod +++ b/go.mod @@ -49,7 +49,7 @@ require ( github.com/google/uuid v1.2.0 github.com/gorilla/mux v1.8.0 github.com/gorilla/websocket v1.4.2 - github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca + github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 github.com/grafana/go-gelf/v2 v2.0.1 github.com/grafana/regexp v0.0.0-20220304100321-149c8afcd6cb github.com/grpc-ecosystem/go-grpc-middleware v1.3.0 diff --git a/go.sum b/go.sum index d0a1d0bbdd..76d6d459b1 100644 --- a/go.sum +++ b/go.sum @@ -1039,8 +1039,8 @@ github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0U github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gotestyourself/gotestyourself v2.2.0+incompatible/go.mod h1:zZKM6oeNM8k+FRljX1mnzVYeS8wiGgQyvST1/GafPbY= github.com/grafana/dskit v0.0.0-20211021180445-3bd016e9d7f1/go.mod h1:uPG2nyK4CtgNDmWv7qyzYcdI+S90kHHRWvHnBtEMBXM= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca h1:0qHzm6VS0bCsSWKHuyfpt+pdpyScdZbzY/IFIyKSYOk= -github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca/go.mod h1:q51XdMLLHNZJSG6KOGujC20ed2OoLFdx0hBmOEVfRs0= +github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 h1:mZluMeUp1vLHKb1nSrMnA0mfupSpBeUkZqDDpfHabrQ= +github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96/go.mod h1:9It/K30QPyj/FuTqBb/SYnaS4/BJCP5YL4SRfXB7dG0= github.com/grafana/go-gelf/v2 v2.0.1 h1:BOChP0h/jLeD+7F9mL7tq10xVkDG15he3T1zHuQaWak= github.com/grafana/go-gelf/v2 v2.0.1/go.mod h1:lexHie0xzYGwCgiRGcvZ723bSNyNI8ZRD4s0CLobh90= github.com/grafana/gocql v0.0.0-20200605141915-ba5dc39ece85 h1:xLuzPoOzdfNb/RF/IENCw+oLVdZB4G21VPhkHBgwSHY= diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index f01461e8ab..894354b208 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "time" "github.com/go-kit/log/level" + "github.com/grafana/dskit/modules" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/grafana/dskit/tenant" @@ -172,8 +173,10 @@ type Interface interface { logproto.QuerierServer CheckReady(ctx context.Context) error FlushHandler(w http.ResponseWriter, _ *http.Request) - ShutdownHandler(w http.ResponseWriter, r *http.Request) GetOrCreateInstance(instanceID string) (*instance, error) + // deprecated + LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) + ShutdownHandler(w http.ResponseWriter, r *http.Request) } // Ingester builds chunks for incoming log streams. @@ -209,6 +212,10 @@ type Ingester struct { // Denotes whether the ingester should flush on shutdown. // Currently only used by the WAL to signal when the disk is full. flushOnShutdownSwitch *OnceSwitch + // Flag for whether stopping the ingester service should also terminate the + // loki process. + // This is set when calling the shutdown handler. + terminateOnShutdown bool // Only used by WAL & flusher to coordinate backpressure during replay. replayController *replayController @@ -245,6 +252,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid tailersQuit: make(chan struct{}), metrics: metrics, flushOnShutdownSwitch: &OnceSwitch{}, + terminateOnShutdown: false, } i.replayController = newReplayController(metrics, cfg.WAL, &replayFlusher{i}) @@ -506,6 +514,12 @@ func (i *Ingester) stopping(_ error) error { } i.flushQueuesDone.Wait() + // In case the flag to terminate on shutdown is set we need to mark the + // ingester service as "failed", so Loki will shut down entirely. + // The module manager logs the failure `modules.ErrStopProcess` in a special way. + if i.terminateOnShutdown && errs.Err() == nil { + return modules.ErrStopProcess + } return errs.Err() } @@ -526,10 +540,16 @@ func (i *Ingester) loop() { } } -// ShutdownHandler triggers the following set of operations in order: +// LegacyShutdownHandler triggers the following set of operations in order: // * Change the state of ring to stop accepting writes. // * Flush all the chunks. -func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { +// Note: This handler does not trigger a termination of the Loki process, +// despite its name. Instead, the ingester service is stopped, so an external +// source can trigger a safe termination through a signal to the process. +// The handler is deprecated and usage is discouraged. Use ShutdownHandler +// instead. +func (i *Ingester) LegacyShutdownHandler(w http.ResponseWriter, r *http.Request) { + level.Warn(util_log.Logger).Log("msg", "The handler /ingester/flush_shutdown is deprecated and usage is discouraged. Please use /ingester/shutdown?flush=true instead.") originalState := i.lifecycler.FlushOnShutdown() // We want to flush the chunks if transfer fails irrespective of original flag. i.lifecycler.SetFlushOnShutdown(true) @@ -538,6 +558,45 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { w.WriteHeader(http.StatusNoContent) } +// ShutdownHandler handles a graceful shutdown of the ingester service and +// termination of the Loki process. +func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { + // Don't allow calling the shutdown handler multiple times + if i.State() != services.Running { + w.WriteHeader(http.StatusServiceUnavailable) + _, _ = w.Write([]byte("Ingester is stopping or already stopped.")) + return + } + params := r.URL.Query() + doFlush := util.FlagFromValues(params, "flush", true) + doDeleteRingTokens := util.FlagFromValues(params, "delete_ring_tokens", false) + doTerminate := util.FlagFromValues(params, "terminate", true) + err := i.handleShutdown(doTerminate, doFlush, doDeleteRingTokens) + + // Stopping the module will return the modules.ErrStopProcess error. This is + // needed so the Loki process is shut down completely. + if err == nil || err == modules.ErrStopProcess { + w.WriteHeader(http.StatusNoContent) + } else { + w.WriteHeader(http.StatusInternalServerError) + _, _ = w.Write([]byte(err.Error())) + } +} + +// handleShutdown triggers the following operations: +// * Change the state of ring to stop accepting writes. +// * optional: Flush all the chunks. +// * optional: Delete ring tokens file +// * Unregister from KV store +// * optional: Terminate process (handled by service manager in loki.go) +func (i *Ingester) handleShutdown(terminate, flush, del bool) error { + i.lifecycler.SetFlushOnShutdown(flush) + i.lifecycler.SetClearTokensOnShutdown(del) + i.lifecycler.SetUnregisterOnShutdown(true) + i.terminateOnShutdown = terminate + return services.StopAndAwaitTerminated(context.Background(), i) +} + // Push implements logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { instanceID, err := tenant.TenantID(ctx) diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index d58c0a3d97..4485256d8a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -342,9 +342,15 @@ func (t *Loki) initIngester() (_ services.Service, err error) { httpMiddleware := middleware.Merge( serverutil.RecoveryHTTPMiddleware, ) - t.Server.HTTP.Path("/flush").Methods("GET", "POST").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler))) - t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler(httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler))) - + t.Server.HTTP.Methods("GET", "POST").Path("/flush").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.FlushHandler)), + ) + t.Server.HTTP.Methods("POST").Path("/ingester/flush_shutdown").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.LegacyShutdownHandler)), + ) + t.Server.HTTP.Methods("POST").Path("/ingester/shutdown").Handler( + httpMiddleware.Wrap(http.HandlerFunc(t.Ingester.ShutdownHandler)), + ) return t.Ingester, nil } diff --git a/pkg/util/http.go b/pkg/util/http.go index 3305a9410a..ed1c5a4220 100644 --- a/pkg/util/http.go +++ b/pkg/util/http.go @@ -9,6 +9,7 @@ import ( "html/template" "io" "net/http" + "net/url" "strings" "github.com/go-kit/log" @@ -285,3 +286,14 @@ func SerializeProtoResponse(w http.ResponseWriter, resp proto.Message, compressi } return nil } + +func FlagFromValues(values url.Values, key string, d bool) bool { + switch strings.ToLower(values.Get(key)) { + case "t", "true", "1": + return true + case "f", "false", "0": + return false + default: + return d + } +} diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go new file mode 100644 index 0000000000..3ecd83e387 --- /dev/null +++ b/vendor/github.com/grafana/dskit/kv/memberlist/http_status_handler.go @@ -0,0 +1,219 @@ +package memberlist + +import ( + _ "embed" + "encoding/json" + "fmt" + "html/template" + "net/http" + "sort" + "strconv" + "strings" + "time" + + "github.com/hashicorp/memberlist" +) + +// HTTPStatusHandler is a http.Handler with status information about memberlist. +type HTTPStatusHandler struct { + kvs *KVInitService + tpl *template.Template +} + +// StatusPageData represents the data passed to the template rendered by HTTPStatusHandler +type StatusPageData struct { + Now time.Time + Memberlist *memberlist.Memberlist + SortedMembers []*memberlist.Node + Store map[string]ValueDesc + MessageHistoryBufferBytes int + SentMessages []Message + ReceivedMessages []Message +} + +// NewHTTPStatusHandler creates a new HTTPStatusHandler that will render the provided template using the data from StatusPageData. +func NewHTTPStatusHandler(kvs *KVInitService, tpl *template.Template) HTTPStatusHandler { + return HTTPStatusHandler{kvs, tpl} +} + +func (h HTTPStatusHandler) ServeHTTP(w http.ResponseWriter, req *http.Request) { + kv := h.kvs.getKV() + if kv == nil { + w.Header().Set("Content-Type", "text/plain") + // Ignore inactionable errors. + _, _ = w.Write([]byte("This instance doesn't use memberlist.")) + return + } + + const ( + downloadKeyParam = "downloadKey" + viewKeyParam = "viewKey" + viewMsgParam = "viewMsg" + deleteMessagesParam = "deleteMessages" + ) + + if err := req.ParseForm(); err == nil { + if req.Form[downloadKeyParam] != nil { + downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. + return + } + + if req.Form[viewKeyParam] != nil { + viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) + return + } + + if req.Form[viewMsgParam] != nil { + msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) + if err != nil { + http.Error(w, err.Error(), http.StatusBadRequest) + return + } + + sent, received := kv.getSentAndReceivedMessages() + + for _, m := range append(sent, received...) { + if m.ID == msgID { + viewMessage(w, kv, m, getFormat(req)) + return + } + } + + http.Error(w, "message not found", http.StatusNotFound) + return + } + + if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { + kv.deleteSentReceivedMessages() + + // Redirect back. + w.Header().Set("Location", "?"+deleteMessagesParam+"=false") + w.WriteHeader(http.StatusFound) + return + } + } + + members := kv.memberlist.Members() + sort.Slice(members, func(i, j int) bool { + return members[i].Name < members[j].Name + }) + + sent, received := kv.getSentAndReceivedMessages() + + v := StatusPageData{ + Now: time.Now(), + Memberlist: kv.memberlist, + SortedMembers: members, + Store: kv.storeCopy(), + MessageHistoryBufferBytes: kv.cfg.MessageHistoryBufferBytes, + SentMessages: sent, + ReceivedMessages: received, + } + + accept := req.Header.Get("Accept") + if strings.Contains(accept, "application/json") { + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + return + } + + w.Header().Set("Content-Type", "text/html") + if err := h.tpl.Execute(w, v); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +func getFormat(req *http.Request) string { + const viewFormat = "format" + + format := "" + if len(req.Form[viewFormat]) > 0 { + format = req.Form[viewFormat][0] + } + return format +} + +func viewMessage(w http.ResponseWriter, kv *KV, msg Message, format string) { + c := kv.GetCodec(msg.Pair.Codec) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + val, err := c.Decode(msg.Pair.Value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) + return + } + + formatValue(w, val, format) +} + +func viewKey(w http.ResponseWriter, store map[string]ValueDesc, key string, format string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + formatValue(w, store[key].value, format) +} + +func formatValue(w http.ResponseWriter, val interface{}, format string) { + w.WriteHeader(200) + w.Header().Add("content-type", "text/plain") + + switch format { + case "json", "json-pretty": + enc := json.NewEncoder(w) + if format == "json-pretty" { + enc.SetIndent("", " ") + } + + err := enc.Encode(val) + if err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } + + default: + _, _ = fmt.Fprintf(w, "%#v", val) + } +} + +func downloadKey(w http.ResponseWriter, kv *KV, store map[string]ValueDesc, key string) { + if store[key].value == nil { + http.Error(w, "value not found", http.StatusNotFound) + return + } + + val := store[key] + + c := kv.GetCodec(store[key].CodecID) + if c == nil { + http.Error(w, "codec not found", http.StatusNotFound) + return + } + + encoded, err := c.Encode(val.value) + if err != nil { + http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) + return + } + + w.Header().Add("content-type", "application/octet-stream") + // Set content-length so that client knows whether it has received full response or not. + w.Header().Add("content-length", strconv.Itoa(len(encoded))) + w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.Version, key)) + w.WriteHeader(200) + + // Ignore errors, we cannot do anything about them. + _, _ = w.Write(encoded) +} + +//go:embed status.gohtml +var defaultPageContent string +var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ + "StringsJoin": strings.Join, +}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go index 1a8313cded..5b505a5488 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/kv_init_service.go @@ -2,19 +2,10 @@ package memberlist import ( "context" - _ "embed" - "encoding/json" - "fmt" - "html/template" "net/http" - "sort" - "strconv" - "strings" "sync" - "time" "github.com/go-kit/log" - "github.com/hashicorp/memberlist" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -95,191 +86,5 @@ func (kvs *KVInitService) stopping(_ error) error { } func (kvs *KVInitService) ServeHTTP(w http.ResponseWriter, req *http.Request) { - kv := kvs.getKV() - if kv == nil { - w.Header().Set("Content-Type", "text/plain") - // Ignore inactionable errors. - _, _ = w.Write([]byte("This instance doesn't use memberlist.")) - return - } - - const ( - downloadKeyParam = "downloadKey" - viewKeyParam = "viewKey" - viewMsgParam = "viewMsg" - deleteMessagesParam = "deleteMessages" - ) - - if err := req.ParseForm(); err == nil { - if req.Form[downloadKeyParam] != nil { - downloadKey(w, kv, kv.storeCopy(), req.Form[downloadKeyParam][0]) // Use first value, ignore the rest. - return - } - - if req.Form[viewKeyParam] != nil { - viewKey(w, kv.storeCopy(), req.Form[viewKeyParam][0], getFormat(req)) - return - } - - if req.Form[viewMsgParam] != nil { - msgID, err := strconv.Atoi(req.Form[viewMsgParam][0]) - if err != nil { - http.Error(w, err.Error(), http.StatusBadRequest) - return - } - - sent, received := kv.getSentAndReceivedMessages() - - for _, m := range append(sent, received...) { - if m.ID == msgID { - viewMessage(w, kv, m, getFormat(req)) - return - } - } - - http.Error(w, "message not found", http.StatusNotFound) - return - } - - if len(req.Form[deleteMessagesParam]) > 0 && req.Form[deleteMessagesParam][0] == "true" { - kv.deleteSentReceivedMessages() - - // Redirect back. - w.Header().Set("Location", "?"+deleteMessagesParam+"=false") - w.WriteHeader(http.StatusFound) - return - } - } - - members := kv.memberlist.Members() - sort.Slice(members, func(i, j int) bool { - return members[i].Name < members[j].Name - }) - - sent, received := kv.getSentAndReceivedMessages() - - v := pageData{ - Now: time.Now(), - Memberlist: kv.memberlist, - SortedMembers: members, - Store: kv.storeCopy(), - SentMessages: sent, - ReceivedMessages: received, - } - - accept := req.Header.Get("Accept") - if strings.Contains(accept, "application/json") { - w.Header().Set("Content-Type", "application/json") - - if err := json.NewEncoder(w).Encode(v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - return - } - - w.Header().Set("Content-Type", "text/html") - if err := defaultPageTemplate.Execute(w, v); err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } -} - -func getFormat(req *http.Request) string { - const viewFormat = "format" - - format := "" - if len(req.Form[viewFormat]) > 0 { - format = req.Form[viewFormat][0] - } - return format -} - -func viewMessage(w http.ResponseWriter, kv *KV, msg message, format string) { - c := kv.GetCodec(msg.Pair.Codec) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - val, err := c.Decode(msg.Pair.Value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to decode: %v", err), http.StatusInternalServerError) - return - } - - formatValue(w, val, format) -} - -func viewKey(w http.ResponseWriter, store map[string]valueDesc, key string, format string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - formatValue(w, store[key].value, format) -} - -func formatValue(w http.ResponseWriter, val interface{}, format string) { - w.WriteHeader(200) - w.Header().Add("content-type", "text/plain") - - switch format { - case "json", "json-pretty": - enc := json.NewEncoder(w) - if format == "json-pretty" { - enc.SetIndent("", " ") - } - - err := enc.Encode(val) - if err != nil { - http.Error(w, err.Error(), http.StatusInternalServerError) - } - - default: - _, _ = fmt.Fprintf(w, "%#v", val) - } -} - -func downloadKey(w http.ResponseWriter, kv *KV, store map[string]valueDesc, key string) { - if store[key].value == nil { - http.Error(w, "value not found", http.StatusNotFound) - return - } - - val := store[key] - - c := kv.GetCodec(store[key].codecID) - if c == nil { - http.Error(w, "codec not found", http.StatusNotFound) - return - } - - encoded, err := c.Encode(val.value) - if err != nil { - http.Error(w, fmt.Sprintf("failed to encode: %v", err), http.StatusInternalServerError) - return - } - - w.Header().Add("content-type", "application/octet-stream") - // Set content-length so that client knows whether it has received full response or not. - w.Header().Add("content-length", strconv.Itoa(len(encoded))) - w.Header().Add("content-disposition", fmt.Sprintf("attachment; filename=%d-%s", val.version, key)) - w.WriteHeader(200) - - // Ignore errors, we cannot do anything about them. - _, _ = w.Write(encoded) + NewHTTPStatusHandler(kvs, defaultPageTemplate).ServeHTTP(w, req) } - -type pageData struct { - Now time.Time - Memberlist *memberlist.Memberlist - SortedMembers []*memberlist.Node - Store map[string]valueDesc - SentMessages []message - ReceivedMessages []message -} - -//go:embed status.gohtml -var defaultPageContent string -var defaultPageTemplate = template.Must(template.New("webpage").Funcs(template.FuncMap{ - "StringsJoin": strings.Join, -}).Parse(defaultPageContent)) diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go index 30f0992d35..23c40ac764 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go +++ b/vendor/github.com/grafana/dskit/kv/memberlist/memberlist_client.go @@ -232,7 +232,7 @@ type KV struct { // KV Store. storeMu sync.Mutex - store map[string]valueDesc + store map[string]ValueDesc // Codec registry codecs map[string]codec.Codec @@ -245,9 +245,9 @@ type KV struct { // Buffers with sent and received messages. Used for troubleshooting only. // New messages are appended, old messages (based on configured size limit) removed from the front. messagesMu sync.Mutex - sentMessages []message + sentMessages []Message sentMessagesSize int - receivedMessages []message + receivedMessages []Message receivedMessagesSize int messageCounter int // Used to give each message in the sentMessages and receivedMessages a unique ID, for UI. @@ -285,7 +285,7 @@ type KV struct { // Message describes incoming or outgoing message, and local state after applying incoming message, or state when sending message. // Fields are exported for templating to work. -type message struct { +type Message struct { ID int // Unique local ID of the message. Time time.Time // Time when message was sent or received. Size int // Message size @@ -296,21 +296,22 @@ type message struct { Changes []string // List of changes in this message (as computed by *this* node). } -type valueDesc struct { +// ValueDesc stores the value along with it's codec and local version. +type ValueDesc struct { // We store the decoded value here to prevent decoding the entire state for every // update we receive. Whilst the updates are small and fast to decode, // the total state can be quite large. // The CAS function is passed a deep copy because it modifies in-place. value Mergeable - // version (local only) is used to keep track of what we're gossiping about, and invalidate old messages - version uint + // Version (local only) is used to keep track of what we're gossiping about, and invalidate old messages. + Version uint // ID of codec used to write this value. Only used when sending full state. - codecID string + CodecID string } -func (v valueDesc) Clone() (result valueDesc) { +func (v ValueDesc) Clone() (result ValueDesc) { result = v if v.value != nil { result.value = v.value.Clone() @@ -318,8 +319,8 @@ func (v valueDesc) Clone() (result valueDesc) { return } -func (v valueDesc) String() string { - return fmt.Sprintf("version: %d, codec: %s", v.version, v.codecID) +func (v ValueDesc) String() string { + return fmt.Sprintf("version: %d, codec: %s", v.Version, v.CodecID) } var ( @@ -343,7 +344,7 @@ func NewKV(cfg KVConfig, logger log.Logger, dnsProvider DNSProvider, registerer registerer: registerer, provider: dnsProvider, - store: make(map[string]valueDesc), + store: make(map[string]ValueDesc), codecs: make(map[string]codec.Codec), watchers: make(map[string][]chan string), prefixWatchers: make(map[string][]chan string), @@ -642,7 +643,7 @@ func (m *KV) get(key string, codec codec.Codec) (out interface{}, version uint, _, _ = v.value.RemoveTombstones(time.Time{}) } - return v.value, v.version, nil + return v.value, v.Version, nil } // WatchKey watches for value changes for given key. When value changes, 'f' function is called with the @@ -909,7 +910,7 @@ func (m *KV) broadcastNewValue(key string, change Mergeable, version uint, codec return } - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: time.Now(), Size: len(pairData), Pair: kvPair, @@ -964,7 +965,7 @@ func (m *KV) NotifyMsg(msg []byte) { changes = mod.MergeContent() } - m.addReceivedMessage(message{ + m.addReceivedMessage(Message{ Time: time.Now(), Size: len(msg), Pair: kvPair, @@ -1033,9 +1034,9 @@ func (m *KV) LocalState(join bool) []byte { continue } - codec := m.GetCodec(val.codecID) + codec := m.GetCodec(val.CodecID) if codec == nil { - level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.codecID, "key", key) + level.Error(m.logger).Log("msg", "failed to encode remote state: unknown codec for key", "codec", val.CodecID, "key", key) continue } @@ -1048,7 +1049,7 @@ func (m *KV) LocalState(join bool) []byte { kvPair.Reset() kvPair.Key = key kvPair.Value = encoded - kvPair.Codec = val.codecID + kvPair.Codec = val.CodecID ser, err := kvPair.Marshal() if err != nil { @@ -1068,11 +1069,11 @@ func (m *KV) LocalState(join bool) []byte { } buf.Write(ser) - m.addSentMessage(message{ + m.addSentMessage(Message{ Time: sent, Size: len(ser), Pair: kvPair, // Makes a copy of kvPair. - Version: val.version, + Version: val.Version, }) } @@ -1136,7 +1137,7 @@ func (m *KV) MergeRemoteState(data []byte, join bool) { changes = change.MergeContent() } - m.addReceivedMessage(message{ + m.addReceivedMessage(Message{ Time: received, Size: int(kvPairLength), Pair: kvPair, // Makes a copy of kvPair. @@ -1184,7 +1185,7 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui // the full state anywhere as is done elsewhere (i.e. Get/WatchKey/CAS). curr := m.store[key] // if casVersion is 0, then there was no previous value, so we will just do normal merge, without localCAS flag set. - if casVersion > 0 && curr.version != casVersion { + if casVersion > 0 && curr.Version != casVersion { return nil, 0, errVersionMismatch } result, change, err := computeNewValue(incomingValue, curr.value, casVersion > 0) @@ -1215,11 +1216,11 @@ func (m *KV) mergeValueForKey(key string, incomingValue Mergeable, casVersion ui } } - newVersion := curr.version + 1 - m.store[key] = valueDesc{ + newVersion := curr.Version + 1 + m.store[key] = ValueDesc{ value: result, - version: newVersion, - codecID: codec.CodecID(), + Version: newVersion, + CodecID: codec.CodecID(), } // The "changes" returned by Merge() can contain references to the "result" @@ -1240,17 +1241,17 @@ func computeNewValue(incoming Mergeable, oldVal Mergeable, cas bool) (Mergeable, return oldVal, change, err } -func (m *KV) storeCopy() map[string]valueDesc { +func (m *KV) storeCopy() map[string]ValueDesc { m.storeMu.Lock() defer m.storeMu.Unlock() - result := make(map[string]valueDesc, len(m.store)) + result := make(map[string]ValueDesc, len(m.store)) for k, v := range m.store { result[k] = v.Clone() } return result } -func (m *KV) addReceivedMessage(msg message) { +func (m *KV) addReceivedMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1264,7 +1265,7 @@ func (m *KV) addReceivedMessage(msg message) { m.receivedMessages, m.receivedMessagesSize = addMessageToBuffer(m.receivedMessages, m.receivedMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) addSentMessage(msg message) { +func (m *KV) addSentMessage(msg Message) { if m.cfg.MessageHistoryBufferBytes == 0 { return } @@ -1278,12 +1279,12 @@ func (m *KV) addSentMessage(msg message) { m.sentMessages, m.sentMessagesSize = addMessageToBuffer(m.sentMessages, m.sentMessagesSize, m.cfg.MessageHistoryBufferBytes, msg) } -func (m *KV) getSentAndReceivedMessages() (sent, received []message) { +func (m *KV) getSentAndReceivedMessages() (sent, received []Message) { m.messagesMu.Lock() defer m.messagesMu.Unlock() // Make copy of both slices. - return append([]message(nil), m.sentMessages...), append([]message(nil), m.receivedMessages...) + return append([]Message(nil), m.sentMessages...), append([]Message(nil), m.receivedMessages...) } func (m *KV) deleteSentReceivedMessages() { @@ -1296,7 +1297,7 @@ func (m *KV) deleteSentReceivedMessages() { m.receivedMessagesSize = 0 } -func addMessageToBuffer(msgs []message, size int, limit int, msg message) ([]message, int) { +func addMessageToBuffer(msgs []Message, size int, limit int, msg Message) ([]Message, int) { msgs = append(msgs, msg) size += msg.Size diff --git a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml index 3ab6d09363..6f845b6e06 100644 --- a/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml +++ b/vendor/github.com/grafana/dskit/kv/memberlist/status.gohtml @@ -1,4 +1,4 @@ -{{- /*gotype: github.com/grafana/dskit/kv/memberlist.statusPageData */ -}} +{{- /*gotype: github.com/grafana/dskit/kv/memberlist.StatusPageData */ -}} @@ -20,7 +20,8 @@ Key - Value Details + Codec + Version Actions @@ -29,7 +30,8 @@ {{ range $k, $v := .Store }} {{ $k }} - {{ $v }} + {{ $v.CodecID }} + {{ $v.Version }} json | json-pretty @@ -68,76 +70,83 @@

State: 0 = Alive, 1 = Suspect, 2 = Dead, 3 = Left

-

Received Messages

+

Message History

-Delete All Messages (received and sent) +{{ if .MessageHistoryBufferBytes }} - - - - - - - - - - - - +

Received Messages

- - {{ range .ReceivedMessages }} + Delete All Messages (received and sent) + +
IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
+ - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValue in the MessageVersion After Update (0 = no change)ChangesActions
- -

Sent Messages

- -Delete All Messages (received and sent) - - - - - - - - - - - - - - - - {{ range .SentMessages }} + + + + {{ range .ReceivedMessages }} + + + + + + + + + + {{ end }} + +
IDTimeKeyValueVersionChangesActions
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} + json + | json-pretty + | struct +
+ +

Sent Messages

+ + Delete All Messages (received and sent) + + + - - - - - - - + + + + + + + - {{ end }} - -
{{ .ID }}{{ .Time.Format "15:04:05.000" }}{{ .Pair.Key }}size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }}{{ .Version }}{{ StringsJoin .Changes ", " }} - json - | json-pretty - | struct - IDTimeKeyValueVersionChangesActions
+ + + + {{ range .SentMessages }} + + {{ .ID }} + {{ .Time.Format "15:04:05.000" }} + {{ .Pair.Key }} + size: {{ .Pair.Value | len }}, codec: {{ .Pair.Codec }} + {{ .Version }} + {{ StringsJoin .Changes ", " }} + + json + | json-pretty + | struct + + + {{ end }} + + +{{ else }} +

Message history buffer is disabled, refer to the configuration to enable it in order to troubleshoot the message history.

+{{ end }} \ No newline at end of file diff --git a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go index 32775c9829..1bb95c0837 100644 --- a/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/basic_lifecycler.go @@ -405,8 +405,13 @@ func (l *BasicLifecycler) updateInstance(ctx context.Context, update func(*Desc, // This could happen if the backend store restarted (and content deleted) // or the instance has been forgotten. In this case, we do re-insert it. if !ok { - level.Warn(l.logger).Log("msg", "instance missing in the ring, adding it back", "ring", l.ringName) - instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), l.GetRegisteredAt()) + level.Warn(l.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", l.ringName) + + // Due to how shuffle sharding work, the missing instance for some period of time could have cause + // a resharding of tenants among instances: to guarantee query correctness we need to update the + // registration timestamp to current time. + registeredAt := time.Now() + instanceDesc = ringDesc.AddIngester(l.cfg.ID, l.cfg.Addr, l.cfg.Zone, l.GetTokens(), l.GetState(), registeredAt) } prevTimestamp := instanceDesc.Timestamp diff --git a/vendor/github.com/grafana/dskit/ring/http.go b/vendor/github.com/grafana/dskit/ring/http.go index 18a56177cb..bcf3d1cc89 100644 --- a/vendor/github.com/grafana/dskit/ring/http.go +++ b/vendor/github.com/grafana/dskit/ring/http.go @@ -93,7 +93,7 @@ func (h *ringPageHandler) handle(w http.ResponseWriter, req *http.Request) { http.Error(w, err.Error(), http.StatusInternalServerError) return } - _, ownedTokens := ringDesc.countTokens() + ownedTokens := ringDesc.countTokens() var ingesterIDs []string for id := range ringDesc.Ingesters { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler.go b/vendor/github.com/grafana/dskit/ring/lifecycler.go index 602e1fdb58..2479ad03c8 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler.go @@ -13,7 +13,6 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" "github.com/pkg/errors" - perrors "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "go.uber.org/atomic" @@ -110,8 +109,9 @@ type Lifecycler struct { Zone string // Whether to flush if transfer fails on shutdown. - flushOnShutdown *atomic.Bool - unregisterOnShutdown *atomic.Bool + flushOnShutdown *atomic.Bool + unregisterOnShutdown *atomic.Bool + clearTokensOnShutdown *atomic.Bool // We need to remember the ingester state, tokens and registered timestamp just in case the KV store // goes away and comes back empty. The state changes during lifecycle of instance. @@ -160,23 +160,22 @@ func NewLifecycler(cfg LifecyclerConfig, flushTransferer FlushTransferer, ringNa } l := &Lifecycler{ - cfg: cfg, - flushTransferer: flushTransferer, - KVStore: store, - Addr: fmt.Sprintf("%s:%d", addr, port), - ID: cfg.ID, - RingName: ringName, - RingKey: ringKey, - flushOnShutdown: atomic.NewBool(flushOnShutdown), - unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), - Zone: cfg.Zone, - actorChan: make(chan func()), - state: PENDING, - lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), - logger: logger, - } - - l.lifecyclerMetrics.tokensToOwn.Set(float64(cfg.NumTokens)) + cfg: cfg, + flushTransferer: flushTransferer, + KVStore: store, + Addr: fmt.Sprintf("%s:%d", addr, port), + ID: cfg.ID, + RingName: ringName, + RingKey: ringKey, + flushOnShutdown: atomic.NewBool(flushOnShutdown), + unregisterOnShutdown: atomic.NewBool(cfg.UnregisterOnShutdown), + clearTokensOnShutdown: atomic.NewBool(false), + Zone: cfg.Zone, + actorChan: make(chan func()), + state: PENDING, + lifecyclerMetrics: NewLifecyclerMetrics(ringName, reg), + logger: logger, + } l.BasicService = services. NewBasicService(nil, l.loop, l.stopping). @@ -304,8 +303,6 @@ func (i *Lifecycler) getTokens() Tokens { } func (i *Lifecycler) setTokens(tokens Tokens) { - i.lifecyclerMetrics.tokensOwned.Set(float64(len(tokens))) - i.stateMtx.Lock() defer i.stateMtx.Unlock() @@ -397,7 +394,7 @@ func (i *Lifecycler) loop(ctx context.Context) error { // First, see if we exist in the cluster, update our state to match if we do, // and add ourselves (without tokens) if we don't. if err := i.initRing(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to join the ring %s", i.RingName) + return errors.Wrapf(err, "failed to join the ring %s", i.RingName) } // We do various period tasks @@ -420,14 +417,14 @@ func (i *Lifecycler) loop(ctx context.Context) error { // let's observe the ring. By using JOINING state, this ingester will be ignored by LEAVING // ingesters, but we also signal that it is not fully functional yet. if err := i.autoJoin(context.Background(), JOINING); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "observing tokens before going ACTIVE", "ring", i.RingName) observeChan = time.After(i.cfg.ObservePeriod) } else { if err := i.autoJoin(context.Background(), ACTIVE); err != nil { - return perrors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to pick tokens in the KV store, ring: %s", i.RingName) } } } @@ -514,11 +511,18 @@ heartbeatLoop: if i.ShouldUnregisterOnShutdown() { if err := i.unregister(context.Background()); err != nil { - return perrors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) + return errors.Wrapf(err, "failed to unregister from the KV store, ring: %s", i.RingName) } level.Info(i.logger).Log("msg", "instance removed from the KV store", "ring", i.RingName) } + if i.cfg.TokensFilePath != "" && i.ClearTokensOnShutdown() { + if err := os.Remove(i.cfg.TokensFilePath); err != nil { + return errors.Wrapf(err, "failed to delete tokens file %s", i.cfg.TokensFilePath) + } + level.Info(i.logger).Log("msg", "removed tokens file from disk", "path", i.cfg.TokensFilePath) + } + return nil } @@ -738,9 +742,13 @@ func (i *Lifecycler) updateConsul(ctx context.Context) error { } instanceDesc, ok := ringDesc.Ingesters[i.ID] + if !ok { - // consul must have restarted - level.Info(i.logger).Log("msg", "found empty ring, inserting tokens", "ring", i.RingName) + // If the instance is missing in the ring, we need to add it back. However, due to how shuffle sharding work, + // the missing instance for some period of time could have cause a resharding of tenants among instances: + // to guarantee query correctness we need to update the registration timestamp to current time. + level.Info(i.logger).Log("msg", "instance is missing in the ring (e.g. the ring backend storage has been reset), registering the instance with an updated registration timestamp", "ring", i.RingName) + i.setRegisteredAt(time.Now()) ringDesc.AddIngester(i.ID, i.Addr, i.Zone, i.getTokens(), i.GetState(), i.getRegisteredAt()) } else { instanceDesc.Timestamp = time.Now().Unix() @@ -825,8 +833,20 @@ func (i *Lifecycler) SetUnregisterOnShutdown(enabled bool) { i.unregisterOnShutdown.Store(enabled) } +// ClearTokensOnShutdown returns if persisted tokens should be cleared on shutdown. +func (i *Lifecycler) ClearTokensOnShutdown() bool { + return i.clearTokensOnShutdown.Load() +} + +// SetClearTokensOnShutdown enables/disables deletions of tokens on shutdown. +// Set to `true` in case one wants to clear tokens on shutdown which are +// otherwise persisted, e.g. useful in custom shutdown handlers. +func (i *Lifecycler) SetClearTokensOnShutdown(enabled bool) { + i.clearTokensOnShutdown.Store(enabled) +} + func (i *Lifecycler) processShutdown(ctx context.Context) { - flushRequired := i.flushOnShutdown.Load() + flushRequired := i.FlushOnShutdown() transferStart := time.Now() if err := i.flushTransferer.TransferOut(ctx); err != nil { if err == ErrTransferDisabled { diff --git a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go index 422a564c18..fe29cdfd5f 100644 --- a/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go +++ b/vendor/github.com/grafana/dskit/ring/lifecycler_metrics.go @@ -7,8 +7,6 @@ import ( type LifecyclerMetrics struct { consulHeartbeats prometheus.Counter - tokensOwned prometheus.Gauge - tokensToOwn prometheus.Gauge shutdownDuration *prometheus.HistogramVec } @@ -19,16 +17,6 @@ func NewLifecyclerMetrics(ringName string, reg prometheus.Registerer) *Lifecycle Help: "The total number of heartbeats sent to consul.", ConstLabels: prometheus.Labels{"name": ringName}, }), - tokensOwned: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_owned", - Help: "The number of tokens owned in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), - tokensToOwn: promauto.With(reg).NewGauge(prometheus.GaugeOpts{ - Name: "member_ring_tokens_to_own", - Help: "The number of tokens to own in the ring.", - ConstLabels: prometheus.Labels{"name": ringName}, - }), shutdownDuration: promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "shutdown_duration_seconds", Help: "Duration (in seconds) of shutdown procedure (ie transfer or flush).", diff --git a/vendor/github.com/grafana/dskit/ring/ring.go b/vendor/github.com/grafana/dskit/ring/ring.go index 6c7e4a49fc..be78fee598 100644 --- a/vendor/github.com/grafana/dskit/ring/ring.go +++ b/vendor/github.com/grafana/dskit/ring/ring.go @@ -183,12 +183,9 @@ type Ring struct { // If set to nil, no caching is done (used by tests, and subrings). shuffledSubringCache map[subringCacheKey]*Ring - memberOwnershipGaugeVec *prometheus.GaugeVec numMembersGaugeVec *prometheus.GaugeVec totalTokensGauge prometheus.Gauge - numTokensGaugeVec *prometheus.GaugeVec oldestTimestampGaugeVec *prometheus.GaugeVec - reportedOwners map[string]struct{} logger log.Logger } @@ -227,11 +224,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client strategy: strategy, ringDesc: &Desc{}, shuffledSubringCache: map[subringCacheKey]*Ring{}, - memberOwnershipGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_member_ownership_percent", - Help: "The percent ownership of the ring by member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), numMembersGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_members", Help: "Number of members in the ring", @@ -241,11 +233,6 @@ func NewWithStoreClientAndStrategy(cfg Config, name, key string, store kv.Client Name: "ring_tokens_total", Help: "Number of tokens in the ring", ConstLabels: map[string]string{"name": name}}), - numTokensGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ - Name: "ring_tokens_owned", - Help: "The number of tokens in the ring owned by the member", - ConstLabels: map[string]string{"name": name}}, - []string{"member"}), oldestTimestampGaugeVec: promauto.With(reg).NewGaugeVec(prometheus.GaugeOpts{ Name: "ring_oldest_member_timestamp", Help: "Timestamp of the oldest member in the ring.", @@ -514,12 +501,10 @@ func (r *Ring) GetReplicationSetForOperation(op Operation) (ReplicationSet, erro }, nil } -// countTokens returns the number of tokens and tokens within the range for each instance. -func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { +// countTokens returns the number tokens within the range for each instance. +func (r *Desc) countTokens() map[string]uint32 { var ( - owned = map[string]uint32{} - numTokens = map[string]uint32{} - + owned = map[string]uint32{} ringTokens = r.GetTokens() ringInstanceByToken = r.getTokensInfo() ) @@ -535,7 +520,6 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { } info := ringInstanceByToken[token] - numTokens[info.InstanceID] = numTokens[info.InstanceID] + 1 owned[info.InstanceID] = owned[info.InstanceID] + diff } @@ -543,11 +527,10 @@ func (r *Desc) countTokens() (map[string]uint32, map[string]uint32) { for id := range r.Ingesters { if _, ok := owned[id]; !ok { owned[id] = 0 - numTokens[id] = 0 } } - return numTokens, owned + return owned } // updateRingMetrics updates ring metrics. Caller must be holding the Write lock! @@ -587,21 +570,6 @@ func (r *Ring) updateRingMetrics(compareResult CompareResult) { return } - prevOwners := r.reportedOwners - r.reportedOwners = make(map[string]struct{}) - numTokens, ownedRange := r.ringDesc.countTokens() - for id, totalOwned := range ownedRange { - r.memberOwnershipGaugeVec.WithLabelValues(id).Set(float64(totalOwned) / float64(math.MaxUint32)) - r.numTokensGaugeVec.WithLabelValues(id).Set(float64(numTokens[id])) - delete(prevOwners, id) - r.reportedOwners[id] = struct{}{} - } - - for k := range prevOwners { - r.memberOwnershipGaugeVec.DeleteLabelValues(k) - r.numTokensGaugeVec.DeleteLabelValues(k) - } - r.totalTokensGauge.Set(float64(len(r.ringTokens))) } diff --git a/vendor/github.com/grafana/dskit/services/basic_service.go b/vendor/github.com/grafana/dskit/services/basic_service.go index ead611a3f9..6ced33aabf 100644 --- a/vendor/github.com/grafana/dskit/services/basic_service.go +++ b/vendor/github.com/grafana/dskit/services/basic_service.go @@ -15,7 +15,7 @@ import ( type StartingFn func(serviceContext context.Context) error // RunningFn function is called when service enters Running state. When it returns, service will move to Stopping state. -// If RunningFn or Stopping return error, Service will end in Failed state, otherwise if both functions return without +// If RunningFn or StoppingFn return error, Service will end in Failed state, otherwise if both functions return without // error, service will end in Terminated state. type RunningFn func(serviceContext context.Context) error diff --git a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go index 91876a7281..a639460bbe 100644 --- a/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go +++ b/vendor/github.com/grafana/dskit/spanlogger/spanlogger.go @@ -97,11 +97,9 @@ func (s *SpanLogger) Error(err error) error { } func withContext(ctx context.Context, logger log.Logger, resolver TenantResolver) log.Logger { - // Weaveworks uses "orgs" and "orgID" to represent Cortex users, - // even though the code-base generally uses `userID` to refer to the same thing. userID, err := resolver.TenantID(ctx) if err == nil && userID != "" { - logger = log.With(logger, "org_id", userID) + logger = log.With(logger, "user", userID) } traceID, ok := tracing.ExtractSampledTraceID(ctx) diff --git a/vendor/modules.txt b/vendor/modules.txt index a0b0f90ebf..bae41a2d13 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -528,7 +528,7 @@ github.com/gorilla/mux # github.com/gorilla/websocket v1.4.2 ## explicit; go 1.12 github.com/gorilla/websocket -# github.com/grafana/dskit v0.0.0-20220331160727-49faf69f72ca +# github.com/grafana/dskit v0.0.0-20220518152339-07166f9e6d96 ## explicit; go 1.17 github.com/grafana/dskit/backoff github.com/grafana/dskit/concurrency