Loki: Per Tenant Runtime Configs (#3460)

* Add per tenant configs mechanism

* Add per tenant logging of stream creation

* fix tests

* enable per tenant configs on push request logging.

* fixing up the stream log, changing log levels to debug
k44
Ed Welch 4 years ago committed by GitHub
parent 93f2b2d89b
commit 8012362674
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/loki/loki-local-config.yaml
  2. 5
      pkg/distributor/distributor.go
  3. 3
      pkg/distributor/distributor_test.go
  4. 50
      pkg/distributor/http.go
  5. 3
      pkg/distributor/http_test.go
  6. 23
      pkg/ingester/checkpoint_test.go
  7. 3
      pkg/ingester/flush_test.go
  8. 11
      pkg/ingester/ingester.go
  9. 5
      pkg/ingester/ingester_test.go
  10. 37
      pkg/ingester/instance.go
  11. 15
      pkg/ingester/instance_test.go
  12. 5
      pkg/ingester/recovery_test.go
  13. 14
      pkg/loki/loki.go
  14. 12
      pkg/loki/modules.go
  15. 15
      pkg/loki/runtime_config.go
  16. 5
      pkg/promtail/targets/lokipush/pushtarget.go
  17. 56
      pkg/util/runtime/config.go

@ -2,6 +2,7 @@ auth_enabled: false
server:
http_listen_port: 3100
grpc_listen_port: 9096
ingester:
wal:

@ -27,6 +27,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -65,6 +66,7 @@ type Distributor struct {
cfg Config
clientCfg client.Config
tenantConfigs *runtime.TenantConfigs
ingestersRing ring.ReadRing
validator *Validator
pool *ring_client.Pool
@ -82,7 +84,7 @@ type Distributor struct {
}
// New a distributor creates.
func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) {
func New(cfg Config, clientCfg client.Config, configs *runtime.TenantConfigs, ingestersRing ring.ReadRing, overrides *validation.Overrides, registerer prometheus.Registerer) (*Distributor, error) {
factory := cfg.factory
if factory == nil {
factory = func(addr string) (ring_client.PoolClient, error) {
@ -121,6 +123,7 @@ func New(cfg Config, clientCfg client.Config, ingestersRing ring.ReadRing, overr
d := Distributor{
cfg: cfg,
clientCfg: clientCfg,
tenantConfigs: configs,
ingestersRing: ingestersRing,
distributorsRing: distributorsRing,
validator: validator,

@ -31,6 +31,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
fe "github.com/grafana/loki/pkg/util/flagext"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -313,7 +314,7 @@ func prepare(t *testing.T, limits *validation.Limits, kvStore kv.Client, factory
}
}
d, err := New(distributorConfig, clientConfig, ingestersRing, overrides, nil)
d, err := New(distributorConfig, clientConfig, runtime.DefaultTenantConfigs(), ingestersRing, overrides, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), d))

@ -5,11 +5,13 @@ import (
"fmt"
"math"
"net/http"
"strings"
"time"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/dustin/go-humanize"
gokit "github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -43,29 +45,67 @@ const applicationJSON = "application/json"
// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
req, err := ParseRequest(r)
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := ParseRequest(logger, userID, r)
if err != nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusBadRequest,
"err", err,
)
}
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if d.tenantConfigs.LogPushRequestStreams(userID) {
var sb strings.Builder
for _, s := range req.Streams {
sb.WriteString(s.Labels)
}
level.Debug(logger).Log(
"msg", "push request streams",
"streams", sb.String(),
)
}
_, err = d.Push(r.Context(), req)
if err == nil {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request successful",
)
}
w.WriteHeader(http.StatusNoContent)
return
}
resp, ok := httpgrpc.HTTPResponseFromError(err)
if ok {
http.Error(w, string(resp.Body), int(resp.Code))
body := string(resp.Body)
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", resp.Code,
"err", body,
)
}
http.Error(w, body, int(resp.Code))
} else {
if d.tenantConfigs.LogPushRequest(userID) {
level.Debug(logger).Log(
"msg", "push request failed",
"code", http.StatusInternalServerError,
"err", err.Error(),
)
}
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}
func ParseRequest(r *http.Request) (*logproto.PushRequest, error) {
userID, _ := user.ExtractOrgID(r.Context())
logger := util_log.WithContext(r.Context(), util_log.Logger)
func ParseRequest(logger gokit.Logger, userID string, r *http.Request) (*logproto.PushRequest, error) {
var body lokiutil.SizeReader

@ -8,6 +8,7 @@ import (
"strings"
"testing"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/stretchr/testify/assert"
)
@ -75,7 +76,7 @@ func TestParseRequest(t *testing.T) {
if len(test.contentEncoding) > 0 {
request.Header.Add("Content-Encoding", test.contentEncoding)
}
data, err := ParseRequest(request)
data, err := ParseRequest(util_log.Logger, "", request)
if test.valid {
assert.Nil(t, err, "Should not give error for %d", index)
assert.NotNil(t, data, "Should give data for %d", index)

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -70,7 +71,7 @@ func TestIngesterWAL(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -113,7 +114,7 @@ func TestIngesterWAL(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -127,7 +128,7 @@ func TestIngesterWAL(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -152,7 +153,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -198,7 +199,7 @@ func TestIngesterWALIgnoresStreamLimits(t *testing.T) {
require.NoError(t, err)
// restart the ingester
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -258,7 +259,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -279,7 +280,7 @@ func TestIngesterWALBackpressureSegments(t *testing.T) {
expectCheckpoint(t, walDir, false, time.Second)
// restart the ingester, ensuring we replayed from WAL.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -303,7 +304,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
}
}
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err := New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -324,7 +325,7 @@ func TestIngesterWALBackpressureCheckpoint(t *testing.T) {
require.Nil(t, services.StopAndAwaitTerminated(context.Background(), i))
// restart the ingester, ensuring we can replay from the checkpoint as well.
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, nil)
i, err = New(ingesterConfig, client.Config{}, newStore(), limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
require.Nil(t, services.StartAndAwaitRunning(context.Background(), i))
@ -455,7 +456,7 @@ func Test_SeriesIterator(t *testing.T) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
for i := 0; i < 3; i++ {
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("%d", i), limiter, runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil)
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream1}}))
require.NoError(t, inst.Push(context.Background(), &logproto.PushRequest{Streams: []logproto.Stream{stream2}}))
instances = append(instances, inst)
@ -505,7 +506,7 @@ func Benchmark_SeriesIterator(b *testing.B) {
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
for i := range instances {
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, noopWAL{}, NilMetrics, nil)
inst := newInstance(defaultConfig(), fmt.Sprintf("instance %d", i), limiter, nil, noopWAL{}, NilMetrics, nil)
require.NoError(b,
inst.Push(context.Background(), &logproto.PushRequest{

@ -12,6 +12,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
@ -257,7 +258,7 @@ func newTestStore(t require.TestingT, cfg Config, walOverride WAL) (*testStore,
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ing, err := New(cfg, client.Config{}, store, limits, nil)
ing, err := New(cfg, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))

@ -34,6 +34,7 @@ import (
"github.com/grafana/loki/pkg/storage/stores/shipper"
errUtil "github.com/grafana/loki/pkg/util"
listutil "github.com/grafana/loki/pkg/util"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -121,8 +122,9 @@ func (cfg *Config) Validate() error {
type Ingester struct {
services.Service
cfg Config
clientConfig client.Config
cfg Config
clientConfig client.Config
tenantConfigs *runtime.TenantConfigs
shutdownMtx sync.Mutex // Allows processes to grab a lock and prevent a shutdown
instancesMtx sync.RWMutex
@ -168,7 +170,7 @@ type ChunkStore interface {
}
// New makes a new Ingester.
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, registerer prometheus.Registerer) (*Ingester, error) {
func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *validation.Overrides, configs *runtime.TenantConfigs, registerer prometheus.Registerer) (*Ingester, error) {
if cfg.ingesterClientFactory == nil {
cfg.ingesterClientFactory = client.New
}
@ -178,6 +180,7 @@ func New(cfg Config, clientConfig client.Config, store ChunkStore, limits *valid
i := &Ingester{
cfg: cfg,
clientConfig: clientConfig,
tenantConfigs: configs,
instances: map[string]*instance{},
store: store,
periodicConfigs: store.GetSchemaConfigs(),
@ -401,7 +404,7 @@ func (i *Ingester) getOrCreateInstance(instanceID string) *instance {
defer i.instancesMtx.Unlock()
inst, ok = i.instances[instanceID]
if !ok {
inst = newInstance(&i.cfg, instanceID, i.limiter, i.wal, i.metrics, i.flushOnShutdownSwitch)
inst = newInstance(&i.cfg, instanceID, i.limiter, i.tenantConfigs, i.wal, i.metrics, i.flushOnShutdownSwitch)
i.instances[instanceID] = inst
}
return inst

@ -25,6 +25,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -37,7 +38,7 @@ func TestIngester(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck
@ -219,7 +220,7 @@ func TestIngesterStreamLimitExceeded(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, overrides, nil)
i, err := New(ingesterConfig, client.Config{}, store, overrides, runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
defer services.StopAndAwaitTerminated(context.Background(), i) //nolint:errcheck

@ -28,6 +28,7 @@ import (
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -79,6 +80,7 @@ type instance struct {
tailerMtx sync.RWMutex
limiter *Limiter
configs *runtime.TenantConfigs
wal WAL
@ -89,14 +91,7 @@ type instance struct {
metrics *ingesterMetrics
}
func newInstance(
cfg *Config,
instanceID string,
limiter *Limiter,
wal WAL,
metrics *ingesterMetrics,
flushOnShutdownSwitch *OnceSwitch,
) *instance {
func newInstance(cfg *Config, instanceID string, limiter *Limiter, configs *runtime.TenantConfigs, wal WAL, metrics *ingesterMetrics, flushOnShutdownSwitch *OnceSwitch) *instance {
i := &instance{
cfg: cfg,
streams: map[string]*stream{},
@ -110,6 +105,7 @@ func newInstance(
tailers: map[uint32]*tailer{},
limiter: limiter,
configs: configs,
wal: wal,
metrics: metrics,
@ -209,6 +205,15 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
}
if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, exceeded limit",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}
validation.DiscardedSamples.WithLabelValues(validation.StreamLimit, i.instanceID).Add(float64(len(pushReqStream.Entries)))
bytes := 0
for _, e := range pushReqStream.Entries {
@ -220,6 +225,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
labels, err := logql.ParseLabels(pushReqStream.Labels)
if err != nil {
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "failed to create stream, failed to parse labels",
"org_id", i.instanceID,
"err", err,
"stream", pushReqStream.Labels,
)
}
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}
fp := i.getHashForLabels(labels)
@ -244,6 +257,14 @@ func (i *instance) getOrCreateStream(pushReqStream logproto.Stream, lock bool, r
i.streamsCreatedTotal.Inc()
i.addTailersToNewStream(stream)
if i.configs.LogStreamCreation(i.instanceID) {
level.Debug(util_log.Logger).Log(
"msg", "successfully created stream",
"org_id", i.instanceID,
"stream", pushReqStream.Labels,
)
}
return stream, nil
}

@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
loki_runtime "github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -38,7 +39,7 @@ func TestLabelsCollisions(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(defaultConfig(), "test", limiter, noopWAL{}, nil, &OnceSwitch{})
i := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, nil, &OnceSwitch{})
// avoid entries from the future.
tt := time.Now().Add(-5 * time.Minute)
@ -65,7 +66,7 @@ func TestConcurrentPushes(t *testing.T) {
require.NoError(t, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{})
const (
concurrent = 10
@ -123,7 +124,7 @@ func TestSyncPeriod(t *testing.T) {
minUtil = 0.20
)
inst := newInstance(defaultConfig(), "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
inst := newInstance(defaultConfig(), "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{})
lbls := makeRandomLabels()
tt := time.Now()
@ -163,7 +164,7 @@ func Test_SeriesQuery(t *testing.T) {
cfg.SyncPeriod = 1 * time.Minute
cfg.SyncMinUtilization = 0.20
instance := newInstance(cfg, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
instance := newInstance(cfg, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{})
currentTime := time.Now()
@ -273,7 +274,7 @@ func Benchmark_PushInstance(b *testing.B) {
require.NoError(b, err)
limiter := NewLimiter(limits, &ringCountMock{count: 1}, 1)
i := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
i := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{})
ctx := context.Background()
for n := 0; n < b.N; n++ {
@ -315,7 +316,7 @@ func Benchmark_instance_addNewTailer(b *testing.B) {
ctx := context.Background()
inst := newInstance(&Config{}, "test", limiter, noopWAL{}, NilMetrics, &OnceSwitch{})
inst := newInstance(&Config{}, "test", limiter, loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, &OnceSwitch{})
t, err := newTailer("foo", `{namespace="foo",pod="bar",instance=~"10.*"}`, nil)
require.NoError(b, err)
for i := 0; i < 10000; i++ {
@ -365,7 +366,7 @@ func Test_Iterator(t *testing.T) {
defaultLimits := defaultLimitsTestConfig()
overrides, err := validation.NewOverrides(defaultLimits, nil)
require.NoError(t, err)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), noopWAL{}, NilMetrics, nil)
instance := newInstance(&ingesterConfig, "fake", NewLimiter(overrides, &ringCountMock{count: 1}, 1), loki_runtime.DefaultTenantConfigs(), noopWAL{}, NilMetrics, nil)
ctx := context.TODO()
direction := logproto.BACKWARD
limit := uint32(2)

@ -17,6 +17,7 @@ import (
"github.com/grafana/loki/pkg/ingester/client"
"github.com/grafana/loki/pkg/logproto"
loki_runtime "github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -205,7 +206,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
chunks: map[string][]chunk.Chunk{},
}
i, err := New(ingesterConfig, client.Config{}, store, limits, nil)
i, err := New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
mkSample := func(i int) *logproto.PushRequest {
@ -239,7 +240,7 @@ func TestSeriesRecoveryNoDuplicates(t *testing.T) {
require.Equal(t, false, iter.Next())
// create a new ingester now
i, err = New(ingesterConfig, client.Config{}, store, limits, nil)
i, err = New(ingesterConfig, client.Config{}, store, limits, loki_runtime.DefaultTenantConfigs(), nil)
require.NoError(t, err)
// recover the checkpointed series

@ -12,6 +12,7 @@ import (
"github.com/felixge/fgprof"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/modules"
@ -145,6 +146,7 @@ type Loki struct {
Server *server.Server
ring *ring.Ring
overrides *validation.Overrides
tenantConfigs *runtime.TenantConfigs
distributor *distributor.Distributor
ingester *ingester.Ingester
querier *querier.Querier
@ -344,6 +346,7 @@ func (t *Loki) setupModuleManager() error {
mm.RegisterModule(MemberlistKV, t.initMemberlistKV)
mm.RegisterModule(Ring, t.initRing)
mm.RegisterModule(Overrides, t.initOverrides)
mm.RegisterModule(TenantConfigs, t.initTenantConfigs)
mm.RegisterModule(Distributor, t.initDistributor)
mm.RegisterModule(Store, t.initStore)
mm.RegisterModule(Ingester, t.initIngester)
@ -360,12 +363,13 @@ func (t *Loki) setupModuleManager() error {
deps := map[string][]string{
Ring: {RuntimeConfig, Server, MemberlistKV},
Overrides: {RuntimeConfig},
Distributor: {Ring, Server, Overrides},
TenantConfigs: {RuntimeConfig},
Distributor: {Ring, Server, Overrides, TenantConfigs},
Store: {Overrides},
Ingester: {Store, Server, MemberlistKV},
Querier: {Store, Ring, Server, IngesterQuerier},
QueryFrontend: {Server, Overrides},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides},
Ingester: {Store, Server, MemberlistKV, TenantConfigs},
Querier: {Store, Ring, Server, IngesterQuerier, TenantConfigs},
QueryFrontend: {Server, Overrides, TenantConfigs},
Ruler: {Ring, Server, Store, RulerStorage, IngesterQuerier, Overrides, TenantConfigs},
TableManager: {Server},
Compactor: {Server},
IngesterQuerier: {Ring},

@ -16,6 +16,7 @@ import (
"github.com/grafana/loki/pkg/ruler/manager"
"github.com/grafana/loki/pkg/storage/stores/shipper/compactor"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
@ -60,6 +61,7 @@ const (
Ring string = "ring"
RuntimeConfig string = "runtime-config"
Overrides string = "overrides"
TenantConfigs string = "tenant-configs"
Server string = "server"
Distributor string = "distributor"
Ingester string = "ingester"
@ -140,11 +142,17 @@ func (t *Loki) initOverrides() (_ services.Service, err error) {
return nil, err
}
func (t *Loki) initTenantConfigs() (_ services.Service, err error) {
t.tenantConfigs, err = runtime.NewTenantConfigs(tenantConfigFromRuntimeConfig(t.runtimeConfig))
// tenantConfigs are not a service, since they don't have any operational state.
return nil, err
}
func (t *Loki) initDistributor() (services.Service, error) {
t.cfg.Distributor.DistributorRing.KVStore.Multi.ConfigProvider = multiClientRuntimeConfigChannel(t.runtimeConfig)
t.cfg.Distributor.DistributorRing.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
var err error
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.ring, t.overrides, prometheus.DefaultRegisterer)
t.distributor, err = distributor.New(t.cfg.Distributor, t.cfg.IngesterClient, t.tenantConfigs, t.ring, t.overrides, prometheus.DefaultRegisterer)
if err != nil {
return nil, err
}
@ -218,7 +226,7 @@ func (t *Loki) initIngester() (_ services.Service, err error) {
t.cfg.Ingester.LifecyclerConfig.RingConfig.KVStore.MemberlistKV = t.memberlistKV.GetMemberlistKV
t.cfg.Ingester.LifecyclerConfig.ListenPort = t.cfg.Server.GRPCListenPort
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, prometheus.DefaultRegisterer)
t.ingester, err = ingester.New(t.cfg.Ingester, t.cfg.IngesterClient, t.store, t.overrides, t.tenantConfigs, prometheus.DefaultRegisterer)
if err != nil {
return
}

@ -7,6 +7,7 @@ import (
"github.com/cortexproject/cortex/pkg/util/runtimeconfig"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/util/runtime"
"github.com/grafana/loki/pkg/util/validation"
)
@ -15,6 +16,7 @@ import (
// These values are then pushed to the components that are interested in them.
type runtimeConfigValues struct {
TenantLimits map[string]*validation.Limits `yaml:"overrides"`
TenantConfig map[string]*runtime.Config `yaml:"configs"`
Multi kv.MultiRuntimeConfig `yaml:"multi_kv_config"`
}
@ -45,6 +47,19 @@ func tenantLimitsFromRuntimeConfig(c *runtimeconfig.Manager) validation.TenantLi
}
}
func tenantConfigFromRuntimeConfig(c *runtimeconfig.Manager) runtime.TenantConfig {
if c == nil {
return nil
}
return func(userID string) *runtime.Config {
cfg, ok := c.GetConfig().(*runtimeConfigValues)
if !ok || cfg == nil {
return nil
}
return cfg.TenantConfig[userID]
}
}
func multiClientRuntimeConfigChannel(manager *runtimeconfig.Manager) func() <-chan kv.MultiRuntimeConfig {
if manager == nil {
return nil

@ -14,6 +14,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/distributor"
"github.com/grafana/loki/pkg/logproto"
@ -102,7 +103,9 @@ func (t *PushTarget) run() error {
}
func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
req, err := distributor.ParseRequest(r)
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
req, err := distributor.ParseRequest(logger, userID, r)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())
http.Error(w, err.Error(), http.StatusBadRequest)

@ -0,0 +1,56 @@
package runtime
type Config struct {
LogStreamCreation bool `yaml:"log_stream_creation"`
LogPushRequest bool `yaml:"log_push_request"`
LogPushRequestStreams bool `yaml:"log_push_request_streams"`
}
// TenantConfig is a function that returns configs for given tenant, or
// nil, if there are no tenant-specific configs.
type TenantConfig func(userID string) *Config
// TenantConfigs periodically fetch a set of per-user configs, and provides convenience
// functions for fetching the correct value.
type TenantConfigs struct {
defaultConfig *Config
tenantConfig TenantConfig
}
// DefaultTenantConfigs creates and returns a new TenantConfigs with the defaults populated.
func DefaultTenantConfigs() *TenantConfigs {
return &TenantConfigs{
defaultConfig: &Config{},
tenantConfig: nil,
}
}
// NewTenantConfig makes a new TenantConfigs
func NewTenantConfigs(tenantConfig TenantConfig) (*TenantConfigs, error) {
return &TenantConfigs{
defaultConfig: DefaultTenantConfigs().defaultConfig,
tenantConfig: tenantConfig,
}, nil
}
func (o *TenantConfigs) getOverridesForUser(userID string) *Config {
if o.tenantConfig != nil {
l := o.tenantConfig(userID)
if l != nil {
return l
}
}
return o.defaultConfig
}
func (o *TenantConfigs) LogStreamCreation(userID string) bool {
return o.getOverridesForUser(userID).LogStreamCreation
}
func (o *TenantConfigs) LogPushRequest(userID string) bool {
return o.getOverridesForUser(userID).LogPushRequest
}
func (o *TenantConfigs) LogPushRequestStreams(userID string) bool {
return o.getOverridesForUser(userID).LogPushRequestStreams
}
Loading…
Cancel
Save