diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 2ff401bace..f12628ee1a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -10,6 +10,8 @@ import ( "os" "time" + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" + "github.com/NYTimes/gziphandler" "github.com/go-kit/log" "github.com/go-kit/log/level" @@ -242,7 +244,7 @@ func (t *Loki) initQuerier() (services.Service, error) { // Querier worker's max concurrent requests must be the same as the querier setting t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent - deleteStore, err := t.deleteRequestsClient("querier") + deleteStore, err := t.deleteRequestsClient("querier", t.overrides) if err != nil { return nil, err } @@ -775,7 +777,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) { t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort - deleteStore, err := t.deleteRequestsClient("ruler") + deleteStore, err := t.deleteRequestsClient("ruler", t.overrides) if err != nil { return nil, err } @@ -995,7 +997,7 @@ func (t *Loki) initUsageReport() (services.Service, error) { return ur, nil } -func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsClient, error) { +func (t *Loki) deleteRequestsClient(clientType string, limits retention.Limits) (deletion.DeleteRequestsClient, error) { // TODO(owen-d): enable delete request storage in tsdb if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) { return deletion.NewNoOpDeleteRequestsStore(), nil @@ -1015,7 +1017,12 @@ func (t *Loki) deleteRequestsClient(clientType string) (deletion.DeleteRequestsC return nil, err } - return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType) + client, err := deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second}, t.deleteClientMetrics, clientType) + if err != nil { + return nil, err + } + + return deletion.NewPerTenantDeleteRequestsClient(client, limits), nil } func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) { diff --git a/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go b/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go new file mode 100644 index 0000000000..41183fa38a --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client.go @@ -0,0 +1,33 @@ +package deletion + +import ( + "context" + + "github.com/grafana/loki/pkg/storage/stores/shipper/compactor/retention" +) + +type perTenantDeleteRequestsClient struct { + client DeleteRequestsClient + limits retention.Limits +} + +func NewPerTenantDeleteRequestsClient(c DeleteRequestsClient, l retention.Limits) DeleteRequestsClient { + return &perTenantDeleteRequestsClient{ + client: c, + limits: l, + } +} + +func (c *perTenantDeleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) { + allLimits := c.limits.AllByUserID() + userLimits, ok := allLimits[userID] + if ok && userLimits.CompactorDeletionEnabled || c.limits.DefaultLimits().CompactorDeletionEnabled { + return c.client.GetAllDeleteRequestsForUser(ctx, userID) + } + + return nil, nil +} + +func (c *perTenantDeleteRequestsClient) Stop() { + c.client.Stop() +} diff --git a/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client_test.go b/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client_test.go new file mode 100644 index 0000000000..9a7b3f5b55 --- /dev/null +++ b/pkg/storage/stores/shipper/compactor/deletion/tenant_delete_requests_client_test.go @@ -0,0 +1,62 @@ +package deletion + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" +) + +func TestTenantDeleteRequestsClient(t *testing.T) { + fakeClient := &fakeRequestsClient{ + reqs: []DeleteRequest{{ + RequestID: "test-request", + }}, + } + perTenantClient := NewPerTenantDeleteRequestsClient(fakeClient, limits) + + t.Run("tenant enabled", func(t *testing.T) { + reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "1") + require.Nil(t, err) + require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs) + }) + + t.Run("tenant disabled", func(t *testing.T) { + reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "2") + require.Nil(t, err) + require.Empty(t, reqs) + }) + + t.Run("default is enabled", func(t *testing.T) { + limits.defaultLimit.compactorDeletionEnabled = true + reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3") + require.Nil(t, err) + require.Equal(t, []DeleteRequest{{RequestID: "test-request"}}, reqs) + }) + + t.Run("default is disabled", func(t *testing.T) { + limits.defaultLimit.compactorDeletionEnabled = false + reqs, err := perTenantClient.GetAllDeleteRequestsForUser(context.Background(), "3") + require.Nil(t, err) + require.Empty(t, reqs) + }) +} + +type fakeRequestsClient struct { + DeleteRequestsClient + + reqs []DeleteRequest +} + +func (c *fakeRequestsClient) GetAllDeleteRequestsForUser(_ context.Context, userID string) ([]DeleteRequest, error) { + return c.reqs, nil +} + +var ( + limits = &fakeLimits{ + perTenant: map[string]retentionLimit{ + "1": {compactorDeletionEnabled: true}, + "2": {compactorDeletionEnabled: false}, + }, + } +)