fix logs deletion integration test to properly validate deleted data (#7751)

**What this PR does / why we need it**:
Currently, the integration test for deletion does not validate whether
the delete request properly deleted the requested data or not. It just
currently checks if the delete request got processed. This was due to
the singleton `boltdb-shipper` client causing problems in flushing
chunks.

I have used `ResetBoltDBIndexClientWithShipper` to reset the
boltdb-shipper client to get the chunk flush from the ingester working.
The test now does the checks to ensure that the right data got deleted
by the compactor. This should help us catch any bugs earlier for any
future changes in deletion code.

**Checklist**
- [x] Tests updated
pull/7756/head
Sandeep Sukhani 3 years ago committed by GitHub
parent 72e6fcc9d2
commit 97a7ed0b61
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 26
      integration/cluster/cluster.go
  2. 129
      integration/loki_micro_services_delete_test.go

@ -169,7 +169,20 @@ func (c *Cluster) Run() error {
}
return nil
}
func (c *Cluster) Restart() error {
if err := c.stop(false); err != nil {
return err
}
return c.Run()
}
func (c *Cluster) Cleanup() error {
return c.stop(true)
}
func (c *Cluster) stop(cleanupFiles bool) error {
_, cancelFunc := context.WithTimeout(context.Background(), time.Second*3)
defer cancelFunc()
@ -195,6 +208,7 @@ func (c *Cluster) Cleanup() error {
// wait for all process to close
c.waitGroup.Wait()
if cleanupFiles {
// cleanup dirs/files
for _, d := range dirs {
errs.Add(os.RemoveAll(d))
@ -202,6 +216,7 @@ func (c *Cluster) Cleanup() error {
for _, f := range files {
errs.Add(os.Remove(f))
}
}
return errs.Err()
}
@ -231,6 +246,7 @@ type Component struct {
RulesTenant string
running bool
wg sync.WaitGroup
RemoteWriteUrls []string
}
@ -362,8 +378,11 @@ func (c *Component) run() error {
}()
c.cluster.waitGroup.Add(1)
c.wg.Add(1)
go func() {
defer c.cluster.waitGroup.Done()
defer c.wg.Done()
err := c.loki.Run(loki.RunOpts{})
if err != nil {
newErr := fmt.Errorf("error starting component %v: %w", c.name, err)
@ -385,6 +404,7 @@ func (c *Component) run() error {
func (c *Component) cleanup() (files []string, dirs []string) {
if c.loki != nil && c.loki.SignalHandler != nil {
c.loki.SignalHandler.Stop()
c.running = false
}
if c.configFile != "" {
files = append(files, c.configFile)
@ -402,6 +422,12 @@ func (c *Component) cleanup() (files []string, dirs []string) {
return files, dirs
}
func (c *Component) Restart() error {
c.cleanup()
c.wg.Wait()
return c.run()
}
func NewRemoteWriteServer(handler *http.HandlerFunc) *httptest.Server {
l, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {

@ -2,7 +2,6 @@ package integration
import (
"context"
"net/http"
"testing"
"time"
@ -11,12 +10,14 @@ import (
"github.com/grafana/loki/integration/client"
"github.com/grafana/loki/integration/cluster"
"github.com/grafana/loki/pkg/storage"
)
func TestMicroServicesDeleteRequest(t *testing.T) {
clu := cluster.New()
defer func() {
assert.NoError(t, clu.Cleanup())
storage.ResetBoltDBIndexClientWithShipper()
}()
// initially, run only compactor, index-gateway and distributor.
@ -30,10 +31,6 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-boltdb.shipper.compactor.delete-request-cancel-period=-60s",
"-compactor.deletion-mode=filter-and-delete",
)
tIndexGateway = clu.AddComponent(
"index-gateway",
"-target=index-gateway",
)
tDistributor = clu.AddComponent(
"distributor",
"-target=distributor",
@ -46,13 +43,12 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
tIngester = clu.AddComponent(
"ingester",
"-target=ingester",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-ingester.flush-on-shutdown=true",
)
tQueryScheduler = clu.AddComponent(
"query-scheduler",
"-target=query-scheduler",
"-query-scheduler.use-scheduler-ring=false",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
)
)
require.NoError(t, clu.Run())
@ -64,58 +60,17 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
"-target=query-frontend",
"-frontend.scheduler-address="+tQueryScheduler.GRPCURL(),
"-frontend.default-validity=0s",
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
_ = clu.AddComponent(
tQuerier = clu.AddComponent(
"querier",
"-target=querier",
"-querier.scheduler-address="+tQueryScheduler.GRPCURL(),
"-boltdb.shipper.index-gateway-client.server-address="+tIndexGateway.GRPCURL(),
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
require.NoError(t, clu.Run())
remoteCalled := []bool{false, false}
var (
tRuler = clu.AddComponent(
"ruler",
"-target=ruler",
"-common.compactor-address="+tCompactor.HTTPURL(),
)
)
handler1 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/write" {
t.Errorf("Expected to request '/api/v1/write', got: %s", r.URL.Path)
}
remoteCalled[0] = true
w.WriteHeader(http.StatusOK)
})
server1 := cluster.NewRemoteWriteServer(&handler1)
defer server1.Close()
handler2 := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
if r.URL.Path != "/api/v1/write" {
t.Errorf("Expected to request '/api/v1/write', got: %s", r.URL.Path)
}
remoteCalled[1] = true
w.WriteHeader(http.StatusOK)
})
server2 := cluster.NewRemoteWriteServer(&handler2)
defer server2.Close()
tRuler.RemoteWriteUrls = []string{
server1.URL,
server2.URL,
}
// initialize only the ruler now.
require.NoError(t, clu.Run())
tenantID := randStringRunes()
now := time.Now()
@ -127,17 +82,11 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
cliQueryFrontend.Now = now
cliCompactor := client.New(tenantID, "", tCompactor.HTTPURL())
cliCompactor.Now = now
cliRuler := client.New(tRuler.RulesTenant, "", tRuler.HTTPURL())
cliRuler.Now = now
t.Run("ingest-logs-store", func(t *testing.T) {
// ingest some log lines
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineA", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
require.NoError(t, cliDistributor.PushLogLineWithTimestamp("lineB", now.Add(-45*time.Minute), map[string]string{"job": "fake"}))
// TODO: Flushing is currently causing a panic, as the boltdb shipper is shared using a global variable in:
// https://github.com/grafana/loki/blob/66a4692423582ed17cce9bd86b69d55663dc7721/pkg/storage/factory.go#L32-L35
// require.NoError(t, cliIngester.Flush())
})
t.Run("ingest-logs-ingester", func(t *testing.T) {
@ -160,6 +109,37 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("flush-logs-and-restart-ingester-querier", func(t *testing.T) {
// restart ingester which should flush the chunks
require.NoError(t, tIngester.Restart())
// ensure that ingester has 0 chunks in memory
cliIngester = client.New(tenantID, "", tIngester.HTTPURL())
cliIngester.Now = now
metrics, err := cliIngester.Metrics()
require.NoError(t, err)
checkMetricValue(t, "loki_ingester_chunks_flushed_total", metrics, 1)
// reset boltdb-shipper client and restart querier
storage.ResetBoltDBIndexClientWithShipper()
require.NoError(t, tQuerier.Restart())
})
// Query lines
t.Run("query again to verify logs being served from storage", func(t *testing.T) {
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
var lines []string
for _, stream := range resp.Data.Stream {
for _, val := range stream.Values {
lines = append(lines, val[1])
}
}
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines)
})
t.Run("add-delete-request", func(t *testing.T) {
params := client.DeleteRequestParams{Start: "0000000000", Query: `{job="fake"} |= "lineB"`}
require.NoError(t, cliCompactor.AddDeleteRequest(params))
@ -186,13 +166,16 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
// Check metrics
metrics, err := cliCompactor.Metrics()
require.NoError(t, err)
checkLabelValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, 1)
// Re-enable this once flush works
// checkLabelValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)
checkUserLabelAndMetricValue(t, "loki_compactor_delete_requests_processed_total", metrics, tenantID, 1)
checkUserLabelAndMetricValue(t, "loki_compactor_deleted_lines", metrics, tenantID, 1)
})
// Query lines
t.Run("query", func(t *testing.T) {
// restart querier to make it sync the index
storage.ResetBoltDBIndexClientWithShipper()
require.NoError(t, tQuerier.Restart())
resp, err := cliQueryFrontend.RunRangeQuery(context.Background(), `{job="fake"}`)
require.NoError(t, err)
assert.Equal(t, "streams", resp.Data.ResultType)
@ -203,31 +186,12 @@ func TestMicroServicesDeleteRequest(t *testing.T) {
lines = append(lines, val[1])
}
}
// Remove lineB once flush works
assert.ElementsMatch(t, []string{"lineA", "lineB", "lineC", "lineD"}, lines, "lineB should not be there")
})
t.Run("ruler", func(t *testing.T) {
// Check rules are read correctly.
resp, err := cliRuler.GetRules(context.Background())
require.NoError(t, err)
require.NotNil(t, resp)
require.Equal(t, "success", resp.Status)
require.Len(t, resp.Data.Groups, 1)
require.Len(t, resp.Data.Groups[0].Rules, 1)
// Wait for remote write to be called.
time.Sleep(5 * time.Second)
// Check remote write was successful.
require.EqualValues(t, []bool{true, true}, remoteCalled, "one or both of the remote write target were not called")
assert.ElementsMatch(t, []string{"lineA", "lineC", "lineD"}, lines, "lineB should not be there")
})
}
func checkLabelValue(t *testing.T, metricName, metrics, tenantID string, expectedValue float64) {
func checkUserLabelAndMetricValue(t *testing.T, metricName, metrics, tenantID string, expectedValue float64) {
t.Helper()
val, labels, err := extractMetric(metricName, metrics)
require.NoError(t, err)
@ -237,3 +201,10 @@ func checkLabelValue(t *testing.T, metricName, metrics, tenantID string, expecte
require.Equal(t, labels["user"], tenantID)
require.Equal(t, expectedValue, val)
}
func checkMetricValue(t *testing.T, metricName, metrics string, expectedValue float64) {
t.Helper()
val, _, err := extractMetric(metricName, metrics)
require.NoError(t, err)
require.Equal(t, expectedValue, val)
}

Loading…
Cancel
Save