|
|
|
|
@ -20,7 +20,8 @@ type ingestLimitsFrontendClient interface { |
|
|
|
|
ExceedsLimits(context.Context, *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// ingestLimitsFrontendRingClient uses the ring to query ingest-limits frontends.
|
|
|
|
|
// ingestLimitsFrontendRingClient uses the ring to discover ingest-limits-frontend
|
|
|
|
|
// instances and proxy requests to them.
|
|
|
|
|
type ingestLimitsFrontendRingClient struct { |
|
|
|
|
ring ring.ReadRing |
|
|
|
|
pool *ring_client.Pool |
|
|
|
|
@ -35,21 +36,33 @@ func newIngestLimitsFrontendRingClient(ring ring.ReadRing, pool *ring_client.Poo |
|
|
|
|
|
|
|
|
|
// Implements the ingestLimitsFrontendClient interface.
|
|
|
|
|
func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req *proto.ExceedsLimitsRequest) (*proto.ExceedsLimitsResponse, error) { |
|
|
|
|
var resp *proto.ExceedsLimitsResponse |
|
|
|
|
err := c.withRandomShuffle(ctx, func(ctx context.Context, client proto.IngestLimitsFrontendClient) error { |
|
|
|
|
var clientErr error |
|
|
|
|
resp, clientErr = client.ExceedsLimits(ctx, req) |
|
|
|
|
return clientErr |
|
|
|
|
}) |
|
|
|
|
return resp, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// withRandomShuffle gets all healthy frontends in the ring, randomly shuffles
|
|
|
|
|
// them, and then calls f.
|
|
|
|
|
func (c *ingestLimitsFrontendRingClient) withRandomShuffle(ctx context.Context, f func(ctx context.Context, client proto.IngestLimitsFrontendClient) error) error { |
|
|
|
|
rs, err := c.ring.GetAllHealthy(limits_frontend_client.LimitsRead) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, fmt.Errorf("failed to get limits-frontend instances from ring: %w", err) |
|
|
|
|
return fmt.Errorf("failed to get limits-frontend instances from ring: %w", err) |
|
|
|
|
} |
|
|
|
|
// Randomly shuffle instances to evenly distribute requests.
|
|
|
|
|
rand.Shuffle(len(rs.Instances), func(i, j int) { |
|
|
|
|
rs.Instances[i], rs.Instances[j] = rs.Instances[j], rs.Instances[i] |
|
|
|
|
}) |
|
|
|
|
var lastErr error |
|
|
|
|
// Send the request to the limits-frontend to see if it exceeds the tenant
|
|
|
|
|
// limits. If the RPC fails, failover to the next instance in the ring.
|
|
|
|
|
// Pass the instance to f. If it fails, failover to the next instance.
|
|
|
|
|
// Repeat until there are no more instances.
|
|
|
|
|
for _, instance := range rs.Instances { |
|
|
|
|
select { |
|
|
|
|
case <-ctx.Done(): |
|
|
|
|
return nil, ctx.Err() |
|
|
|
|
return ctx.Err() |
|
|
|
|
default: |
|
|
|
|
} |
|
|
|
|
c, err := c.pool.GetClientFor(instance.Addr) |
|
|
|
|
@ -58,14 +71,13 @@ func (c *ingestLimitsFrontendRingClient) ExceedsLimits(ctx context.Context, req |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
client := c.(proto.IngestLimitsFrontendClient) |
|
|
|
|
resp, err := client.ExceedsLimits(ctx, req) |
|
|
|
|
if err != nil { |
|
|
|
|
if err = f(ctx, client); err != nil { |
|
|
|
|
lastErr = err |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
return resp, nil |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
return nil, lastErr |
|
|
|
|
return lastErr |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type ingestLimits struct { |
|
|
|
|
|