Extracting queue interface (#3816)

Exporting metrics entry type

Signed-off-by: Danny Kopping <danny.kopping@grafana.com>
pull/3823/head
Danny Kopping 5 years ago committed by GitHub
parent 6ba49f43ae
commit 110ff6fb9e
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      pkg/ruler/appender.go
  2. 12
      pkg/ruler/appender_test.go
  3. 18
      pkg/ruler/remote_write.go
  4. 6
      pkg/ruler/remote_write_test.go
  5. 8
      pkg/util/queue.go

@ -105,9 +105,9 @@ func (a *RemoteWriteAppendable) onEvict(userID, groupKey string) func() {
}
func (a *RemoteWriteAppender) Append(_ uint64, l labels.Labels, t int64, v float64) (uint64, error) {
a.queue.Append(queueEntry{
labels: l,
sample: cortexpb.Sample{
a.queue.Append(TimeSeriesEntry{
Labels: l,
Sample: cortexpb.Sample{
Value: v,
TimestampMs: t,
},

@ -128,9 +128,9 @@ func TestAppendSample(t *testing.T) {
ts := time.Now().Unix()
val := 91.2
sample := queueEntry{
labels: labels,
sample: cortexpb.Sample{
sample := TimeSeriesEntry{
Labels: labels,
Sample: cortexpb.Sample{
Value: val,
TimestampMs: ts,
},
@ -257,8 +257,8 @@ func TestAppenderEvictOldest(t *testing.T) {
require.Equal(t, capacity, appender.queue.Length())
// only two newest samples are kept
require.Equal(t, appender.queue.Entries()[0].(queueEntry).sample.Value, 11.3)
require.Equal(t, appender.queue.Entries()[1].(queueEntry).sample.Value, 11.4)
require.Equal(t, appender.queue.Entries()[0].(TimeSeriesEntry).Sample.Value, 11.3)
require.Equal(t, appender.queue.Entries()[1].(TimeSeriesEntry).Sample.Value, 11.4)
}
// context is created by ruler, passing along details of the rule being executed
@ -321,7 +321,7 @@ func (c *MockRemoteWriteClient) Name() string { return "" }
// Endpoint is the remote read or write endpoint for the storage client.
func (c *MockRemoteWriteClient) Endpoint() string { return "" }
func (c *MockRemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) {
func (c *MockRemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) {
args := c.Called(queue)
return args.Get(0).([]byte), args.Error(1)
}

@ -17,15 +17,15 @@ import (
var UserAgent = fmt.Sprintf("loki-remote-write/%s", build.Version)
type queueEntry struct {
labels labels.Labels
sample cortexpb.Sample
type TimeSeriesEntry struct {
Labels labels.Labels
Sample cortexpb.Sample
}
type RemoteWriter interface {
remote.WriteClient
PrepareRequest(queue *util.EvictingQueue) ([]byte, error)
PrepareRequest(queue util.Queue) ([]byte, error)
}
type RemoteWriteClient struct {
@ -62,7 +62,7 @@ func NewRemoteWriter(cfg Config, userID string) (RemoteWriter, error) {
}, nil
}
func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error {
func (r *RemoteWriteClient) prepare(queue util.Queue) error {
// reuse slices, resize if they are not big enough
if cap(r.labels) < queue.Length() {
r.labels = make([]labels.Labels, 0, queue.Length())
@ -75,13 +75,13 @@ func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error {
r.samples = r.samples[:0]
for _, entry := range queue.Entries() {
entry, ok := entry.(queueEntry)
entry, ok := entry.(TimeSeriesEntry)
if !ok {
return fmt.Errorf("queue contains invalid entry of type: %T", entry)
}
r.labels = append(r.labels, entry.labels)
r.samples = append(r.samples, entry.sample)
r.labels = append(r.labels, entry.Labels)
r.samples = append(r.samples, entry.Sample)
}
return nil
@ -89,7 +89,7 @@ func (r *RemoteWriteClient) prepare(queue *util.EvictingQueue) error {
// PrepareRequest takes the given queue and serializes it into a compressed
// proto write request that will be sent to Cortex
func (r *RemoteWriteClient) PrepareRequest(queue *util.EvictingQueue) ([]byte, error) {
func (r *RemoteWriteClient) PrepareRequest(queue util.Queue) ([]byte, error) {
// prepare labels and samples from queue
err := r.prepare(queue)
if err != nil {

@ -32,7 +32,7 @@ func TestPrepare(t *testing.T) {
// first start with 10 items
for i := 0; i < 10; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))
@ -45,7 +45,7 @@ func TestPrepare(t *testing.T) {
// then resize the internal slices to 100
for i := 0; i < 100; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))
@ -58,7 +58,7 @@ func TestPrepare(t *testing.T) {
// then reuse the existing slice (no resize necessary since 5 < 100)
for i := 0; i < 5; i++ {
queue.Append(queueEntry{labels: lbs, sample: sample})
queue.Append(TimeSeriesEntry{Labels: lbs, Sample: sample})
}
require.Nil(t, client.prepare(queue))

@ -0,0 +1,8 @@
package util
type Queue interface {
Append(entry interface{})
Entries() []interface{}
Length() int
Clear()
}
Loading…
Cancel
Save