|
|
|
|
@ -34,6 +34,7 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
client_testutil "github.com/prometheus/client_golang/prometheus/testutil" |
|
|
|
|
common_config "github.com/prometheus/common/config" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
"github.com/prometheus/prometheus/config" |
|
|
|
|
"github.com/prometheus/prometheus/pkg/labels" |
|
|
|
|
@ -41,6 +42,7 @@ import ( |
|
|
|
|
"github.com/prometheus/prometheus/prompb" |
|
|
|
|
"github.com/prometheus/prometheus/tsdb/record" |
|
|
|
|
"github.com/prometheus/prometheus/util/testutil" |
|
|
|
|
"net/url" |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
const defaultFlushDeadline = 1 * time.Minute |
|
|
|
|
@ -54,9 +56,11 @@ func TestSampleDelivery(t *testing.T) { |
|
|
|
|
c := NewTestWriteClient() |
|
|
|
|
c.expectSamples(samples[:len(samples)/2], series) |
|
|
|
|
|
|
|
|
|
cfg := config.DefaultQueueConfig |
|
|
|
|
cfg.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
|
|
|
cfg.MaxShards = 1 |
|
|
|
|
queueConfig := config.DefaultQueueConfig |
|
|
|
|
queueConfig.BatchSendDeadline = model.Duration(100 * time.Millisecond) |
|
|
|
|
queueConfig.MaxShards = 1 |
|
|
|
|
queueConfig.Capacity = len(samples) |
|
|
|
|
queueConfig.MaxSamplesPerSend = len(samples) / 2 |
|
|
|
|
|
|
|
|
|
dir, err := ioutil.TempDir("", "TestSampleDeliver") |
|
|
|
|
testutil.Ok(t, err) |
|
|
|
|
@ -64,18 +68,35 @@ func TestSampleDelivery(t *testing.T) { |
|
|
|
|
testutil.Ok(t, os.RemoveAll(dir)) |
|
|
|
|
}() |
|
|
|
|
|
|
|
|
|
metrics := newQueueManagerMetrics(nil, "", "") |
|
|
|
|
m := NewQueueManager(metrics, nil, nil, nil, dir, newEWMARate(ewmaWeight, shardUpdateDuration), cfg, nil, nil, c, defaultFlushDeadline) |
|
|
|
|
m.StoreSeries(series, 0) |
|
|
|
|
s := NewStorage(nil, nil, nil, dir, defaultFlushDeadline) |
|
|
|
|
defer s.Close() |
|
|
|
|
|
|
|
|
|
// These should be received by the client.
|
|
|
|
|
m.Start() |
|
|
|
|
m.Append(samples[:len(samples)/2]) |
|
|
|
|
defer m.Stop() |
|
|
|
|
writeConfig := config.DefaultRemoteWriteConfig |
|
|
|
|
conf := &config.Config{ |
|
|
|
|
GlobalConfig: config.DefaultGlobalConfig, |
|
|
|
|
RemoteWriteConfigs: []*config.RemoteWriteConfig{ |
|
|
|
|
&writeConfig, |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
// We need to set URL's so that metric creation doesn't panic.
|
|
|
|
|
writeConfig.URL = &common_config.URL{ |
|
|
|
|
URL: &url.URL{ |
|
|
|
|
Host: "http://test-storage.com", |
|
|
|
|
}, |
|
|
|
|
} |
|
|
|
|
writeConfig.QueueConfig = queueConfig |
|
|
|
|
testutil.Ok(t, s.ApplyConfig(conf)) |
|
|
|
|
hash, err := toHash(writeConfig) |
|
|
|
|
testutil.Ok(t, err) |
|
|
|
|
qm := s.rws.queues[hash] |
|
|
|
|
qm.SetClient(c) |
|
|
|
|
|
|
|
|
|
qm.StoreSeries(series, 0) |
|
|
|
|
|
|
|
|
|
qm.Append(samples[:len(samples)/2]) |
|
|
|
|
c.waitForExpectedSamples(t) |
|
|
|
|
c.expectSamples(samples[len(samples)/2:], series) |
|
|
|
|
m.Append(samples[len(samples)/2:]) |
|
|
|
|
qm.Append(samples[len(samples)/2:]) |
|
|
|
|
c.waitForExpectedSamples(t) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
|