pkg/ingester: check that ingester is in LEAVING state when transferring chunks and claiming tokens. Required when using memberlist client. (#1300)

* On chunks transfer, verify that source ingester is LEAVING

This is important when using gossiping ring. If source ingester is not
leaving yet, than claiming its tokens will fail (merge function
will not move ownership to the target ingester), and target ingester
will end up with no tokens in the ring.

* Register ring to prometheus.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Update check using latest version from Cortex.

There is no direct usage of logging, which is consistent
with the rest of this method.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>

* Move import to group of imports with third-party packages.

Signed-off-by: Peter Štibraný <peter.stibrany@grafana.com>
pull/1396/head
Peter Štibraný 6 years ago committed by Cyril Tovena
parent 17b2ce0700
commit dd96fa146c
  1. 31
      pkg/ingester/transfer.go
  2. 2
      pkg/loki/modules.go

@ -81,6 +81,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID)
if err != nil {
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState")
}
}
userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
@ -127,6 +133,31 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
return nil
}
// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet
// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this
// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict
// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again.
func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error {
v, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return errors.Wrap(err, "get ring")
}
if v == nil {
return fmt.Errorf("ring not found when checking state of source ingester")
}
r, ok := v.(*ring.Desc)
if !ok || r == nil {
return fmt.Errorf("ring not found, got %T", v)
}
if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING {
return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State)
}
// all fine
return nil
}
// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.shutdownMtx.Lock()

@ -13,6 +13,7 @@ import (
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/middleware"
"github.com/weaveworks/common/server"
"google.golang.org/grpc/health/grpc_health_v1"
@ -120,6 +121,7 @@ func (t *Loki) initRing() (err error) {
if err != nil {
return
}
prometheus.MustRegister(t.ring)
t.server.HTTP.Handle("/ring", t.ring)
return
}

Loading…
Cancel
Save