|
|
|
@ -56,17 +56,17 @@ func (g *RingStreamUsageGatherer) forAllBackends(ctx context.Context, r GetStrea |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, replicaSet ring.ReplicationSet, r GetStreamUsageRequest) ([]GetStreamUsageResponse, error) { |
|
|
|
|
partitions, err := g.perReplicaSetPartitions(ctx, replicaSet) |
|
|
|
|
partitions, err := g.getConsumedPartitions(ctx, replicaSet) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
gr, ctx := errgroup.WithContext(ctx) |
|
|
|
|
errg, ctx := errgroup.WithContext(ctx) |
|
|
|
|
responses := make([]GetStreamUsageResponse, len(replicaSet.Instances)) |
|
|
|
|
|
|
|
|
|
// TODO: We shouldn't query all instances since we know which instance holds which stream.
|
|
|
|
|
for i, instance := range replicaSet.Instances { |
|
|
|
|
gr.Go(func() error { |
|
|
|
|
errg.Go(func() error { |
|
|
|
|
client, err := g.pool.GetClientFor(instance.Addr) |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
@ -86,22 +86,18 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, replic |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := gr.Wait(); err != nil { |
|
|
|
|
if err := errg.Wait(); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return responses, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) perReplicaSetPartitions(ctx context.Context, replicaSet ring.ReplicationSet) (map[string][]int32, error) { |
|
|
|
|
type getAssignedPartitionsResponse struct { |
|
|
|
|
Addr string |
|
|
|
|
Response *logproto.GetAssignedPartitionsResponse |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (g *RingStreamUsageGatherer) getConsumedPartitions(ctx context.Context, replicaSet ring.ReplicationSet) (map[string][]int32, error) { |
|
|
|
|
errg, ctx := errgroup.WithContext(ctx) |
|
|
|
|
responses := make([]getAssignedPartitionsResponse, len(replicaSet.Instances)) |
|
|
|
|
for i, instance := range replicaSet.Instances { |
|
|
|
|
responses := make(map[string]*logproto.GetAssignedPartitionsResponse) |
|
|
|
|
// Get the partitions assigned to each instance.
|
|
|
|
|
for _, instance := range replicaSet.Instances { |
|
|
|
|
errg.Go(func() error { |
|
|
|
|
client, err := g.pool.GetClientFor(instance.Addr) |
|
|
|
|
if err != nil { |
|
|
|
@ -111,40 +107,40 @@ func (g *RingStreamUsageGatherer) perReplicaSetPartitions(ctx context.Context, r |
|
|
|
|
if err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
responses[i] = getAssignedPartitionsResponse{Addr: instance.Addr, Response: resp} |
|
|
|
|
responses[instance.Addr] = resp |
|
|
|
|
return nil |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := errg.Wait(); err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
partitions := make(map[string][]int32) |
|
|
|
|
// Track highest value seen for each partition
|
|
|
|
|
highestValues := make(map[int32]int64) |
|
|
|
|
// Track which addr has the highest value for each partition
|
|
|
|
|
highestAddr := make(map[int32]string) |
|
|
|
|
|
|
|
|
|
// First pass - find highest values for each partition
|
|
|
|
|
for _, resp := range responses { |
|
|
|
|
for partition, value := range resp.Response.AssignedPartitions { |
|
|
|
|
if currentHighest, exists := highestValues[partition]; !exists || value > currentHighest { |
|
|
|
|
highestValues[partition] = value |
|
|
|
|
highestAddr[partition] = resp.Addr |
|
|
|
|
// Deduplicate the partitions. This can happen if the call to
|
|
|
|
|
// GetAssignedPartitions is interleaved with a partition rebalance, such
|
|
|
|
|
// that two or more instances claim to be the consumer of the same
|
|
|
|
|
// partition at the same time. In case of conflicts, choose the instance
|
|
|
|
|
// with the latest timestamp.
|
|
|
|
|
highestTimestamp := make(map[int32]int64) |
|
|
|
|
assigned := make(map[int32]string) |
|
|
|
|
for addr, resp := range responses { |
|
|
|
|
for partition, assignedAt := range resp.AssignedPartitions { |
|
|
|
|
if t := highestTimestamp[partition]; t < assignedAt { |
|
|
|
|
highestTimestamp[partition] = assignedAt |
|
|
|
|
assigned[partition] = addr |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Second pass - assign partitions to addrs that have the highest values
|
|
|
|
|
for partition, addr := range highestAddr { |
|
|
|
|
partitions[addr] = append(partitions[addr], partition) |
|
|
|
|
// Return a slice of partition IDs for each instance.
|
|
|
|
|
result := make(map[string][]int32) |
|
|
|
|
for partition, addr := range assigned { |
|
|
|
|
result[addr] = append(result[addr], partition) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// Sort partition IDs for each address for consistent ordering
|
|
|
|
|
for addr := range partitions { |
|
|
|
|
slices.Sort(partitions[addr]) |
|
|
|
|
// Sort the partition IDs.
|
|
|
|
|
for instance := range result { |
|
|
|
|
slices.Sort(result[instance]) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return partitions, nil |
|
|
|
|
return result, nil |
|
|
|
|
} |
|
|
|
|