|
|
|
@ -316,102 +316,6 @@ func TestSampleDelivery(t *testing.T) { |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type perRequestWriteClient struct { |
|
|
|
|
*TestWriteClient |
|
|
|
|
|
|
|
|
|
expectUnorderedRequests bool |
|
|
|
|
|
|
|
|
|
mtx sync.Mutex |
|
|
|
|
|
|
|
|
|
i int |
|
|
|
|
requests []*TestWriteClient |
|
|
|
|
expectedSeries []record.RefSeries |
|
|
|
|
expectedRequestSamples [][]record.RefSample |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func newPerRequestWriteClient(expectUnorderedRequests bool) *perRequestWriteClient { |
|
|
|
|
return &perRequestWriteClient{ |
|
|
|
|
expectUnorderedRequests: expectUnorderedRequests, |
|
|
|
|
TestWriteClient: NewTestWriteClient(Version2), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *perRequestWriteClient) expectRequestSamples(ss []record.RefSample, series []record.RefSeries) { |
|
|
|
|
tc := NewTestWriteClient(Version2) |
|
|
|
|
c.requests = append(c.requests, tc) |
|
|
|
|
|
|
|
|
|
c.expectedSeries = series |
|
|
|
|
c.expectedRequestSamples = append(c.expectedRequestSamples, ss) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *perRequestWriteClient) expectedData(t testing.TB) { |
|
|
|
|
t.Helper() |
|
|
|
|
|
|
|
|
|
c.mtx.Lock() |
|
|
|
|
defer c.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
c.TestWriteClient.mtx.Lock() |
|
|
|
|
exp := 0 |
|
|
|
|
for _, ss := range c.expectedRequestSamples { |
|
|
|
|
exp += len(ss) |
|
|
|
|
} |
|
|
|
|
got := deepLen(c.TestWriteClient.receivedSamples) |
|
|
|
|
c.TestWriteClient.mtx.Unlock() |
|
|
|
|
|
|
|
|
|
if got < exp { |
|
|
|
|
t.Errorf("totally expected %v samples, got %v", exp, got) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i, cl := range c.requests { |
|
|
|
|
cl.waitForExpectedData(t, 0*time.Second) // We already waited.
|
|
|
|
|
t.Log("client", i, "checked") |
|
|
|
|
} |
|
|
|
|
if c.i != len(c.requests) { |
|
|
|
|
t.Errorf("expected %v calls, got %v", len(c.requests), c.i) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (c *perRequestWriteClient) Store(ctx context.Context, req []byte, r int, rwFormat config.RemoteWriteFormat, compression string) error { |
|
|
|
|
c.mtx.Lock() |
|
|
|
|
defer c.mtx.Unlock() |
|
|
|
|
defer func() { c.i++ }() |
|
|
|
|
if c.i >= len(c.requests) { |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if err := c.TestWriteClient.Store(ctx, req, r, rwFormat, compression); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
expReqSampleToUse := 0 |
|
|
|
|
if c.expectUnorderedRequests { |
|
|
|
|
// expectUnorderedRequests tells us that multiple shards were used by queue manager,
|
|
|
|
|
// so we can't trust that incoming requests will match order of c.expectedRequestSamples
|
|
|
|
|
// slice. However, for successful test case we can assume that first sample value will
|
|
|
|
|
// match, so find such expected request if any.
|
|
|
|
|
// NOTE: This assumes sample values have unique values in our tests.
|
|
|
|
|
for i, es := range c.expectedRequestSamples { |
|
|
|
|
if len(es) == 0 { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
for _, rs := range c.TestWriteClient.receivedSamples { |
|
|
|
|
if len(rs) == 0 { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
if es[0].V != rs[0].GetValue() { |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
expReqSampleToUse = i |
|
|
|
|
break |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
// We tried our best, use normal flow otherwise.
|
|
|
|
|
} |
|
|
|
|
c.requests[c.i].expectSamples(c.expectedRequestSamples[expReqSampleToUse], c.expectedSeries) |
|
|
|
|
c.expectedRequestSamples = append(c.expectedRequestSamples[:expReqSampleToUse], c.expectedRequestSamples[expReqSampleToUse+1:]...) |
|
|
|
|
return c.requests[c.i].Store(ctx, req, r, rwFormat, compression) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func testDefaultQueueConfig() config.QueueConfig { |
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
|
// For faster unit tests we don't wait default 5 seconds.
|
|
|
|
@ -419,78 +323,6 @@ func testDefaultQueueConfig() config.QueueConfig { |
|
|
|
|
return cfg |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// TestHistogramSampleBatching tests current way of how classic histogram series
|
|
|
|
|
// are grouped in queue manager.
|
|
|
|
|
// This is a first step of exploring PRW 2.0 self-contained classic histograms.
|
|
|
|
|
func TestHistogramSampleBatching(t *testing.T) { |
|
|
|
|
t.Parallel() |
|
|
|
|
|
|
|
|
|
series, samples := createTestClassicHistogram(10) |
|
|
|
|
|
|
|
|
|
for _, tc := range []struct { |
|
|
|
|
name string |
|
|
|
|
queueConfig config.QueueConfig |
|
|
|
|
expRequestSamples [][]record.RefSample |
|
|
|
|
}{ |
|
|
|
|
{ |
|
|
|
|
name: "OneShardDefaultBatch", |
|
|
|
|
queueConfig: func() config.QueueConfig { |
|
|
|
|
cfg := testDefaultQueueConfig() |
|
|
|
|
cfg.MaxShards = 1 |
|
|
|
|
cfg.MinShards = 1 |
|
|
|
|
return cfg |
|
|
|
|
}(), |
|
|
|
|
expRequestSamples: [][]record.RefSample{samples}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
name: "OneShardLimitedBatch", |
|
|
|
|
queueConfig: func() config.QueueConfig { |
|
|
|
|
cfg := testDefaultQueueConfig() |
|
|
|
|
cfg.MaxShards = 1 |
|
|
|
|
cfg.MinShards = 1 |
|
|
|
|
cfg.MaxSamplesPerSend = 5 |
|
|
|
|
return cfg |
|
|
|
|
}(), |
|
|
|
|
expRequestSamples: [][]record.RefSample{ |
|
|
|
|
samples[:5], samples[5:10], samples[10:], |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
{ |
|
|
|
|
name: "TwoShards", |
|
|
|
|
queueConfig: func() config.QueueConfig { |
|
|
|
|
cfg := testDefaultQueueConfig() |
|
|
|
|
cfg.MaxShards = 2 |
|
|
|
|
cfg.MinShards = 2 |
|
|
|
|
return cfg |
|
|
|
|
}(), |
|
|
|
|
expRequestSamples: [][]record.RefSample{ |
|
|
|
|
{samples[0], samples[2], samples[4], samples[6], samples[8], samples[10]}, |
|
|
|
|
{samples[1], samples[3], samples[5], samples[7], samples[9], samples[11]}, |
|
|
|
|
}, |
|
|
|
|
}, |
|
|
|
|
} { |
|
|
|
|
t.Run(tc.name, func(t *testing.T) { |
|
|
|
|
c := newPerRequestWriteClient(tc.queueConfig.MaxShards > 1) |
|
|
|
|
|
|
|
|
|
for _, s := range tc.expRequestSamples { |
|
|
|
|
c.expectRequestSamples(s, series) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
dir := t.TempDir() |
|
|
|
|
mcfg := config.DefaultMetadataConfig |
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), tc.queueConfig, mcfg, labels.EmptyLabels(), nil, c, defaultFlushDeadline, newPool(), newHighestTimestampMetric(), nil, false, false, Version2) |
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
m.Start() |
|
|
|
|
m.Append(samples) |
|
|
|
|
m.Stop() |
|
|
|
|
c.expectedData(t) |
|
|
|
|
}) |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func TestMetadataDelivery(t *testing.T) { |
|
|
|
|
c := NewTestWriteClient(Version1) |
|
|
|
|
|
|
|
|
@ -1118,41 +950,6 @@ func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata { |
|
|
|
|
return metas |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func createTestClassicHistogram(buckets int) ([]record.RefSeries, []record.RefSample) { |
|
|
|
|
samples := make([]record.RefSample, buckets+2) |
|
|
|
|
series := make([]record.RefSeries, buckets+2) |
|
|
|
|
|
|
|
|
|
for i := range samples { |
|
|
|
|
samples[i] = record.RefSample{ |
|
|
|
|
Ref: chunks.HeadSeriesRef(i), T: int64(i), V: float64(i), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for i := 0; i < buckets; i++ { |
|
|
|
|
le := fmt.Sprintf("%v", i) |
|
|
|
|
if i == 0 { |
|
|
|
|
le = "+Inf" |
|
|
|
|
} |
|
|
|
|
series[i] = record.RefSeries{ |
|
|
|
|
Ref: chunks.HeadSeriesRef(i), |
|
|
|
|
Labels: labels.FromStrings( |
|
|
|
|
"__name__", "http_request_duration_seconds_bucket", |
|
|
|
|
"le", le, |
|
|
|
|
), |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
series[buckets] = record.RefSeries{ |
|
|
|
|
Ref: chunks.HeadSeriesRef(buckets), |
|
|
|
|
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_sum"), |
|
|
|
|
} |
|
|
|
|
series[buckets+1] = record.RefSeries{ |
|
|
|
|
Ref: chunks.HeadSeriesRef(buckets + 1), |
|
|
|
|
Labels: labels.FromStrings("__name__", "http_request_duration_seconds_count"), |
|
|
|
|
} |
|
|
|
|
return series, samples |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getSeriesNameFromRef(r record.RefSeries) string { |
|
|
|
|
return r.Labels.Get("__name__") |
|
|
|
|
} |
|
|
|
|