mirror of https://github.com/grafana/loki
Merge pull request #794 from rfratto/forward-ingester-chunks
ingester: support chunk transfers on ingester shutdown.pull/804/head
commit
d21c30d260
@ -0,0 +1,236 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"os" |
||||
"time" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/ingester/client" |
||||
"github.com/cortexproject/cortex/pkg/ring" |
||||
"github.com/cortexproject/cortex/pkg/util" |
||||
"github.com/go-kit/kit/log/level" |
||||
"github.com/grafana/loki/pkg/helpers" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/pkg/errors" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/client_golang/prometheus/promauto" |
||||
"github.com/weaveworks/common/user" |
||||
"golang.org/x/net/context" |
||||
) |
||||
|
||||
var ( |
||||
sentChunks = promauto.NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_ingester_sent_chunks", |
||||
Help: "The total number of chunks sent by this ingester whilst leaving.", |
||||
}) |
||||
receivedChunks = promauto.NewCounter(prometheus.CounterOpts{ |
||||
Name: "loki_ingester_received_chunks", |
||||
Help: "The total number of chunks received by this ingester whilst joining.", |
||||
}) |
||||
) |
||||
|
||||
// TransferChunks receives all chunks from another ingester. The Ingester
|
||||
// must be in PENDING state or else the call will fail.
|
||||
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error { |
||||
// Entry JOINING state (only valid from PENDING)
|
||||
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// The ingesters state effectively works as a giant mutex around this
|
||||
// whole method, and as such we have to ensure we unlock the mutex.
|
||||
defer func() { |
||||
state := i.lifecycler.GetState() |
||||
if i.lifecycler.GetState() == ring.ACTIVE { |
||||
return |
||||
} |
||||
|
||||
level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state) |
||||
|
||||
// Enter PENDING state (only valid from JOINING)
|
||||
if i.lifecycler.GetState() == ring.JOINING { |
||||
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil { |
||||
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err) |
||||
os.Exit(1) |
||||
} |
||||
} |
||||
}() |
||||
|
||||
fromIngesterID := "" |
||||
seriesReceived := 0 |
||||
|
||||
for { |
||||
chunkSet, err := stream.Recv() |
||||
if err == io.EOF { |
||||
break |
||||
} else if err != nil { |
||||
return err |
||||
} |
||||
|
||||
// We can't send "extra" fields with a streaming call, so we repeat
|
||||
// chunkSet.FromIngesterId and assume it is the same every time around
|
||||
// this loop.
|
||||
if fromIngesterID == "" { |
||||
fromIngesterID = chunkSet.FromIngesterId |
||||
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID) |
||||
} |
||||
|
||||
userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId) |
||||
|
||||
lbls := []client.LabelAdapter{} |
||||
for _, lbl := range chunkSet.Labels { |
||||
lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value}) |
||||
} |
||||
|
||||
instance := i.getOrCreateInstance(chunkSet.UserId) |
||||
for _, chunk := range chunkSet.Chunks { |
||||
if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
seriesReceived++ |
||||
receivedChunks.Add(float64(len(chunkSet.Chunks))) |
||||
} |
||||
|
||||
if seriesReceived == 0 { |
||||
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID) |
||||
return fmt.Errorf("no series") |
||||
} else if fromIngesterID == "" { |
||||
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester") |
||||
return fmt.Errorf("no ingester id") |
||||
} |
||||
|
||||
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil { |
||||
return err |
||||
} |
||||
|
||||
// Close the stream last, as this is what tells the "from" ingester that
|
||||
// it's OK to shut down.
|
||||
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil { |
||||
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err) |
||||
return err |
||||
} |
||||
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived) |
||||
return nil |
||||
} |
||||
|
||||
// StopIncomingRequests implements ring.Lifecycler.
|
||||
func (i *Ingester) StopIncomingRequests() { |
||||
i.instancesMtx.Lock() |
||||
defer i.instancesMtx.Unlock() |
||||
i.readonly = true |
||||
} |
||||
|
||||
// TransferOut implements ring.Lifecycler.
|
||||
func (i *Ingester) TransferOut(ctx context.Context) error { |
||||
backoff := util.NewBackoff(ctx, util.BackoffConfig{ |
||||
MinBackoff: 100 * time.Millisecond, |
||||
MaxBackoff: 5 * time.Second, |
||||
MaxRetries: i.cfg.MaxTransferRetries, |
||||
}) |
||||
|
||||
for backoff.Ongoing() { |
||||
err := i.transferOut(ctx) |
||||
if err == nil { |
||||
return nil |
||||
} |
||||
|
||||
level.Error(util.Logger).Log("msg", "transfer failed", "err", err) |
||||
backoff.Wait() |
||||
} |
||||
|
||||
return backoff.Err() |
||||
} |
||||
|
||||
func (i *Ingester) transferOut(ctx context.Context) error { |
||||
targetIngester, err := i.findTransferTarget(ctx) |
||||
if err != nil { |
||||
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err) |
||||
} |
||||
|
||||
level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr) |
||||
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if c, ok := c.(io.Closer); ok { |
||||
defer helpers.LogError("closing client", c.Close) |
||||
} |
||||
ic := c.(logproto.IngesterClient) |
||||
|
||||
ctx = user.InjectOrgID(ctx, "-1") |
||||
stream, err := ic.TransferChunks(ctx) |
||||
if err != nil { |
||||
return errors.Wrap(err, "TransferChunks") |
||||
} |
||||
|
||||
for instanceID, inst := range i.instances { |
||||
for _, istream := range inst.streams { |
||||
chunks := make([]*logproto.Chunk, 0, len(istream.chunks)) |
||||
|
||||
for _, c := range istream.chunks { |
||||
bb, err := c.chunk.Bytes() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
chunks = append(chunks, &logproto.Chunk{ |
||||
Data: bb, |
||||
}) |
||||
} |
||||
|
||||
lbls := []*logproto.LabelPair{} |
||||
for _, lbl := range istream.labels { |
||||
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value}) |
||||
} |
||||
|
||||
err := stream.Send(&logproto.TimeSeriesChunk{ |
||||
Chunks: chunks, |
||||
UserId: instanceID, |
||||
Labels: lbls, |
||||
FromIngesterId: i.lifecycler.ID, |
||||
}) |
||||
if err != nil { |
||||
level.Error(util.Logger).Log("msg", "failed sending stream's chunks to ingester", "to_ingester", targetIngester.Addr, "err", err) |
||||
return err |
||||
} |
||||
|
||||
sentChunks.Add(float64(len(chunks))) |
||||
} |
||||
} |
||||
|
||||
_, err = stream.CloseAndRecv() |
||||
if err != nil { |
||||
return errors.Wrap(err, "CloseAndRecv") |
||||
} |
||||
|
||||
for _, flushQueue := range i.flushQueues { |
||||
flushQueue.DiscardAndClose() |
||||
} |
||||
i.flushQueuesDone.Wait() |
||||
|
||||
level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr) |
||||
return nil |
||||
} |
||||
|
||||
// findTransferTarget finds an ingester in a PENDING state to use for transferring
|
||||
// chunks to.
|
||||
func (i *Ingester) findTransferTarget(ctx context.Context) (*ring.IngesterDesc, error) { |
||||
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING) |
||||
if len(ingesters) == 0 { |
||||
return nil, fmt.Errorf("no pending ingesters") |
||||
} |
||||
|
||||
return &ingesters[0], nil |
||||
} |
||||
@ -0,0 +1,210 @@ |
||||
package ingester |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"sort" |
||||
"testing" |
||||
"time" |
||||
|
||||
"google.golang.org/grpc" |
||||
"google.golang.org/grpc/health/grpc_health_v1" |
||||
|
||||
"github.com/cortexproject/cortex/pkg/ring" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
"github.com/weaveworks/common/user" |
||||
|
||||
"github.com/grafana/loki/pkg/ingester/client" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"golang.org/x/net/context" |
||||
) |
||||
|
||||
func TestTransferOut(t *testing.T) { |
||||
f := newTestIngesterFactory(t) |
||||
|
||||
ing := f.getIngester(time.Duration(0)) |
||||
|
||||
// Push some data into our original ingester
|
||||
ctx := user.InjectOrgID(context.Background(), "test") |
||||
_, err := ing.Push(ctx, &logproto.PushRequest{ |
||||
Streams: []*logproto.Stream{ |
||||
{ |
||||
Entries: []logproto.Entry{ |
||||
{Line: "line 0", Timestamp: time.Unix(0, 0)}, |
||||
{Line: "line 1", Timestamp: time.Unix(1, 0)}, |
||||
}, |
||||
Labels: `{foo="bar",bar="baz1"}`, |
||||
}, |
||||
{ |
||||
Entries: []logproto.Entry{ |
||||
{Line: "line 2", Timestamp: time.Unix(2, 0)}, |
||||
{Line: "line 3", Timestamp: time.Unix(3, 0)}, |
||||
}, |
||||
Labels: `{foo="bar",bar="baz2"}`, |
||||
}, |
||||
}, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
assert.Len(t, ing.instances, 1) |
||||
if assert.Contains(t, ing.instances, "test") { |
||||
assert.Len(t, ing.instances["test"].streams, 2) |
||||
} |
||||
|
||||
// Create a new ingester and trasfer data to it
|
||||
ing2 := f.getIngester(time.Second * 60) |
||||
ing.Shutdown() |
||||
|
||||
assert.Len(t, ing2.instances, 1) |
||||
if assert.Contains(t, ing2.instances, "test") { |
||||
assert.Len(t, ing2.instances["test"].streams, 2) |
||||
|
||||
lines := []string{} |
||||
|
||||
// Get all the lines back and make sure the blocks transferred successfully
|
||||
for _, stream := range ing2.instances["test"].streams { |
||||
it, err := stream.Iterator( |
||||
time.Unix(0, 0), |
||||
time.Unix(10, 0), |
||||
logproto.FORWARD, |
||||
func([]byte) bool { return true }, |
||||
) |
||||
if !assert.NoError(t, err) { |
||||
continue |
||||
} |
||||
|
||||
for it.Next() { |
||||
entry := it.Entry() |
||||
lines = append(lines, entry.Line) |
||||
} |
||||
} |
||||
|
||||
sort.Strings(lines) |
||||
|
||||
assert.Equal( |
||||
t, |
||||
[]string{"line 0", "line 1", "line 2", "line 3"}, |
||||
lines, |
||||
) |
||||
} |
||||
} |
||||
|
||||
type testIngesterFactory struct { |
||||
t *testing.T |
||||
store ring.KVClient |
||||
n int |
||||
ingesters map[string]*Ingester |
||||
} |
||||
|
||||
func newTestIngesterFactory(t *testing.T) *testIngesterFactory { |
||||
return &testIngesterFactory{ |
||||
t: t, |
||||
store: ring.NewInMemoryKVClient(ring.ProtoCodec{Factory: ring.ProtoDescFactory}), |
||||
ingesters: make(map[string]*Ingester), |
||||
} |
||||
} |
||||
|
||||
func (f *testIngesterFactory) getIngester(joinAfter time.Duration) *Ingester { |
||||
f.n++ |
||||
|
||||
cfg := defaultIngesterTestConfig() |
||||
cfg.MaxTransferRetries = 1 |
||||
cfg.LifecyclerConfig.ClaimOnRollout = true |
||||
cfg.LifecyclerConfig.ID = fmt.Sprintf("localhost-%d", f.n) |
||||
cfg.LifecyclerConfig.RingConfig.KVStore.Mock = f.store |
||||
cfg.LifecyclerConfig.JoinAfter = joinAfter |
||||
cfg.LifecyclerConfig.Addr = cfg.LifecyclerConfig.ID |
||||
|
||||
cfg.ingesterClientFactory = func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error) { |
||||
ingester, ok := f.ingesters[addr] |
||||
if !ok { |
||||
return nil, fmt.Errorf("no ingester %s", addr) |
||||
} |
||||
|
||||
return struct { |
||||
logproto.PusherClient |
||||
logproto.QuerierClient |
||||
logproto.IngesterClient |
||||
grpc_health_v1.HealthClient |
||||
io.Closer |
||||
}{ |
||||
PusherClient: nil, |
||||
QuerierClient: nil, |
||||
IngesterClient: &testIngesterClient{t: f.t, i: ingester}, |
||||
Closer: ioutil.NopCloser(nil), |
||||
}, nil |
||||
} |
||||
|
||||
_, ing := newTestStore(f.t, cfg) |
||||
f.ingesters[fmt.Sprintf("%s:0", cfg.LifecyclerConfig.ID)] = ing |
||||
|
||||
// NB there's some kind of race condition with the in-memory KV client when
|
||||
// we don't give the ingester a little bit of time to initialize. a 100ms
|
||||
// wait time seems effective.
|
||||
time.Sleep(time.Millisecond * 100) |
||||
return ing |
||||
} |
||||
|
||||
type testIngesterClient struct { |
||||
t *testing.T |
||||
i *Ingester |
||||
} |
||||
|
||||
func (c *testIngesterClient) TransferChunks(context.Context, ...grpc.CallOption) (logproto.Ingester_TransferChunksClient, error) { |
||||
chunkCh := make(chan *logproto.TimeSeriesChunk) |
||||
respCh := make(chan *logproto.TransferChunksResponse) |
||||
|
||||
client := &testTransferChunksClient{ch: chunkCh, resp: respCh} |
||||
go func() { |
||||
server := &testTransferChunksServer{ch: chunkCh, resp: respCh} |
||||
err := c.i.TransferChunks(server) |
||||
require.NoError(c.t, err) |
||||
}() |
||||
|
||||
return client, nil |
||||
} |
||||
|
||||
type testTransferChunksClient struct { |
||||
ch chan *logproto.TimeSeriesChunk |
||||
resp chan *logproto.TransferChunksResponse |
||||
|
||||
grpc.ClientStream |
||||
} |
||||
|
||||
func (c *testTransferChunksClient) Send(chunk *logproto.TimeSeriesChunk) error { |
||||
c.ch <- chunk |
||||
return nil |
||||
} |
||||
|
||||
func (c *testTransferChunksClient) CloseAndRecv() (*logproto.TransferChunksResponse, error) { |
||||
close(c.ch) |
||||
resp := <-c.resp |
||||
close(c.resp) |
||||
return resp, nil |
||||
} |
||||
|
||||
type testTransferChunksServer struct { |
||||
ch chan *logproto.TimeSeriesChunk |
||||
resp chan *logproto.TransferChunksResponse |
||||
|
||||
grpc.ServerStream |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) Context() context.Context { |
||||
return context.Background() |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) SendAndClose(resp *logproto.TransferChunksResponse) error { |
||||
s.resp <- resp |
||||
return nil |
||||
} |
||||
|
||||
func (s *testTransferChunksServer) Recv() (*logproto.TimeSeriesChunk, error) { |
||||
chunk, ok := <-s.ch |
||||
if !ok { |
||||
return nil, io.EOF |
||||
} |
||||
return chunk, nil |
||||
} |
||||
File diff suppressed because it is too large
Load Diff
Loading…
Reference in new issue