tests: reinforce and optimize queue_manager_test createTimeSeries (#18179)

* tests: fix createTimeSeries so it does not create unnecessary load

16M samples 4k series

Signed-off-by: bwplotka <bwplotka@gmail.com>

* addressed comments

Signed-off-by: bwplotka <bwplotka@gmail.com>

* Apply suggestions from code review

Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>

---------

Signed-off-by: bwplotka <bwplotka@gmail.com>
Signed-off-by: Bartlomiej Plotka <bwplotka@gmail.com>
Co-authored-by: George Krajcsovits <krajorama@users.noreply.github.com>
pull/18195/head^2
Bartlomiej Plotka 2 months ago committed by GitHub
parent c317f9254e
commit 333e0dc188
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 674
      storage/remote/queue_manager_test.go

@ -27,7 +27,6 @@ import (
"time"
"github.com/gogo/protobuf/proto"
"github.com/google/go-cmp/cmp"
remoteapi "github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/client_golang/prometheus"
client_testutil "github.com/prometheus/client_golang/prometheus/testutil"
@ -69,6 +68,7 @@ func newHighestTimestampMetric() *maxTimestamp {
func TestBasicContentNegotiation(t *testing.T) {
t.Parallel()
queueConfig := config.DefaultQueueConfig
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond)
queueConfig.MaxShards = 1
@ -139,20 +139,8 @@ func TestBasicContentNegotiation(t *testing.T) {
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
var (
series []record.RefSeries
metadata []record.RefMetadata
samples []record.RefSample
)
// Generates same series in both cases.
samples, series = createTimeseries(1, 1)
metadata = createSeriesMetadata(series)
recs := generateRecords(recCase{series: 1, samplesPerSeries: 1})
// Apply new config.
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples)
// For now we only ever have a single rw config in this test.
conf.RemoteWriteConfigs[0].ProtobufMessage = tc.senderProtoMsg
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
@ -163,18 +151,18 @@ func TestBasicContentNegotiation(t *testing.T) {
c.injectErrors(tc.injectErrs)
qm.SetClient(c)
qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
qm.StoreSeries(recs.series, 0)
qm.StoreMetadata(recs.metadata)
// Do we expect some data back?
if !tc.expectFail {
c.expectSamples(samples, series)
c.expectSamples(recs.samples, recs.series)
} else {
c.expectSamples(nil, nil)
}
// Schedule send.
qm.Append(samples)
qm.Append(recs.samples)
if !tc.expectFail {
// No error expected, so wait for data.
@ -201,8 +189,6 @@ func TestBasicContentNegotiation(t *testing.T) {
func TestSampleDelivery(t *testing.T) {
t.Parallel()
// Let's create an even number of send batches, so we don't run into the
// batch timeout case.
n := 3
queueConfig := config.DefaultQueueConfig
@ -221,100 +207,75 @@ func TestSampleDelivery(t *testing.T) {
writeConfig,
},
}
for _, tc := range []struct {
protoMsg remoteapi.WriteMessageType
name string
samples bool
exemplars bool
histograms bool
floatHistograms bool
}{
{protoMsg: remoteapi.WriteV1MessageType, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{protoMsg: remoteapi.WriteV1MessageType, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{protoMsg: remoteapi.WriteV1MessageType, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
{protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: false, histograms: false, floatHistograms: false, name: "samples only"},
{protoMsg: remoteapi.WriteV2MessageType, samples: true, exemplars: true, histograms: true, floatHistograms: true, name: "samples, exemplars, and histograms"},
{protoMsg: remoteapi.WriteV2MessageType, samples: false, exemplars: true, histograms: false, floatHistograms: false, name: "exemplars only"},
{protoMsg: remoteapi.WriteV2MessageType, samples: false, exemplars: false, histograms: true, floatHistograms: false, name: "histograms only"},
{protoMsg: remoteapi.WriteV2MessageType, samples: false, exemplars: false, histograms: false, floatHistograms: true, name: "float histograms only"},
} {
t.Run(fmt.Sprintf("%s-%s", tc.protoMsg, tc.name), func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
var (
series []record.RefSeries
metadata []record.RefMetadata
samples []record.RefSample
exemplars []record.RefExemplar
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
)
// Generates same series in both cases.
if tc.samples {
samples, series = createTimeseries(n, n)
}
if tc.exemplars {
exemplars, series = createExemplars(n, n)
}
if tc.histograms {
histograms, _, series = createHistograms(n, n, false)
}
if tc.floatHistograms {
_, floatHistograms, series = createHistograms(n, n, true)
}
metadata = createSeriesMetadata(series)
// Apply new config.
queueConfig.Capacity = len(samples)
queueConfig.MaxSamplesPerSend = len(samples) / 2
// For now we only ever have a single rw config in this test.
conf.RemoteWriteConfigs[0].ProtobufMessage = tc.protoMsg
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient(tc.protoMsg)
qm.SetClient(c)
qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
// Send first half of data.
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
if tc.protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 {
c.expectMetadataForBatch(metadata, series, samples[:len(samples)/2], exemplars[:len(exemplars)/2], histograms[:len(histograms)/2], floatHistograms[:len(floatHistograms)/2])
}
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t, 30*time.Second)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
if tc.protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 {
c.expectMetadataForBatch(metadata, series, samples[len(samples)/2:], exemplars[len(exemplars)/2:], histograms[len(histograms)/2:], floatHistograms[len(floatHistograms)/2:])
}
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
c.waitForExpectedData(t, 30*time.Second)
})
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
for _, rc := range []recCase{
{series: n, samplesPerSeries: n, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "samples only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: n, floatHistogramsPerSeries: 0, exemplarsPerSeries: 0, name: "histograms only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: n, exemplarsPerSeries: 0, name: "float histograms only"},
{series: n, samplesPerSeries: 0, histogramsPerSeries: 0, floatHistogramsPerSeries: 0, exemplarsPerSeries: n, name: "exemplars only"},
{series: n, samplesPerSeries: n, histogramsPerSeries: n, floatHistogramsPerSeries: n, exemplarsPerSeries: n, name: "all"},
} {
t.Run(fmt.Sprintf("proto=%s/case=%s", protoMsg, rc.name), func(t *testing.T) {
dir := t.TempDir()
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline, nil, false)
defer s.Close()
recs := generateRecords(rc)
var (
series = recs.series
metadata = recs.metadata
samples = recs.samples
exemplars = recs.exemplars
histograms = recs.histograms
floatHistograms = recs.floatHistograms
)
// Apply new config.
queueConfig.Capacity = n
queueConfig.MaxSamplesPerSend = n / 2
conf.RemoteWriteConfigs[0].ProtobufMessage = protoMsg
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient(protoMsg)
qm.SetClient(c)
qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
// Send first half of data.
c.expectSamples(samples[:len(samples)/2], series)
c.expectExemplars(exemplars[:len(exemplars)/2], series)
c.expectHistograms(histograms[:len(histograms)/2], series)
c.expectFloatHistograms(floatHistograms[:len(floatHistograms)/2], series)
if protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 {
c.expectMetadataForBatch(metadata, series, samples[:len(samples)/2], exemplars[:len(exemplars)/2], histograms[:len(histograms)/2], floatHistograms[:len(floatHistograms)/2])
}
qm.Append(samples[:len(samples)/2])
qm.AppendExemplars(exemplars[:len(exemplars)/2])
qm.AppendHistograms(histograms[:len(histograms)/2])
qm.AppendFloatHistograms(floatHistograms[:len(floatHistograms)/2])
c.waitForExpectedData(t, 30*time.Second)
// Send second half of data.
c.expectSamples(samples[len(samples)/2:], series)
c.expectExemplars(exemplars[len(exemplars)/2:], series)
c.expectHistograms(histograms[len(histograms)/2:], series)
c.expectFloatHistograms(floatHistograms[len(floatHistograms)/2:], series)
if protoMsg == remoteapi.WriteV2MessageType && len(metadata) > 0 {
c.expectMetadataForBatch(metadata, series, samples[len(samples)/2:], exemplars[len(exemplars)/2:], histograms[len(histograms)/2:], floatHistograms[len(floatHistograms)/2:])
}
qm.Append(samples[len(samples)/2:])
qm.AppendExemplars(exemplars[len(exemplars)/2:])
qm.AppendHistograms(histograms[len(histograms)/2:])
qm.AppendFloatHistograms(floatHistograms[len(floatHistograms)/2:])
c.waitForExpectedData(t, 30*time.Second)
})
}
}
}
@ -387,24 +348,26 @@ func TestWALMetadataDelivery(t *testing.T) {
},
}
num := 3
_, series := createTimeseries(0, num)
metadata := createSeriesMetadata(series)
n := 3
recs := generateRecords(recCase{series: n, samplesPerSeries: n})
require.NoError(t, s.ApplyConfig(conf))
hash, err := toHash(writeConfig)
require.NoError(t, err)
qm := s.rws.queues[hash]
c := NewTestWriteClient(remoteapi.WriteV1MessageType)
c := NewTestWriteClient(remoteapi.WriteV2MessageType)
qm.SetClient(c)
qm.StoreSeries(series, 0)
qm.StoreMetadata(metadata)
qm.StoreSeries(recs.series, 0)
qm.StoreMetadata(recs.metadata)
require.Len(t, qm.seriesLabels, num)
require.Len(t, qm.seriesMetadata, num)
require.Len(t, qm.seriesLabels, n)
require.Len(t, qm.seriesMetadata, n)
c.expectSamples(recs.samples, recs.series)
c.expectMetadataForBatch(recs.metadata, recs.series, recs.samples, nil, nil, nil)
qm.Append(recs.samples)
c.waitForExpectedData(t, 30*time.Second)
}
@ -412,26 +375,24 @@ func TestSampleDeliveryTimeout(t *testing.T) {
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
// Let's send one less sample than batch size, and wait the timeout duration
n := 9
samples, series := createTimeseries(n, n)
recs := generateRecords(recCase{series: 10, samplesPerSeries: 10})
cfg := testDefaultQueueConfig()
mcfg := config.DefaultMetadataConfig
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, mcfg, defaultFlushDeadline, c, protoMsg)
m.StoreSeries(series, 0)
m.StoreSeries(recs.series, 0)
m.Start()
defer m.Stop()
// Send the samples twice, waiting for the samples in the meantime.
c.expectSamples(samples, series)
m.Append(samples)
c.expectSamples(recs.samples, recs.series)
m.Append(recs.samples)
c.waitForExpectedData(t, 30*time.Second)
c.expectSamples(samples, series)
m.Append(samples)
c.expectSamples(recs.samples, recs.series)
m.Append(recs.samples)
c.waitForExpectedData(t, 30*time.Second)
})
}
@ -443,29 +404,16 @@ func TestSampleDeliveryOrder(t *testing.T) {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
ts := 10
n := config.DefaultQueueConfig.MaxSamplesPerSend * ts
samples := make([]record.RefSample, 0, n)
series := make([]record.RefSeries, 0, n)
for i := range n {
name := fmt.Sprintf("test_metric_%d", i%ts)
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(i),
V: float64(i),
})
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
})
}
recs := generateRecords(recCase{series: n, samplesPerSeries: 1})
c, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
c.expectSamples(recs.samples, recs.series)
m.StoreSeries(recs.series, 0)
m.Start()
defer m.Stop()
// These should be received by the client.
m.Append(samples)
m.Append(recs.samples)
c.waitForExpectedData(t, 30*time.Second)
})
}
@ -483,14 +431,15 @@ func TestShutdown(t *testing.T) {
mcfg := config.DefaultMetadataConfig
m := newTestQueueManager(t, cfg, mcfg, deadline, c, protoMsg)
// Send 2x batch size, so we know it will need at least two sends.
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
samples, series := createTimeseries(n, n)
m.StoreSeries(series, 0)
recs := generateRecords(recCase{series: n / 1000, samplesPerSeries: 1000})
m.StoreSeries(recs.series, 0)
m.Start()
// Append blocks to guarantee delivery, so we do it in the background.
go func() {
m.Append(samples)
m.Append(recs.samples)
}()
synctest.Wait()
@ -547,33 +496,35 @@ func TestSeriesReset(t *testing.T) {
func TestReshard(t *testing.T) {
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
size := 10 // Make bigger to find more races.
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, series := createTimeseries(nSamples, nSeries)
samplesPerSeries := config.DefaultQueueConfig.Capacity * size
recs := generateRecords(recCase{series: nSeries, samplesPerSeries: samplesPerSeries})
t.Logf("about to send %v samples", len(recs.samples))
cfg := config.DefaultQueueConfig
cfg.MaxShards = 1
c := NewTestWriteClient(protoMsg)
m := newTestQueueManager(t, cfg, config.DefaultMetadataConfig, defaultFlushDeadline, c, protoMsg)
c.expectSamples(samples, series)
m.StoreSeries(series, 0)
c.expectSamples(recs.samples, recs.series)
m.StoreSeries(recs.series, 0)
m.Start()
defer m.Stop()
go func() {
for i := 0; i < len(samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(samples[i : i+config.DefaultQueueConfig.Capacity])
for i := 0; i < len(recs.samples); i += config.DefaultQueueConfig.Capacity {
sent := m.Append(recs.samples[i : i+config.DefaultQueueConfig.Capacity])
require.True(t, sent, "samples not sent")
time.Sleep(100 * time.Millisecond)
}
}()
for i := 1; i < len(samples)/config.DefaultQueueConfig.Capacity; i++ {
for i := 1; i < len(recs.samples)/config.DefaultQueueConfig.Capacity; i++ {
m.shards.stop()
m.shards.start(i)
time.Sleep(100 * time.Millisecond)
@ -627,7 +578,7 @@ func TestReshardPartialBatch(t *testing.T) {
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
samples, series := createTimeseries(1, 10)
recs := generateRecords(recCase{series: 1, samplesPerSeries: 10})
c := NewTestBlockedWriteClient()
@ -639,14 +590,14 @@ func TestReshardPartialBatch(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m.StoreSeries(series, 0)
m.StoreSeries(recs.series, 0)
m.Start()
for range 100 {
done := make(chan struct{})
go func() {
m.Append(samples)
m.Append(recs.samples)
time.Sleep(batchSendDeadline)
m.shards.stop()
m.shards.start(1)
@ -672,7 +623,7 @@ func TestReshardPartialBatch(t *testing.T) {
func TestQueueFilledDeadlock(t *testing.T) {
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
samples, series := createTimeseries(50, 1)
recs := generateRecords(recCase{series: 50, samplesPerSeries: 1})
c := NewNopWriteClient()
@ -686,7 +637,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
cfg.BatchSendDeadline = model.Duration(batchSendDeadline)
m := newTestQueueManager(t, cfg, mcfg, flushDeadline, c, protoMsg)
m.StoreSeries(series, 0)
m.StoreSeries(recs.series, 0)
m.Start()
defer m.Stop()
@ -694,7 +645,7 @@ func TestQueueFilledDeadlock(t *testing.T) {
done := make(chan struct{})
go func() {
time.Sleep(batchSendDeadline)
m.Append(samples)
m.Append(recs.samples)
done <- struct{}{}
}()
select {
@ -784,7 +735,7 @@ func TestDisableReshardOnRetry(t *testing.T) {
defer onStoreCalled()
var (
fakeSamples, fakeSeries = createTimeseries(100, 100)
recs = generateRecords(recCase{series: 100, samplesPerSeries: 100})
cfg = config.DefaultQueueConfig
mcfg = config.DefaultMetadataConfig
@ -807,14 +758,14 @@ func TestDisableReshardOnRetry(t *testing.T) {
)
m := NewQueueManager(metrics, nil, nil, nil, "", newEWMARate(ewmaWeight, shardUpdateDuration), cfg, mcfg, labels.EmptyLabels(), nil, client, 0, newPool(), newHighestTimestampMetric(), nil, false, false, false, remoteapi.WriteV1MessageType)
m.StoreSeries(fakeSeries, 0)
m.StoreSeries(recs.series, 0)
// Attempt to samples while the manager is running. We immediately stop the
// manager after the recoverable error is generated to prevent the manager
// from resharding itself.
m.Start()
{
m.Append(fakeSamples)
m.Append(recs.samples)
select {
case <-onStoredContext.Done():
@ -840,35 +791,132 @@ func TestDisableReshardOnRetry(t *testing.T) {
}, time.Minute, retryAfter, "shouldReshard should have been re-enabled")
}
func createTimeseries(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSeries) {
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.NewScratchBuilder(1 + len(extraLabels))
for i := range numSeries {
name := fmt.Sprintf("test_metric_%d", i)
for j := range numSamples {
samples = append(samples, record.RefSample{
type recCase struct {
name string
series int
samplesPerSeries int
histogramsPerSeries int
floatHistogramsPerSeries int
exemplarsPerSeries int
extraLabels []labels.Label
labelsFn func(lb *labels.ScratchBuilder, i int) labels.Labels
tsFn func(i, j int) int64
}
type records struct {
series []record.RefSeries
samples []record.RefSample
histograms []record.RefHistogramSample
floatHistograms []record.RefFloatHistogramSample
exemplars []record.RefExemplar
metadata []record.RefMetadata
}
func newTestHist(i int) *histogram.Histogram {
return &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
}
}
func generateRecords(c recCase) (ret records) {
ret.series = make([]record.RefSeries, c.series)
ret.metadata = make([]record.RefMetadata, c.series)
ret.samples = make([]record.RefSample, c.series*c.samplesPerSeries)
ret.histograms = make([]record.RefHistogramSample, c.series*c.histogramsPerSeries)
ret.floatHistograms = make([]record.RefFloatHistogramSample, c.series*c.floatHistogramsPerSeries)
ret.exemplars = make([]record.RefExemplar, c.series*c.exemplarsPerSeries)
if c.labelsFn == nil {
c.labelsFn = func(lb *labels.ScratchBuilder, i int) labels.Labels {
// Create series with labels that contains name of series plus any extra labels supplied.
name := fmt.Sprintf("test_metric_%d", i)
lb.Reset()
lb.Add(model.MetricNameLabel, name)
for _, l := range c.extraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
return lb.Labels()
}
}
if c.tsFn == nil {
c.tsFn = func(_, j int) int64 { return int64(j) }
}
lb := labels.NewScratchBuilder(1 + len(c.extraLabels))
for i := range ret.series {
ret.series[i] = record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: c.labelsFn(&lb, i),
}
ret.metadata[i] = record.RefMetadata{
Ref: chunks.HeadSeriesRef(i),
Type: uint8(record.Counter),
Unit: "unit text",
Help: "help text",
}
for j := range c.samplesPerSeries {
ret.samples[i*c.samplesPerSeries+j] = record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
T: c.tsFn(i, j),
V: float64(i),
})
}
}
// Create Labels that is name of series plus any extra labels supplied.
lb.Reset()
lb.Add(labels.MetricName, name)
rand.Shuffle(len(extraLabels), func(i, j int) {
extraLabels[i], extraLabels[j] = extraLabels[j], extraLabels[i]
})
for _, l := range extraLabels {
lb.Add(l.Name, l.Value)
h := newTestHist(i)
for j := range c.histogramsPerSeries {
ret.histograms[i*c.histogramsPerSeries+j] = record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
H: h,
}
}
for j := range c.floatHistogramsPerSeries {
ret.floatHistograms[i*c.floatHistogramsPerSeries+j] = record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
FH: h.ToFloat(nil),
}
}
for j := range c.exemplarsPerSeries {
ret.exemplars[i*c.exemplarsPerSeries+j] = record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: c.tsFn(i, j),
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
}
}
lb.Sort()
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
})
}
return samples, series
return ret
}
// BenchmarkGenerateRecords checks data generator performance.
// Recommended CLI:
/*
export bench=genRecs && go test ./storage/remote/... \
-run '^$' -bench '^BenchmarkGenerateRecords' \
-benchtime 1s -count 6 -cpu 2 -timeout 999m -benchmem \
| tee ${bench}.txt
*/
func BenchmarkGenerateRecords(b *testing.B) {
n := 2 * config.DefaultQueueConfig.MaxSamplesPerSend
b.ReportAllocs()
b.ResetTimer()
for b.Loop() {
// This will generate 16M samples and 4k series.
generateRecords(recCase{series: n, samplesPerSeries: n})
}
}
func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries {
@ -895,88 +943,6 @@ func createProtoTimeseriesWithOld(numSamples, baseTs int64) []prompb.TimeSeries
return samples
}
func createExemplars(numExemplars, numSeries int) ([]record.RefExemplar, []record.RefSeries) {
exemplars := make([]record.RefExemplar, 0, numExemplars)
series := make([]record.RefSeries, 0, numSeries)
for i := range numSeries {
name := fmt.Sprintf("test_metric_%d", i)
for j := range numExemplars {
e := record.RefExemplar{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
V: float64(i),
Labels: labels.FromStrings("trace_id", fmt.Sprintf("trace-%d", i)),
}
exemplars = append(exemplars, e)
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
})
}
return exemplars, series
}
func createHistograms(numSamples, numSeries int, floatHistogram bool) ([]record.RefHistogramSample, []record.RefFloatHistogramSample, []record.RefSeries) {
histograms := make([]record.RefHistogramSample, 0, numSamples)
floatHistograms := make([]record.RefFloatHistogramSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
for i := range numSeries {
name := fmt.Sprintf("test_metric_%d", i)
for j := range numSamples {
hist := &histogram.Histogram{
Schema: 2,
ZeroThreshold: 1e-128,
ZeroCount: 0,
Count: 2,
Sum: 0,
PositiveSpans: []histogram.Span{{Offset: 0, Length: 1}},
PositiveBuckets: []int64{int64(i) + 1},
NegativeSpans: []histogram.Span{{Offset: 0, Length: 1}},
NegativeBuckets: []int64{int64(-i) - 1},
}
if floatHistogram {
fh := record.RefFloatHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
FH: hist.ToFloat(nil),
}
floatHistograms = append(floatHistograms, fh)
} else {
h := record.RefHistogramSample{
Ref: chunks.HeadSeriesRef(i),
T: int64(j),
H: hist,
}
histograms = append(histograms, h)
}
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: labels.FromStrings("__name__", name),
})
}
if floatHistogram {
return nil, floatHistograms, series
}
return histograms, nil, series
}
func createSeriesMetadata(series []record.RefSeries) []record.RefMetadata {
metas := make([]record.RefMetadata, 0, len(series))
for _, s := range series {
metas = append(metas, record.RefMetadata{
Ref: s.Ref,
Type: uint8(record.Counter),
Unit: "unit text",
Help: "help text",
})
}
return metas
}
func getSeriesIDFromRef(r record.RefSeries) string {
return r.Labels.String()
}
@ -1419,7 +1385,7 @@ func BenchmarkSampleSend(b *testing.B) {
const numSamples = 1
const numSeries = 10000
samples, series := createTimeseries(numSamples, numSeries, extraLabels...)
recs := generateRecords(recCase{series: numSeries, samplesPerSeries: numSamples, extraLabels: extraLabels})
c := NewNopWriteClient()
@ -1433,7 +1399,7 @@ func BenchmarkSampleSend(b *testing.B) {
for _, format := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
b.Run(string(format), func(b *testing.B) {
m := newTestQueueManager(b, cfg, mcfg, defaultFlushDeadline, c, format)
m.StoreSeries(series, 0)
m.StoreSeries(recs.series, 0)
// These should be received by the client.
m.Start()
@ -1441,8 +1407,8 @@ func BenchmarkSampleSend(b *testing.B) {
b.ResetTimer()
for i := 0; b.Loop(); i++ {
m.Append(samples)
m.UpdateSeriesSegment(series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.Append(recs.samples)
m.UpdateSeriesSegment(recs.series, i+1) // simulate what wlog.Watcher.garbageCollectSeries does
m.SeriesReset(i + 1)
}
// Do not include shutdown
@ -1484,7 +1450,7 @@ func BenchmarkStoreSeries(b *testing.B) {
// numSeries chosen to be big enough that StoreSeries dominates creating a new queue manager.
const numSeries = 1000
_, series := createTimeseries(0, numSeries, extraLabels...)
recs := generateRecords(recCase{series: numSeries, samplesPerSeries: 0, extraLabels: extraLabels})
for _, tc := range testCases {
b.Run(tc.name, func(b *testing.B) {
@ -1499,7 +1465,7 @@ func BenchmarkStoreSeries(b *testing.B) {
m.externalLabels = tc.externalLabels
m.relabelConfigs = tc.relabelConfigs
m.StoreSeries(series, 0)
m.StoreSeries(recs.series, 0)
}
})
}
@ -2009,7 +1975,25 @@ func TestDropOldTimeSeries(t *testing.T) {
size := 10
nSeries := 6
nSamples := config.DefaultQueueConfig.Capacity * size
samples, newSamples, series := createTimeseriesWithOldSamples(nSamples, nSeries)
pastRecs := generateRecords(recCase{
series: nSeries,
samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
tsFn: func(_, j int) int64 {
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
return past + int64(j)
},
})
newRecs := generateRecords(recCase{
series: nSeries,
samplesPerSeries: (nSamples / nSeries) / 2, // Half data is past.
tsFn: func(_, j int) int64 {
return time.Now().UnixMilli() + int64(j)
},
})
series := pastRecs.series // Series is the same for both old and new.
newSamples := newRecs.samples
samples := append(pastRecs.samples, newRecs.samples...)
c := NewTestWriteClient(protoMsg)
c.expectSamples(newSamples, series)
@ -2038,10 +2022,14 @@ func TestIsSampleOld(t *testing.T) {
// Simulates scenario in which remote write endpoint is down and a subset of samples is dropped due to age limit while backoffing.
func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
const (
maxSamplesPerSend = 10
maxLabels = 9
)
t.Parallel()
for _, protoMsg := range []remoteapi.WriteMessageType{remoteapi.WriteV1MessageType, remoteapi.WriteV2MessageType} {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
maxSamplesPerSend := 10
sampleAgeLimit := time.Second * 2
cfg := config.DefaultQueueConfig
@ -2063,18 +2051,38 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
m.Start()
batchID := 0
expectedSamples := map[string][]prompb.Sample{}
appendData := func(numberOfSeries int, timeAdd time.Duration, shouldBeDropped bool) {
t.Log(">>>> Appending series ", numberOfSeries, " as batch ID ", batchID, " with timeAdd ", timeAdd, " and should be dropped ", shouldBeDropped)
samples, series := createTimeseriesWithRandomLabelCount(strconv.Itoa(batchID), numberOfSeries, timeAdd, 9)
m.StoreSeries(series, batchID)
sent := m.Append(samples)
// Use a fixed rand source so tests are consistent.
r := rand.New(rand.NewSource(99))
recs := generateRecords(recCase{
series: numberOfSeries,
samplesPerSeries: 1,
tsFn: func(_, _ int) int64 {
return time.Now().Add(timeAdd).UnixMilli()
},
labelsFn: func(lb *labels.ScratchBuilder, i int) labels.Labels {
lb.Reset()
labelsCount := r.Intn(maxLabels)
lb.Add("__name__", "batch_"+strconv.Itoa(batchID)+"_id_"+strconv.Itoa(i))
for j := 1; j < labelsCount+1; j++ {
// same for both name and value
label := "batch_" + strconv.Itoa(batchID) + "_label_" + strconv.Itoa(j)
lb.Add(label, label)
}
return lb.Labels()
},
})
m.StoreSeries(recs.series, batchID)
sent := m.Append(recs.samples)
require.True(t, sent, "samples not sent")
if !shouldBeDropped {
for _, s := range samples {
tsID := getSeriesIDFromRef(series[s.Ref])
expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
for _, s := range recs.samples {
tsID := getSeriesIDFromRef(recs.series[s.Ref])
c.expectedSamples[tsID] = append(c.expectedSamples[tsID], prompb.Sample{
Timestamp: s.T,
Value: s.V,
})
@ -2084,93 +2092,28 @@ func TestSendSamplesWithBackoffWithSampleAgeLimit(t *testing.T) {
}
timeShift := -time.Millisecond * 5
// Inject RW error.
c.SetReturnError(RecoverableError{context.DeadlineExceeded, defaultBackoff})
// Send current samples in various intervals.
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(sampleAgeLimit)
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(sampleAgeLimit / 10)
appendData(maxSamplesPerSend/2, timeShift, true)
time.Sleep(2 * sampleAgeLimit)
// Eventually all the above data must be ignored as 2x sampleAgeLimit passed.
// Now send, quickly re-enable RW target and send another batch.
// We should expect all the data from those two below batches.
appendData(2*maxSamplesPerSend, timeShift, false)
time.Sleep(sampleAgeLimit / 2)
c.SetReturnError(nil)
appendData(5, timeShift, false)
m.Stop()
if diff := cmp.Diff(expectedSamples, c.receivedSamples); diff != "" {
t.Errorf("mismatch (-want +got):\n%s", diff)
}
})
}
}
func createTimeseriesWithRandomLabelCount(id string, seriesCount int, timeAdd time.Duration, maxLabels int) ([]record.RefSample, []record.RefSeries) {
samples := []record.RefSample{}
series := []record.RefSeries{}
// use a fixed rand source so tests are consistent
r := rand.New(rand.NewSource(99))
for i := range seriesCount {
s := record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: time.Now().Add(timeAdd).UnixMilli(),
V: r.Float64(),
}
samples = append(samples, s)
labelsCount := r.Intn(maxLabels)
lb := labels.NewScratchBuilder(1 + labelsCount)
lb.Add("__name__", "batch_"+id+"_id_"+strconv.Itoa(i))
for j := 1; j < labelsCount+1; j++ {
// same for both name and value
label := "batch_" + id + "_label_" + strconv.Itoa(j)
lb.Add(label, label)
}
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
})
}
return samples, series
}
func createTimeseriesWithOldSamples(numSamples, numSeries int, extraLabels ...labels.Label) ([]record.RefSample, []record.RefSample, []record.RefSeries) {
newSamples := make([]record.RefSample, 0, numSamples)
samples := make([]record.RefSample, 0, numSamples)
series := make([]record.RefSeries, 0, numSeries)
lb := labels.NewScratchBuilder(1 + len(extraLabels))
for i := range numSeries {
name := fmt.Sprintf("test_metric_%d", i)
// We create half of the samples in the past.
past := timestamp.FromTime(time.Now().Add(-5 * time.Minute))
for j := 0; j < numSamples/2; j++ {
samples = append(samples, record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: past + int64(j),
V: float64(i),
})
}
for j := 0; j < numSamples/2; j++ {
sample := record.RefSample{
Ref: chunks.HeadSeriesRef(i),
T: time.Now().UnixMilli() + int64(j),
V: float64(i),
}
samples = append(samples, sample)
newSamples = append(newSamples, sample)
}
// Create Labels that is name of series plus any extra labels supplied.
lb.Reset()
lb.Add(labels.MetricName, name)
for _, l := range extraLabels {
lb.Add(l.Name, l.Value)
}
lb.Sort()
series = append(series, record.RefSeries{
Ref: chunks.HeadSeriesRef(i),
Labels: lb.Labels(),
require.Equal(t, c.expectedSamples, c.receivedSamples)
})
}
return samples, newSamples, series
}
func filterTsLimit(limit int64, ts prompb.TimeSeries) bool {
@ -2662,7 +2605,7 @@ func TestHighestTimestampOnAppend(t *testing.T) {
t.Run(fmt.Sprint(protoMsg), func(t *testing.T) {
nSamples := 11 * config.DefaultQueueConfig.Capacity
nSeries := 3
samples, series := createTimeseries(nSamples, nSeries)
recs := generateRecords(recCase{series: nSeries, samplesPerSeries: nSamples / nSeries})
_, m := newTestClientAndQueueManager(t, defaultFlushDeadline, protoMsg)
m.Start()
@ -2670,13 +2613,14 @@ func TestHighestTimestampOnAppend(t *testing.T) {
require.Equal(t, 0.0, m.metrics.highestTimestamp.Get())
m.StoreSeries(series, 0)
require.True(t, m.Append(samples))
m.StoreSeries(recs.series, 0)
require.True(t, m.Append(recs.samples))
// Check that Append sets the highest timestamp correctly.
highestTs := float64((nSamples - 1) / 1000)
require.Greater(t, highestTs, 0.0)
require.Equal(t, highestTs, m.metrics.highestTimestamp.Get())
// NOTE: generateRecords yields nSamples/nSeries samples (36666), with <i for samplesPerSeries> timestamp.
// This gives the highest timestamp of 36666/1000 (seconds).
const expectedHighestTsSeconds = 36.0
require.Equal(t, expectedHighestTsSeconds, m.metrics.highestTimestamp.Get())
})
}
}

Loading…
Cancel
Save