mirror of https://github.com/grafana/loki
Do not transfer off chunks on ingester shutdown (#10709)
This PR removes the ability of an ingester in LEAVING state prior to shutdown to transfer chunks off to another ingester in the ring in PENDING state. The **Write Ahead Log** (WAL) supersedes the chunk transfer feature. See upgrade notes for information how to replace the removed setting. Signed-off-by: Christian Haudum <christian.haudum@gmail.com> Co-authored-by: Salva Corts <salva.corts@grafana.com>pull/10780/head
parent
da175771c0
commit
54e46446fa
@ -1,318 +0,0 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"os" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/grafana/dskit/ring" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/prometheus/prometheus/model/labels" |
||||
"golang.org/x/net/context" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
lokiutil "github.com/grafana/loki/pkg/util" |
||||
util_log "github.com/grafana/loki/pkg/util/log" |
||||
) |
||||
|
||||
var ( |
||||
sentChunks = promauto.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "loki", |
||||
Name: "ingester_sent_chunks", |
||||
Help: "The total number of chunks sent by this ingester whilst leaving.", |
||||
}) |
||||
receivedChunks = promauto.NewCounter(prometheus.CounterOpts{ |
||||
Namespace: "loki", |
||||
Name: "ingester_received_chunks", |
||||
Help: "The total number of chunks received by this ingester whilst joining.", |
||||
}) |
||||
) |
||||
|
||||
// TransferChunks receives all chunks from another ingester. The Ingester
|
||||
// must be in PENDING state or else the call will fail.
|
||||
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { |
||||
logger := util_log.WithContext(stream.Context(), util_log.Logger) |
||||
// Prevent a shutdown from happening until we've completely finished a handoff
|
||||
// from a leaving ingester.
|
||||
i.shutdownMtx.Lock() |
||||
defer i.shutdownMtx.Unlock() |
||||
|
||||
// Entry JOINING state (only valid from PENDING)
|
||||
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// The ingesters state effectively works as a giant mutex around this
|
||||
// whole method, and as such we have to ensure we unlock the mutex.
|
||||
defer func() { |
||||
state := i.lifecycler.GetState() |
||||
if i.lifecycler.GetState() == ring.ACTIVE { |
||||
return |
||||
} |
||||
|
||||
level.Error(logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state) |
||||
|
||||
// Enter PENDING state (only valid from JOINING)
|
||||
if i.lifecycler.GetState() == ring.JOINING { |
||||
// Create a new context here to attempt to update the state back to pending to allow
|
||||
// a failed transfer to try again. If we fail to set the state back to PENDING then
|
||||
// exit Loki as we will effectively be hung anyway stuck in a JOINING state and will
|
||||
// never join.
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute) |
||||
if err := i.lifecycler.ChangeState(ctx, ring.PENDING); err != nil { |
||||
level.Error(logger).Log("msg", "failed to update the ring state back to PENDING after "+ |
||||
"a chunk transfer failure, there is nothing more Loki can do from this state "+ |
||||
"so the process will exit...", "err", err) |
||||
os.Exit(1) |
||||
} |
||||
cancel() |
||||
} |
||||
}() |
||||
|
||||
fromIngesterID := "" |
||||
seriesReceived := 0 |
||||
|
||||
for { |
||||
chunkSet, err := stream.Recv() |
||||
if err == io.EOF { |
||||
break |
||||
} else if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// We can't send "extra" fields with a streaming call, so we repeat
|
||||
// chunkSet.FromIngesterId and assume it is the same every time around
|
||||
// this loop.
|
||||
if fromIngesterID == "" { |
||||
fromIngesterID = chunkSet.FromIngesterId |
||||
level.Info(logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) |
||||
|
||||
// Before transfer, make sure 'from' ingester is in correct state to call ClaimTokensFor later
|
||||
err := i.checkFromIngesterIsInLeavingState(stream.Context(), fromIngesterID) |
||||
if err != nil { |
||||
return errors.Wrap(err, "TransferChunks: checkFromIngesterIsInLeavingState") |
||||
} |
||||
} |
||||
|
||||
userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId) |
||||
|
||||
lbls := make([]labels.Label, 0, len(chunkSet.Labels)) |
||||
for _, lbl := range chunkSet.Labels { |
||||
lbls = append(lbls, labels.Label{Name: lbl.Name, Value: lbl.Value}) |
||||
} |
||||
|
||||
instance, err := i.GetOrCreateInstance(chunkSet.UserId) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
for _, chunk := range chunkSet.Chunks { |
||||
if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
seriesReceived++ |
||||
receivedChunks.Add(float64(len(chunkSet.Chunks))) |
||||
} |
||||
|
||||
if seriesReceived == 0 { |
||||
level.Error(logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) |
||||
return fmt.Errorf("no series") |
||||
} else if fromIngesterID == "" { |
||||
level.Error(logger).Log("msg", "received TransferChunks request with no ID from ingester") |
||||
return fmt.Errorf("no ingester id") |
||||
} |
||||
|
||||
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Close the stream last, as this is what tells the "from" ingester that
|
||||
// it's OK to shut down.
|
||||
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil { |
||||
level.Error(logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) |
||||
return err |
||||
} |
||||
level.Info(logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) |
||||
return nil |
||||
} |
||||
|
||||
// Ring gossiping: check if "from" ingester is in LEAVING state. It should be, but we may not see that yet
|
||||
// when using gossip ring. If we cannot see ingester is the LEAVING state yet, we don't accept this
|
||||
// transfer, as claiming tokens would possibly end up with this ingester owning no tokens, due to conflict
|
||||
// resolution in ring merge function. Hopefully the leaving ingester will retry transfer again.
|
||||
func (i *Ingester) checkFromIngesterIsInLeavingState(ctx context.Context, fromIngesterID string) error { |
||||
v, err := i.lifecycler.KVStore.Get(ctx, RingKey) |
||||
if err != nil { |
||||
return errors.Wrap(err, "get ring") |
||||
} |
||||
if v == nil { |
||||
return fmt.Errorf("ring not found when checking state of source ingester") |
||||
} |
||||
r, ok := v.(*ring.Desc) |
||||
if !ok || r == nil { |
||||
return fmt.Errorf("ring not found, got %T", v) |
||||
} |
||||
|
||||
if r.Ingesters == nil || r.Ingesters[fromIngesterID].State != ring.LEAVING { |
||||
return fmt.Errorf("source ingester is not in a LEAVING state, found state=%v", r.Ingesters[fromIngesterID].State) |
||||
} |
||||
|
||||
// all fine
|
||||
return nil |
||||
} |
||||
|
||||
// stopIncomingRequests is called when ingester is stopping
|
||||
func (i *Ingester) stopIncomingRequests() { |
||||
i.shutdownMtx.Lock() |
||||
defer i.shutdownMtx.Unlock() |
||||
|
||||
i.instancesMtx.Lock() |
||||
defer i.instancesMtx.Unlock() |
||||
|
||||
i.readonly = true |
||||
} |
||||
|
||||
// TransferOut implements ring.Lifecycler.
|
||||
func (i *Ingester) TransferOut(ctx context.Context) error { |
||||
if i.cfg.MaxTransferRetries <= 0 { |
||||
return ring.ErrTransferDisabled |
||||
} |
||||
|
||||
backoff := backoff.New(ctx, backoff.Config{ |
||||
MinBackoff: 100 * time.Millisecond, |
||||
MaxBackoff: 5 * time.Second, |
||||
MaxRetries: i.cfg.MaxTransferRetries, |
||||
}) |
||||
|
||||
for backoff.Ongoing() { |
||||
err := i.transferOut(ctx) |
||||
if err == nil { |
||||
return nil |
||||
} |
||||
|
||||
level.Error(util_log.WithContext(ctx, util_log.Logger)).Log("msg", "transfer failed", "err", err) |
||||
backoff.Wait() |
||||
} |
||||
|
||||
return backoff.Err() |
||||
} |
||||
|
||||
func (i *Ingester) transferOut(ctx context.Context) error { |
||||
logger := util_log.WithContext(ctx, util_log.Logger) |
||||
targetIngester, err := i.findTransferTarget(ctx) |
||||
if err != nil { |
||||
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err) |
||||
} |
||||
|
||||
level.Info(logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr) |
||||
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if c, ok := c.(io.Closer); ok { |
||||
defer lokiutil.LogErrorWithContext(ctx, "closing client", c.Close) |
||||
} |
||||
ic := c.(logproto.IngesterClient) |
||||
|
||||
ctx = user.InjectOrgID(ctx, "-1") |
||||
s, err := ic.TransferChunks(ctx) |
||||
if err != nil { |
||||
return errors.Wrap(err, "TransferChunks") |
||||
} |
||||
|
||||
for instanceID, inst := range i.instances { |
||||
err := inst.streams.ForEach(func(istream *stream) (bool, error) { |
||||
err = func() error { |
||||
istream.chunkMtx.Lock() |
||||
defer istream.chunkMtx.Unlock() |
||||
lbls := []*logproto.LabelPair{} |
||||
for _, lbl := range istream.labels { |
||||
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) |
||||
} |
||||
|
||||
// We moved to sending one chunk at a time in a stream instead of sending all chunks for a stream
|
||||
// as large chunks can create large payloads of >16MB which can hit GRPC limits,
|
||||
// typically streams won't have many chunks in memory so sending one at a time
|
||||
// shouldn't add too much overhead.
|
||||
for _, c := range istream.chunks { |
||||
// Close the chunk first, writing any data in the headblock to a new block.
|
||||
err := c.chunk.Close() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
bb, err := c.chunk.Bytes() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
chunks := make([]*logproto.Chunk, 1) |
||||
chunks[0] = &logproto.Chunk{ |
||||
Data: bb, |
||||
} |
||||
|
||||
err = s.Send(&logproto.TimeSeriesChunk{ |
||||
Chunks: chunks, |
||||
UserId: instanceID, |
||||
Labels: lbls, |
||||
FromIngesterId: i.lifecycler.ID, |
||||
}) |
||||
if err != nil { |
||||
level.Error(logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) |
||||
return err |
||||
} |
||||
|
||||
sentChunks.Add(float64(len(chunks))) |
||||
} |
||||
return nil |
||||
}() |
||||
if err != nil { |
||||
return false, err |
||||
} |
||||
return true, nil |
||||
}) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
_, err = s.CloseAndRecv() |
||||
if err != nil { |
||||
return errors.Wrap(err, "CloseAndRecv") |
||||
} |
||||
|
||||
for _, flushQueue := range i.flushQueues { |
||||
flushQueue.DiscardAndClose() |
||||
} |
||||
i.flushQueuesDone.Wait() |
||||
|
||||
level.Info(logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr) |
||||
return nil |
||||
} |
||||
|
||||
// findTransferTarget finds an ingester in a PENDING state to use for transferring
|
||||
// chunks to.
|
||||
func (i *Ingester) findTransferTarget(ctx context.Context) (*ring.InstanceDesc, error) { |
||||
ringDesc, err := i.lifecycler.KVStore.Get(ctx, RingKey) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) |
||||
if len(ingesters) == 0 { |
||||
return nil, fmt.Errorf("no pending ingesters") |
||||
} |
||||
|
||||
return &ingesters[0], nil |
||||
} |
@ -1,244 +0,0 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"sort" |
||||
"testing" |
||||
"time" |
||||
|
||||
gokitlog "github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/grafana/dskit/kv" |
||||
"github.com/grafana/dskit/ring" |
||||
"github.com/grafana/dskit/services" |
||||
"github.com/grafana/dskit/user" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
"golang.org/x/net/context" |
||||
"google.golang.org/grpc" |
||||
|
||||
"github.com/grafana/loki/pkg/chunkenc" |
||||
"github.com/grafana/loki/pkg/ingester/client" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logql/log" |
||||
util_log "github.com/grafana/loki/pkg/util/log" |
||||
) |
||||
|
||||
func TestTransferOut(t *testing.T) { |
||||
f := newTestIngesterFactory(t) |
||||
|
||||
ing := f.getIngester(time.Duration(0), t) |
||||
|
||||
// Push some data into our original ingester
|
||||
ctx := user.InjectOrgID(context.Background(), "test") |
||||
_, err := ing.Push(ctx, &logproto.PushRequest{ |
||||
Streams: []logproto.Stream{ |
||||
{ |
||||
Entries: []logproto.Entry{ |
||||
{Line: "line 0", Timestamp: time.Unix(0, 0)}, |
||||
{Line: "line 1", Timestamp: time.Unix(1, 0)}, |
||||
}, |
||||
Labels: `{foo="bar",bar="baz1"}`, |
||||
}, |
||||
{ |
||||
Entries: []logproto.Entry{ |
||||
{Line: "line 2", Timestamp: time.Unix(2, 0)}, |
||||
{Line: "line 3", Timestamp: time.Unix(3, 0)}, |
||||
}, |
||||
Labels: `{foo="bar",bar="baz2"}`, |
||||
}, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
assert.Len(t, ing.instances, 1) |
||||
if assert.Contains(t, ing.instances, "test") { |
||||
assert.Equal(t, ing.instances["test"].streams.Len(), 2) |
||||
} |
||||
|
||||
// Create a new ingester and transfer data to it
|
||||
ing2 := f.getIngester(time.Second*60, t) |
||||
defer services.StopAndAwaitTerminated(context.Background(), ing2) //nolint:errcheck
|
||||
require.NoError(t, services.StopAndAwaitTerminated(context.Background(), ing)) |
||||
|
||||
assert.Len(t, ing2.instances, 1) |
||||
if assert.Contains(t, ing2.instances, "test") { |
||||
assert.Equal(t, ing2.instances["test"].streams.Len(), 2) |
||||
|
||||
lines := []string{} |
||||
|
||||
// Get all the lines back and make sure the blocks transferred successfully
|
||||
_ = ing2.instances["test"].streams.ForEach(func(s *stream) (bool, error) { |
||||
it, err := s.Iterator( |
||||
context.TODO(), |
||||
nil, |
||||
time.Unix(0, 0), |
||||
time.Unix(10, 0), |
||||
logproto.FORWARD, |
||||
log.NewNoopPipeline().ForStream(s.labels), |
||||
) |
||||
if !assert.NoError(t, err) { |
||||
return true, nil |
||||
} |
||||
|
||||
for it.Next() { |
||||
entry := it.Entry() |
||||
lines = append(lines, entry.Line) |
||||
} |
||||
return true, nil |
||||
}) |
||||
|
||||
sort.Strings(lines) |
||||
|
||||
assert.Equal( |
||||
t, |
||||
[]string{"line 0", "line 1", "line 2", "line 3"}, |
||||
lines, |
||||
) |
||||
} |
||||
} |
||||
|
||||
type testIngesterFactory struct { |
||||
t *testing.T |
||||
store kv.Client |
||||
n int |
||||
ingesters map[string]*Ingester |
||||
} |
||||
|
||||
func newTestIngesterFactory(t *testing.T) *testIngesterFactory { |
||||
kvClient, err := kv.NewClient(kv.Config{Store: "inmemory"}, ring.GetCodec(), nil, gokitlog.NewNopLogger()) |
||||
require.NoError(t, err) |
||||
|
||||
return &testIngesterFactory{ |
||||
t: t, |
||||
store: kvClient, |
||||
ingesters: make(map[string]*Ingester), |
||||
} |
||||
} |
||||
|
||||
func (f *testIngesterFactory) getIngester(joinAfter time.Duration, t *testing.T) *Ingester { |
||||
f.n++ |
||||
|
||||
cfg := defaultIngesterTestConfig(t) |
||||
cfg.MaxTransferRetries = 1 |
||||
cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n) |
||||
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store |
||||
cfg.LifecyclerConfig.JoinAfter = joinAfter |
||||
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID |
||||
// Force a tiny chunk size and no encoding so we can guarantee multiple chunks
|
||||
// These values are also crafted around the specific use of `line _` in the log line which is 6 bytes long
|
||||
cfg.BlockSize = 3 // Block size needs to be less than chunk size so we can get more than one block per chunk
|
||||
cfg.TargetChunkSize = 24 |
||||
cfg.ChunkEncoding = chunkenc.EncNone.String() |
||||
|
||||
cfg.ingesterClientFactory = func(_ client.Config, addr string) (client.HealthAndIngesterClient, error) { |
||||
ingester, ok := f.ingesters[addr] |
||||
if !ok { |
||||
return nil, fmt.Errorf("no ingester %s", addr) |
||||
} |
||||
|
||||
return client.ClosableHealthAndIngesterClient{ |
||||
PusherClient: nil, |
||||
QuerierClient: nil, |
||||
IngesterClient: &testIngesterClient{t: f.t, i: ingester}, |
||||
Closer: io.NopCloser(nil), |
||||
}, nil |
||||
} |
||||
|
||||
_, ing := newTestStore(f.t, cfg, nil) |
||||
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing |
||||
|
||||
// NB there's some kind of race condition with the in-memory KV client when
|
||||
// we don't give the ingester a little bit of time to initialize. a 100ms
|
||||
// wait time seems effective.
|
||||
time.Sleep(time.Millisecond * 100) |
||||
return ing |
||||
} |
||||
|
||||
type testIngesterClient struct { |
||||
t *testing.T |
||||
i *Ingester |
||||
} |
||||
|
||||
func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { |
||||
chunkCh := make(chan *logproto.TimeSeriesChunk) |
||||
respCh := make(chan *logproto.TransferChunksResponse) |
||||
waitCh := make(chan bool) |
||||
|
||||
client := &testTransferChunksClient{ch: chunkCh, resp: respCh, wait: waitCh} |
||||
go func() { |
||||
server := &testTransferChunksServer{ch: chunkCh, resp: respCh} |
||||
err := c.i.TransferChunks(server) |
||||
require.NoError(c.t, err) |
||||
}() |
||||
|
||||
// After 50ms, we try killing the target ingester's lifecycler to verify
|
||||
// that it obtained a lock on the shutdown process. This operation should
|
||||
// block until the transfer completes.
|
||||
//
|
||||
// Then after another 50ms, we also allow data to start sending. This tests an issue
|
||||
// where an ingester is shut down before it completes the handoff and ends up in an
|
||||
// unhealthy state, permanently stuck in the handler for claiming tokens.
|
||||
go func() { |
||||
time.Sleep(time.Millisecond * 50) |
||||
c.i.stopIncomingRequests() // used to be called from lifecycler, now it must be called *before* stopping lifecyler. (ingester does this on shutdown)
|
||||
err := services.StopAndAwaitTerminated(context.Background(), c.i.lifecycler) |
||||
if err != nil { |
||||
level.Error(util_log.Logger).Log("msg", "lifecycler failed", "err", err) |
||||
} |
||||
}() |
||||
|
||||
go func() { |
||||
time.Sleep(time.Millisecond * 100) |
||||
close(waitCh) |
||||
}() |
||||
|
||||
return client, nil |
||||
} |
||||
|
||||
type testTransferChunksClient struct { |
||||
wait chan bool |
||||
ch chan *logproto.TimeSeriesChunk |
||||
resp chan *logproto.TransferChunksResponse |
||||
|
||||
grpc.ClientStream |
||||
} |
||||
|
||||
func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { |
||||
<-c.wait |
||||
c.ch <- chunk |
||||
return nil |
||||
} |
||||
|
||||
func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { |
||||
<-c.wait |
||||
close(c.ch) |
||||
resp := <-c.resp |
||||
close(c.resp) |
||||
return resp, nil |
||||
} |
||||
|
||||
type testTransferChunksServer struct { |
||||
ch chan *logproto.TimeSeriesChunk |
||||
resp chan *logproto.TransferChunksResponse |
||||
|
||||
grpc.ServerStream |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) Context() context.Context { |
||||
return context.Background() |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error { |
||||
s.resp <- resp |
||||
return nil |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) { |
||||
chunk, ok := <-s.ch |
||||
if !ok { |
||||
return nil, io.EOF |
||||
} |
||||
return chunk, nil |
||||
} |
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue