Fix delete updates (#6194)

* Fix delete updates

* Update pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>

* Add compactor address to the storage config

* review comments

* Review feedback

Co-authored-by: Michel Hollands <42814411+MichelHollands@users.noreply.github.com>
pull/6267/head
Travis Patterson 3 years ago committed by GitHub
parent 3b3fcf6c87
commit 9b2786bde3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 8
      docs/sources/configuration/_index.md
  2. 7
      pkg/loki/common/common.go
  3. 10
      pkg/loki/delete_store_listener.go
  4. 40
      pkg/loki/modules.go
  5. 2
      pkg/lokifrontend/config.go
  6. 174
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_client.go
  7. 68
      pkg/storage/stores/shipper/compactor/deletion/delete_requests_client_test.go
  8. 4
      production/ksonnet/loki/config.libsonnet

@ -379,10 +379,6 @@ The `frontend` block configures the Loki query-frontend.
# CLI flag: -frontend.downstream-url
[downstream_url: <string> | default = ""]
# Address, including port, where the compactor api is served
# CLI flag: -frontend.compactor-address
[compactor_address: <string> | default = ""]
# Log queries that are slower than the specified duration. Set to 0 to disable.
# Set to < 0 to enable on all queries.
# CLI flag: -frontend.log-queries-longer-than
@ -2613,6 +2609,10 @@ This way, one doesn't have to replicate configuration in multiple places.
# to be used by the distributor's ring, but only if the distributor's ring itself
# doesn't have a `heartbeat_period` set.
[ring: <ring>]
# Address, including port, where the compactor api is served
# CLI flag: -common.compactor-address
[compactor_address: <string> | default = ""]
```
## analytics

@ -40,9 +40,12 @@ type Config struct {
// You can check this during Loki execution under ring status pages (ex: `/ring` will output the address of the different ingester
// instances).
InstanceAddr string `yaml:"instance_addr"`
// CompactorAddress is the http address of the compactor in the form http://host:port
CompactorAddress string `yaml:"compactor_address"`
}
func (c *Config) RegisterFlags(_ *flag.FlagSet) {
func (c *Config) RegisterFlags(f *flag.FlagSet) {
throwaway := flag.NewFlagSet("throwaway", flag.PanicOnError)
throwaway.IntVar(&c.ReplicationFactor, "common.replication-factor", 3, "How many ingesters incoming data should be replicated to.")
c.Storage.RegisterFlagsWithPrefix("common.storage", throwaway)
@ -52,6 +55,8 @@ func (c *Config) RegisterFlags(_ *flag.FlagSet) {
c.InstanceInterfaceNames = netutil.PrivateNetworkInterfacesWithFallback([]string{"eth0", "en0"}, util_log.Logger)
throwaway.StringVar(&c.InstanceAddr, "common.instance-addr", "", "Default advertised address to be used by Loki components.")
throwaway.Var((*flagext.StringSlice)(&c.InstanceInterfaceNames), "common.instance-interface-names", "List of network interfaces to read address from.")
f.StringVar(&c.CompactorAddress, "common.compactor-address", "", "the http address of the compactor in the form http://host:port")
}
type Storage struct {

@ -6,12 +6,12 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor/deletion"
)
func deleteRequestsStoreListener(d deletion.DeleteRequestsStore) *listener {
func deleteRequestsStoreListener(d deletion.DeleteRequestsClient) *listener {
return &listener{d}
}
type listener struct {
deleteRequestsStore deletion.DeleteRequestsStore
deleteRequestsClient deletion.DeleteRequestsClient
}
// Starting is called when the service transitions from NEW to STARTING.
@ -26,7 +26,7 @@ func (l *listener) Stopping(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
// Terminated is called when the service transitions to the TERMINATED state.
@ -35,7 +35,7 @@ func (l *listener) Terminated(from services.State) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}
// Failed is called when the service transitions to the FAILED state.
@ -44,5 +44,5 @@ func (l *listener) Failed(from services.State, failure error) {
// no need to do anything
return
}
l.deleteRequestsStore.Stop()
l.deleteRequestsClient.Stop()
}

@ -241,7 +241,7 @@ func (t *Loki) initQuerier() (services.Service, error) {
// Querier worker's max concurrent requests must be the same as the querier setting
t.Cfg.Worker.MaxConcurrentRequests = t.Cfg.Querier.MaxConcurrent
deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
@ -575,17 +575,25 @@ func (t *Loki) cacheGenClient() (generationnumber.CacheGenClient, error) {
return deletion.NewNoOpDeleteRequestsStore(), nil
}
compactorAddress := t.Cfg.Frontend.CompactorAddress
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}
func (t *Loki) compactorAddress() (string, error) {
if t.Cfg.isModuleEnabled(All) || t.Cfg.isModuleEnabled(Read) {
// In single binary or read modes, this module depends on Server
compactorAddress = fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort)
return fmt.Sprintf("http://127.0.0.1:%d", t.Cfg.Server.HTTPListenPort), nil
}
if compactorAddress == "" {
return nil, errors.New("query filtering for deletes requires 'compactor_address' to be configured")
if t.Cfg.Common.CompactorAddress == "" {
return "", errors.New("query filtering for deletes requires 'compactor_address' to be configured")
}
return generationnumber.NewGenNumberClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
return t.Cfg.Common.CompactorAddress, nil
}
func (t *Loki) initQueryFrontend() (_ services.Service, err error) {
@ -742,7 +750,7 @@ func (t *Loki) initRuler() (_ services.Service, err error) {
t.Cfg.Ruler.Ring.ListenPort = t.Cfg.Server.GRPCListenPort
deleteStore, err := t.deleteRequestsStore()
deleteStore, err := t.deleteRequestsClient()
if err != nil {
return nil, err
}
@ -949,7 +957,7 @@ func (t *Loki) initUsageReport() (services.Service, error) {
return ur, nil
}
func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
func (t *Loki) deleteRequestsClient() (deletion.DeleteRequestsClient, error) {
// TODO(owen-d): enable delete request storage in tsdb
if config.UsingTSDB(t.Cfg.SchemaConfig.Configs) {
return deletion.NewNoOpDeleteRequestsStore(), nil
@ -960,16 +968,16 @@ func (t *Loki) deleteRequestsStore() (deletion.DeleteRequestsStore, error) {
return nil, err
}
deleteStore := deletion.NewNoOpDeleteRequestsStore()
if config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) && filteringEnabled {
indexClient, err := storage.NewIndexClient(config.BoltDBShipperType, t.Cfg.StorageConfig, t.Cfg.SchemaConfig, t.overrides, t.clientMetrics, nil, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
if !config.UsingBoltdbShipper(t.Cfg.SchemaConfig.Configs) || !filteringEnabled {
return deletion.NewNoOpDeleteRequestsStore(), nil
}
deleteStore = deletion.NewDeleteStoreFromIndexClient(indexClient)
compactorAddress, err := t.compactorAddress()
if err != nil {
return nil, err
}
return deleteStore, nil
return deletion.NewDeleteRequestsClient(compactorAddress, &http.Client{Timeout: 5 * time.Second})
}
func calculateMaxLookBack(pc config.PeriodConfig, maxLookBackConfig, minDuration time.Duration) (time.Duration, error) {

@ -15,7 +15,6 @@ type Config struct {
CompressResponses bool `yaml:"compress_responses"`
DownstreamURL string `yaml:"downstream_url"`
CompactorAddress string `yaml:"compactor_address"`
TailProxyURL string `yaml:"tail_proxy_url"`
}
@ -28,6 +27,5 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.BoolVar(&cfg.CompressResponses, "querier.compress-http-responses", false, "Compress HTTP responses.")
f.StringVar(&cfg.DownstreamURL, "frontend.downstream-url", "", "URL of downstream Prometheus.")
f.StringVar(&cfg.CompactorAddress, "frontend.compactor-address", "", "host and port where the compactor API is listening")
f.StringVar(&cfg.TailProxyURL, "frontend.tail-proxy-url", "", "URL of querier for tail proxy.")
}

@ -0,0 +1,174 @@
package deletion
import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/url"
"sync"
"time"
"github.com/go-kit/log/level"
"github.com/grafana/loki/pkg/util/log"
)
const (
orgHeaderKey = "X-Scope-OrgID"
getDeletePath = "/loki/api/v1/delete"
)
type DeleteRequestsClient interface {
GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error)
Stop()
}
type deleteRequestsClient struct {
url string
httpClient httpClient
mu sync.RWMutex
cache map[string][]DeleteRequest
cacheDuration time.Duration
stopChan chan struct{}
}
type httpClient interface {
Do(*http.Request) (*http.Response, error)
}
type DeleteRequestsStoreOption func(c *deleteRequestsClient)
func WithRequestClientCacheDuration(d time.Duration) DeleteRequestsStoreOption {
return func(c *deleteRequestsClient) {
c.cacheDuration = d
}
}
func NewDeleteRequestsClient(addr string, c httpClient, opts ...DeleteRequestsStoreOption) (DeleteRequestsClient, error) {
u, err := url.Parse(addr)
if err != nil {
level.Error(log.Logger).Log("msg", "error parsing url", "err", err)
return nil, err
}
u.Path = getDeletePath
client := &deleteRequestsClient{
url: u.String(),
httpClient: c,
cacheDuration: 5 * time.Minute,
cache: make(map[string][]DeleteRequest),
stopChan: make(chan struct{}),
}
for _, o := range opts {
o(client)
}
go client.updateLoop()
return client, nil
}
func (c *deleteRequestsClient) GetAllDeleteRequestsForUser(ctx context.Context, userID string) ([]DeleteRequest, error) {
if cachedRequests, ok := c.getCachedRequests(userID); ok {
return cachedRequests, nil
}
requests, err := c.getRequestsFromServer(ctx, userID)
if err != nil {
return nil, err
}
c.mu.Lock()
defer c.mu.Unlock()
c.cache[userID] = requests
return requests, nil
}
func (c *deleteRequestsClient) getCachedRequests(userID string) ([]DeleteRequest, bool) {
c.mu.RLock()
defer c.mu.RUnlock()
res, ok := c.cache[userID]
return res, ok
}
func (c *deleteRequestsClient) Stop() {
close(c.stopChan)
}
func (c *deleteRequestsClient) updateLoop() {
t := time.NewTicker(c.cacheDuration)
for {
select {
case <-t.C:
c.updateCache()
case <-c.stopChan:
return
}
}
}
func (c *deleteRequestsClient) updateCache() {
userIDs := c.currentUserIDs()
newCache := make(map[string][]DeleteRequest)
for _, userID := range userIDs {
deleteReq, err := c.getRequestsFromServer(context.Background(), userID)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
continue
}
newCache[userID] = deleteReq
}
c.mu.Lock()
defer c.mu.Unlock()
c.cache = newCache
}
func (c *deleteRequestsClient) currentUserIDs() []string {
c.mu.RLock()
defer c.mu.RUnlock()
userIDs := make([]string, 0, len(c.cache))
for userID := range c.cache {
userIDs = append(userIDs, userID)
}
return userIDs
}
func (c *deleteRequestsClient) getRequestsFromServer(ctx context.Context, userID string) ([]DeleteRequest, error) {
req, err := http.NewRequestWithContext(ctx, http.MethodGet, c.url, nil)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
req.Header.Set(orgHeaderKey, userID)
resp, err := c.httpClient.Do(req)
if err != nil {
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
defer resp.Body.Close()
if resp.StatusCode/100 != 2 {
err := fmt.Errorf("unexpected status code: %d", resp.StatusCode)
level.Error(log.Logger).Log("msg", "error getting delete requests from the store", "err", err)
return nil, err
}
var deleteRequests []DeleteRequest
if err := json.NewDecoder(resp.Body).Decode(&deleteRequests); err != nil {
level.Error(log.Logger).Log("msg", "error marshalling response", "err", err)
return nil, err
}
return deleteRequests, nil
}

@ -0,0 +1,68 @@
package deletion
import (
"context"
"io"
"net/http"
"strings"
"testing"
"time"
"github.com/stretchr/testify/require"
)
func TestGetCacheGenNumberForUser(t *testing.T) {
t.Run("it requests results from the api", func(t *testing.T) {
httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`}
client, err := NewDeleteRequestsClient("http://test-server", httpClient)
require.Nil(t, err)
deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID")
require.Nil(t, err)
require.Len(t, deleteRequests, 1)
require.Equal(t, "test-request", deleteRequests[0].RequestID)
require.Equal(t, "http://test-server/loki/api/v1/delete", httpClient.req.URL.String())
require.Equal(t, http.MethodGet, httpClient.req.Method)
require.Equal(t, "userID", httpClient.req.Header.Get("X-Scope-OrgID"))
})
t.Run("it caches the results", func(t *testing.T) {
httpClient := &mockHTTPClient{ret: `[{"request_id":"test-request"}]`}
client, err := NewDeleteRequestsClient("http://test-server", httpClient, WithRequestClientCacheDuration(100*time.Millisecond))
require.Nil(t, err)
deleteRequests, err := client.GetAllDeleteRequestsForUser(context.Background(), "userID")
require.Nil(t, err)
require.Equal(t, "test-request", deleteRequests[0].RequestID)
httpClient.ret = `[{"request_id":"different"}]`
deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID")
require.Nil(t, err)
require.Equal(t, "test-request", deleteRequests[0].RequestID)
time.Sleep(200 * time.Millisecond)
deleteRequests, err = client.GetAllDeleteRequestsForUser(context.Background(), "userID")
require.Nil(t, err)
require.Equal(t, "different", deleteRequests[0].RequestID)
client.Stop()
})
}
type mockHTTPClient struct {
ret string
req *http.Request
}
func (c *mockHTTPClient) Do(req *http.Request) (*http.Response, error) {
c.req = req
return &http.Response{
StatusCode: 200,
Body: io.NopCloser(strings.NewReader(c.ret)),
}, nil
}

@ -144,6 +144,9 @@
commonEnvs: [],
loki: {
common: {
compactor_address: 'http://compactor.%s.svc.cluster.local.:%d' % [$._config.namespace, $._config.http_listen_port],
},
server: {
graceful_shutdown_timeout: '5s',
http_server_idle_timeout: '120s',
@ -158,7 +161,6 @@
frontend: {
compress_responses: true,
log_queries_longer_than: '5s',
compactor_address: 'http://compactor.%s.svc.cluster.local:%d' % [$._config.namespace, $._config.http_listen_port],
},
frontend_worker: {
match_max_concurrent: true,

Loading…
Cancel
Save