From bd99fa552ca85cc70a36d52c73783a2f4b5a9223 Mon Sep 17 00:00:00 2001 From: George Robinson Date: Tue, 18 Nov 2025 19:53:57 +0000 Subject: [PATCH] chore: abstract failover to withRandomShuffle (#19915) --- pkg/distributor/ingest_limits.go | 30 +++++++++++++++++++++--------- 1 file changed, 21 insertions(+), 9 deletions(-) diff --git a/pkg/distributor/ingest_limits.go b/pkg/distributor/ingest_limits.go index 957a949b6d..f60aefa24b 100644 --- a/pkg/distributor/ingest_limits.go +++ b/pkg/distributor/ingest_limits.go @@ -20,7 +20,8 @@ type ingestLimitsFrontendClient interface { ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) } -// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends. +// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend +// instances and proxy requests to them. type ingestLimitsFrontendRingClient struct { ring ring.ReadRing pool *ring_client.Pool @@ -35,21 +36,33 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo // Implements the ingestLimitsFrontendClient interface. func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { + var resp *proto.ExceedsLimitsResponse + err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error { + var clientErr error + resp, clientErr = client.ExceedsLimits(ctx, req) + return clientErr + }) + return resp, err +} + +// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles +// them, and then calls f. +func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error { rs, err := c.ring.GetAllHealthy(limits_frontend_client.LimitsRead) if err != nil { - return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err) + return fmt.Errorf("failed to get limits-frontend instances from ring: %w", err) } // Randomly shuffle instances to evenly distribute requests. rand.Shuffle(len(rs.Instances), func(i, j int) { rs.Instances[i], rs.Instances[j] = rs.Instances[j], rs.Instances[i] }) var lastErr error - // Send the request to the limits-frontend to see if it exceeds the tenant - // limits. If the RPC fails, failover to the next instance in the ring. + // Pass the instance to f. If it fails, failover to the next instance. + // Repeat until there are no more instances. for _, instance := range rs.Instances { select { case <-ctx.Done(): - return nil, ctx.Err() + return ctx.Err() default: } c, err := c.pool.GetClientFor(instance.Addr) @@ -58,14 +71,13 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req continue } client := c.(proto.IngestLimitsFrontendClient) - resp, err := client.ExceedsLimits(ctx, req) - if err != nil { + if err = f(ctx, client); err != nil { lastErr = err continue } - return resp, nil + return nil } - return nil, lastErr + return lastErr } type ingestLimits struct {