|
|
|
@ -48,24 +48,24 @@ func (g *RingStreamUsageGatherer) GetStreamUsage(ctx context.Context, r GetStrea |
|
|
|
|
|
|
|
|
|
// TODO(grobinson): Need to rename this to something more accurate.
|
|
|
|
|
func (g *RingStreamUsageGatherer) forAllBackends(ctx context.Context, r GetStreamUsageRequest) ([]GetStreamUsageResponse, error) { |
|
|
|
|
replicaSet, err := g.ring.GetAllHealthy(LimitsRead) |
|
|
|
|
rs, err := g.ring.GetAllHealthy(LimitsRead) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
return g.forGivenReplicaSet(ctx, replicaSet, r) |
|
|
|
|
return g.forGivenReplicaSet(ctx, rs, r) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, replicaSet ring.ReplicationSet, r GetStreamUsageRequest) ([]GetStreamUsageResponse, error) { |
|
|
|
|
partitions, err := g.getConsumedPartitions(ctx, replicaSet) |
|
|
|
|
func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs ring.ReplicationSet, r GetStreamUsageRequest) ([]GetStreamUsageResponse, error) { |
|
|
|
|
partitions, err := g.getConsumedPartitions(ctx, rs) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
errg, ctx := errgroup.WithContext(ctx) |
|
|
|
|
responses := make([]GetStreamUsageResponse, len(replicaSet.Instances)) |
|
|
|
|
responses := make([]GetStreamUsageResponse, len(rs.Instances)) |
|
|
|
|
|
|
|
|
|
// TODO: We shouldn't query all instances since we know which instance holds which stream.
|
|
|
|
|
for i, instance := range replicaSet.Instances { |
|
|
|
|
for i, instance := range rs.Instances { |
|
|
|
|
errg.Go(func() error { |
|
|
|
|
client, err := g.pool.GetClientFor(instance.Addr) |
|
|
|
|
if err != nil { |
|
|
|
@ -93,11 +93,17 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, replic |
|
|
|
|
return responses, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) getConsumedPartitions(ctx context.Context, replicaSet ring.ReplicationSet) (map[string][]int32, error) { |
|
|
|
|
type getAssignedPartitionsResponse struct { |
|
|
|
|
Addr string |
|
|
|
|
Response *logproto.GetAssignedPartitionsResponse |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) getConsumedPartitions(ctx context.Context, rs ring.ReplicationSet) (map[string][]int32, error) { |
|
|
|
|
errg, ctx := errgroup.WithContext(ctx) |
|
|
|
|
responses := make(map[string]*logproto.GetAssignedPartitionsResponse) |
|
|
|
|
responses := make([]getAssignedPartitionsResponse, len(rs.Instances)) |
|
|
|
|
|
|
|
|
|
// Get the partitions assigned to each instance.
|
|
|
|
|
for _, instance := range replicaSet.Instances { |
|
|
|
|
for i, instance := range rs.Instances { |
|
|
|
|
errg.Go(func() error { |
|
|
|
|
client, err := g.pool.GetClientFor(instance.Addr) |
|
|
|
|
if err != nil { |
|
|
|
@ -107,7 +113,9 @@ func (g *RingStreamUsageGatherer) getConsumedPartitions(ctx context.Context, rep |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
responses[instance.Addr] = resp |
|
|
|
|
// No need for a mutex here as responses is a "Structured variable"
|
|
|
|
|
// as described in https://go.dev/ref/spec#Variables.
|
|
|
|
|
responses[i] = getAssignedPartitionsResponse{Addr: instance.Addr, Response: resp} |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
@ -122,11 +130,11 @@ func (g *RingStreamUsageGatherer) getConsumedPartitions(ctx context.Context, rep |
|
|
|
|
// with the latest timestamp.
|
|
|
|
|
highestTimestamp := make(map[int32]int64) |
|
|
|
|
assigned := make(map[int32]string) |
|
|
|
|
for addr, resp := range responses { |
|
|
|
|
for partition, assignedAt := range resp.AssignedPartitions { |
|
|
|
|
for _, resp := range responses { |
|
|
|
|
for partition, assignedAt := range resp.Response.AssignedPartitions { |
|
|
|
|
if t := highestTimestamp[partition]; t < assignedAt { |
|
|
|
|
highestTimestamp[partition] = assignedAt |
|
|
|
|
assigned[partition] = addr |
|
|
|
|
assigned[partition] = resp.Addr |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|