From 8012362674568379a3871ff8c4a2bfd1ddba7ad1 Mon Sep 17 00:00:00 2001 From: Ed Welch Date: Tue, 9 Mar 2021 21:43:50 -0500 Subject: [PATCH] Loki: Per Tenant Runtime Configs (#3460) * Add per tenant configs mechanism * Add per tenant logging of stream creation * fix tests * enable per tenant configs on push request logging. * fixing up the stream log, changing log levels to debug --- cmd/loki/loki-local-config.yaml | 1 + pkg/distributor/distributor.go | 5 +- pkg/distributor/distributor_test.go | 3 +- pkg/distributor/http.go | 50 ++++++++++++++++-- pkg/distributor/http_test.go | 3 +- pkg/ingester/checkpoint_test.go | 23 +++++---- pkg/ingester/flush_test.go | 3 +- pkg/ingester/ingester.go | 11 ++-- pkg/ingester/ingester_test.go | 5 +- pkg/ingester/instance.go | 37 +++++++++++--- pkg/ingester/instance_test.go | 15 +++--- pkg/ingester/recovery_test.go | 5 +- pkg/loki/loki.go | 14 ++++-- pkg/loki/modules.go | 12 ++++- pkg/loki/runtime_config.go | 15 ++++++ pkg/promtail/targets/lokipush/pushtarget.go | 5 +- pkg/util/runtime/config.go | 56 +++++++++++++++++++++ 17 files changed, 212 insertions(+), 51 deletions(-) create mode 100644 pkg/util/runtime/config.go diff --git a/cmd/loki/loki-local-config.yaml b/cmd/loki/loki-local-config.yaml index 0b45802732..6d39f98ae2 100644 --- a/cmd/loki/loki-local-config.yaml +++ b/cmd/loki/loki-local-config.yaml @@ -2,6 +2,7 @@ auth_enabled: false server: http_listen_port: 3100 + grpc_listen_port: 9096 ingester: wal: diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 7d916954ef..e20ec7a233 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -27,6 +27,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -65,6 +66,7 @@ type Distributor struct { cfg Config clientCfg client.Config + tenantConfigs *runtime.TenantConfigs ingestersRing ring.ReadRing validator *Validator pool *ring_client.Pool @@ -82,7 +84,7 @@ type Distributor struct { } // New a distributor creates. -func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) { +func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) { factory := cfg.factory if factory == nil { factory = func(addr string) (ring_client.PoolClient, error) { @@ -121,6 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr d := Distributor{ cfg: cfg, clientCfg: clientCfg, + tenantConfigs: configs, ingestersRing: ingestersRing, distributorsRing: distributorsRing, validator: validator, diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 5b4ac06dd5..9dfab1dfc4 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -31,6 +31,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" fe "github.com/grafana/loki/pkg/util/flagext" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -313,7 +314,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory } } - d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil) + d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), d)) diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index c663488a4c..8928ca066b 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -5,11 +5,13 @@ import ( "fmt" "math" "net/http" + "strings" "time" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/dustin/go-humanize" + gokit "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -43,29 +45,67 @@ const applicationJSON = "application/json" // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { - req, err := ParseRequest(r) + logger := util_log.WithContext(r.Context(), util_log.Logger) + userID, _ := user.ExtractOrgID(r.Context()) + req, err := ParseRequest(logger, userID, r) if err != nil { + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( + "msg", "push request failed", + "code", http.StatusBadRequest, + "err", err, + ) + } http.Error(w, err.Error(), http.StatusBadRequest) return } + if d.tenantConfigs.LogPushRequestStreams(userID) { + var sb strings.Builder + for _, s := range req.Streams { + sb.WriteString(s.Labels) + } + level.Debug(logger).Log( + "msg", "push request streams", + "streams", sb.String(), + ) + } + _, err = d.Push(r.Context(), req) if err == nil { + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( + "msg", "push request successful", + ) + } w.WriteHeader(http.StatusNoContent) return } resp, ok := httpgrpc.HTTPResponseFromError(err) if ok { - http.Error(w, string(resp.Body), int(resp.Code)) + body := string(resp.Body) + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( + "msg", "push request failed", + "code", resp.Code, + "err", body, + ) + } + http.Error(w, body, int(resp.Code)) } else { + if d.tenantConfigs.LogPushRequest(userID) { + level.Debug(logger).Log( + "msg", "push request failed", + "code", http.StatusInternalServerError, + "err", err.Error(), + ) + } http.Error(w, err.Error(), http.StatusInternalServerError) } } -func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { - userID, _ := user.ExtractOrgID(r.Context()) - logger := util_log.WithContext(r.Context(), util_log.Logger) +func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) { var body lokiutil.SizeReader diff --git a/pkg/distributor/http_test.go b/pkg/distributor/http_test.go index a17ebffcb7..0f68eb5e08 100644 --- a/pkg/distributor/http_test.go +++ b/pkg/distributor/http_test.go @@ -8,6 +8,7 @@ import ( "strings" "testing" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/stretchr/testify/assert" ) @@ -75,7 +76,7 @@ func TestParseRequest(t *testing.T) { if len(test.contentEncoding) > 0 { request.Header.Add("Content-Encoding", test.contentEncoding) } - data, err := ParseRequest(request) + data, err := ParseRequest(util_log.Logger, "", request) if test.valid { assert.Nil(t, err, "Should not give error for %d", index) assert.NotNil(t, data, "Should give data for %d", index) diff --git a/pkg/ingester/checkpoint_test.go b/pkg/ingester/checkpoint_test.go index 5aadc50290..e0063067e2 100644 --- a/pkg/ingester/checkpoint_test.go +++ b/pkg/ingester/checkpoint_test.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -70,7 +71,7 @@ func TestIngesterWAL(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -113,7 +114,7 @@ func TestIngesterWAL(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -127,7 +128,7 @@ func TestIngesterWAL(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -152,7 +153,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -198,7 +199,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) { require.NoError(t, err) // restart the ingester - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -258,7 +259,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -279,7 +280,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) { expectCheckpoint(t, walDir, false, time.Second) // restart the ingester, ensuring we replayed from WAL. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -303,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { } } - i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -324,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) { require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i)) // restart the ingester, ensuring we can replay from the checkpoint as well. - i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil) + i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck require.Nil(t, services.StartAndAwaitRunning(context.Background(), i)) @@ -455,7 +456,7 @@ func Test_SeriesIterator(t *testing.T) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := 0; i < 3; i++ { - inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}})) require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}})) instances = append(instances, inst) @@ -505,7 +506,7 @@ func Benchmark_SeriesIterator(b *testing.B) { limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) for i := range instances { - inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil) + inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil) require.NoError(b, inst.Push(context.Background(), &logproto.PushRequest{ diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 982d4bce1f..f59871ed97 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -12,6 +12,7 @@ import ( "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/log" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" @@ -257,7 +258,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore, limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil) require.NoError(t, err) - ing, err := New(cfg, client.Config{}, store, limits, nil) + ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing)) diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 9a05f4186b..81cf22dada 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -34,6 +34,7 @@ import ( "github.com/grafana/loki/pkg/storage/stores/shipper" errUtil "github.com/grafana/loki/pkg/util" listutil "github.com/grafana/loki/pkg/util" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -121,8 +122,9 @@ func (cfg *Config) Validate() error { type Ingester struct { services.Service - cfg Config - clientConfig client.Config + cfg Config + clientConfig client.Config + tenantConfigs *runtime.TenantConfigs shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown instancesMtx sync.RWMutex @@ -168,7 +170,7 @@ type ChunkStore interface { } // New makes a new Ingester. -func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) { +func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) { if cfg.ingesterClientFactory == nil { cfg.ingesterClientFactory = client.New } @@ -178,6 +180,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid i := &Ingester{ cfg: cfg, clientConfig: clientConfig, + tenantConfigs: configs, instances: map[string]*instance{}, store: store, periodicConfigs: store.GetSchemaConfigs(), @@ -401,7 +404,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance { defer i.instancesMtx.Unlock() inst, ok = i.instances[instanceID] if !ok { - inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch) + inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch) i.instances[instanceID] = inst } return inst diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 98b8bba0b6..36743044af 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -37,7 +38,7 @@ func TestIngester(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck @@ -219,7 +220,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, overrides, nil) + i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 268e002d79..c98ebb8cf9 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -28,6 +28,7 @@ import ( "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" "github.com/grafana/loki/pkg/logql/stats" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -79,6 +80,7 @@ type instance struct { tailerMtx sync.RWMutex limiter *Limiter + configs *runtime.TenantConfigs wal WAL @@ -89,14 +91,7 @@ type instance struct { metrics *ingesterMetrics } -func newInstance( - cfg *Config, - instanceID string, - limiter *Limiter, - wal WAL, - metrics *ingesterMetrics, - flushOnShutdownSwitch *OnceSwitch, -) *instance { +func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance { i := &instance{ cfg: cfg, streams: map[string]*stream{}, @@ -110,6 +105,7 @@ func newInstance( tailers: map[uint32]*tailer{}, limiter: limiter, + configs: configs, wal: wal, metrics: metrics, @@ -209,6 +205,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r } if err != nil { + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "failed to create stream, exceeded limit", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } + validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries))) bytes := 0 for _, e := range pushReqStream.Entries { @@ -220,6 +225,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r labels, err := logql.ParseLabels(pushReqStream.Labels) if err != nil { + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "failed to create stream, failed to parse labels", + "org_id", i.instanceID, + "err", err, + "stream", pushReqStream.Labels, + ) + } return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } fp := i.getHashForLabels(labels) @@ -244,6 +257,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r i.streamsCreatedTotal.Inc() i.addTailersToNewStream(stream) + if i.configs.LogStreamCreation(i.instanceID) { + level.Debug(util_log.Logger).Log( + "msg", "successfully created stream", + "org_id", i.instanceID, + "stream", pushReqStream.Labels, + ) + } + return stream, nil } diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 7ce0bcf294..c7e04fc255 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" + loki_runtime "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -38,7 +39,7 @@ func TestLabelsCollisions(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{}) + i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{}) // avoid entries from the future. tt := time.Now().Add(-5 * time.Minute) @@ -65,7 +66,7 @@ func TestConcurrentPushes(t *testing.T) { require.NoError(t, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) const ( concurrent = 10 @@ -123,7 +124,7 @@ func TestSyncPeriod(t *testing.T) { minUtil = 0.20 ) - inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) lbls := makeRandomLabels() tt := time.Now() @@ -163,7 +164,7 @@ func Test_SeriesQuery(t *testing.T) { cfg.SyncPeriod = 1 * time.Minute cfg.SyncMinUtilization = 0.20 - instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) currentTime := time.Now() @@ -273,7 +274,7 @@ func Benchmark_PushInstance(b *testing.B) { require.NoError(b, err) limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1) - i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) ctx := context.Background() for n := 0; n < b.N; n++ { @@ -315,7 +316,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) { ctx := context.Background() - inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{}) + inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{}) t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil) require.NoError(b, err) for i := 0; i < 10000; i++ { @@ -365,7 +366,7 @@ func Test_Iterator(t *testing.T) { defaultLimits := defaultLimitsTestConfig() overrides, err := validation.NewOverrides(defaultLimits, nil) require.NoError(t, err) - instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), noopWAL{}, NilMetrics, nil) + instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil) ctx := context.TODO() direction := logproto.BACKWARD limit := uint32(2) diff --git a/pkg/ingester/recovery_test.go b/pkg/ingester/recovery_test.go index 39a3f9149e..a26533f633 100644 --- a/pkg/ingester/recovery_test.go +++ b/pkg/ingester/recovery_test.go @@ -17,6 +17,7 @@ import ( "github.com/grafana/loki/pkg/ingester/client" "github.com/grafana/loki/pkg/logproto" + loki_runtime "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -205,7 +206,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { chunks: map[string][]chunk.Chunk{}, } - i, err := New(ingesterConfig, client.Config{}, store, limits, nil) + i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) mkSample := func(i int) *logproto.PushRequest { @@ -239,7 +240,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) { require.Equal(t, false, iter.Next()) // create a new ingester now - i, err = New(ingesterConfig, client.Config{}, store, limits, nil) + i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil) require.NoError(t, err) // recover the checkpointed series diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 2e2d384a9a..12ba1d4f83 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -12,6 +12,7 @@ import ( "github.com/felixge/fgprof" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/modules" @@ -145,6 +146,7 @@ type Loki struct { Server *server.Server ring *ring.Ring overrides *validation.Overrides + tenantConfigs *runtime.TenantConfigs distributor *distributor.Distributor ingester *ingester.Ingester querier *querier.Querier @@ -344,6 +346,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(MemberlistKV, t.initMemberlistKV) mm.RegisterModule(Ring, t.initRing) mm.RegisterModule(Overrides, t.initOverrides) + mm.RegisterModule(TenantConfigs, t.initTenantConfigs) mm.RegisterModule(Distributor, t.initDistributor) mm.RegisterModule(Store, t.initStore) mm.RegisterModule(Ingester, t.initIngester) @@ -360,12 +363,13 @@ func (t *Loki) setupModuleManager() error { deps := map[string][]string{ Ring: {RuntimeConfig, Server, MemberlistKV}, Overrides: {RuntimeConfig}, - Distributor: {Ring, Server, Overrides}, + TenantConfigs: {RuntimeConfig}, + Distributor: {Ring, Server, Overrides, TenantConfigs}, Store: {Overrides}, - Ingester: {Store, Server, MemberlistKV}, - Querier: {Store, Ring, Server, IngesterQuerier}, - QueryFrontend: {Server, Overrides}, - Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides}, + Ingester: {Store, Server, MemberlistKV, TenantConfigs}, + Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs}, + QueryFrontend: {Server, Overrides, TenantConfigs}, + Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs}, TableManager: {Server}, Compactor: {Server}, IngesterQuerier: {Ring}, diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 52f522ac1a..51b241bdf1 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -16,6 +16,7 @@ import ( "github.com/grafana/loki/pkg/ruler/manager" "github.com/grafana/loki/pkg/storage/stores/shipper/compactor" + "github.com/grafana/loki/pkg/util/runtime" "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" @@ -60,6 +61,7 @@ const ( Ring string = "ring" RuntimeConfig string = "runtime-config" Overrides string = "overrides" + TenantConfigs string = "tenant-configs" Server string = "server" Distributor string = "distributor" Ingester string = "ingester" @@ -140,11 +142,17 @@ func (t *Loki) initOverrides() (_ services.Service, err error) { return nil, err } +func (t *Loki) initTenantConfigs() (_ services.Service, err error) { + t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig)) + // tenantConfigs are not a service, since they don't have any operational state. + return nil, err +} + func (t *Loki) initDistributor() (services.Service, error) { t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig) t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV var err error - t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides, prometheus.DefaultRegisterer) + t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer) if err != nil { return nil, err } @@ -218,7 +226,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) { t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort - t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer) + t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer) if err != nil { return } diff --git a/pkg/loki/runtime_config.go b/pkg/loki/runtime_config.go index 99ea284f34..2f753df774 100644 --- a/pkg/loki/runtime_config.go +++ b/pkg/loki/runtime_config.go @@ -7,6 +7,7 @@ import ( "github.com/cortexproject/cortex/pkg/util/runtimeconfig" "gopkg.in/yaml.v2" + "github.com/grafana/loki/pkg/util/runtime" "github.com/grafana/loki/pkg/util/validation" ) @@ -15,6 +16,7 @@ import ( // These values are then pushed to the components that are interested in them. type runtimeConfigValues struct { TenantLimits map[string]*validation.Limits `yaml:"overrides"` + TenantConfig map[string]*runtime.Config `yaml:"configs"` Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"` } @@ -45,6 +47,19 @@ func tenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLi } } +func tenantConfigFromRuntimeConfig(c *runtimeconfig.Manager) runtime.TenantConfig { + if c == nil { + return nil + } + return func(userID string) *runtime.Config { + cfg, ok := c.GetConfig().(*runtimeConfigValues) + if !ok || cfg == nil { + return nil + } + return cfg.TenantConfig[userID] + } +} + func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-chan kv.MultiRuntimeConfig { if manager == nil { return nil diff --git a/pkg/promtail/targets/lokipush/pushtarget.go b/pkg/promtail/targets/lokipush/pushtarget.go index 9c3ea6d936..a2d488dbee 100644 --- a/pkg/promtail/targets/lokipush/pushtarget.go +++ b/pkg/promtail/targets/lokipush/pushtarget.go @@ -14,6 +14,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/pkg/relabel" "github.com/weaveworks/common/server" + "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/distributor" "github.com/grafana/loki/pkg/logproto" @@ -102,7 +103,9 @@ func (t *PushTarget) run() error { } func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { - req, err := distributor.ParseRequest(r) + logger := util_log.WithContext(r.Context(), util_log.Logger) + userID, _ := user.ExtractOrgID(r.Context()) + req, err := distributor.ParseRequest(logger, userID, r) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) http.Error(w, err.Error(), http.StatusBadRequest) diff --git a/pkg/util/runtime/config.go b/pkg/util/runtime/config.go new file mode 100644 index 0000000000..110a9fce2a --- /dev/null +++ b/pkg/util/runtime/config.go @@ -0,0 +1,56 @@ +package runtime + +type Config struct { + LogStreamCreation bool `yaml:"log_stream_creation"` + LogPushRequest bool `yaml:"log_push_request"` + LogPushRequestStreams bool `yaml:"log_push_request_streams"` +} + +// TenantConfig is a function that returns configs for given tenant, or +// nil, if there are no tenant-specific configs. +type TenantConfig func(userID string) *Config + +// TenantConfigs periodically fetch a set of per-user configs, and provides convenience +// functions for fetching the correct value. +type TenantConfigs struct { + defaultConfig *Config + tenantConfig TenantConfig +} + +// DefaultTenantConfigs creates and returns a new TenantConfigs with the defaults populated. +func DefaultTenantConfigs() *TenantConfigs { + return &TenantConfigs{ + defaultConfig: &Config{}, + tenantConfig: nil, + } +} + +// NewTenantConfig makes a new TenantConfigs +func NewTenantConfigs(tenantConfig TenantConfig) (*TenantConfigs, error) { + return &TenantConfigs{ + defaultConfig: DefaultTenantConfigs().defaultConfig, + tenantConfig: tenantConfig, + }, nil +} + +func (o *TenantConfigs) getOverridesForUser(userID string) *Config { + if o.tenantConfig != nil { + l := o.tenantConfig(userID) + if l != nil { + return l + } + } + return o.defaultConfig +} + +func (o *TenantConfigs) LogStreamCreation(userID string) bool { + return o.getOverridesForUser(userID).LogStreamCreation +} + +func (o *TenantConfigs) LogPushRequest(userID string) bool { + return o.getOverridesForUser(userID).LogPushRequest +} + +func (o *TenantConfigs) LogPushRequestStreams(userID string) bool { + return o.getOverridesForUser(userID).LogPushRequestStreams +}