@ -2,6 +2,7 @@ package partition
import (
"context"
"fmt"
"sync"
"testing"
"time"
@ -13,6 +14,7 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"github.com/stretchr/testify/require"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/grafana/loki/v3/pkg/kafka"
"github.com/grafana/loki/v3/pkg/kafka/testkafka"
@ -161,3 +163,71 @@ func TestPartitionReader_ProcessCatchUpAtStartup(t *testing.T) {
err = services . StopAndAwaitTerminated ( context . Background ( ) , partitionReader )
require . NoError ( t , err )
}
func TestPartitionReader_ProcessCommits ( t * testing . T ) {
_ , kafkaCfg := testkafka . CreateCluster ( t , 1 , "test-topic" )
consumer := newMockConsumer ( )
consumerFactory := func ( _ Committer ) ( Consumer , error ) {
return consumer , nil
}
partitionID := int32 ( 0 )
partitionReader , err := NewReader ( kafkaCfg , partitionID , "test-consumer-group" , consumerFactory , log . NewNopLogger ( ) , prometheus . NewRegistry ( ) )
require . NoError ( t , err )
producer , err := kafka . NewWriterClient ( kafkaCfg , 100 , log . NewNopLogger ( ) , prometheus . NewRegistry ( ) )
require . NoError ( t , err )
// Init the client: This usually happens in "start" but we want to manage our own lifecycle for this test.
partitionReader . client , err = kafka . NewReaderClient ( kafkaCfg , nil , log . NewNopLogger ( ) ,
kgo . ConsumePartitions ( map [ string ] map [ int32 ] kgo . Offset {
kafkaCfg . Topic : { partitionID : kgo . NewOffset ( ) . AtStart ( ) } ,
} ) ,
)
require . NoError ( t , err )
stream := logproto . Stream {
Labels : labels . FromStrings ( "foo" , "bar" ) . String ( ) ,
Entries : [ ] logproto . Entry { { Timestamp : time . Now ( ) , Line : "test" } } ,
}
records , err := kafka . Encode ( partitionID , "test-tenant" , stream , 10 << 20 )
require . NoError ( t , err )
require . Len ( t , records , 1 )
ctx , cancel := context . WithDeadlineCause ( context . Background ( ) , time . Now ( ) . Add ( 10 * time . Second ) , fmt . Errorf ( "test unexpectedly deadlocked" ) )
recordsChan := make ( chan [ ] Record )
wait := consumer . Start ( ctx , recordsChan )
targetLag := time . Second
i := - 1
iterations := 5
producer . ProduceSync ( context . Background ( ) , records ... )
// timeSince acts as a hook for when we check if we've honoured the lag or not. We modify it to respond "no" initially, to force a re-loop, and then "yes" after `iterations`.
// We also inject a new kafka record each time so there is more to consume.
timeSince := func ( time . Time ) time . Duration {
i ++
if i < iterations {
producer . ProduceSync ( context . Background ( ) , records ... )
return targetLag + 1
}
return targetLag - 1
}
_ , err = partitionReader . processNextFetchesUntilLagHonored ( ctx , targetLag , log . NewNopLogger ( ) , recordsChan , timeSince )
assert . NoError ( t , err )
// Wait to process all the records
cancel ( )
wait ( )
close ( recordsChan )
close ( consumer . recordsChan )
recordsCount := 0
for receivedRecords := range consumer . recordsChan {
recordsCount += len ( receivedRecords )
}
// We expect to have processed all the records, including initial + one per iteration.
assert . Equal ( t , iterations + 1 , recordsCount )
}