ingester: support chunk transfers on ingester shutdown.

This commit introduces chunk transfers, borrowing the mechanism from
Cortex's implementation: when an ingester is shut down with claim
on rollout enabled, the ingester will find a pending ingester and
transfer all of its chunks to it.
pull/794/head
Robert Fratto 7 years ago
parent 3d5319e72a
commit d34b1f7450
  1. 5
      pkg/chunkenc/gzip.go
  2. 8
      pkg/ingester/client/client.go
  3. 2
      pkg/ingester/flush.go
  4. 3
      pkg/ingester/flush_test.go
  5. 71
      pkg/ingester/ingester.go
  6. 3
      pkg/ingester/ingester_test.go
  7. 19
      pkg/ingester/instance.go
  8. 15
      pkg/ingester/stream.go
  9. 228
      pkg/ingester/transfer.go
  10. 1332
      pkg/logproto/logproto.pb.go
  11. 24
      pkg/logproto/logproto.proto
  12. 2
      pkg/loki/modules.go

@ -155,8 +155,9 @@ func NewMemChunk(enc Encoding) *MemChunk {
// NewByteChunk returns a MemChunk on the passed bytes.
func NewByteChunk(b []byte) (*MemChunk, error) {
bc := &MemChunk{
cPool: &Gzip,
head: &headBlock{}, // Dummy, empty headblock.
cPool: &Gzip,
encoding: EncGZIP,
head: &headBlock{}, // Dummy, empty headblock.
}
db := decbuf{b: b}

@ -55,11 +55,13 @@ func New(cfg Config, addr string) (grpc_health_v1.HealthClient, error) {
return struct {
logproto.PusherClient
logproto.QuerierClient
logproto.IngesterClient
grpc_health_v1.HealthClient
io.Closer
}{
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
Closer: conn,
PusherClient: logproto.NewPusherClient(conn),
QuerierClient: logproto.NewQuerierClient(conn),
IngesterClient: logproto.NewIngesterClient(conn),
Closer: conn,
}, nil
}

@ -166,7 +166,7 @@ func (i *Ingester) flushLoop(j int) {
}
func (i *Ingester) flushUserSeries(userID string, fp model.Fingerprint, immediate bool) error {
instance, ok := i.getInstanceByID(userID)
instance, ok, _ := i.getInstanceByID(userID)
if !ok {
return nil
}

@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/grafana/loki/pkg/chunkenc"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/prometheus/common/model"
@ -61,7 +62,7 @@ func newTestStore(t require.TestingT, cfg Config) (*testStore, *Ingester) {
chunks: map[string][]chunk.Chunk{},
}
ing, err := New(cfg, store)
ing, err := New(cfg, client.Config{}, store)
require.NoError(t, err)
return store, ing

@ -17,9 +17,14 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
)
// ErrReadOnly is returned when the ingester is shutting down and a push was
// attempted.
var ErrReadOnly = errors.New("Ingester is shutting down")
var readinessProbeSuccess = []byte("Ready")
var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
@ -31,18 +36,25 @@ var flushQueueLength = promauto.NewGauge(prometheus.GaugeOpts{
type Config struct {
LifecyclerConfig ring.LifecyclerConfig `yaml:"lifecycler,omitempty"`
// Config for transferring chunks.
MaxTransferRetries int `yaml:"max_transfer_retries,omitempty"`
ConcurrentFlushes int `yaml:"concurrent_flushes"`
FlushCheckPeriod time.Duration `yaml:"flush_check_period"`
FlushOpTimeout time.Duration `yaml:"flush_op_timeout"`
RetainPeriod time.Duration `yaml:"chunk_retain_period"`
MaxChunkIdle time.Duration `yaml:"chunk_idle_period"`
BlockSize int `yaml:"chunk_block_size"`
// For testing, you can override the address and ID of this ingester.
ingesterClientFactory func(cfg client.Config, addr string) (grpc_health_v1.HealthClient, error)
}
// RegisterFlags registers the flags.
func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
cfg.LifecyclerConfig.RegisterFlags(f)
f.IntVar(&cfg.MaxTransferRetries, "ingester.max-transfer-retries", 10, "Number of times to try and transfer chunks before falling back to flushing.")
f.IntVar(&cfg.ConcurrentFlushes, "ingester.concurrent-flushed", 16, "")
f.DurationVar(&cfg.FlushCheckPeriod, "ingester.flush-check-period", 30*time.Second, "")
f.DurationVar(&cfg.FlushOpTimeout, "ingester.flush-op-timeout", 10*time.Second, "")
@ -53,10 +65,12 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
// Ingester builds chunks for incoming log streams.
type Ingester struct {
cfg Config
cfg Config
clientConfig client.Config
instancesMtx sync.RWMutex
instances map[string]*instance
readonly bool
lifecycler *ring.Lifecycler
store ChunkStore
@ -77,14 +91,19 @@ type ChunkStore interface {
}
// New makes a new Ingester.
func New(cfg Config, store ChunkStore) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
i := &Ingester{
cfg: cfg,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
cfg: cfg,
clientConfig: clientConfig,
instances: map[string]*instance{},
store: store,
quit: make(chan struct{}),
flushQueues: make([]*util.PriorityQueue, cfg.ConcurrentFlushes),
quitting: make(chan struct{}),
}
i.flushQueuesDone.Add(cfg.ConcurrentFlushes)
@ -138,16 +157,6 @@ func (i *Ingester) Stopping() {
}
}
// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
}
// TransferOut implements ring.Lifecycler.
func (i *Ingester) TransferOut(context.Context) error {
return nil
}
// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
@ -155,15 +164,19 @@ func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logpro
return nil, err
}
instance := i.getOrCreateInstance(instanceID)
instance, readonly := i.getOrCreateInstance(instanceID)
if readonly {
return nil, ErrReadOnly
}
err = instance.Push(ctx, req)
return &logproto.PushResponse{}, err
}
func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
inst, ok := i.getInstanceByID(instanceID)
if ok {
return inst
func (i *Ingester) getOrCreateInstance(instanceID string) (instance *instance, readonly bool) {
inst, ok, readonly := i.getInstanceByID(instanceID)
if ok || readonly {
return inst, readonly
}
i.instancesMtx.Lock()
@ -173,7 +186,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
inst = newInstance(instanceID, i.cfg.BlockSize)
i.instances[instanceID] = inst
}
return inst
return inst, i.readonly
}
// Query the ingests for log streams matching a set of matchers.
@ -183,7 +196,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
return err
}
instance := i.getOrCreateInstance(instanceID)
instance, _ := i.getOrCreateInstance(instanceID)
return instance.Query(req, queryServer)
}
@ -194,7 +207,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return nil, err
}
instance := i.getOrCreateInstance(instanceID)
instance, _ := i.getOrCreateInstance(instanceID)
return instance.Label(ctx, req)
}
@ -223,12 +236,12 @@ func (i *Ingester) ReadinessHandler(w http.ResponseWriter, r *http.Request) {
}
}
func (i *Ingester) getInstanceByID(id string) (*instance, bool) {
func (i *Ingester) getInstanceByID(id string) (instance *instance, ok bool, readonly bool) {
i.instancesMtx.RLock()
defer i.instancesMtx.RUnlock()
inst, ok := i.instances[id]
return inst, ok
return inst, ok, i.readonly
}
func (i *Ingester) getInstances() []*instance {
@ -255,7 +268,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
return err
}
instance := i.getOrCreateInstance(instanceID)
instance, _ := i.getOrCreateInstance(instanceID)
tailer, err := newTailer(instanceID, req.Query, req.Regex, queryServer)
if err != nil {
return err

@ -6,6 +6,7 @@ import (
"testing"
"time"
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/stretchr/testify/require"
"github.com/weaveworks/common/user"
@ -21,7 +22,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, store)
i, err := New(ingesterConfig, client.Config{}, store)
require.NoError(t, err)
defer i.Shutdown()

@ -71,6 +71,25 @@ func newInstance(instanceID string, blockSize int) *instance {
}
}
// consumeChunk manually adds a chunk that was received during ingester chunk
// transfer.
func (i *instance) consumeChunk(ctx context.Context, labels []client.LabelAdapter, chunk *logproto.Chunk) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()
fp := client.FastFingerprint(labels)
stream, ok := i.streams[fp]
if !ok {
stream = newStream(fp, labels, i.blockSize)
i.index.Add(labels, fp)
i.streams[fp] = stream
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
}
return stream.consumeChunk(ctx, chunk)
}
func (i *instance) Push(ctx context.Context, req *logproto.PushRequest) error {
i.streamsMtx.Lock()
defer i.streamsMtx.Unlock()

@ -73,6 +73,21 @@ func newStream(fp model.Fingerprint, labels []client.LabelAdapter, blockSize int
}
}
// consumeChunk manually adds a chunk to the stream that was received during
// ingester chunk transfer.
func (s *stream) consumeChunk(_ context.Context, chunk *logproto.Chunk) error {
c, err := chunkenc.NewByteChunk(chunk.Data)
if err != nil {
return err
}
s.chunks = append(s.chunks, chunkDesc{
chunk: c,
})
chunksCreatedTotal.Inc()
return nil
}
func (s *stream) Push(_ context.Context, entries []logproto.Entry) error {
if len(s.chunks) == 0 {
s.chunks = append(s.chunks, chunkDesc{

@ -0,0 +1,228 @@
package ingester
import (
"fmt"
"io"
"os"
"time"
"github.com/cortexproject/cortex/pkg/ingester/client"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/logproto"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/user"
"golang.org/x/net/context"
)
var (
sentChunks = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_sent_chunks",
Help: "The total number of chunks sent by this ingester whilst leaving.",
})
receivedChunks = promauto.NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_received_chunks",
Help: "The total number of chunks received by this ingester whilst joining.",
})
)
// TransferChunks receieves all chunks from another ingester. The Ingester
// must be in PENDING state or else the call will fail.
func (i *Ingester) TransferChunks(stream logproto.Ingester_TransferChunksServer) error {
// Entry JOINING state (only valid from PENDING)
if err := i.lifecycler.ChangeState(stream.Context(), ring.JOINING); err != nil {
return err
}
// The ingesters state effectively works as a giant mutex around this
// whole method, and as such we have to ensure we unlock the mutex.
defer func() {
state := i.lifecycler.GetState()
if i.lifecycler.GetState() == ring.ACTIVE {
return
}
level.Error(util.Logger).Log("msg", "TransferChunks failed, not in ACTIVE state.", "state", state)
// Enter PENDING state (only valid from JOINING)
if i.lifecycler.GetState() == ring.JOINING {
if err := i.lifecycler.ChangeState(stream.Context(), ring.PENDING); err != nil {
level.Error(util.Logger).Log("msg", "error rolling back failed TransferChunks", "err", err)
os.Exit(1)
}
}
}()
fromIngesterID := ""
seriesReceived := 0
for {
chunkSet, err := stream.Recv()
if err == io.EOF {
break
} else if err != nil {
return err
}
// We can't send "extra" fields with a streaming call, so we repeat
// chunkSet.FromIngesterId and assume it is the same every time around
// this loop.
if fromIngesterID == "" {
fromIngesterID = chunkSet.FromIngesterId
level.Info(util.Logger).Log("msg", "processing TransferChunks request", "from_ingester", fromIngesterID)
}
userCtx := user.InjectOrgID(stream.Context(), chunkSet.UserId)
lbls := []client.LabelAdapter{}
for _, lbl := range chunkSet.Labels {
lbls = append(lbls, client.LabelAdapter{Name: lbl.Name, Value: lbl.Value})
}
instance, _ := i.getOrCreateInstance(chunkSet.UserId)
for _, chunk := range chunkSet.Chunks {
if err := instance.consumeChunk(userCtx, lbls, chunk); err != nil {
return err
}
}
seriesReceived++
receivedChunks.Add(float64(len(chunkSet.Chunks)))
}
if seriesReceived == 0 {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no series", "from_ingester", fromIngesterID)
return fmt.Errorf("no series")
} else if fromIngesterID == "" {
level.Error(util.Logger).Log("msg", "received TransferChunks request with no ID from ingester")
return fmt.Errorf("no ingester id")
}
if err := i.lifecycler.ClaimTokensFor(stream.Context(), fromIngesterID); err != nil {
return err
}
if err := i.lifecycler.ChangeState(stream.Context(), ring.ACTIVE); err != nil {
return err
}
// Close the stream last, as this is what tells the "from" ingester that
// it's OK to shut down.
if err := stream.SendAndClose(&logproto.TransferChunksResponse{}); err != nil {
level.Error(util.Logger).Log("msg", "Error closing TransferChunks stream", "from_ingester", fromIngesterID, "err", err)
return err
}
level.Info(util.Logger).Log("msg", "Successfully transferred chunks", "from_ingester", fromIngesterID, "series_received", seriesReceived)
return nil
}
// StopIncomingRequests implements ring.Lifecycler.
func (i *Ingester) StopIncomingRequests() {
i.instancesMtx.Lock()
defer i.instancesMtx.Unlock()
i.readonly = true
}
// TransferOut implements ring.Lifecycler.
func (i *Ingester) TransferOut(ctx context.Context) error {
backoff := util.NewBackoff(ctx, util.BackoffConfig{
MinBackoff: 100 * time.Millisecond,
MaxBackoff: 5 * time.Second,
MaxRetries: i.cfg.MaxTransferRetries,
})
for backoff.Ongoing() {
err := i.transferOut(ctx)
if err == nil {
return nil
}
level.Error(util.Logger).Log("msg", "transfer failed", "err", err)
backoff.Wait()
}
return backoff.Err()
}
func (i *Ingester) transferOut(ctx context.Context) error {
targetIngester, err := i.findTransferTarget(ctx)
if err != nil {
return fmt.Errorf("cannot find ingester to transfer chunks to: %v", err)
}
level.Info(util.Logger).Log("msg", "sending chunks", "to_ingester", targetIngester.Addr)
c, err := i.cfg.ingesterClientFactory(i.clientConfig, targetIngester.Addr)
if err != nil {
return err
}
if c, ok := c.(io.Closer); ok {
defer c.Close()
}
ic := c.(logproto.IngesterClient)
ctx = user.InjectOrgID(ctx, "-1")
stream, err := ic.TransferChunks(ctx)
_, err = stream.CloseAndRecv()
if err != nil {
return errors.Wrap(err, "CloseAndRecv")
}
for instanceID, inst := range i.instances {
for _, istream := range inst.streams {
chunks := make([]*logproto.Chunk, 0, len(istream.chunks))
for _, c := range istream.chunks {
bb, err := c.chunk.Bytes()
if err != nil {
return err
}
chunks = append(chunks, &logproto.Chunk{
Data: bb,
})
}
lbls := []*logproto.LabelPair{}
for _, lbl := range istream.labels {
lbls = append(lbls, &logproto.LabelPair{Name: lbl.Name, Value: lbl.Value})
}
stream.Send(&logproto.TimeSeriesChunk{
Chunks: chunks,
UserId: instanceID,
Labels: lbls,
FromIngesterId: i.lifecycler.ID,
})
sentChunks.Add(float64(len(chunks)))
}
}
for _, flushQueue := range i.flushQueues {
flushQueue.DiscardAndClose()
}
i.flushQueuesDone.Wait()
level.Info(util.Logger).Log("msg", "successfully sent chunks", "to_ingester", targetIngester.Addr)
return nil
}
// findTransferTarget finds an ingester in a PENDING state to use for transferring
// chunks to.
func (i *Ingester) findTransferTarget(ctx context.Context) (*ring.IngesterDesc, error) {
ringDesc, err := i.lifecycler.KVStore.Get(ctx, ring.ConsulKey)
if err != nil {
return nil, err
}
ingesters := ringDesc.(*ring.Desc).FindIngestersByState(ring.PENDING)
if len(ingesters) == 0 {
return nil, fmt.Errorf("no pending ingesters")
}
return &ingesters[0], nil
}

File diff suppressed because it is too large Load Diff

@ -15,6 +15,10 @@ service Querier {
rpc Tail(TailRequest) returns (stream TailResponse) {};
}
service Ingester {
rpc TransferChunks(stream TimeSeriesChunk) returns (TransferChunksResponse) {};
}
message PushRequest {
repeated Stream streams = 1 [(gogoproto.jsontag) = "streams"];
}
@ -79,3 +83,23 @@ message DroppedStream {
google.protobuf.Timestamp to = 2 [(gogoproto.stdtime) = true, (gogoproto.nullable) = false];
string labels = 3;
}
message TimeSeriesChunk {
string from_ingester_id = 1;
string user_id = 2;
repeated LabelPair labels = 3;
repeated Chunk chunks = 4;
}
message LabelPair {
string name = 1;
string value = 2;
}
message Chunk {
bytes data = 1;
}
message TransferChunksResponse {
}

@ -151,7 +151,7 @@ func (t *Loki) initQuerier() (err error) {
func (t *Loki) initIngester() (err error) {
t.cfg.Ingester.LifecyclerConfig.ListenPort = &t.cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(t.cfg.Ingester, t.store)
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store)
if err != nil {
return
}

Loading…
Cancel
Save