Update delete integration test (#6192)

* Add wait until processed

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add query lines

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove Cortex reference

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add deleted lines metric and check in test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Set negative cancel period so default 1 minute delay is compensated for

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Reduce timeout

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Add helper function

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Comment out assertions that only work with flush

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove unused metric type param

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Check counter in unit test

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Rename ClientOption to Option as per the linter

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Remove redundant check

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* fill in MsgAndArgs field of test assertion

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/6330/head
Michel Hollands 4 years ago committed by GitHub
parent a877febf75
commit 65e3148bc9
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      integration/client/client.go
  2. 3
      integration/cluster/cluster.go
  3. 54
      integration/loki_micro_services_delete_test.go
  4. 44
      integration/parse_metrics.go
  5. 41
      integration/parse_metrics_test.go
  6. 14
      pkg/storage/stores/shipper/compactor/deletion/delete_request.go
  7. 19
      pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go
  8. 1
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go
  9. 6
      pkg/storage/stores/shipper/compactor/deletion/metrics.go

@ -37,7 +37,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
return r.next.RoundTrip(req)
}
type CortexClientOption interface {
type Option interface {
Type() string
}
@ -57,7 +57,7 @@ type Client struct {
}
// NewLogsClient creates a new client
func New(instanceID, token, baseURL string, opts ...CortexClientOption) *Client {
func New(instanceID, token, baseURL string, opts ...Option) *Client {
rt := &roundTripper{
instanceID: instanceID,
token: token,

@ -301,7 +301,8 @@ func (c *Component) run() error {
defer c.cluster.waitGroup.Done()
err := c.loki.Run(loki.RunOpts{})
if err != nil {
errCh <- err
newErr := fmt.Errorf("error starting component %v: %w", c.name, err)
errCh <- newErr
}
}()

@ -21,10 +21,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
tCompactor = clu.AddComponent(
"compactor",
"-target=compactor",
"-boltdb.shipper.compactor.compaction-interval=10s",
"-boltdb.shipper.compactor.retention-delete-delay=10s",
"-boltdb.shipper.compactor.compaction-interval=1s",
"-boltdb.shipper.compactor.retention-delete-delay=1s",
"-boltdb.shipper.compactor.deletion-mode=filter-and-delete",
"-boltdb.shipper.compactor.delete-request-cancel-period=0s",
// By default a minute is added to the delete request start time. This compensates for that.
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
@ -48,6 +49,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"query-frontend",
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host,
"-frontend.default-validity=0s",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host,
)
_ = clu.AddComponent(
@ -79,7 +81,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// TODO: Flushing is currently causing a panic, as the boltdb shipper is shared using a global variable in:
// https://github.com/grafana/loki/blob/66a4692423582ed17cce9bd86b69d55663dc7721/pkg/storage/factory.go#L32-L35
//require.NoError(t, cliIngester.Flush())
// require.NoError(t, cliIngester.Flush())
})
t.Run("ingest-logs-ingester", func(t *testing.T) {
@ -115,4 +117,48 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
require.Equal(t, `{job="fake"} |= "lineB"`, deleteRequests[0].Query)
require.Equal(t, "received", deleteRequests[0].Status)
})
// Wait until delete request is finished
t.Run("wait-until-delete-request-processed", func(t *testing.T) {
require.Eventually(t, func() bool {
deleteRequests, err := cliCompactor.GetDeleteRequests()
require.NoError(t, err)
require.Len(t, deleteRequests, 1)
return deleteRequests[0].Status == "processed"
}, 10*time.Second, 1*time.Second)
// Check metrics
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkLabelValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, 1)
// Re-enable this once flush works
// checkLabelValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)
})
// Query lines
t.Run("query", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(`{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
// Remove lineB once flush works
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines, "lineB should not be there")
})
}
func checkLabelValue(t *testing.T, metricName, metrics, tenantID string, expectedValue float64) {
t.Helper()
val, labels, err := extractMetric(metricName, metrics)
require.NoError(t, err)
require.NotNil(t, labels)
require.Len(t, labels, 1)
require.Contains(t, labels, "user")
require.Equal(t, labels["user"], tenantID)
require.Equal(t, expectedValue, val)
}

@ -0,0 +1,44 @@
package integration
import (
"fmt"
"strings"
io_prometheus_client "github.com/prometheus/client_model/go"
"github.com/prometheus/common/expfmt"
)
var (
ErrNoMetricFound = fmt.Errorf("metric not found")
ErrInvalidMetricType = fmt.Errorf("invalid metric type")
)
func extractMetric(metricName, metrics string) (float64, map[string]string, error) {
var parser expfmt.TextParser
mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics))
if err != nil {
return 0, nil, err
}
mf, found := mfs[metricName]
if !found {
return 0, nil, ErrNoMetricFound
}
var val float64
switch mf.GetType() {
case io_prometheus_client.MetricType_COUNTER:
val = *mf.Metric[0].Counter.Value
case io_prometheus_client.MetricType_GAUGE:
val = *mf.Metric[0].Gauge.Value
default:
return 0, nil, ErrInvalidMetricType
}
labels := make(map[string]string)
for _, l := range mf.Metric[0].Label {
labels[*l.Name] = *l.Value
}
return val, labels, nil
}

@ -0,0 +1,41 @@
package integration
import (
"testing"
"github.com/stretchr/testify/require"
)
var exampleMetricOutput = `
# HELP loki_compactor_delete_requests_processed_total Number of delete requests processed per user
# TYPE loki_compactor_delete_requests_processed_total counter
loki_compactor_delete_requests_processed_total{user="eEWxEcgwRQcf"} 1
# HELP loki_compactor_delete_requests_received_total Number of delete requests received per user
# TYPE loki_compactor_delete_requests_received_total counter
loki_compactor_delete_requests_received_total{user="eEWxEcgwRQcf"} 1
# HELP loki_compactor_load_pending_requests_attempts_total Number of attempts that were made to load pending requests with status
# TYPE loki_compactor_load_pending_requests_attempts_total counter
loki_compactor_load_pending_requests_attempts_total{status="success"} 57
# HELP loki_compactor_oldest_pending_delete_request_age_seconds Age of oldest pending delete request in seconds, since they are over their cancellation period
# TYPE loki_compactor_oldest_pending_delete_request_age_seconds gauge
loki_compactor_oldest_pending_delete_request_age_seconds 0
# HELP loki_compactor_pending_delete_requests_count Count of delete requests which are over their cancellation period and have not finished processing yet
# TYPE loki_compactor_pending_delete_requests_count gauge
loki_compactor_pending_delete_requests_count 0
`
func TestExtractCounterMetric(t *testing.T) {
val, labels, err := extractMetric("loki_compactor_oldest_pending_delete_request_age_seconds", exampleMetricOutput)
require.NoError(t, err)
require.NotNil(t, labels)
require.Len(t, labels, 0)
require.Equal(t, float64(0), val)
val, labels, err = extractMetric("loki_compactor_delete_requests_processed_total", exampleMetricOutput)
require.NoError(t, err)
require.NotNil(t, labels)
require.Len(t, labels, 1)
require.Contains(t, labels, "user")
require.Equal(t, labels["user"], "eEWxEcgwRQcf")
require.Equal(t, float64(1), val)
}

@ -2,6 +2,7 @@ package deletion
import (
"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
@ -19,9 +20,10 @@ type DeleteRequest struct {
Status DeleteRequestStatus `json:"status"`
CreatedAt model.Time `json:"created_at"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
logSelectorExpr syntax.LogSelectorExpr `json:"-"`
UserID string `json:"-"`
matchers []*labels.Matcher `json:"-"`
logSelectorExpr syntax.LogSelectorExpr `json:"-"`
deletedLinesTotal prometheus.Counter `json:"-"`
}
func (d *DeleteRequest) SetQuery(logQL string) error {
@ -57,7 +59,11 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error
f := p.ForStream(labels).ProcessString
return func(s string) bool {
result, _, skip := f(0, s)
return len(result) != 0 || skip
if len(result) != 0 || skip {
d.deletedLinesTotal.Inc()
return true
}
return false
}, nil
}

@ -4,6 +4,8 @@ import (
"testing"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/stretchr/testify/require"
@ -242,8 +244,16 @@ func mustParseLabel(input string) labels.Labels {
}
func TestDeleteRequest_FilterFunction(t *testing.T) {
counter := prometheus.NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "compactor_deleted_lines",
Help: "Number of deleted lines per user",
}, []string{"user"}).WithLabelValues("userID")
prometheus.MustRegister(counter)
dr := DeleteRequest{
Query: `{foo="bar"} |= "some"`,
Query: `{foo="bar"} |= "some"`,
deletedLinesTotal: counter,
}
lblStr := `{foo="bar"}`
@ -255,6 +265,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.True(t, f(`some line`))
require.False(t, f(""))
require.False(t, f("other line"))
require.Equal(t, float64(1), testutil.ToFloat64(counter))
lblStr = `{foo2="buzz"}`
lbls = mustParseLabel(lblStr)
@ -265,9 +276,11 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.False(t, f(""))
require.False(t, f("other line"))
require.False(t, f("some line"))
require.Equal(t, float64(1), testutil.ToFloat64(counter))
dr = DeleteRequest{
Query: `{namespace="default"}`,
Query: `{namespace="default"}`,
deletedLinesTotal: counter,
}
lblStr = `{namespace="default"}`
@ -279,5 +292,5 @@ func TestDeleteRequest_FilterFunction(t *testing.T) {
require.True(t, f(`some line`))
require.True(t, f(""))
require.True(t, f("other line"))
require.Equal(t, float64(4), testutil.ToFloat64(counter))
}

@ -119,6 +119,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error {
if deleteRequest.CreatedAt.Add(d.deleteRequestCancelPeriod).Add(time.Minute).After(model.Now()) {
continue
}
deleteRequest.deletedLinesTotal = d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID)
d.deleteRequestsToProcess = append(d.deleteRequestsToProcess, deleteRequest)
}

@ -27,6 +27,7 @@ type deleteRequestsManagerMetrics struct {
loadPendingRequestsAttemptsTotal *prometheus.CounterVec
oldestPendingDeleteRequestAgeSeconds prometheus.Gauge
pendingDeleteRequestsCount prometheus.Gauge
deletedLinesTotal *prometheus.CounterVec
}
func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsManagerMetrics {
@ -57,6 +58,11 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan
Name: "compactor_pending_delete_requests_count",
Help: "Count of delete requests which are over their cancellation period and have not finished processing yet",
})
m.deletedLinesTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{
Namespace: "loki",
Name: "compactor_deleted_lines",
Help: "Number of deleted lines per user",
}, []string{"user"})
return &m
}

Loading…
Cancel
Save