From b830d65bdde8a56076100253e55b34e7b6cc01fa Mon Sep 17 00:00:00 2001 From: Nick Pillitteri <56quarters@users.noreply.github.com> Date: Thu, 8 Jul 2021 15:29:31 -0400 Subject: [PATCH] Use the Cortex wrapper for getting tenant ID from a context (#3973) This change replaces all uses of the weaveworks/common method for getting tenant/org ID with the Cortex wrapper which performs some extra validation. Signed-off-by: Nick Pillitteri --- .../promtail/targets/lokipush/pushtarget.go | 4 +-- cmd/migrate/main.go | 3 ++- pkg/distributor/distributor.go | 3 ++- pkg/distributor/http.go | 4 +-- pkg/ingester/flush.go | 10 ++++---- pkg/ingester/flush_test.go | 3 ++- pkg/ingester/ingester.go | 25 ++++++++----------- pkg/ingester/ingester_test.go | 3 ++- pkg/logql/engine.go | 4 +-- pkg/querier/http.go | 4 +-- pkg/querier/querier.go | 12 ++++----- pkg/querier/queryrange/limits.go | 3 ++- pkg/querier/queryrange/roundtrip.go | 4 +-- pkg/querier/queryrange/split_by_interval.go | 4 +-- pkg/storage/store.go | 4 +-- 15 files changed, 45 insertions(+), 45 deletions(-) diff --git a/clients/pkg/promtail/targets/lokipush/pushtarget.go b/clients/pkg/promtail/targets/lokipush/pushtarget.go index 42eabf94f3..d1008cf7f2 100644 --- a/clients/pkg/promtail/targets/lokipush/pushtarget.go +++ b/clients/pkg/promtail/targets/lokipush/pushtarget.go @@ -7,6 +7,7 @@ import ( "strings" "time" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" @@ -16,7 +17,6 @@ import ( "github.com/prometheus/prometheus/pkg/relabel" promql_parser "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/server" - "github.com/weaveworks/common/user" "github.com/grafana/loki/clients/pkg/promtail/api" "github.com/grafana/loki/clients/pkg/promtail/scrapeconfig" @@ -106,7 +106,7 @@ func (t *PushTarget) run() error { func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) - userID, _ := user.ExtractOrgID(r.Context()) + userID, _ := tenant.TenantID(r.Context()) req, err := push.ParseRequest(logger, userID, r, nil) if err != nil { level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error()) diff --git a/cmd/migrate/main.go b/cmd/migrate/main.go index 2147bd0a81..7b3efebd8b 100644 --- a/cmd/migrate/main.go +++ b/cmd/migrate/main.go @@ -13,6 +13,7 @@ import ( "time" cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" @@ -134,7 +135,7 @@ func main() { ctx := context.Background() // This is a little weird but it was the easiest way to guarantee the userID is in the right format ctx = user.InjectOrgID(ctx, *source) - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { panic(err) } diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 80b1f51350..6fc896ac0f 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -9,6 +9,7 @@ import ( cortex_distributor "github.com/cortexproject/cortex/pkg/distributor" "github.com/cortexproject/cortex/pkg/ring" ring_client "github.com/cortexproject/cortex/pkg/ring/client" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/limiter" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -191,7 +192,7 @@ type pushTracker struct { // Push a set of streams. func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } diff --git a/pkg/distributor/http.go b/pkg/distributor/http.go index 880dce1067..70a87de5e0 100644 --- a/pkg/distributor/http.go +++ b/pkg/distributor/http.go @@ -4,10 +4,10 @@ import ( "net/http" "strings" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/loghttp/push" ) @@ -15,7 +15,7 @@ import ( // PushHandler reads a snappy-compressed proto from the HTTP body. func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { logger := util_log.WithContext(r.Context(), util_log.Logger) - userID, _ := user.ExtractOrgID(r.Context()) + userID, _ := tenant.TenantID(r.Context()) req, err := push.ParseRequest(logger, userID, r, d.tenantsRetention) if err != nil { if d.tenantConfigs.LogPushRequest(userID) { diff --git a/pkg/ingester/flush.go b/pkg/ingester/flush.go index 883690cbcc..d46727aa1d 100644 --- a/pkg/ingester/flush.go +++ b/pkg/ingester/flush.go @@ -9,6 +9,10 @@ import ( "golang.org/x/net/context" + "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/tenant" + "github.com/cortexproject/cortex/pkg/util" + util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" @@ -16,10 +20,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/user" - "github.com/cortexproject/cortex/pkg/chunk" - "github.com/cortexproject/cortex/pkg/util" - util_log "github.com/cortexproject/cortex/pkg/util/log" - "github.com/grafana/loki/pkg/chunkenc" loki_util "github.com/grafana/loki/pkg/util" ) @@ -335,7 +335,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe } func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return err } diff --git a/pkg/ingester/flush_test.go b/pkg/ingester/flush_test.go index 3719e99884..9daf77b651 100644 --- a/pkg/ingester/flush_test.go +++ b/pkg/ingester/flush_test.go @@ -19,6 +19,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" "github.com/cortexproject/cortex/pkg/ring/kv" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" @@ -297,7 +298,7 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error { if s.onPut != nil { return s.onPut(ctx, chunks) } - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return err } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index ff80d96731..f612da626a 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -11,6 +11,7 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/ring" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/cortexproject/cortex/pkg/util/services" @@ -20,7 +21,6 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/loki/pkg/chunkenc" @@ -387,7 +387,7 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) { // Push implements logproto.Pusher. func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) { - instanceID, err := user.ExtractOrgID(ctx) + instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err } else if i.readonly { @@ -421,7 +421,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie ctx := stats.NewContext(queryServer.Context()) defer stats.SendAsTrailer(ctx, queryServer) - instanceID, err := user.ExtractOrgID(ctx) + instanceID, err := tenant.TenantID(ctx) if err != nil { return err } @@ -462,7 +462,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log ctx := stats.NewContext(queryServer.Context()) defer stats.SendAsTrailer(ctx, queryServer) - instanceID, err := user.ExtractOrgID(ctx) + instanceID, err := tenant.TenantID(ctx) if err != nil { return err } @@ -516,7 +516,7 @@ func (i *Ingester) boltdbShipperMaxLookBack() time.Duration { // GetChunkIDs is meant to be used only when using an async store like boltdb-shipper. func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) { - orgID, err := user.ExtractOrgID(ctx) + orgID, err := tenant.TenantID(ctx) if err != nil { return nil, err } @@ -555,12 +555,12 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq // Label returns the set of labels for the stream this ingester knows about. func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { - instanceID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } - instance := i.getOrCreateInstance(instanceID) + instance := i.getOrCreateInstance(userID) resp, err := instance.Label(ctx, req) if err != nil { return nil, err @@ -579,11 +579,6 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp return resp, nil } - userID, err := user.ExtractOrgID(ctx) - if err != nil { - return nil, err - } - maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod if boltdbShipperMaxLookBack != 0 { maxLookBackPeriod = boltdbShipperMaxLookBack @@ -615,7 +610,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp // Series queries the ingester for log stream identifiers (label sets) matching a set of matchers func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { - instanceID, err := user.ExtractOrgID(ctx) + instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err } @@ -671,7 +666,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ default: } - instanceID, err := user.ExtractOrgID(queryServer.Context()) + instanceID, err := tenant.TenantID(queryServer.Context()) if err != nil { return err } @@ -691,7 +686,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_ // TailersCount returns count of active tail requests from a user func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) { - instanceID, err := user.ExtractOrgID(ctx) + instanceID, err := tenant.TenantID(ctx) if err != nil { return nil, err } diff --git a/pkg/ingester/ingester_test.go b/pkg/ingester/ingester_test.go index 860eb1ae89..0a8a5d9c67 100644 --- a/pkg/ingester/ingester_test.go +++ b/pkg/ingester/ingester_test.go @@ -11,6 +11,7 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/cortexproject/cortex/pkg/chunk" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/cortexproject/cortex/pkg/util/services" "github.com/stretchr/testify/require" @@ -260,7 +261,7 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error { s.mtx.Lock() defer s.mtx.Unlock() - userid, err := user.ExtractOrgID(ctx) + userid, err := tenant.TenantID(ctx) if err != nil { return err } diff --git a/pkg/logql/engine.go b/pkg/logql/engine.go index e36d551ba8..ddf04d0154 100644 --- a/pkg/logql/engine.go +++ b/pkg/logql/engine.go @@ -8,6 +8,7 @@ import ( "sort" "time" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/spanlogger" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" @@ -15,7 +16,6 @@ import ( "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql" promql_parser "github.com/prometheus/prometheus/promql/parser" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -177,7 +177,7 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser. return q.evalLiteral(ctx, lit) } - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } diff --git a/pkg/querier/http.go b/pkg/querier/http.go index 73ed9f2642..ccb572ea96 100644 --- a/pkg/querier/http.go +++ b/pkg/querier/http.go @@ -5,13 +5,13 @@ import ( "net/http" "time" + "github.com/cortexproject/cortex/pkg/tenant" util_log "github.com/cortexproject/cortex/pkg/util/log" "github.com/go-kit/kit/log/level" "github.com/gorilla/websocket" "github.com/prometheus/prometheus/pkg/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/loghttp" loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy" @@ -347,7 +347,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) { } func (q *Querier) validateEntriesLimits(ctx context.Context, query string, limit uint32) error { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 2c6945b25c..f3edaa9cf7 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -6,14 +6,14 @@ import ( "net/http" "time" - "github.com/go-kit/kit/log/level" "github.com/prometheus/common/model" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "google.golang.org/grpc/health/grpc_health_v1" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/spanlogger" cortex_validation "github.com/cortexproject/cortex/pkg/util/validation" + "github.com/go-kit/kit/log/level" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/loghttp" @@ -254,7 +254,7 @@ func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval // Label does the heavy lifting for a Label query. func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } @@ -358,7 +358,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer, // Series fetches any matching series for a list of matcher sets func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err } @@ -484,7 +484,7 @@ func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time, } func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return time.Time{}, time.Time{}, err } @@ -532,7 +532,7 @@ func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits tim } func (q *Querier) checkTailRequestLimit(ctx context.Context) error { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return err } diff --git a/pkg/querier/queryrange/limits.go b/pkg/querier/queryrange/limits.go index 91f4bdf73b..afb52c29a5 100644 --- a/pkg/querier/queryrange/limits.go +++ b/pkg/querier/queryrange/limits.go @@ -9,6 +9,7 @@ import ( "github.com/cortexproject/cortex/pkg/cortexpb" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/opentracing/opentracing-go" "github.com/weaveworks/common/httpgrpc" "github.com/weaveworks/common/user" @@ -200,7 +201,7 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error) if span := opentracing.SpanFromContext(ctx); span != nil { request.LogToSpan(span) } - userid, err := user.ExtractOrgID(ctx) + userid, err := tenant.TenantID(ctx) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } diff --git a/pkg/querier/queryrange/roundtrip.go b/pkg/querier/queryrange/roundtrip.go index bfe6e41a7f..758c576c7b 100644 --- a/pkg/querier/queryrange/roundtrip.go +++ b/pkg/querier/queryrange/roundtrip.go @@ -10,11 +10,11 @@ import ( "github.com/cortexproject/cortex/pkg/chunk" "github.com/cortexproject/cortex/pkg/chunk/cache" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/go-kit/kit/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/prometheus/pkg/labels" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logql" @@ -176,7 +176,7 @@ func transformRegexQuery(req *http.Request, expr logql.LogSelectorExpr) (logql.L // validates log entries limits func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error { - userID, err := user.ExtractOrgID(req.Context()) + userID, err := tenant.TenantID(req.Context()) if err != nil { return httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } diff --git a/pkg/querier/queryrange/split_by_interval.go b/pkg/querier/queryrange/split_by_interval.go index 18947d3673..33919ace9f 100644 --- a/pkg/querier/queryrange/split_by_interval.go +++ b/pkg/querier/queryrange/split_by_interval.go @@ -6,12 +6,12 @@ import ( "time" "github.com/cortexproject/cortex/pkg/querier/queryrange" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/opentracing/opentracing-go" otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/httpgrpc" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/logproto" ) @@ -159,7 +159,7 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next } func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) { - userid, err := user.ExtractOrgID(ctx) + userid, err := tenant.TenantID(ctx) if err != nil { return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error()) } diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 42b8395868..4c12e83de4 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -11,13 +11,13 @@ import ( cortex_local "github.com/cortexproject/cortex/pkg/chunk/local" "github.com/cortexproject/cortex/pkg/chunk/storage" "github.com/cortexproject/cortex/pkg/querier/astmapper" + "github.com/cortexproject/cortex/pkg/tenant" "github.com/cortexproject/cortex/pkg/util/flagext" "github.com/go-kit/kit/log" "github.com/go-kit/kit/log/level" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/pkg/labels" - "github.com/weaveworks/common/user" "github.com/grafana/loki/pkg/iter" "github.com/grafana/loki/pkg/logproto" @@ -196,7 +196,7 @@ func (s *store) SetChunkFilterer(chunkFilterer RequestChunkFilterer) { // lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries` func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) { - userID, err := user.ExtractOrgID(ctx) + userID, err := tenant.TenantID(ctx) if err != nil { return nil, err }