pkg/ingester: prevent shutdowns from processing during joining handoff

This commit fixes a race condition where an ingester that is shut down
during the joining handoff (i.e., receiving chunks from a leaving
ingester before claiming its tokens) hangs and can never shut down
cleanly. A shutdown mutex is implemented which is obtained at the
start of the handoff process and released after the handoff process
completes. This race condition also prevented the leaving ingester from
completing its shutdown, since it waits for the joining ingester to
claim the tokens.

This means that if a brand new ingester is shut down, it will always
have finished receiving chunks from the previous leaving ingester and
have finished obtaining the tokens from the ingester attempting to
leave.
pull/1137/head
Robert Fratto 6 years ago committed by Ed Welch
parent b75965e915
commit 2cb3c82cc8
  1. 1
      pkg/ingester/ingester.go
  2. 9
      pkg/ingester/transfer.go
  3. 23
      pkg/ingester/transfer_test.go

@ -70,6 +70,7 @@ type Ingester struct {
cfg Config
clientConfig client.Config
shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool

@ -33,6 +33,11 @@ var (
// TransferChunks receives all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
// Prevent a shutdown from happening until we've completely finished a handoff
// from a leaving ingester.
i.shutdownMtx.Lock()
defer i.shutdownMtx.Unlock()
// Entry JOINING state (only valid from PENDING)
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil {
return err
@ -122,8 +127,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer)
// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.shutdownMtx.Lock()
defer i.shutdownMtx.Unlock()
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()
i.readonly = true
}

@ -176,18 +176,37 @@ type testIngesterClient struct {
func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) {
chunkCh := make(chan *logproto.TimeSeriesChunk)
respCh := make(chan *logproto.TransferChunksResponse)
waitCh := make(chan bool)
client := &testTransferChunksClient{ch: chunkCh, resp: respCh}
client := &testTransferChunksClient{ch: chunkCh, resp: respCh, wait: waitCh}
go func() {
server := &testTransferChunksServer{ch: chunkCh, resp: respCh}
err := c.i.TransferChunks(server)
require.NoError(c.t, err)
}()
// After 50ms, we try killing the target ingester's lifecycler to verify
// that it obtained a lock on the shutdown process. This operation should
// block until the transfer completes.
//
// Then after another 50ms, we also allow data to start sending. This tests an issue
// where an ingester is shut down before it completes the handoff and ends up in an
// unhealthy state, permanently stuck in the handler for claiming tokens.
go func() {
time.Sleep(time.Millisecond * 50)
c.i.lifecycler.Shutdown()
}()
go func() {
time.Sleep(time.Millisecond * 100)
close(waitCh)
}()
return client, nil
}
type testTransferChunksClient struct {
wait chan bool
ch chan *logproto.TimeSeriesChunk
resp chan *logproto.TransferChunksResponse
@ -195,11 +214,13 @@ type testTransferChunksClient struct {
}
func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error {
<-c.wait
c.ch <- chunk
return nil
}
func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) {
<-c.wait
close(c.ch)
resp := <-c.resp
close(c.resp)

Loading…
Cancel
Save