@ -276,11 +276,12 @@ func (i *Ingester) collectChunksToFlush(instance *instance, fp model.Fingerprint
}
// Flush this chunk if it hasn't already been successfully flushed.
if stream . chunks [ j ] . flushed . IsZero ( ) {
result = append ( result , & stream . chunks [ j ] )
if immediate {
reason = flushReasonForced
}
chunksFlushedPerReason . WithLabelValues ( reason ) . Add ( 1 )
stream . chunks [ j ] . reason = reason
result = append ( result , & stream . chunks [ j ] )
}
}
}
@ -342,6 +343,11 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
}
}
// flushChunks iterates over given chunkDescs, derives chunk.Chunk from them and flush them to the store, one at a time.
//
// If a chunk fails to be flushed, this operation is reinserted in the queue. Since previously flushed chunks
// are marked as flushed, they shouldn't be flushed again.
// It has to close given chunks to have have the head block included.
func ( i * Ingester ) flushChunks ( ctx context . Context , fp model . Fingerprint , labelPairs labels . Labels , cs [ ] * chunkDesc , chunkMtx sync . Locker ) error {
userID , err := tenant . TenantID ( ctx )
if err != nil {
@ -352,87 +358,124 @@ func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelP
labelsBuilder . Set ( nameLabel , logsValue )
metric := labelsBuilder . Labels ( )
wireChunks := make ( [ ] chunk . Chunk , len ( cs ) )
sizePerTenant := chunkSizePerTenant . WithLabelValues ( userID )
countPerTenant := chunksPerTenant . WithLabelValues ( userID )
// use anonymous function to make lock releasing simpler.
err = func ( ) error {
chunkMtx . Lock ( )
defer chunkMtx . Unlock ( )
for j , c := range cs {
if err := i . closeChunk ( c , chunkMtx ) ; err != nil {
return fmt . Errorf ( "chunk close for flushing: %w" , err )
}
for j , c := range cs {
// Ensure that new blocks are cut before flushing as data in the head block is not included otherwise.
if err := c . chunk . Close ( ) ; err != nil {
return err
}
firstTime , lastTime := loki_util . RoundToMilliseconds ( c . chunk . Bounds ( ) )
ch := chunk . NewChunk (
userID , fp , metric ,
chunkenc . NewFacade ( c . chunk , i . cfg . BlockSize , i . cfg . TargetChunkSize ) ,
firstTime ,
lastTime ,
)
chunkSize := c . chunk . BytesSize ( ) + 4 * 1024 // size + 4kB should be enough room for cortex header
start := time . Now ( )
if err := ch . EncodeTo ( bytes . NewBuffer ( make ( [ ] byte , 0 , chunkSize ) ) ) ; err != nil {
return err
}
chunkEncodeTime . Observe ( time . Since ( start ) . Seconds ( ) )
wireChunks [ j ] = ch
firstTime , lastTime := loki_util . RoundToMilliseconds ( c . chunk . Bounds ( ) )
ch := chunk . NewChunk (
userID , fp , metric ,
chunkenc . NewFacade ( c . chunk , i . cfg . BlockSize , i . cfg . TargetChunkSize ) ,
firstTime ,
lastTime ,
)
if err := i . encodeChunk ( ctx , ch , c ) ; err != nil {
return err
}
return nil
} ( )
if err != nil {
return err
}
if err := i . flushChunk ( ctx , ch ) ; err != nil {
return err
}
if err := i . store . Put ( ctx , wireChunks ) ; err != nil {
return err
i . markChunkAsFlushed ( cs [ j ] , chunkMtx )
reason := func ( ) string {
chunkMtx . Lock ( )
defer chunkMtx . Unlock ( )
return c . reason
} ( )
i . reportFlushedChunkStatistics ( ch , c , sizePerTenant , countPerTenant , reason )
}
flushedChunksStats . Inc ( int64 ( len ( wireChunks ) ) )
// Record statistics only when actual put request did not return error.
sizePerTenant := chunkSizePerTenant . WithLabelValues ( userID )
countPerTenant := chunksPerTenant . WithLabelValues ( userID )
return nil
}
// markChunkAsFlushed mark a chunk to make sure it won't be flushed if this operation fails.
func ( i * Ingester ) markChunkAsFlushed ( desc * chunkDesc , chunkMtx sync . Locker ) {
chunkMtx . Lock ( )
defer chunkMtx . Unlock ( )
desc . flushed = time . Now ( )
}
for i , wc := range wireChunks {
// closeChunk closes the given chunk while locking it to ensure that new blocks are cut before flushing.
//
// If the chunk isn't closed, data in the head block isn't included.
func ( i * Ingester ) closeChunk ( desc * chunkDesc , chunkMtx sync . Locker ) error {
chunkMtx . Lock ( )
defer chunkMtx . Unlock ( )
// flush successful, write while we have lock
cs [ i ] . flushed = time . Now ( )
return desc . chunk . Close ( )
}
numEntries := cs [ i ] . chunk . Size ( )
byt , err := wc . Encoded ( )
if err != nil {
continue
}
// encodeChunk encodes a chunk.Chunk based on the given chunkDesc.
//
// If the encoding is unsuccessful the flush operation is reinserted in the queue which will cause
// the encoding for a given chunk to be evaluated again.
func ( i * Ingester ) encodeChunk ( ctx context . Context , ch chunk . Chunk , desc * chunkDesc ) error {
if err := ctx . Err ( ) ; err != nil {
return err
}
start := time . Now ( )
chunkBytesSize := desc . chunk . BytesSize ( ) + 4 * 1024 // size + 4kB should be enough room for cortex header
if err := ch . EncodeTo ( bytes . NewBuffer ( make ( [ ] byte , 0 , chunkBytesSize ) ) ) ; err != nil {
return fmt . Errorf ( "chunk encoding: %w" , err )
}
chunkEncodeTime . Observe ( time . Since ( start ) . Seconds ( ) )
return nil
}
compressedSize := float64 ( len ( byt ) )
uncompressedSize , ok := chunkenc . UncompressedSize ( wc . Data )
// flushChunk flushes the given chunk to the store.
//
// If the flush is successful, metrics for this flush are to be reported.
// If the flush isn't successful, the operation for this userID is requeued allowing this and all other unflushed
// chunk to have another opportunity to be flushed.
func ( i * Ingester ) flushChunk ( ctx context . Context , ch chunk . Chunk ) error {
if err := i . store . Put ( ctx , [ ] chunk . Chunk { ch } ) ; err != nil {
return fmt . Errorf ( "store put chunk: %w" , err )
}
flushedChunksStats . Inc ( 1 )
return nil
}
if ok && compressedSize > 0 {
chunkCompressionRatio . Observe ( float64 ( uncompressedSize ) / compressedSize )
}
// reportFlushedChunkStatistics calculate overall statistics of flushed chunks without compromising the flush process.
func ( i * Ingester ) reportFlushedChunkStatistics ( ch chunk . Chunk , desc * chunkDesc , sizePerTenant prometheus . Counter , countPerTenant prometheus . Counter , reason string ) {
byt , err := ch . Encoded ( )
if err != nil {
level . Error ( util_log . Logger ) . Log ( "msg" , "failed to encode flushed wire chunk" , "err" , err )
return
}
chunksFlushedPerReason . WithLabelValues ( reason ) . Add ( 1 )
utilization := wc . Data . Utilization ( )
chunkUtilization . Observe ( utilization )
chunkEntries . Observe ( float64 ( numEntries ) )
chunkSize . Observe ( compressedSize )
sizePerTenant . Add ( compressedSize )
countPerTenant . Inc ( )
firstTime , lastTime := cs [ i ] . chunk . Bounds ( )
chunkAge . Observe ( time . Since ( firstTime ) . Seconds ( ) )
chunkLifespan . Observe ( lastTime . Sub ( firstTime ) . Hours ( ) )
flushedChunksBytesStats . Record ( compressedSize )
flushedChunksLinesStats . Record ( float64 ( numEntries ) )
flushedChunksUtilizationStats . Record ( utilization )
flushedChunksAgeStats . Record ( time . Since ( firstTime ) . Seconds ( ) )
flushedChunksLifespanStats . Record ( lastTime . Sub ( firstTime ) . Hours ( ) )
compressedSize := float64 ( len ( byt ) )
uncompressedSize , ok := chunkenc . UncompressedSize ( ch . Data )
if ok && compressedSize > 0 {
chunkCompressionRatio . Observe ( float64 ( uncompressedSize ) / compressedSize )
}
return nil
utilization := ch . Data . Utilization ( )
chunkUtilization . Observe ( utilization )
numEntries := desc . chunk . Size ( )
chunkEntries . Observe ( float64 ( numEntries ) )
chunkSize . Observe ( compressedSize )
sizePerTenant . Add ( compressedSize )
countPerTenant . Inc ( )
boundsFrom , boundsTo := desc . chunk . Bounds ( )
chunkAge . Observe ( time . Since ( boundsFrom ) . Seconds ( ) )
chunkLifespan . Observe ( boundsTo . Sub ( boundsFrom ) . Hours ( ) )
flushedChunksBytesStats . Record ( compressedSize )
flushedChunksLinesStats . Record ( float64 ( numEntries ) )
flushedChunksUtilizationStats . Record ( utilization )
flushedChunksAgeStats . Record ( time . Since ( boundsFrom ) . Seconds ( ) )
flushedChunksLifespanStats . Record ( boundsTo . Sub ( boundsFrom ) . Hours ( ) )
}