diff --git a/pkg/limits/frontend/ring.go b/pkg/limits/frontend/ring.go index 0363bddd5e..3f298fd9d0 100644 --- a/pkg/limits/frontend/ring.go +++ b/pkg/limits/frontend/ring.go @@ -119,23 +119,81 @@ func (g *RingStreamUsageGatherer) forGivenReplicaSet(ctx context.Context, rs rin return responses, nil } +type zonePartitionConsumersResult struct { + zone string + partitions map[int32]string +} + +// getZoneAwarePartitionConsumers returns partition consumers for each zone +// in the replication set. If a zone has no active partition consumers, the +// zone will still be returned but its partition consumers will be nil. +// If ZoneAwarenessEnabled is false, it returns all partition consumers under +// a psuedo-zone (""). +func (g *RingStreamUsageGatherer) getZoneAwarePartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[string]map[int32]string, error) { + zoneDescs := make(map[string][]ring.InstanceDesc) + for _, instance := range instances { + zoneDescs[instance.Zone] = append(zoneDescs[instance.Zone], instance) + } + // Get the partition consumers for each zone. + resultsCh := make(chan zonePartitionConsumersResult, len(zoneDescs)) + errg, ctx := errgroup.WithContext(ctx) + for zone, instances := range zoneDescs { + errg.Go(func() error { + res, err := g.getPartitionConsumers(ctx, instances) + if err != nil { + level.Error(g.logger).Log("msg", "failed to get partition consumers for zone", "zone", zone, "err", err.Error()) + } + // If the consumers could not be fetched for a zone, then it is + // expected partitionConsumers is nil. + resultsCh <- zonePartitionConsumersResult{ + zone: zone, + partitions: res, + } + return nil + }) + } + errg.Wait() //nolint + close(resultsCh) + results := make(map[string]map[int32]string) + for result := range resultsCh { + results[result.zone] = result.partitions + } + return results, nil +} + type getAssignedPartitionsResponse struct { - Addr string - Response *logproto.GetAssignedPartitionsResponse + addr string + response *logproto.GetAssignedPartitionsResponse } +// getPartitionConsumers returns the consumer for each partition. + +// In some cases, it might not be possible to know the consumer for a +// partition. If this happens, it returns the consumers for a subset of +// partitions that it does know about. +// +// For example, if a partition does not have a consumer then the partition +// will be absent from the result. Likewise, if an instance does not respond, +// the partition that it consumes will be absent from the result too. This +// also means that if no partitions are assigned consumers, or if no instances +// respond, the result will be empty. +// +// This method is not zone-aware, so if ZoneAwarenessEnabled is true, it +// should be called once for each zone, and instances should be filtered to +// the respective zone. Alternatively, you can pass all instances for all zones +// to find the most up to date consumer for each partition across all zones. func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, instances []ring.InstanceDesc) (map[int32]string, error) { errg, ctx := errgroup.WithContext(ctx) - responses := make(chan getAssignedPartitionsResponse, len(instances)) + responseCh := make(chan getAssignedPartitionsResponse, len(instances)) for _, instance := range instances { errg.Go(func() error { // We use a cache to eliminate redundant gRPC requests for // GetAssignedPartitions as the set of assigned partitions is // expected to be stable outside consumer rebalances. if resp, ok := g.assignedPartitionsCache.Get(instance.Addr); ok { - responses <- getAssignedPartitionsResponse{ - Addr: instance.Addr, - Response: resp, + responseCh <- getAssignedPartitionsResponse{ + addr: instance.Addr, + response: resp, } return nil } @@ -150,9 +208,9 @@ func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, ins return nil } g.assignedPartitionsCache.Set(instance.Addr, resp) - responses <- getAssignedPartitionsResponse{ - Addr: instance.Addr, - Response: resp, + responseCh <- getAssignedPartitionsResponse{ + addr: instance.Addr, + response: resp, } return nil }) @@ -160,14 +218,14 @@ func (g *RingStreamUsageGatherer) getPartitionConsumers(ctx context.Context, ins if err := errg.Wait(); err != nil { return nil, err } - close(responses) + close(responseCh) highestTimestamp := make(map[int32]int64) assigned := make(map[int32]string) - for resp := range responses { - for partition, assignedAt := range resp.Response.AssignedPartitions { + for resp := range responseCh { + for partition, assignedAt := range resp.response.AssignedPartitions { if t := highestTimestamp[partition]; t < assignedAt { highestTimestamp[partition] = assignedAt - assigned[partition] = resp.Addr + assigned[partition] = resp.addr } } } diff --git a/pkg/limits/frontend/ring_test.go b/pkg/limits/frontend/ring_test.go index eced6cb858..b4e30bafb6 100644 --- a/pkg/limits/frontend/ring_test.go +++ b/pkg/limits/frontend/ring_test.go @@ -205,6 +205,167 @@ func TestRingStreamUsageGatherer_GetStreamUsage(t *testing.T) { } } +func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { + tests := []struct { + name string + instances []ring.InstanceDesc + expectedAssignedPartitionsRequests []*logproto.GetAssignedPartitionsRequest + getAssignedPartitionsResponses []*logproto.GetAssignedPartitionsResponse + getAssignedPartitionsResponseErrs []error + expected map[string]map[int32]string + }{{ + name: "single zone", + instances: []ring.InstanceDesc{{ + Addr: "instance-a-0", + Zone: "a", + }}, + expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}}, + getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{ + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + }, + }}, + getAssignedPartitionsResponseErrs: []error{nil}, + expected: map[string]map[int32]string{"a": {0: "instance-a-0"}}, + }, { + name: "two zones", + instances: []ring.InstanceDesc{{ + Addr: "instance-a-0", + Zone: "a", + }, { + Addr: "instance-b-0", + Zone: "b", + }}, + expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}}, + getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{ + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + }, + }, { + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + }, + }}, + getAssignedPartitionsResponseErrs: []error{nil, nil}, + expected: map[string]map[int32]string{ + "a": {0: "instance-a-0"}, + "b": {0: "instance-b-0"}, + }, + }, { + name: "two zones, subset of partitions in zone b", + instances: []ring.InstanceDesc{{ + Addr: "instance-a-0", + Zone: "a", + }, { + Addr: "instance-b-0", + Zone: "b", + }}, + expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}}, + getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{ + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + 1: time.Now().UnixNano(), + }, + }, { + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + }, + }}, + getAssignedPartitionsResponseErrs: []error{nil, nil}, + expected: map[string]map[int32]string{ + "a": {0: "instance-a-0", 1: "instance-a-0"}, + "b": {0: "instance-b-0"}, + }, + }, { + name: "two zones, error in zone b", + instances: []ring.InstanceDesc{{ + Addr: "instance-a-0", + Zone: "a", + }, { + Addr: "instance-b-0", + Zone: "b", + }}, + expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}}, + getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{ + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + 1: time.Now().UnixNano(), + }, + }, nil}, + getAssignedPartitionsResponseErrs: []error{nil, errors.New("an unexpected error occurred")}, + expected: map[string]map[int32]string{ + "a": {0: "instance-a-0", 1: "instance-a-0"}, + "b": {}, + }, + }, { + name: "two zones, different number of instances per zone", + instances: []ring.InstanceDesc{{ + Addr: "instance-a-0", + Zone: "a", + }, { + Addr: "instance-a-1", + Zone: "a", + }, { + Addr: "instance-b-0", + Zone: "b", + }}, + expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}, {}}, + getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{ + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + }, + }, { + AssignedPartitions: map[int32]int64{ + 1: time.Now().UnixNano(), + }, + }, { + AssignedPartitions: map[int32]int64{ + 0: time.Now().UnixNano(), + 1: time.Now().UnixNano(), + }, + }}, + getAssignedPartitionsResponseErrs: []error{nil, nil, nil}, + expected: map[string]map[int32]string{ + "a": {0: "instance-a-0", 1: "instance-a-1"}, + "b": {0: "instance-b-0", 1: "instance-b-0"}, + }, + }} + + for _, test := range tests { + t.Run(test.name, func(t *testing.T) { + // Set up the mock clients, one for each pair of mock RPC responses. + clients := make([]*mockIngestLimitsClient, len(test.instances)) + for i := range test.instances { + // These test cases assume one request/response per instance. + expectedNumAssignedPartitionsRequests := 0 + if test.expectedAssignedPartitionsRequests[i] != nil { + expectedNumAssignedPartitionsRequests = 1 + } + clients[i] = &mockIngestLimitsClient{ + t: t, + expectedAssignedPartitionsRequest: test.expectedAssignedPartitionsRequests[i], + getAssignedPartitionsResponse: test.getAssignedPartitionsResponses[i], + getAssignedPartitionsResponseErr: test.getAssignedPartitionsResponseErrs[i], + expectedNumAssignedPartitionsRequests: expectedNumAssignedPartitionsRequests, + } + t.Cleanup(clients[i].AssertExpectedNumRequests) + } + // Set up the mocked ring and client pool for the tests. + readRing, clientPool := newMockRingWithClientPool(t, "test", clients, test.instances) + cache := NewNopCache[string, *logproto.GetAssignedPartitionsResponse]() + g := NewRingStreamUsageGatherer(readRing, clientPool, 2, cache, log.NewNopLogger()) + + // Set a maximum upper bound on the test execution time. + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + result, err := g.getZoneAwarePartitionConsumers(ctx, test.instances) + require.NoError(t, err) + require.Equal(t, test.expected, result) + }) + } +} + func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { tests := []struct { name string