From 2cb3c82cc8fade278f9182df03e716d7afc87ef2 Mon Sep 17 00:00:00 2001 From: Robert Fratto Date: Thu, 3 Oct 2019 11:59:22 -0400 Subject: [PATCH] pkg/ingester: prevent shutdowns from processing during joining handoff This commit fixes a race condition where an ingester that is shut down during the joining handoff (i.e., receiving chunks from a leaving ingester before claiming its tokens) hangs and can never shut down cleanly. A shutdown mutex is implemented which is obtained at the start of the handoff process and released after the handoff process completes. This race condition also prevented the leaving ingester from completing its shutdown, since it waits for the joining ingester to claim the tokens. This means that if a brand new ingester is shut down, it will always have finished receiving chunks from the previous leaving ingester and have finished obtaining the tokens from the ingester attempting to leave. --- pkg/ingester/ingester.go | 1 + pkg/ingester/transfer.go | 9 +++++++++ pkg/ingester/transfer_test.go | 23 ++++++++++++++++++++++- 3 files changed, 32 insertions(+), 1 deletion(-) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 821b02f460..42cca371e0 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -70,6 +70,7 @@ type Ingester struct { cfg Config clientConfig client.Config + shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown instancesMtx sync.RWMutex instances map[string]*instance readonly bool diff --git a/pkg/ingester/transfer.go b/pkg/ingester/transfer.go index ab3f139899..7b9a66370f 100644 --- a/pkg/ingester/transfer.go +++ b/pkg/ingester/transfer.go @@ -33,6 +33,11 @@ var ( // TransferChunks receives all chunks from another ingester. The Ingester // must be in PENDING state or else the call will fail. func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { + // Prevent a shutdown from happening until we've completely finished a handoff + // from a leaving ingester. + i.shutdownMtx.Lock() + defer i.shutdownMtx.Unlock() + // Entry JOINING state (only valid from PENDING) if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil { return err @@ -122,8 +127,12 @@ func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) // StopIncomingRequests implements ring.Lifecycler. func (i *Ingester) StopIncomingRequests() { + i.shutdownMtx.Lock() + defer i.shutdownMtx.Unlock() + i.instancesMtx.Lock() defer i.instancesMtx.Unlock() + i.readonly = true } diff --git a/pkg/ingester/transfer_test.go b/pkg/ingester/transfer_test.go index ea87cd54f5..dbf95c8945 100644 --- a/pkg/ingester/transfer_test.go +++ b/pkg/ingester/transfer_test.go @@ -176,18 +176,37 @@ type testIngesterClient struct { func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { chunkCh := make(chan *logproto.TimeSeriesChunk) respCh := make(chan *logproto.TransferChunksResponse) + waitCh := make(chan bool) - client := &testTransferChunksClient{ch: chunkCh, resp: respCh} + client := &testTransferChunksClient{ch: chunkCh, resp: respCh, wait: waitCh} go func() { server := &testTransferChunksServer{ch: chunkCh, resp: respCh} err := c.i.TransferChunks(server) require.NoError(c.t, err) }() + // After 50ms, we try killing the target ingester's lifecycler to verify + // that it obtained a lock on the shutdown process. This operation should + // block until the transfer completes. + // + // Then after another 50ms, we also allow data to start sending. This tests an issue + // where an ingester is shut down before it completes the handoff and ends up in an + // unhealthy state, permanently stuck in the handler for claiming tokens. + go func() { + time.Sleep(time.Millisecond * 50) + c.i.lifecycler.Shutdown() + }() + + go func() { + time.Sleep(time.Millisecond * 100) + close(waitCh) + }() + return client, nil } type testTransferChunksClient struct { + wait chan bool ch chan *logproto.TimeSeriesChunk resp chan *logproto.TransferChunksResponse @@ -195,11 +214,13 @@ type testTransferChunksClient struct { } func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { + <-c.wait c.ch <- chunk return nil } func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { + <-c.wait close(c.ch) resp := <-c.resp close(c.resp)