From 65e3148bc97159a2efaefdc0bcc5e2fa88c40870 Mon Sep 17 00:00:00 2001 From: Michel Hollands <42814411+MichelHollands@users.noreply.github.com> Date: Tue, 7 Jun 2022 14:03:47 +0100 Subject: [PATCH] Update delete integration test (#6192) * Add wait until processed Signed-off-by: Michel Hollands * Add query lines Signed-off-by: Michel Hollands * Remove Cortex reference Signed-off-by: Michel Hollands * Add deleted lines metric and check in test Signed-off-by: Michel Hollands * Set negative cancel period so default 1 minute delay is compensated for Signed-off-by: Michel Hollands * Reduce timeout Signed-off-by: Michel Hollands * Add helper function Signed-off-by: Michel Hollands * Comment out assertions that only work with flush Signed-off-by: Michel Hollands * Remove unused metric type param Signed-off-by: Michel Hollands * Check counter in unit test Signed-off-by: Michel Hollands * Rename ClientOption to Option as per the linter Signed-off-by: Michel Hollands * Remove redundant check Signed-off-by: Michel Hollands * fill in MsgAndArgs field of test assertion Signed-off-by: Michel Hollands --- integration/client/client.go | 4 +- integration/cluster/cluster.go | 3 +- .../loki_micro_services_delete_test.go | 54 +++++++++++++++++-- integration/parse_metrics.go | 44 +++++++++++++++ integration/parse_metrics_test.go | 41 ++++++++++++++ .../compactor/deletion/delete_request.go | 14 +++-- .../compactor/deletion/delete_request_test.go | 19 +++++-- .../deletion/delete_requests_manager.go | 1 + .../shipper/compactor/deletion/metrics.go | 6 +++ 9 files changed, 172 insertions(+), 14 deletions(-) create mode 100644 integration/parse_metrics.go create mode 100644 integration/parse_metrics_test.go diff --git a/integration/client/client.go b/integration/client/client.go index e74762a721..a6b5d8837f 100644 --- a/integration/client/client.go +++ b/integration/client/client.go @@ -37,7 +37,7 @@ func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) { return r.next.RoundTrip(req) } -type CortexClientOption interface { +type Option interface { Type() string } @@ -57,7 +57,7 @@ type Client struct { } // NewLogsClient creates a new client -func New(instanceID, token, baseURL string, opts ...CortexClientOption) *Client { +func New(instanceID, token, baseURL string, opts ...Option) *Client { rt := &roundTripper{ instanceID: instanceID, token: token, diff --git a/integration/cluster/cluster.go b/integration/cluster/cluster.go index 787aefb068..66ba702235 100644 --- a/integration/cluster/cluster.go +++ b/integration/cluster/cluster.go @@ -301,7 +301,8 @@ func (c *Component) run() error { defer c.cluster.waitGroup.Done() err := c.loki.Run(loki.RunOpts{}) if err != nil { - errCh <- err + newErr := fmt.Errorf("error starting component %v: %w", c.name, err) + errCh <- newErr } }() diff --git a/integration/loki_micro_services_delete_test.go b/integration/loki_micro_services_delete_test.go index 7d9b8e5073..dbd1b97c8d 100644 --- a/integration/loki_micro_services_delete_test.go +++ b/integration/loki_micro_services_delete_test.go @@ -21,10 +21,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) { tCompactor = clu.AddComponent( "compactor", "-target=compactor", - "-boltdb.shipper.compactor.compaction-interval=10s", - "-boltdb.shipper.compactor.retention-delete-delay=10s", + "-boltdb.shipper.compactor.compaction-interval=1s", + "-boltdb.shipper.compactor.retention-delete-delay=1s", "-boltdb.shipper.compactor.deletion-mode=filter-and-delete", - "-boltdb.shipper.compactor.delete-request-cancel-period=0s", + // By default a minute is added to the delete request start time. This compensates for that. + "-boltdb.shipper.compactor.delete-request-cancel-period=-60s", ) tIndexGateway = clu.AddComponent( "index-gateway", @@ -48,6 +49,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) { "query-frontend", "-target=query-frontend", "-frontend.scheduler-address="+tQueryScheduler.GRPCURL().Host, + "-frontend.default-validity=0s", "-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL().Host, ) _ = clu.AddComponent( @@ -79,7 +81,7 @@ func TestMicroServicesDeleteRequest(t *testing.T) { // TODO: Flushing is currently causing a panic, as the boltdb shipper is shared using a global variable in: // https://github.com/grafana/loki/blob/66a4692423582ed17cce9bd86b69d55663dc7721/pkg/storage/factory.go#L32-L35 - //require.NoError(t, cliIngester.Flush()) + // require.NoError(t, cliIngester.Flush()) }) t.Run("ingest-logs-ingester", func(t *testing.T) { @@ -115,4 +117,48 @@ func TestMicroServicesDeleteRequest(t *testing.T) { require.Equal(t, `{job="fake"} |= "lineB"`, deleteRequests[0].Query) require.Equal(t, "received", deleteRequests[0].Status) }) + + // Wait until delete request is finished + t.Run("wait-until-delete-request-processed", func(t *testing.T) { + require.Eventually(t, func() bool { + deleteRequests, err := cliCompactor.GetDeleteRequests() + require.NoError(t, err) + require.Len(t, deleteRequests, 1) + return deleteRequests[0].Status == "processed" + }, 10*time.Second, 1*time.Second) + + // Check metrics + metrics, err := cliCompactor.Metrics() + require.NoError(t, err) + checkLabelValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, 1) + // Re-enable this once flush works + // checkLabelValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1) + }) + + // Query lines + t.Run("query", func(t *testing.T) { + resp, err := cliQueryFrontend.RunRangeQuery(`{job="fake"}`) + require.NoError(t, err) + assert.Equal(t, "streams", resp.Data.ResultType) + + var lines []string + for _, stream := range resp.Data.Stream { + for _, val := range stream.Values { + lines = append(lines, val[1]) + } + } + // Remove lineB once flush works + assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines, "lineB should not be there") + }) +} + +func checkLabelValue(t *testing.T, metricName, metrics, tenantID string, expectedValue float64) { + t.Helper() + val, labels, err := extractMetric(metricName, metrics) + require.NoError(t, err) + require.NotNil(t, labels) + require.Len(t, labels, 1) + require.Contains(t, labels, "user") + require.Equal(t, labels["user"], tenantID) + require.Equal(t, expectedValue, val) } diff --git a/integration/parse_metrics.go b/integration/parse_metrics.go new file mode 100644 index 0000000000..46ea424978 --- /dev/null +++ b/integration/parse_metrics.go @@ -0,0 +1,44 @@ +package integration + +import ( + "fmt" + "strings" + + io_prometheus_client "github.com/prometheus/client_model/go" + "github.com/prometheus/common/expfmt" +) + +var ( + ErrNoMetricFound = fmt.Errorf("metric not found") + ErrInvalidMetricType = fmt.Errorf("invalid metric type") +) + +func extractMetric(metricName, metrics string) (float64, map[string]string, error) { + var parser expfmt.TextParser + mfs, err := parser.TextToMetricFamilies(strings.NewReader(metrics)) + if err != nil { + return 0, nil, err + } + + mf, found := mfs[metricName] + if !found { + return 0, nil, ErrNoMetricFound + } + + var val float64 + switch mf.GetType() { + case io_prometheus_client.MetricType_COUNTER: + val = *mf.Metric[0].Counter.Value + case io_prometheus_client.MetricType_GAUGE: + val = *mf.Metric[0].Gauge.Value + default: + return 0, nil, ErrInvalidMetricType + } + + labels := make(map[string]string) + for _, l := range mf.Metric[0].Label { + labels[*l.Name] = *l.Value + } + + return val, labels, nil +} diff --git a/integration/parse_metrics_test.go b/integration/parse_metrics_test.go new file mode 100644 index 0000000000..94c19b7584 --- /dev/null +++ b/integration/parse_metrics_test.go @@ -0,0 +1,41 @@ +package integration + +import ( + "testing" + + "github.com/stretchr/testify/require" +) + +var exampleMetricOutput = ` +# HELP loki_compactor_delete_requests_processed_total Number of delete requests processed per user +# TYPE loki_compactor_delete_requests_processed_total counter +loki_compactor_delete_requests_processed_total{user="eEWxEcgwRQcf"} 1 +# HELP loki_compactor_delete_requests_received_total Number of delete requests received per user +# TYPE loki_compactor_delete_requests_received_total counter +loki_compactor_delete_requests_received_total{user="eEWxEcgwRQcf"} 1 +# HELP loki_compactor_load_pending_requests_attempts_total Number of attempts that were made to load pending requests with status +# TYPE loki_compactor_load_pending_requests_attempts_total counter +loki_compactor_load_pending_requests_attempts_total{status="success"} 57 +# HELP loki_compactor_oldest_pending_delete_request_age_seconds Age of oldest pending delete request in seconds, since they are over their cancellation period +# TYPE loki_compactor_oldest_pending_delete_request_age_seconds gauge +loki_compactor_oldest_pending_delete_request_age_seconds 0 +# HELP loki_compactor_pending_delete_requests_count Count of delete requests which are over their cancellation period and have not finished processing yet +# TYPE loki_compactor_pending_delete_requests_count gauge +loki_compactor_pending_delete_requests_count 0 +` + +func TestExtractCounterMetric(t *testing.T) { + val, labels, err := extractMetric("loki_compactor_oldest_pending_delete_request_age_seconds", exampleMetricOutput) + require.NoError(t, err) + require.NotNil(t, labels) + require.Len(t, labels, 0) + require.Equal(t, float64(0), val) + + val, labels, err = extractMetric("loki_compactor_delete_requests_processed_total", exampleMetricOutput) + require.NoError(t, err) + require.NotNil(t, labels) + require.Len(t, labels, 1) + require.Contains(t, labels, "user") + require.Equal(t, labels["user"], "eEWxEcgwRQcf") + require.Equal(t, float64(1), val) +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go index d6e26d472e..5e4dc1a118 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request.go @@ -2,6 +2,7 @@ package deletion import ( "github.com/go-kit/log/level" + "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" @@ -19,9 +20,10 @@ type DeleteRequest struct { Status DeleteRequestStatus `json:"status"` CreatedAt model.Time `json:"created_at"` - UserID string `json:"-"` - matchers []*labels.Matcher `json:"-"` - logSelectorExpr syntax.LogSelectorExpr `json:"-"` + UserID string `json:"-"` + matchers []*labels.Matcher `json:"-"` + logSelectorExpr syntax.LogSelectorExpr `json:"-"` + deletedLinesTotal prometheus.Counter `json:"-"` } func (d *DeleteRequest) SetQuery(logQL string) error { @@ -57,7 +59,11 @@ func (d *DeleteRequest) FilterFunction(labels labels.Labels) (filter.Func, error f := p.ForStream(labels).ProcessString return func(s string) bool { result, _, skip := f(0, s) - return len(result) != 0 || skip + if len(result) != 0 || skip { + d.deletedLinesTotal.Inc() + return true + } + return false }, nil } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go index e609d10555..21c857a0ea 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_request_test.go @@ -4,6 +4,8 @@ import ( "testing" "time" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/stretchr/testify/require" @@ -242,8 +244,16 @@ func mustParseLabel(input string) labels.Labels { } func TestDeleteRequest_FilterFunction(t *testing.T) { + counter := prometheus.NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "compactor_deleted_lines", + Help: "Number of deleted lines per user", + }, []string{"user"}).WithLabelValues("userID") + prometheus.MustRegister(counter) + dr := DeleteRequest{ - Query: `{foo="bar"} |= "some"`, + Query: `{foo="bar"} |= "some"`, + deletedLinesTotal: counter, } lblStr := `{foo="bar"}` @@ -255,6 +265,7 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.True(t, f(`some line`)) require.False(t, f("")) require.False(t, f("other line")) + require.Equal(t, float64(1), testutil.ToFloat64(counter)) lblStr = `{foo2="buzz"}` lbls = mustParseLabel(lblStr) @@ -265,9 +276,11 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.False(t, f("")) require.False(t, f("other line")) require.False(t, f("some line")) + require.Equal(t, float64(1), testutil.ToFloat64(counter)) dr = DeleteRequest{ - Query: `{namespace="default"}`, + Query: `{namespace="default"}`, + deletedLinesTotal: counter, } lblStr = `{namespace="default"}` @@ -279,5 +292,5 @@ func TestDeleteRequest_FilterFunction(t *testing.T) { require.True(t, f(`some line`)) require.True(t, f("")) require.True(t, f("other line")) - + require.Equal(t, float64(4), testutil.ToFloat64(counter)) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go index 7d3467371b..b2f7ef18a9 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go +++ b/pkg/storage/stores/shipper/compactor/deletion/delete_requests_manager.go @@ -119,6 +119,7 @@ func (d *DeleteRequestsManager) loadDeleteRequestsToProcess() error { if deleteRequest.CreatedAt.Add(d.deleteRequestCancelPeriod).Add(time.Minute).After(model.Now()) { continue } + deleteRequest.deletedLinesTotal = d.metrics.deleteRequestsProcessedTotal.WithLabelValues(deleteRequest.UserID) d.deleteRequestsToProcess = append(d.deleteRequestsToProcess, deleteRequest) } diff --git a/pkg/storage/stores/shipper/compactor/deletion/metrics.go b/pkg/storage/stores/shipper/compactor/deletion/metrics.go index 90b275e9bb..f7d473b685 100644 --- a/pkg/storage/stores/shipper/compactor/deletion/metrics.go +++ b/pkg/storage/stores/shipper/compactor/deletion/metrics.go @@ -27,6 +27,7 @@ type deleteRequestsManagerMetrics struct { loadPendingRequestsAttemptsTotal *prometheus.CounterVec oldestPendingDeleteRequestAgeSeconds prometheus.Gauge pendingDeleteRequestsCount prometheus.Gauge + deletedLinesTotal *prometheus.CounterVec } func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsManagerMetrics { @@ -57,6 +58,11 @@ func newDeleteRequestsManagerMetrics(r prometheus.Registerer) *deleteRequestsMan Name: "compactor_pending_delete_requests_count", Help: "Count of delete requests which are over their cancellation period and have not finished processing yet", }) + m.deletedLinesTotal = promauto.With(r).NewCounterVec(prometheus.CounterOpts{ + Namespace: "loki", + Name: "compactor_deleted_lines", + Help: "Number of deleted lines per user", + }, []string{"user"}) return &m }