|
|
|
|
@ -277,7 +277,7 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { |
|
|
|
|
"b": {0: "instance-b-0"}, |
|
|
|
|
}, |
|
|
|
|
}, { |
|
|
|
|
name: "two zones, error in zone b", |
|
|
|
|
name: "two zones, instance in zone b returns an error", |
|
|
|
|
instances: []ring.InstanceDesc{{ |
|
|
|
|
Addr: "instance-a-0", |
|
|
|
|
Zone: "a", |
|
|
|
|
@ -297,6 +297,19 @@ func TestRingStreamUsageGatherer_GetZoneAwarePartitionConsumers(t *testing.T) { |
|
|
|
|
"a": {0: "instance-a-0", 1: "instance-a-0"}, |
|
|
|
|
"b": {}, |
|
|
|
|
}, |
|
|
|
|
}, { |
|
|
|
|
name: "two zones, all instances return an error", |
|
|
|
|
instances: []ring.InstanceDesc{{ |
|
|
|
|
Addr: "instance-a-0", |
|
|
|
|
Zone: "a", |
|
|
|
|
}, { |
|
|
|
|
Addr: "instance-b-0", |
|
|
|
|
Zone: "b", |
|
|
|
|
}}, |
|
|
|
|
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}}, |
|
|
|
|
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{{}, {}}, |
|
|
|
|
getAssignedPartitionsResponseErrs: []error{nil, nil, nil}, |
|
|
|
|
expected: map[string]map[int32]string{"a": {}, "b": {}}, |
|
|
|
|
}, { |
|
|
|
|
name: "two zones, different number of instances per zone", |
|
|
|
|
instances: []ring.InstanceDesc{{ |
|
|
|
|
@ -441,8 +454,8 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { |
|
|
|
|
0: "instance-1", |
|
|
|
|
}, |
|
|
|
|
}, { |
|
|
|
|
// This test asserts that even when one instance returns an error,
|
|
|
|
|
// we can still get the assigned partitions for all remaining instances.
|
|
|
|
|
// Even when one instance returns an error it should still return the
|
|
|
|
|
// partitions for all remaining instances.
|
|
|
|
|
name: "two instances, one returns error", |
|
|
|
|
instances: []ring.InstanceDesc{{ |
|
|
|
|
Addr: "instance-0", |
|
|
|
|
@ -454,9 +467,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { |
|
|
|
|
AssignedPartitions: map[int32]int64{ |
|
|
|
|
0: time.Now().Add(-time.Second).UnixNano(), |
|
|
|
|
}, |
|
|
|
|
}, { |
|
|
|
|
nil, |
|
|
|
|
}}, |
|
|
|
|
}, nil}, |
|
|
|
|
getAssignedPartitionsResponseErrs: []error{ |
|
|
|
|
nil, |
|
|
|
|
errors.New("an unexpected error occurred"), |
|
|
|
|
@ -464,6 +475,22 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { |
|
|
|
|
expected: map[int32]string{ |
|
|
|
|
0: "instance-0", |
|
|
|
|
}, |
|
|
|
|
}, { |
|
|
|
|
// Even when all instances return an error, it should not return an
|
|
|
|
|
// error.
|
|
|
|
|
name: "all instances return error", |
|
|
|
|
instances: []ring.InstanceDesc{{ |
|
|
|
|
Addr: "instance-0", |
|
|
|
|
}, { |
|
|
|
|
Addr: "instance-1", |
|
|
|
|
}}, |
|
|
|
|
expectedAssignedPartitionsRequests: []*logproto.GetAssignedPartitionsRequest{{}, {}}, |
|
|
|
|
getAssignedPartitionsResponses: []*logproto.GetAssignedPartitionsResponse{nil, nil}, |
|
|
|
|
getAssignedPartitionsResponseErrs: []error{ |
|
|
|
|
errors.New("an unexpected error occurred"), |
|
|
|
|
errors.New("an unexpected error occurred"), |
|
|
|
|
}, |
|
|
|
|
expected: map[int32]string{}, |
|
|
|
|
}} |
|
|
|
|
|
|
|
|
|
for _, test := range tests { |
|
|
|
|
@ -488,7 +515,6 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { |
|
|
|
|
// Set up the mocked ring and client pool for the tests.
|
|
|
|
|
readRing, clientPool := newMockRingWithClientPool(t, "test", mockClients, test.instances) |
|
|
|
|
cache := NewNopCache[string, *logproto.GetAssignedPartitionsResponse]() |
|
|
|
|
|
|
|
|
|
g := NewRingStreamUsageGatherer(readRing, clientPool, 1, cache, log.NewNopLogger()) |
|
|
|
|
|
|
|
|
|
// Set a maximum upper bound on the test execution time.
|
|
|
|
|
@ -502,7 +528,7 @@ func TestRingStreamUsageGatherer_GetPartitionConsumers(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestRingStreamUsageGatherer_GetPartitionConsumers_CacheHitsAndMisses(t *testing.T) { |
|
|
|
|
func TestRingStreamUsageGatherer_GetPartitionConsumers_IsCached(t *testing.T) { |
|
|
|
|
// Set up the mock clients, one for each pair of mock RPC responses.
|
|
|
|
|
client0 := mockIngestLimitsClient{ |
|
|
|
|
t: t, |
|
|
|
|
|