Cleanup/unordered writes ingester config (#4192)

* handle replaying unordered WAL into ordered configs.

* ingester limiter metric, testware, correctly replayed unordered WAL into ordered config
pull/4193/head
Owen Diehl 4 years ago committed by GitHub
parent 3d571f8bce
commit ff2ced09ea
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 14
      pkg/chunkenc/memchunk.go
  2. 8
      pkg/ingester/checkpoint.go
  3. 108
      pkg/ingester/checkpoint_test.go
  4. 1
      pkg/ingester/flush_test.go
  5. 13
      pkg/ingester/ingester.go
  6. 4
      pkg/ingester/instance.go
  7. 16
      pkg/ingester/instance_test.go
  8. 17
      pkg/ingester/limiter.go
  9. 4
      pkg/ingester/limiter_test.go
  10. 6
      pkg/ingester/metrics.go
  11. 40
      pkg/ingester/recovery.go
  12. 13
      pkg/ingester/stream.go
  13. 1
      pkg/ingester/transfer_test.go

@ -724,6 +724,20 @@ func (c *MemChunk) reorder() error {
return nil
}
func (c *MemChunk) ConvertHead(desired HeadBlockFmt) error {
if c.head != nil && c.head.Format() != desired {
newH, err := c.head.Convert(desired)
if err != nil {
return err
}
c.head = newH
}
c.headFmt = desired
return nil
}
// cut a new block and add it to finished blocks.
func (c *MemChunk) cut() error {
if c.head.IsEmpty() {

@ -100,10 +100,10 @@ func fromWireChunks(conf *Config, wireChunks []Chunk) ([]chunkDesc, error) {
lastUpdated: c.LastUpdated,
}
hbType := chunkenc.OrderedHeadBlockFmt
if conf.UnorderedWrites {
hbType = chunkenc.UnorderedHeadBlockFmt
}
// Always use Unordered headblocks during replay
// to ensure Loki can effectively replay an unordered-friendly
// WAL into a new configuration that disables unordered writes.
hbType := chunkenc.UnorderedHeadBlockFmt
mc, err := chunkenc.MemchunkFromCheckpoint(c.Data, c.Head, hbType, conf.BlockSize, conf.TargetChunkSize)
if err != nil {
return nil, err

@ -450,7 +450,7 @@ func Test_SeriesIterator(t *testing.T) {
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
@ -500,7 +500,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
IngestionBurstSizeMB: 1e4,
}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil, nil)
@ -575,3 +575,107 @@ func buildChunks(t testing.TB, size int) []Chunk {
}
return chks
}
func TestIngesterWALReplaysUnorderedToOrdered(t *testing.T) {
for _, waitForCheckpoint := range []bool{false, true} {
t.Run(fmt.Sprintf("checkpoint-%v", waitForCheckpoint), func(t *testing.T) {
walDir, err := ioutil.TempDir(os.TempDir(), "loki-wal")
require.Nil(t, err)
defer os.RemoveAll(walDir)
ingesterConfig := defaultIngesterTestConfigWithWAL(t, walDir)
// First launch the ingester with unordered writes enabled
dft := defaultLimitsTestConfig()
dft.UnorderedWrites = true
limits, err := validation.NewOverrides(dft, nil)
require.NoError(t, err)
newStore := func() *mockStore {
return &mockStore{
chunks: map[string][]chunk.Chunk{},
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
req := logproto.PushRequest{
Streams: []logproto.Stream{
{
Labels: `{foo="bar",bar="baz1"}`,
},
{
Labels: `{foo="bar",bar="baz2"}`,
},
},
}
start := time.Now()
steps := 10
end := start.Add(time.Second * time.Duration(steps))
// Write data out of order
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: start.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", i),
})
}
ctx := user.InjectOrgID(context.Background(), "test")
_, err = i.Push(ctx, &req)
require.NoError(t, err)
if waitForCheckpoint {
// Ensure we have checkpointed now
expectCheckpoint(t, walDir, true, ingesterConfig.WAL.CheckpointDuration*2) // give a bit of buffer
// Add some more data after the checkpoint
tmp := end
end = end.Add(time.Second * time.Duration(steps))
req.Streams[0].Entries = nil
req.Streams[1].Entries = nil
// Write data out of order again
for i := steps - 1; i >= 0; i-- {
req.Streams[0].Entries = append(req.Streams[0].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
req.Streams[1].Entries = append(req.Streams[1].Entries, logproto.Entry{
Timestamp: tmp.Add(time.Duration(i) * time.Second),
Line: fmt.Sprintf("line %d", steps+i),
})
}
_, err = i.Push(ctx, &req)
require.NoError(t, err)
}
ensureIngesterData(ctx, t, start, end, i)
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// Now disable unordered writes
limitCfg := defaultLimitsTestConfig()
limitCfg.UnorderedWrites = false
limits, err = validation.NewOverrides(limitCfg, nil)
require.NoError(t, err)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
// ensure we've recovered data from wal segments
ensureIngesterData(ctx, t, start, end, i)
})
}
}

@ -291,7 +291,6 @@ func defaultIngesterTestConfig(t testing.TB) Config {
cfg.LifecyclerConfig.MinReadyDuration = 0
cfg.BlockSize = 256 * 1024
cfg.TargetChunkSize = 1500 * 1024
cfg.UnorderedWrites = true
return cfg
}

@ -82,8 +82,6 @@ type Config struct {
ChunkFilterer storage.RequestChunkFilterer `yaml:"-"`
UnorderedWrites bool `yaml:"unordered_writes_enabled"`
IndexShards int `yaml:"index_shards"`
}
@ -107,7 +105,6 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.DurationVar(&cfg.MaxChunkAge, "ingester.max-chunk-age", time.Hour, "Maximum chunk age before flushing.")
f.DurationVar(&cfg.QueryStoreMaxLookBackPeriod, "ingester.query-store-max-look-back-period", 0, "How far back should an ingester be allowed to query the store for data, for use only with boltdb-shipper index and filesystem object store. -1 for infinite.")
f.BoolVar(&cfg.AutoForgetUnhealthy, "ingester.autoforget-unhealthy", false, "Enable to remove unhealthy ingesters from the ring after `ring.kvstore.heartbeat_timeout`")
f.BoolVar(&cfg.UnorderedWrites, "ingester.unordered-writes-enabled", false, "(Experimental) Allow out of order writes.")
f.IntVar(&cfg.IndexShards, "ingester.index-shards", index.DefaultIndexShards, "Shard factor used in the ingesters for the in process reverse index. This MUST be evenly divisible by ALL schema shard factors or Loki will not start.")
}
@ -231,7 +228,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
// Now that the lifecycler has been created, we can create the limiter
// which depends on it.
i.limiter = NewLimiter(limits, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
i.limiter = NewLimiter(limits, metrics, i.lifecycler, cfg.LifecyclerConfig.RingConfig.ReplicationFactor)
i.Service = services.NewBasicService(i.starting, i.running, i.stopping)
@ -328,9 +325,9 @@ func (i *Ingester) starting(ctx context.Context) error {
i.cfg.RetainPeriod = old
}()
// Disable the in process stream limit checks while replaying the WAL
i.limiter.Disable()
defer i.limiter.Enable()
// Disable the in process stream limit checks while replaying the WAL.
// It is re-enabled in the recover's Close() method.
i.limiter.DisableForWALReplay()
recoverer := newIngesterRecoverer(i)
defer recoverer.Close()
@ -381,6 +378,8 @@ func (i *Ingester) starting(ctx context.Context) error {
"errors", segmentRecoveryErr != nil,
)
level.Info(util_log.Logger).Log("msg", "closing recoverer")
recoverer.Close()
elapsed := time.Since(start)
i.metrics.walReplayDuration.Set(elapsed.Seconds())
level.Info(util_log.Logger).Log("msg", "recovery finished", "time", elapsed.String())

@ -132,7 +132,7 @@ func (i *instance) consumeChunk(ctx context.Context, ls labels.Labels, chunk *lo
if !ok {
sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(ls), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streamsByFP[fp] = stream
i.streams[stream.labelsString] = stream
i.streamsCreatedTotal.Inc()
@ -243,7 +243,7 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
fp := i.getHashForLabels(labels)
sortedLabels := i.index.Add(cortexpb.FromLabelsToLabelAdapters(labels), fp)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.limits.UnorderedWrites(i.instanceID), i.metrics)
stream = newStream(i.cfg, i.instanceID, fp, sortedLabels, i.limiter.UnorderedWrites(i.instanceID), i.metrics)
i.streams[pushReqStream.Labels] = stream
i.streamsByFP[fp] = stream

@ -39,7 +39,7 @@ var NilMetrics = newIngesterMetrics(nil)
func TestLabelsCollisions(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}, nil)
@ -66,7 +66,7 @@ func TestLabelsCollisions(t *testing.T) {
func TestConcurrentPushes(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
@ -117,7 +117,7 @@ func TestConcurrentPushes(t *testing.T) {
func TestSyncPeriod(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
const (
syncPeriod = 1 * time.Minute
@ -159,7 +159,7 @@ func TestSyncPeriod(t *testing.T) {
func Test_SeriesQuery(t *testing.T) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
// just some random values
cfg := defaultConfig()
@ -274,7 +274,7 @@ func makeRandomLabels() labels.Labels {
func Benchmark_PushInstance(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 1000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}, nil)
ctx := context.Background()
@ -314,7 +314,7 @@ func Benchmark_PushInstance(b *testing.B) {
func Benchmark_instance_addNewTailer(b *testing.B) {
limits, err := validation.NewOverrides(validation.Limits{MaxLocalStreamsPerUser: 100000}, nil)
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
limiter := NewLimiter(limits, NilMetrics, &ringCountMock{count: 1}, 1)
ctx := context.Background()
@ -368,7 +368,7 @@ func Test_Iterator(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)
@ -450,7 +450,7 @@ func Test_ChunkFilter(t *testing.T) {
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(
&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
&ingesterConfig, "fake", NewLimiter(overrides, NilMetrics, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil, &testFilter{})
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)

@ -24,32 +24,45 @@ type Limiter struct {
limits *validation.Overrides
ring RingCount
replicationFactor int
metrics *ingesterMetrics
mtx sync.RWMutex
disabled bool
}
func (l *Limiter) Disable() {
func (l *Limiter) DisableForWALReplay() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = true
l.metrics.limiterEnabled.Set(0)
}
func (l *Limiter) Enable() {
l.mtx.Lock()
defer l.mtx.Unlock()
l.disabled = false
l.metrics.limiterEnabled.Set(1)
}
// NewLimiter makes a new limiter
func NewLimiter(limits *validation.Overrides, ring RingCount, replicationFactor int) *Limiter {
func NewLimiter(limits *validation.Overrides, metrics *ingesterMetrics, ring RingCount, replicationFactor int) *Limiter {
return &Limiter{
limits: limits,
ring: ring,
replicationFactor: replicationFactor,
metrics: metrics,
}
}
func (l *Limiter) UnorderedWrites(userID string) bool {
// WAL replay should not discard previously ack'd writes,
// so allow out of order writes while the limiter is disabled.
if l.disabled {
return true
}
return l.limits.UnorderedWrites(userID)
}
// AssertMaxStreamsPerUser ensures limit has not been reached compared to the current
// number of streams in input and returns an error if so.
func (l *Limiter) AssertMaxStreamsPerUser(userID string, streams int) error {

@ -108,7 +108,7 @@ func TestLimiter_AssertMaxStreamsPerUser(t *testing.T) {
}, nil)
require.NoError(t, err)
limiter := NewLimiter(limits, ring, testData.ringReplicationFactor)
limiter := NewLimiter(limits, NilMetrics, ring, testData.ringReplicationFactor)
actual := limiter.AssertMaxStreamsPerUser("test", testData.streams)
assert.Equal(t, testData.expected, actual)
@ -155,7 +155,7 @@ func TestLimiter_minNonZero(t *testing.T) {
testData := testData
t.Run(testName, func(t *testing.T) {
limiter := NewLimiter(nil, nil, 0)
limiter := NewLimiter(nil, NilMetrics, nil, 0)
assert.Equal(t, testData.expected, limiter.minNonZero(testData.first, testData.second))
})
}

@ -27,6 +27,8 @@ type ingesterMetrics struct {
recoveryBytesInUse prometheus.Gauge
recoveryIsFlushing prometheus.Gauge
limiterEnabled prometheus.Gauge
autoForgetUnhealthyIngestersTotal prometheus.Counter
}
@ -119,6 +121,10 @@ func newIngesterMetrics(r prometheus.Registerer) *ingesterMetrics {
Name: "loki_ingester_wal_replay_flushing",
Help: "Whether the wal replay is in a flushing phase due to backpressure",
}),
limiterEnabled: promauto.With(r).NewGauge(prometheus.GaugeOpts{
Name: "loki_ingester_limiter_enabled",
Help: "Whether the ingester's limiter is enabled",
}),
autoForgetUnhealthyIngestersTotal: promauto.With(r).NewCounter(prometheus.CounterOpts{
Name: "loki_ingester_autoforget_unhealthy_ingesters_total",
Help: "Total number of ingesters automatically forgotten",

@ -99,6 +99,7 @@ type ingesterRecoverer struct {
}
func newIngesterRecoverer(i *Ingester) *ingesterRecoverer {
return &ingesterRecoverer{
ing: i,
done: make(chan struct{}),
@ -127,6 +128,9 @@ func (r *ingesterRecoverer) Series(series *Series) error {
stream.lastLine.content = series.LastLine
stream.entryCt = series.EntryCt
stream.highestTs = series.HighestTs
// Always set during replay, then reset to desired value afterward.
// This allows replaying unordered WALs into ordered configurations.
stream.unorderedWrites = true
if err != nil {
return err
@ -202,14 +206,46 @@ func (r *ingesterRecoverer) Push(userID string, entries RefEntries) error {
}
func (r *ingesterRecoverer) Close() {
// reset all the incrementing stream counters after a successful WAL replay.
// Ensure this is only run once.
select {
case <-r.done:
return
default:
}
close(r.done)
// Enable the limiter here to accurately reflect tenant limits after recovery.
r.ing.limiter.Enable()
for _, inst := range r.ing.getInstances() {
inst.forAllStreams(context.Background(), func(s *stream) error {
// reset all the incrementing stream counters after a successful WAL replay.
s.resetCounter()
// If we've replayed a WAL with unordered writes, but the new
// configuration disables them, convert all streams/head blocks
// to ensure unordered writes are disabled after the replay,
// but without dropping any previously accepted data.
isAllowed := r.ing.limiter.UnorderedWrites(s.tenant)
old := s.unorderedWrites
s.unorderedWrites = isAllowed
if !isAllowed && old {
s.chunkMtx.Lock()
defer s.chunkMtx.Unlock()
if len(s.chunks) > 0 {
err := s.chunks[len(s.chunks)-1].chunk.ConvertHead(headBlockType(isAllowed))
if err != nil {
return err
}
}
}
return nil
})
}
close(r.done)
}
func (r *ingesterRecoverer) Done() <-chan struct{} {

@ -161,11 +161,7 @@ func (s *stream) setChunks(chunks []Chunk) (bytesAdded, entriesAdded int, err er
}
func (s *stream) NewChunk() *chunkenc.MemChunk {
hbType := chunkenc.OrderedHeadBlockFmt
if s.unorderedWrites {
hbType = chunkenc.UnorderedHeadBlockFmt
}
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, hbType, s.cfg.BlockSize, s.cfg.TargetChunkSize)
return chunkenc.NewMemChunk(s.cfg.parsedEncoding, headBlockType(s.unorderedWrites), s.cfg.BlockSize, s.cfg.TargetChunkSize)
}
func (s *stream) Push(
@ -480,3 +476,10 @@ func (s *stream) addTailer(t *tailer) {
func (s *stream) resetCounter() {
s.entryCt = 0
}
func headBlockType(unorderedWrites bool) chunkenc.HeadBlockFmt {
if unorderedWrites {
return chunkenc.UnorderedHeadBlockFmt
}
return chunkenc.OrderedHeadBlockFmt
}

@ -30,7 +30,6 @@ func TestTransferOut(t *testing.T) {
f := newTestIngesterFactory(t)
ing := f.getIngester(time.Duration(0), t)
ing.cfg.UnorderedWrites = false // enforce ordered writes on old testware (transfers are deprecated).
// Push some data into our original ingester
ctx := user.InjectOrgID(context.Background(), "test")

Loading…
Cancel
Save