feat: add method to get zone-aware partition consumers (#17377)

pull/17075/head^2
George Robinson 9 months ago committed by GitHub
parent 6aed2ba812
commit 4a733fcf7d
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 84
      pkg/limits/frontend/ring.go
  2. 161
      pkg/limits/frontend/ring_test.go

@ -119,23 +119,81 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs rin
return responses, nil
}
type zonePartitionConsumersResult struct {
zone string
partitions map[int32]string
}
// getZoneAwarePartitionConsumers returns partition consumers for each zone
// in the replication set. If a zone has no active partition consumers, the
// zone will still be returned but its partition consumers will be nil.
// If ZoneAwarenessEnabled is false, it returns all partition consumers under
// a psuedo-zone ("").
func (g *RingStreamUsageGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) {
zoneDescs := make(map[string][]ring.InstanceDesc)
for _, instance := range instances {
zoneDescs[instance.Zone] = append(zoneDescs[instance.Zone], instance)
}
// Get the partition consumers for each zone.
resultsCh := make(chan zonePartitionConsumersResult, len(zoneDescs))
errg, ctx := errgroup.WithContext(ctx)
for zone, instances := range zoneDescs {
errg.Go(func() error {
res, err := g.getPartitionConsumers(ctx, instances)
if err != nil {
level.Error(g.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error())
}
// If the consumers could not be fetched for a zone, then it is
// expected partitionConsumers is nil.
resultsCh <- zonePartitionConsumersResult{
zone: zone,
partitions: res,
}
return nil
})
}
errg.Wait() //nolint
close(resultsCh)
results := make(map[string]map[int32]string)
for result := range resultsCh {
results[result.zone] = result.partitions
}
return results, nil
}
type getAssignedPartitionsResponse struct {
Addr string
Response *logproto.GetAssignedPartitionsResponse
addr string
response *logproto.GetAssignedPartitionsResponse
}
// getPartitionConsumers returns the consumer for each partition.
// In some cases, it might not be possible to know the consumer for a
// partition. If this happens, it returns the consumers for a subset of
// partitions that it does know about.
//
// For example, if a partition does not have a consumer then the partition
// will be absent from the result. Likewise, if an instance does not respond,
// the partition that it consumes will be absent from the result too. This
// also means that if no partitions are assigned consumers, or if no instances
// respond, the result will be empty.
//
// This method is not zone-aware, so if ZoneAwarenessEnabled is true, it
// should be called once for each zone, and instances should be filtered to
// the respective zone. Alternatively, you can pass all instances for all zones
// to find the most up to date consumer for each partition across all zones.
func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) {
errg, ctx := errgroup.WithContext(ctx)
responses := make(chan getAssignedPartitionsResponse, len(instances))
responseCh := make(chan getAssignedPartitionsResponse, len(instances))
for _, instance := range instances {
errg.Go(func() error {
// We use a cache to eliminate redundant gRPC requests for
// GetAssignedPartitions as the set of assigned partitions is
// expected to be stable outside consumer rebalances.
if resp, ok := g.assignedPartitionsCache.Get(instance.Addr); ok {
responses <- getAssignedPartitionsResponse{
Addr: instance.Addr,
Response: resp,
responseCh <- getAssignedPartitionsResponse{
addr: instance.Addr,
response: resp,
}
return nil
}
@ -150,9 +208,9 @@ func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, ins
return nil
}
g.assignedPartitionsCache.Set(instance.Addr, resp)
responses <- getAssignedPartitionsResponse{
Addr: instance.Addr,
Response: resp,
responseCh <- getAssignedPartitionsResponse{
addr: instance.Addr,
response: resp,
}
return nil
})
@ -160,14 +218,14 @@ func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, ins
if err := errg.Wait(); err != nil {
return nil, err
}
close(responses)
close(responseCh)
highestTimestamp := make(map[int32]int64)
assigned := make(map[int32]string)
for resp := range responses {
for partition, assignedAt := range resp.Response.AssignedPartitions {
for resp := range responseCh {
for partition, assignedAt := range resp.response.AssignedPartitions {
if t := highestTimestamp[partition]; t < assignedAt {
highestTimestamp[partition] = assignedAt
assigned[partition] = resp.Addr
assigned[partition] = resp.addr
}
}
}

@ -205,6 +205,167 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) {
}
}
func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) {
tests := []struct {
name string
instances []ring.InstanceDesc
expectedAssignedPartitionsRequests []*logproto.GetAssignedPartitionsRequest
getAssignedPartitionsResponses []*logproto.GetAssignedPartitionsResponse
getAssignedPartitionsResponseErrs []error
expected map[string]map[int32]string
}{{
name: "single zone",
instances: []ring.InstanceDesc{{
Addr: "instance-a-0",
Zone: "a",
}},
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}},
getAssignedPartitionsResponseErrs: []error{nil},
expected: map[string]map[int32]string{"a": {0: "instance-a-0"}},
}, {
name: "two zones",
instances: []ring.InstanceDesc{{
Addr: "instance-a-0",
Zone: "a",
}, {
Addr: "instance-b-0",
Zone: "b",
}},
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}, {
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}},
getAssignedPartitionsResponseErrs: []error{nil, nil},
expected: map[string]map[int32]string{
"a": {0: "instance-a-0"},
"b": {0: "instance-b-0"},
},
}, {
name: "two zones, subset of partitions in zone b",
instances: []ring.InstanceDesc{{
Addr: "instance-a-0",
Zone: "a",
}, {
Addr: "instance-b-0",
Zone: "b",
}},
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
1: time.Now().UnixNano(),
},
}, {
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}},
getAssignedPartitionsResponseErrs: []error{nil, nil},
expected: map[string]map[int32]string{
"a": {0: "instance-a-0", 1: "instance-a-0"},
"b": {0: "instance-b-0"},
},
}, {
name: "two zones, error in zone b",
instances: []ring.InstanceDesc{{
Addr: "instance-a-0",
Zone: "a",
}, {
Addr: "instance-b-0",
Zone: "b",
}},
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
1: time.Now().UnixNano(),
},
}, nil},
getAssignedPartitionsResponseErrs: []error{nil, errors.New("an unexpected error occurred")},
expected: map[string]map[int32]string{
"a": {0: "instance-a-0", 1: "instance-a-0"},
"b": {},
},
}, {
name: "two zones, different number of instances per zone",
instances: []ring.InstanceDesc{{
Addr: "instance-a-0",
Zone: "a",
}, {
Addr: "instance-a-1",
Zone: "a",
}, {
Addr: "instance-b-0",
Zone: "b",
}},
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}, {}},
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
},
}, {
AssignedPartitions: map[int32]int64{
1: time.Now().UnixNano(),
},
}, {
AssignedPartitions: map[int32]int64{
0: time.Now().UnixNano(),
1: time.Now().UnixNano(),
},
}},
getAssignedPartitionsResponseErrs: []error{nil, nil, nil},
expected: map[string]map[int32]string{
"a": {0: "instance-a-0", 1: "instance-a-1"},
"b": {0: "instance-b-0", 1: "instance-b-0"},
},
}}
for _, test := range tests {
t.Run(test.name, func(t *testing.T) {
// Set up the mock clients, one for each pair of mock RPC responses.
clients := make([]*mockIngestLimitsClient, len(test.instances))
for i := range test.instances {
// These test cases assume one request/response per instance.
expectedNumAssignedPartitionsRequests := 0
if test.expectedAssignedPartitionsRequests[i] != nil {
expectedNumAssignedPartitionsRequests = 1
}
clients[i] = &mockIngestLimitsClient{
t: t,
expectedAssignedPartitionsRequest: test.expectedAssignedPartitionsRequests[i],
getAssignedPartitionsResponse: test.getAssignedPartitionsResponses[i],
getAssignedPartitionsResponseErr: test.getAssignedPartitionsResponseErrs[i],
expectedNumAssignedPartitionsRequests: expectedNumAssignedPartitionsRequests,
}
t.Cleanup(clients[i].AssertExpectedNumRequests)
}
// Set up the mocked ring and client pool for the tests.
readRing, clientPool := newMockRingWithClientPool(t, "test", clients, test.instances)
cache := NewNopCache[string, *logproto.GetAssignedPartitionsResponse]()
g := NewRingStreamUsageGatherer(readRing, clientPool, 2, cache, log.NewNopLogger())
// Set a maximum upper bound on the test execution time.
ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
defer cancel()
result, err := g.getZoneAwarePartitionConsumers(ctx, test.instances)
require.NoError(t, err)
require.Equal(t, test.expected, result)
})
}
}
func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) {
tests := []struct {
name string

Loading…
Cancel
Save