From b3b96568ed9ee7faf3a8fee61579ff077cb9753b Mon Sep 17 00:00:00 2001 From: Dylan Guedes Date: Wed, 13 Apr 2022 13:36:02 -0300 Subject: [PATCH] Loki: Flush chunks one at a time (#5894) * Flush chunks one at time. - Calls `store.Put` (flushes chunk to the store) for each chunk individually until it doesn't work instead of calling it for all chunks a single time * Refactor chunk flushing into smaller functions. * Rearrange flushing locks. - Updates our locking strategy to release the chunk lock more frequently. This release the lock after closing and marking a chunk as flushed. * Change where we report `chunksFlushedPerReason` metric. --- pkg/ingester/flush.go | 177 +++++++++++++++++++++++++---------------- pkg/ingester/stream.go | 1 + 2 files changed, 111 insertions(+), 67 deletions(-) diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 85a689ed49..f5ef5b1519 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -276,11 +276,12 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint } // Flush this chunk if it hasn't already been successfully flushed. if stream.chunks[j].flushed.IsZero() { - result = append(result, &stream.chunks[j]) if immediate { reason = flushReasonForced } - chunksFlushedPerReason.WithLabelValues(reason).Add(1) + stream.chunks[j].reason = reason + + result = append(result, &stream.chunks[j]) } } } @@ -342,6 +343,11 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe } } +// flushChunks iterates over given chunkDescs, derives chunk.Chunk from them and flush them to the store, one at a time. +// +// If a chunk fails to be flushed, this operation is reinserted in the queue. Since previously flushed chunks +// are marked as flushed, they shouldn't be flushed again. +// It has to close given chunks to have have the head block included. func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error { userID, err := tenant.TenantID(ctx) if err != nil { @@ -352,87 +358,124 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP labelsBuilder.Set(nameLabel, logsValue) metric := labelsBuilder.Labels() - wireChunks := make([]chunk.Chunk, len(cs)) + sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) + countPerTenant := chunksPerTenant.WithLabelValues(userID) - // use anonymous function to make lock releasing simpler. - err = func() error { - chunkMtx.Lock() - defer chunkMtx.Unlock() + for j, c := range cs { + if err := i.closeChunk(c, chunkMtx); err != nil { + return fmt.Errorf("chunk close for flushing: %w", err) + } - for j, c := range cs { - // Ensure that new blocks are cut before flushing as data in the head block is not included otherwise. - if err := c.chunk.Close(); err != nil { - return err - } - firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) - ch := chunk.NewChunk( - userID, fp, metric, - chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), - firstTime, - lastTime, - ) - - chunkSize := c.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header - start := time.Now() - if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkSize))); err != nil { - return err - } - chunkEncodeTime.Observe(time.Since(start).Seconds()) - wireChunks[j] = ch + firstTime, lastTime := loki_util.RoundToMilliseconds(c.chunk.Bounds()) + ch := chunk.NewChunk( + userID, fp, metric, + chunkenc.NewFacade(c.chunk, i.cfg.BlockSize, i.cfg.TargetChunkSize), + firstTime, + lastTime, + ) + + if err := i.encodeChunk(ctx, ch, c); err != nil { + return err } - return nil - }() - if err != nil { - return err - } + if err := i.flushChunk(ctx, ch); err != nil { + return err + } - if err := i.store.Put(ctx, wireChunks); err != nil { - return err + i.markChunkAsFlushed(cs[j], chunkMtx) + + reason := func() string { + chunkMtx.Lock() + defer chunkMtx.Unlock() + + return c.reason + }() + + i.reportFlushedChunkStatistics(ch, c, sizePerTenant, countPerTenant, reason) } - flushedChunksStats.Inc(int64(len(wireChunks))) - // Record statistics only when actual put request did not return error. - sizePerTenant := chunkSizePerTenant.WithLabelValues(userID) - countPerTenant := chunksPerTenant.WithLabelValues(userID) + return nil +} +// markChunkAsFlushed mark a chunk to make sure it won't be flushed if this operation fails. +func (i *Ingester) markChunkAsFlushed(desc *chunkDesc, chunkMtx sync.Locker) { chunkMtx.Lock() defer chunkMtx.Unlock() + desc.flushed = time.Now() +} - for i, wc := range wireChunks { +// closeChunk closes the given chunk while locking it to ensure that new blocks are cut before flushing. +// +// If the chunk isn't closed, data in the head block isn't included. +func (i *Ingester) closeChunk(desc *chunkDesc, chunkMtx sync.Locker) error { + chunkMtx.Lock() + defer chunkMtx.Unlock() - // flush successful, write while we have lock - cs[i].flushed = time.Now() + return desc.chunk.Close() +} - numEntries := cs[i].chunk.Size() - byt, err := wc.Encoded() - if err != nil { - continue - } +// encodeChunk encodes a chunk.Chunk based on the given chunkDesc. +// +// If the encoding is unsuccessful the flush operation is reinserted in the queue which will cause +// the encoding for a given chunk to be evaluated again. +func (i *Ingester) encodeChunk(ctx context.Context, ch chunk.Chunk, desc *chunkDesc) error { + if err := ctx.Err(); err != nil { + return err + } + start := time.Now() + chunkBytesSize := desc.chunk.BytesSize() + 4*1024 // size + 4kB should be enough room for cortex header + if err := ch.EncodeTo(bytes.NewBuffer(make([]byte, 0, chunkBytesSize))); err != nil { + return fmt.Errorf("chunk encoding: %w", err) + } + chunkEncodeTime.Observe(time.Since(start).Seconds()) + return nil +} - compressedSize := float64(len(byt)) - uncompressedSize, ok := chunkenc.UncompressedSize(wc.Data) +// flushChunk flushes the given chunk to the store. +// +// If the flush is successful, metrics for this flush are to be reported. +// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed +// chunk to have another opportunity to be flushed. +func (i *Ingester) flushChunk(ctx context.Context, ch chunk.Chunk) error { + if err := i.store.Put(ctx, []chunk.Chunk{ch}); err != nil { + return fmt.Errorf("store put chunk: %w", err) + } + flushedChunksStats.Inc(1) + return nil +} - if ok && compressedSize > 0 { - chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) - } +// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process. +func (i *Ingester) reportFlushedChunkStatistics(ch chunk.Chunk, desc *chunkDesc, sizePerTenant prometheus.Counter, countPerTenant prometheus.Counter, reason string) { + byt, err := ch.Encoded() + if err != nil { + level.Error(util_log.Logger).Log("msg", "failed to encode flushed wire chunk", "err", err) + return + } + + chunksFlushedPerReason.WithLabelValues(reason).Add(1) - utilization := wc.Data.Utilization() - chunkUtilization.Observe(utilization) - chunkEntries.Observe(float64(numEntries)) - chunkSize.Observe(compressedSize) - sizePerTenant.Add(compressedSize) - countPerTenant.Inc() - firstTime, lastTime := cs[i].chunk.Bounds() - chunkAge.Observe(time.Since(firstTime).Seconds()) - chunkLifespan.Observe(lastTime.Sub(firstTime).Hours()) - - flushedChunksBytesStats.Record(compressedSize) - flushedChunksLinesStats.Record(float64(numEntries)) - flushedChunksUtilizationStats.Record(utilization) - flushedChunksAgeStats.Record(time.Since(firstTime).Seconds()) - flushedChunksLifespanStats.Record(lastTime.Sub(firstTime).Hours()) + compressedSize := float64(len(byt)) + uncompressedSize, ok := chunkenc.UncompressedSize(ch.Data) + + if ok && compressedSize > 0 { + chunkCompressionRatio.Observe(float64(uncompressedSize) / compressedSize) } - return nil + utilization := ch.Data.Utilization() + chunkUtilization.Observe(utilization) + numEntries := desc.chunk.Size() + chunkEntries.Observe(float64(numEntries)) + chunkSize.Observe(compressedSize) + sizePerTenant.Add(compressedSize) + countPerTenant.Inc() + + boundsFrom, boundsTo := desc.chunk.Bounds() + chunkAge.Observe(time.Since(boundsFrom).Seconds()) + chunkLifespan.Observe(boundsTo.Sub(boundsFrom).Hours()) + + flushedChunksBytesStats.Record(compressedSize) + flushedChunksLinesStats.Record(float64(numEntries)) + flushedChunksUtilizationStats.Record(utilization) + flushedChunksAgeStats.Record(time.Since(boundsFrom).Seconds()) + flushedChunksLifespanStats.Record(boundsTo.Sub(boundsFrom).Hours()) } diff --git a/pkg/ingester/stream.go b/pkg/ingester/stream.go index fdf9a9b3c7..c470954c04 100644 --- a/pkg/ingester/stream.go +++ b/pkg/ingester/stream.go @@ -107,6 +107,7 @@ type chunkDesc struct { closed bool synced bool flushed time.Time + reason string lastUpdated time.Time }