diff --git a/go.mod b/go.mod index d4b9dc86e1..3545b7532f 100644 --- a/go.mod +++ b/go.mod @@ -151,7 +151,7 @@ require ( github.com/shirou/gopsutil/v4 v4.25.10 github.com/thanos-io/objstore v0.0.0-20250115091151-a54d0f04b42a github.com/tjhop/slog-gokit v0.1.4 - github.com/twmb/franz-go v1.20.2 + github.com/twmb/franz-go v1.20.3 github.com/twmb/franz-go/pkg/kadm v1.17.1 github.com/twmb/franz-go/pkg/kfake v0.0.0-20250603004440-37eecbb8927f github.com/twmb/franz-go/pkg/kmsg v1.12.0 diff --git a/go.sum b/go.sum index 05feb21995..34ed87b052 100644 --- a/go.sum +++ b/go.sum @@ -1107,8 +1107,8 @@ github.com/tklauser/numcpus v0.10.0 h1:18njr6LDBk1zuna922MgdjQuJFjrdppsZG60sHGfj github.com/tklauser/numcpus v0.10.0/go.mod h1:BiTKazU708GQTYF4mB+cmlpT2Is1gLk7XVuEeem8LsQ= github.com/ttacon/chalk v0.0.0-20160626202418-22c06c80ed31/go.mod h1:onvgF043R+lC5RZ8IT9rBXDaEDnpnw/Cl+HFiw+v/7Q= github.com/tv42/httpunix v0.0.0-20150427012821-b75d8614f926/go.mod h1:9ESjWnEqriFuLhtthL60Sar/7RFoluCcXsuvEwTV5KM= -github.com/twmb/franz-go v1.20.2 h1:CiwhyKZHW6vqSHJkh+RTxFAJkio0jBjM/JQhx/HZ72A= -github.com/twmb/franz-go v1.20.2/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= +github.com/twmb/franz-go v1.20.3 h1:gjwZwZmmvo/t7mxyj6frxDORVxsqrycXPnDrpkXldfY= +github.com/twmb/franz-go v1.20.3/go.mod h1:YCnepDd4gl6vdzG03I5Wa57RnCTIC6DVEyMpDX/J8UA= github.com/twmb/franz-go/pkg/kadm v1.17.1 h1:Bt02Y/RLgnFO2NP2HVP1kd2TFtGRiJZx+fSArjZDtpw= github.com/twmb/franz-go/pkg/kadm v1.17.1/go.mod h1:s4duQmrDbloVW9QTMXhs6mViTepze7JLG43xwPcAeTg= github.com/twmb/franz-go/pkg/kfake v0.0.0-20250603004440-37eecbb8927f h1:69/xwCyhBOKyMaPISOxdmfhxVZZ/WEwurPZKUw3yRrc= diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/atomic_maybe_work.go b/vendor/github.com/twmb/franz-go/pkg/kgo/atomic_maybe_work.go index bfdd3c1deb..1cf93ad68e 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/atomic_maybe_work.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/atomic_maybe_work.go @@ -8,7 +8,7 @@ const ( stateContinueWorking ) -type workLoop struct{ state atomicU32 } +type workLoop struct{ state atomic.Uint32 } // maybeBegin returns whether a work loop should begin. func (l *workLoop) maybeBegin() bool { diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/broker.go b/vendor/github.com/twmb/franz-go/pkg/kgo/broker.go index 6781f8063d..175c839a73 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/broker.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/broker.go @@ -162,7 +162,7 @@ type broker struct { // reqs manages incoming message requests. reqs ring[promisedReq] // dead is an atomic so a backed up reqs cannot block broker stoppage. - dead atomicBool + dead atomic.Bool } // brokerVersions is loaded once (and potentially a few times concurrently if @@ -701,7 +701,7 @@ func (b *broker) connect(ctx context.Context) (net.Conn, error) { // brokerCxn manages an actual connection to a Kafka broker. This is separate // the broker struct to allow lazy connection (re)creation. type brokerCxn struct { - throttleUntil atomicI64 // atomic nanosec + throttleUntil atomic.Int64 // atomic nanosec conn net.Conn @@ -718,17 +718,17 @@ type brokerCxn struct { // The following four fields are used for connection reaping. // Write is only updated in one location; read is updated in three // due to readConn, readConnAsync, and discard. - lastWrite atomicI64 - lastRead atomicI64 - writing atomicBool - reading atomicBool + lastWrite atomic.Int64 + lastRead atomic.Int64 + writing atomic.Bool + reading atomic.Bool successes uint64 // resps manages reading kafka responses. resps ring[promisedResp] // dead is an atomic so that a backed up resps cannot block cxn death. - dead atomicBool + dead atomic.Bool // closed in cloneConn; allows throttle waiting to quit deadCh chan struct{} } diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go index ee6c2ad855..f3d69cf4d8 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/client.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/client.go @@ -1442,6 +1442,17 @@ start: } } + log := func(backoff time.Duration) { + r.cl.cfg.logger.Log(LogLevelDebug, "retrying request", + "request", kmsg.NameForKey(req.Key()), + "tries", tries, + "backoff", backoff, + "time_since_start", time.Since(tryStart), + "request_error", err, + "response_error", retryErr, + ) + } + if err != nil || retryErr != nil { if r.limitRetries == 0 || tries <= r.limitRetries { backoff := r.cl.cfg.retryBackoff(tries) @@ -1451,19 +1462,13 @@ start: // is a broker-specific network error, and the next // broker is different than the current, we also retry. if r.cl.shouldRetry(tries, err) || r.cl.shouldRetry(tries, retryErr) { - r.cl.cfg.logger.Log(LogLevelDebug, "retrying request", - "request", kmsg.NameForKey(req.Key()), - "tries", tries, - "backoff", backoff, - "time_since_start", time.Since(tryStart), - "request_error", err, - "response_error", retryErr, - ) + log(backoff) if r.cl.waitTries(ctx, backoff) { next, nextErr = r.br() goto start } } else if r.cl.shouldRetryNext(tries, err) { + log(backoff) next, nextErr = r.br() if next != br && r.cl.waitTries(ctx, backoff) { goto start @@ -2430,7 +2435,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res retryTimeout = cl.cfg.retryTimeout(req.Key()) wg sync.WaitGroup - issue func(reqTry) + issue func(reqTry, int32) ) l := cl.cfg.logger @@ -2441,7 +2446,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // // This recursively calls itself if a request fails and can be retried. // We avoid stack problems because this calls itself in a goroutine. - issue = func(try reqTry) { + issue = func(try reqTry, avoidBroker int32) { issues, reshardable, err := sharder.shard(ctx, try.req, try.lastErr) if err != nil { l.Log(LogLevelDebug, "unable to shard request", "req", kmsg.Key(try.req.Key()).Name(), "previous_tries", try.tries, "err", err) @@ -2496,17 +2501,21 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res start: tries++ - broker := cl.broker() + br := cl.broker() var err error if !myIssue.any { - broker, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker) + br, err = cl.brokerOrErr(ctx, myIssue.broker, errUnknownBroker) + } else if avoidBroker != -1 { + for i := 0; i < 3 && br.meta.NodeID == avoidBroker; i++ { + br = cl.broker() + } } if err != nil { addShard(shard(nil, myIssue.req, nil, err)) // failure to load a broker is a failure to issue a request return } - resp, err := broker.waitResp(ctx, myIssue.req) + resp, err := br.waitResp(ctx, myIssue.req) var errIsFromResp bool if err == nil { err = sharder.onResp(myIssue.req, resp) // perform some potential cleanup, and potentially receive an error to retry @@ -2523,10 +2532,36 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res // immediately. The request was not even issued. However, as a // safety, we only do this 3 times to avoid some super weird // pathological spin loop. - backoff := cl.cfg.retryBackoff(tries) + // + // We do retry on pinnedOld even if noRetries==true because + // the request was not issued; the sharder may handle + // errBrokerTooOld by pinning / splitting differently next try. + var ( + backoff = cl.cfg.retryBackoff(tries) + pinnedOld = reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3 + notTimedOut = retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout + shouldRetry = cl.shouldRetry(tries, err) + shouldRetryNext = myIssue.any && cl.shouldRetryNext(tries, err) + ) + + // If we retried on a "next" broker, but we randomly chose + // that same broker 3x, then we avoid retrying again on a + // "next" broker. + // + // If we retry at all, we need to clear `avoidBroker` in + // case it's already set. however, if we *do* need to retry + // on a different broker, then we set it. + if avoidBroker != -1 && br.meta.NodeID == avoidBroker { + shouldRetryNext = false + } + avoidBroker = -1 + if shouldRetryNext { + avoidBroker = br.meta.NodeID + } + if err != nil && - (reshardable && isPinned && errors.Is(err, errBrokerTooOld) && tries <= 3) || - (retryTimeout == 0 || time.Now().Add(backoff).Sub(start) <= retryTimeout) && cl.shouldRetry(tries, err) && cl.waitTries(ctx, backoff) && !noRetries { + (pinnedOld || + !noRetries && notTimedOut && (shouldRetry || shouldRetryNext) && cl.waitTries(ctx, backoff)) { // Non-reshardable re-requests just jump back to the // top where the broker is loaded. This is the case on // requests where the original request is split to @@ -2536,7 +2571,7 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res goto start } l.Log(LogLevelDebug, "sharded request failed, resharding and reissuing", "req", kmsg.Key(myIssue.req.Key()).Name(), "time_since_start", time.Since(start), "tries", tries, "err", err) - issue(reqTry{tries, myIssue.req, err}) + issue(reqTry{tries, myIssue.req, err}, avoidBroker) return } @@ -2548,12 +2583,12 @@ func (cl *Client) handleShardedReq(ctx context.Context, req kmsg.Request) ([]Res if errIsFromResp { err = nil } - addShard(shard(broker, myIssue.req, resp, err)) // the error was not retryable + addShard(shard(br, myIssue.req, resp, err)) // the error was not retryable }() } } - issue(reqTry{0, req, nil}) + issue(reqTry{0, req, nil}, -1) wg.Wait() return shards, sharder.merge diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go index 1e09dadcb0..31a71a3d41 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/config.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/config.go @@ -663,7 +663,7 @@ func RequestTimeoutOverhead(overhead time.Duration) Opt { } // ConnIdleTimeout is a rough amount of time to allow connections to idle -// before they are closed, overriding the default 20. +// before they are closed, overriding the default 20s. // // In the worst case, a connection can be allowed to idle for up to 2x this // time, while the average is expected to be 1.5x (essentially, a uniform @@ -773,7 +773,7 @@ func RetryBackoffFn(backoff func(int) time.Duration) Opt { } // RequestRetries sets the number of tries that retryable requests are allowed, -// overriding the default of 20s. +// overriding the default of 20. // // This option does not apply to produce requests; to limit produce request // retries / record retries, see RecordRetries. diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/consumer.go b/vendor/github.com/twmb/franz-go/pkg/kgo/consumer.go index cb48bee33c..e73f5c20cd 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/consumer.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/consumer.go @@ -179,8 +179,8 @@ func (o Offset) At(at int64) Offset { } type consumer struct { - bufferedRecords atomicI64 - bufferedBytes atomicI64 + bufferedRecords atomic.Int64 + bufferedBytes atomic.Int64 cl *Client @@ -1503,7 +1503,7 @@ type consumerSession struct { desireFetchCh chan chan chan struct{} cancelFetchCh chan chan chan struct{} allowedFetches int - fetchManagerStarted atomicBool // atomic, once true, we start the fetch manager + fetchManagerStarted atomic.Bool // atomic, once true, we start the fetch manager // Workers signify the number of fetch and list / epoch goroutines that // are currently running within the context of this consumer session. diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go b/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go index 16dfe5b680..04e0b19511 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/consumer_group.go @@ -26,7 +26,7 @@ type groupConsumer struct { cancel func() manageDone chan struct{} // closed once when the manage goroutine quits - cooperative atomicBool // true if the group balancer chosen during Join is cooperative + cooperative atomic.Bool // true if the group balancer chosen during Join is cooperative // The data for topics that the user assigned. Metadata updates the // atomic.Value in each pointer atomically. @@ -93,7 +93,7 @@ type groupConsumer struct { // - set to false at the beginning of a join group session // - set to true if join group response indicates we are leader // - read on metadata updates in findNewAssignments - leader atomicBool + leader atomic.Bool // Set to true when ending a transaction committing transaction // offsets, and then set to false immediately after before calling @@ -1382,7 +1382,7 @@ func (s strptr) String() string { // the rejoin status. type groupExternal struct { tps atomic.Value // map[string]int32 - rejoin atomicBool + rejoin atomic.Bool } func (g *groupConsumer) loadExternal() *groupExternal { diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/go118.go b/vendor/github.com/twmb/franz-go/pkg/kgo/go118.go deleted file mode 100644 index 483c3e9127..0000000000 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/go118.go +++ /dev/null @@ -1,57 +0,0 @@ -//go:build !go1.19 -// +build !go1.19 - -package kgo - -import "sync/atomic" - -type atomicBool uint32 - -func (b *atomicBool) Store(v bool) { - if v { - atomic.StoreUint32((*uint32)(b), 1) - } else { - atomic.StoreUint32((*uint32)(b), 0) - } -} - -func (b *atomicBool) Load() bool { return atomic.LoadUint32((*uint32)(b)) == 1 } - -func (b *atomicBool) Swap(v bool) bool { - var swap uint32 - if v { - swap = 1 - } - return atomic.SwapUint32((*uint32)(b), swap) == 1 -} - -type atomicI32 int32 - -func (v *atomicI32) Add(s int32) int32 { return atomic.AddInt32((*int32)(v), s) } -func (v *atomicI32) Store(s int32) { atomic.StoreInt32((*int32)(v), s) } -func (v *atomicI32) Load() int32 { return atomic.LoadInt32((*int32)(v)) } -func (v *atomicI32) Swap(s int32) int32 { return atomic.SwapInt32((*int32)(v), s) } - -type atomicU32 uint32 - -func (v *atomicU32) Add(s uint32) uint32 { return atomic.AddUint32((*uint32)(v), s) } -func (v *atomicU32) Store(s uint32) { atomic.StoreUint32((*uint32)(v), s) } -func (v *atomicU32) Load() uint32 { return atomic.LoadUint32((*uint32)(v)) } -func (v *atomicU32) Swap(s uint32) uint32 { return atomic.SwapUint32((*uint32)(v), s) } -func (v *atomicU32) CompareAndSwap(old, new uint32) bool { - return atomic.CompareAndSwapUint32((*uint32)(v), old, new) -} - -type atomicI64 int64 - -func (v *atomicI64) Add(s int64) int64 { return atomic.AddInt64((*int64)(v), s) } -func (v *atomicI64) Store(s int64) { atomic.StoreInt64((*int64)(v), s) } -func (v *atomicI64) Load() int64 { return atomic.LoadInt64((*int64)(v)) } -func (v *atomicI64) Swap(s int64) int64 { return atomic.SwapInt64((*int64)(v), s) } - -type atomicU64 uint64 - -func (v *atomicU64) Add(s uint64) uint64 { return atomic.AddUint64((*uint64)(v), s) } -func (v *atomicU64) Store(s uint64) { atomic.StoreUint64((*uint64)(v), s) } -func (v *atomicU64) Load() uint64 { return atomic.LoadUint64((*uint64)(v)) } -func (v *atomicU64) Swap(s uint64) uint64 { return atomic.SwapUint64((*uint64)(v), s) } diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/go119.go b/vendor/github.com/twmb/franz-go/pkg/kgo/go119.go deleted file mode 100644 index 7c8ade5e13..0000000000 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/go119.go +++ /dev/null @@ -1,14 +0,0 @@ -//go:build go1.19 -// +build go1.19 - -package kgo - -import "sync/atomic" - -type ( - atomicBool struct{ atomic.Bool } - atomicI32 struct{ atomic.Int32 } - atomicU32 struct{ atomic.Uint32 } - atomicI64 struct{ atomic.Int64 } - atomicU64 struct{ atomic.Uint64 } -) diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/go121.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/go121.go deleted file mode 100644 index 3cf972b6ed..0000000000 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/go121.go +++ /dev/null @@ -1,28 +0,0 @@ -//go:build go1.21 -// +build go1.21 - -package sticky - -import "slices" - -func sortPartNums(ps memberPartitions) { - slices.Sort(ps) -} - -func (b *balancer) sortMemberByLiteralPartNum(memberNum int) { - partNums := b.plan[memberNum] - slices.SortFunc(partNums, func(lpNum, rpNum int32) int { - ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] - li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] - lt, rt := li.topic, ri.topic - lp, rp := lpNum-li.partNum, rpNum-ri.partNum - if lp < rp { - return -1 - } else if lp > rp { - return 1 - } else if lt < rt { - return -1 - } - return 1 - }) -} diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/goold.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/goold.go deleted file mode 100644 index addd2bbc19..0000000000 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/goold.go +++ /dev/null @@ -1,22 +0,0 @@ -//go:build !go1.21 -// +build !go1.21 - -package sticky - -import "sort" - -func sortPartNums(partNums memberPartitions) { - sort.Slice(partNums, func(i, j int) bool { return partNums[i] < partNums[j] }) -} - -func (b *balancer) sortMemberByLiteralPartNum(memberNum int) { - partNums := b.plan[memberNum] - sort.Slice(partNums, func(i, j int) bool { - lpNum, rpNum := partNums[i], partNums[j] - ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] - li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] - lt, rt := li.topic, ri.topic - lp, rp := lpNum-li.partNum, rpNum-ri.partNum - return lp < rp || (lp == rp && lt < rt) - }) -} diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/sticky.go b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/sticky.go index 47ae4fede7..e18ccb473a 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/sticky.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/internal/sticky/sticky.go @@ -138,7 +138,7 @@ func (b *balancer) into() Plan { // partOwners is created by topic, and partNums refers to // indices in partOwners. If we sort by partNum, we have sorted // topics and partitions. - sortPartNums(partNums) + slices.Sort(partNums) // We can reuse partNums for our topic partitions. topicParts := partNums[:0] @@ -391,6 +391,24 @@ func deserializeUserData(userdata []byte, base []topicPartition) (memberPlan []t return memberPlan, generation } +func (b *balancer) sortMemberByLiteralPartNum(memberNum int) { + partNums := b.plan[memberNum] + slices.SortFunc(partNums, func(lpNum, rpNum int32) int { + ltNum, rtNum := b.partOwners[lpNum], b.partOwners[rpNum] + li, ri := b.topicInfos[ltNum], b.topicInfos[rtNum] + lt, rt := li.topic, ri.topic + lp, rp := lpNum-li.partNum, rpNum-ri.partNum + if lp < rp { + return -1 + } else if lp > rp { + return 1 + } else if lt < rt { + return -1 + } + return 1 + }) +} + // assignUnassignedAndInitGraph is a long function that assigns unassigned // partitions to the least loaded members and initializes our steal graph. // diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go b/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go index dc7b535b14..08826dcb81 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/producer.go @@ -44,16 +44,16 @@ type producer struct { unknownTopics map[string]*unknownTopicProduces id atomic.Value - producingTxn atomicBool + producingTxn atomic.Bool // We must have a producer field for flushing; we cannot just have a // field on recBufs that is toggled on flush. If we did, then a new // recBuf could be created and records sent to while we are flushing. - flushing atomicI32 // >0 if flushing, can Flush many times concurrently - blocked atomicI32 // >0 if over max recs or bytes + flushing atomic.Int32 // >0 if flushing, can Flush many times concurrently + blocked atomic.Int32 // >0 if over max recs or bytes blockedBytes int64 - aborting atomicI32 // >0 if aborting, can abort many times concurrently + aborting atomic.Int32 // >0 if aborting, can abort many times concurrently idMu sync.Mutex idVersion int16 @@ -344,7 +344,7 @@ func (cl *Client) ProduceSync(ctx context.Context, rs ...*Record) ProduceResults // This is similar to using ProduceResult's FirstErr function. type FirstErrPromise struct { wg sync.WaitGroup - once atomicBool + once atomic.Bool err error cl *Client } diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/record_formatter.go b/vendor/github.com/twmb/franz-go/pkg/kgo/record_formatter.go index 9cef47e081..81d0eaba9c 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/record_formatter.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/record_formatter.go @@ -13,6 +13,7 @@ import ( "regexp" "strconv" "strings" + "sync/atomic" "time" "unicode/utf8" @@ -25,7 +26,7 @@ import ( // RecordFormatter formats records. type RecordFormatter struct { - calls atomicI64 + calls atomic.Int64 fns []func([]byte, *FetchPartition, *Record) []byte } diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/sink.go b/vendor/github.com/twmb/franz-go/pkg/kgo/sink.go index 4416459a33..212574340d 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/sink.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/sink.go @@ -26,7 +26,7 @@ type sink struct { // response, we check what version was set in the request. If it is at // least 4, which 1.0 introduced, we upgrade the sem size. inflightSem atomic.Value - produceVersion atomicI32 // negative is unset, positive is version + produceVersion atomic.Int32 // negative is unset, positive is version drainState workLoop @@ -43,7 +43,7 @@ type sink struct { // successful response. For simplicity, if we have a good response // following an error response before the error response's backoff // occurs, the backoff is not cleared. - consecutiveFailures atomicU32 + consecutiveFailures atomic.Uint32 recBufsMu sync.Mutex // guards the following recBufs []*recBuf // contains all partition records for batch building @@ -1252,11 +1252,11 @@ type recBuf struct { // addedToTxn, for transactions only, signifies whether this partition // has been added to the transaction yet or not. - addedToTxn atomicBool + addedToTxn atomic.Bool // For LoadTopicPartitioner partitioning; atomically tracks the number // of records buffered in total on this recBuf. - buffered atomicI64 + buffered atomic.Int64 mu sync.Mutex // guards r/w access to all fields below diff --git a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go index 1cf86ed8a0..058e1a551f 100644 --- a/vendor/github.com/twmb/franz-go/pkg/kgo/source.go +++ b/vendor/github.com/twmb/franz-go/pkg/kgo/source.go @@ -9,6 +9,7 @@ import ( "sort" "strings" "sync" + "sync/atomic" "time" "github.com/twmb/franz-go/pkg/kbin" @@ -98,7 +99,7 @@ type cursor struct { topicID [16]byte partition int32 - unknownIDFails atomicI32 + unknownIDFails atomic.Int32 keepControl bool // whether to keep control records @@ -133,7 +134,7 @@ type cursor struct { // // The used state is exclusively updated by either building a fetch // request or when the source is stopped. - useState atomicBool + useState atomic.Bool topicPartitionData // updated in metadata when session is stopped @@ -316,19 +317,19 @@ func (cs cursorPreferreds) String() string { for j, p := range ps { if j < len(ps)-1 { if p.ooor { - fmt.Fprintf(sb, "%d=>%d[ooor], ", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d[ooor], ", p.p, p.next) } else if p.recheck { - fmt.Fprintf(sb, "%d=>%d[recheck], ", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d[recheck], ", p.p, p.next) } else { - fmt.Fprintf(sb, "%d=>%d, ", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d, ", p.p, p.next) } } else { if p.ooor { - fmt.Fprintf(sb, "%d=>%d[ooor]", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d[ooor]", p.p, p.next) } else if p.recheck { - fmt.Fprintf(sb, "%d=>%d[recheck]", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d[recheck]", p.p, p.next) } else { - fmt.Fprintf(sb, "%d=>%d", p.p, p.next) + fmt.Fprintf(sb, "p%d=>b%d", p.p, p.next) } } } @@ -655,6 +656,25 @@ func (s *source) createReq() *fetchRequest { paused := s.cl.consumer.loadPaused() + // While building this request, if any cursor is follow fetching and it + // has been more than the recheck-if-we-should-still-follow interval, + // we skip the cursor and move it back to the leader. + // + // This is safe w.r.t. metadata updates because `createReq` is running + // in the context of a live consumer session. + var rechecks cursorPreferreds + defer func() { + if len(rechecks) > 0 { + s.cl.cfg.logger.Log(LogLevelInfo, "redirecting follower fetchers back to their leader to re-check if a new follower should be chosen", + "from_broker", s.nodeID, + "moves", rechecks.String(), + ) + for _, c := range rechecks { + c.move() + } + } + }() + s.cursorsMu.Lock() defer s.cursorsMu.Unlock() @@ -662,7 +682,18 @@ func (s *source) createReq() *fetchRequest { for range s.cursors { c := s.cursors[cursorIdx] cursorIdx = (cursorIdx + 1) % len(s.cursors) - if !c.usable() || paused.has(c.topic, c.partition) { + if !c.usable() { + continue + } + if s.nodeID != c.leader && c.moveAt > 0 && time.Since(time.Unix(0, c.moveAt)) > s.cl.cfg.recheckPreferredReplicaInterval { + rechecks = append(rechecks, cursorOffsetPreferred{ + cursorOffsetNext: *c.use(), + preferredReplica: c.leader, + recheck: true, + }) + continue + } + if paused.has(c.topic, c.partition) { continue } req.addCursor(c) @@ -899,20 +930,16 @@ func (s *source) fetch(consumerSession *consumerSession, doneFetch chan<- struct // Before updating the source, we move all cursors that have new // preferred replicas and remove them from being tracked in our req // offsets. We also remove the reload offsets from our req offsets. - // - // These two removals transition responsibility for finishing using the - // cursor from the request's used offsets to the new source or the - // reloading. if len(preferreds) > 0 { s.cl.cfg.logger.Log(LogLevelInfo, "fetch partitions returned preferred replicas", "from_broker", s.nodeID, "moves", preferreds.String(), ) + preferreds.eachPreferred(func(c cursorOffsetPreferred) { + c.move() + deleteReqUsedOffset(c.from.topic, c.from.partition) + }) } - preferreds.eachPreferred(func(c cursorOffsetPreferred) { - c.move() - deleteReqUsedOffset(c.from.topic, c.from.partition) - }) reloadOffsets.each(deleteReqUsedOffset) // The session on the request was updated; we keep those updates. @@ -1262,16 +1289,6 @@ func (s *source) handleReqResp(br *broker, req *fetchRequest, resp *kmsg.FetchRe if keep { fetchTopic.Partitions = append(fetchTopic.Partitions, fp) } - - if s.nodeID != c.leader && c.moveAt > 0 && time.Since(time.Unix(0, c.moveAt)) > s.cl.cfg.recheckPreferredReplicaInterval { - if len(preferreds) == 0 || preferreds[len(preferreds)-1].cursorOffsetNext != *partOffset { - preferreds = append(preferreds, cursorOffsetPreferred{ - cursorOffsetNext: *partOffset, - preferredReplica: c.leader, - recheck: true, - }) - } - } } if len(fetchTopic.Partitions) > 0 { diff --git a/vendor/modules.txt b/vendor/modules.txt index 4bd25380b1..67c5ed0ee3 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -1914,7 +1914,7 @@ github.com/tklauser/go-sysconf # github.com/tklauser/numcpus v0.10.0 ## explicit; go 1.23.0 github.com/tklauser/numcpus -# github.com/twmb/franz-go v1.20.2 +# github.com/twmb/franz-go v1.20.3 ## explicit; go 1.24.0 github.com/twmb/franz-go/pkg/kbin github.com/twmb/franz-go/pkg/kerr