|
|
|
@ -611,6 +611,10 @@ func (i *Ingester) starting(ctx context.Context) (err error) { |
|
|
|
|
i.setPrepareShutdown() |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// start our flush loop: this needs to start before the partition-reader in order for chunks to be shipped in the case of Kafka catching up.
|
|
|
|
|
i.loopDone.Add(1) |
|
|
|
|
go i.loop() |
|
|
|
|
|
|
|
|
|
// When kafka ingestion is enabled, we have to make sure that reader catches up replaying the partition
|
|
|
|
|
// BEFORE the ingester ring lifecycler is started, because once the ingester ring lifecycler will start
|
|
|
|
|
// it will switch the ingester state in the ring to ACTIVE.
|
|
|
|
@ -646,9 +650,7 @@ func (i *Ingester) starting(ctx context.Context) (err error) { |
|
|
|
|
return fmt.Errorf("failed to start partition ring lifecycler: %w", err) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// start our loop
|
|
|
|
|
i.loopDone.Add(1) |
|
|
|
|
go i.loop() |
|
|
|
|
|
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|