Use the Cortex wrapper for getting tenant ID from a context (#3973)

This change replaces all uses of the weaveworks/common method
for getting tenant/org ID with the Cortex wrapper which performs
some extra validation.

Signed-off-by: Nick Pillitteri <nick.pillitteri@grafana.com>
pull/3976/head
Nick Pillitteri 5 years ago committed by GitHub
parent 23ad7f2e61
commit b830d65bdd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/pkg/promtail/targets/lokipush/pushtarget.go
  2. 3
      cmd/migrate/main.go
  3. 3
      pkg/distributor/distributor.go
  4. 4
      pkg/distributor/http.go
  5. 10
      pkg/ingester/flush.go
  6. 3
      pkg/ingester/flush_test.go
  7. 25
      pkg/ingester/ingester.go
  8. 3
      pkg/ingester/ingester_test.go
  9. 4
      pkg/logql/engine.go
  10. 4
      pkg/querier/http.go
  11. 12
      pkg/querier/querier.go
  12. 3
      pkg/querier/queryrange/limits.go
  13. 4
      pkg/querier/queryrange/roundtrip.go
  14. 4
      pkg/querier/queryrange/split_by_interval.go
  15. 4
      pkg/storage/store.go

@ -7,6 +7,7 @@ import (
"strings"
"time"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
@ -16,7 +17,6 @@ import (
"github.com/prometheus/prometheus/pkg/relabel"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/server"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/clients/pkg/promtail/api"
"github.com/grafana/loki/clients/pkg/promtail/scrapeconfig"
@ -106,7 +106,7 @@ func (t *PushTarget) run() error {
func (t *PushTarget) handle(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, nil)
if err != nil {
level.Warn(t.logger).Log("msg", "failed to parse incoming push request", "err", err.Error())

@ -13,6 +13,7 @@ import (
"time"
cortex_storage "github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
@ -134,7 +135,7 @@ func main() {
ctx := context.Background()
// This is a little weird but it was the easiest way to guarantee the userID is in the right format
ctx = user.InjectOrgID(ctx, *source)
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
panic(err)
}

@ -9,6 +9,7 @@ import (
cortex_distributor "github.com/cortexproject/cortex/pkg/distributor"
"github.com/cortexproject/cortex/pkg/ring"
ring_client "github.com/cortexproject/cortex/pkg/ring/client"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/limiter"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
@ -191,7 +192,7 @@ type pushTracker struct {
// Push a set of streams.
func (d *Distributor) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

@ -4,10 +4,10 @@ import (
"net/http"
"strings"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/loghttp/push"
)
@ -15,7 +15,7 @@ import (
// PushHandler reads a snappy-compressed proto from the HTTP body.
func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) {
logger := util_log.WithContext(r.Context(), util_log.Logger)
userID, _ := user.ExtractOrgID(r.Context())
userID, _ := tenant.TenantID(r.Context())
req, err := push.ParseRequest(logger, userID, r, d.tenantsRetention)
if err != nil {
if d.tenantConfigs.LogPushRequest(userID) {

@ -9,6 +9,10 @@ import (
"golang.org/x/net/context"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
@ -16,10 +20,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/grafana/loki/pkg/chunkenc"
loki_util "github.com/grafana/loki/pkg/util"
)
@ -335,7 +335,7 @@ func (i *Ingester) removeFlushedChunks(instance *instance, stream *stream, mayRe
}
func (i *Ingester) flushChunks(ctx context.Context, fp model.Fingerprint, labelPairs labels.Labels, cs []*chunkDesc, chunkMtx sync.Locker) error {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

@ -19,6 +19,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/ring/kv"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
@ -297,7 +298,7 @@ func (s *testStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
if s.onPut != nil {
return s.onPut(ctx, chunks)
}
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

@ -11,6 +11,7 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/ring"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/cortexproject/cortex/pkg/util/services"
@ -20,7 +21,6 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/grafana/loki/pkg/chunkenc"
@ -387,7 +387,7 @@ func (i *Ingester) ShutdownHandler(w http.ResponseWriter, r *http.Request) {
// Push implements logproto.Pusher.
func (i *Ingester) Push(ctx context.Context, req *logproto.PushRequest) (*logproto.PushResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
} else if i.readonly {
@ -421,7 +421,7 @@ func (i *Ingester) Query(req *logproto.QueryRequest, queryServer logproto.Querie
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)
instanceID, err := user.ExtractOrgID(ctx)
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
@ -462,7 +462,7 @@ func (i *Ingester) QuerySample(req *logproto.SampleQueryRequest, queryServer log
ctx := stats.NewContext(queryServer.Context())
defer stats.SendAsTrailer(ctx, queryServer)
instanceID, err := user.ExtractOrgID(ctx)
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return err
}
@ -516,7 +516,7 @@ func (i *Ingester) boltdbShipperMaxLookBack() time.Duration {
// GetChunkIDs is meant to be used only when using an async store like boltdb-shipper.
func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsRequest) (*logproto.GetChunkIDsResponse, error) {
orgID, err := user.ExtractOrgID(ctx)
orgID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
@ -555,12 +555,12 @@ func (i *Ingester) GetChunkIDs(ctx context.Context, req *logproto.GetChunkIDsReq
// Label returns the set of labels for the stream this ingester knows about.
func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
instance := i.getOrCreateInstance(instanceID)
instance := i.getOrCreateInstance(userID)
resp, err := instance.Label(ctx, req)
if err != nil {
return nil, err
@ -579,11 +579,6 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
return resp, nil
}
userID, err := user.ExtractOrgID(ctx)
if err != nil {
return nil, err
}
maxLookBackPeriod := i.cfg.QueryStoreMaxLookBackPeriod
if boltdbShipperMaxLookBack != 0 {
maxLookBackPeriod = boltdbShipperMaxLookBack
@ -615,7 +610,7 @@ func (i *Ingester) Label(ctx context.Context, req *logproto.LabelRequest) (*logp
// Series queries the ingester for log stream identifiers (label sets) matching a set of matchers
func (i *Ingester) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
@ -671,7 +666,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
default:
}
instanceID, err := user.ExtractOrgID(queryServer.Context())
instanceID, err := tenant.TenantID(queryServer.Context())
if err != nil {
return err
}
@ -691,7 +686,7 @@ func (i *Ingester) Tail(req *logproto.TailRequest, queryServer logproto.Querier_
// TailersCount returns count of active tail requests from a user
func (i *Ingester) TailersCount(ctx context.Context, in *logproto.TailersCountRequest) (*logproto.TailersCountResponse, error) {
instanceID, err := user.ExtractOrgID(ctx)
instanceID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

@ -11,6 +11,7 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/cortexproject/cortex/pkg/util/services"
"github.com/stretchr/testify/require"
@ -260,7 +261,7 @@ func (s *mockStore) Put(ctx context.Context, chunks []chunk.Chunk) error {
s.mtx.Lock()
defer s.mtx.Unlock()
userid, err := user.ExtractOrgID(ctx)
userid, err := tenant.TenantID(ctx)
if err != nil {
return err
}

@ -8,6 +8,7 @@ import (
"sort"
"time"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
@ -15,7 +16,6 @@ import (
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql"
promql_parser "github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -177,7 +177,7 @@ func (q *query) evalSample(ctx context.Context, expr SampleExpr) (promql_parser.
return q.evalLiteral(ctx, lit)
}
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

@ -5,13 +5,13 @@ import (
"net/http"
"time"
"github.com/cortexproject/cortex/pkg/tenant"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/go-kit/kit/log/level"
"github.com/gorilla/websocket"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/loghttp"
loghttp_legacy "github.com/grafana/loki/pkg/loghttp/legacy"
@ -347,7 +347,7 @@ func parseRegexQuery(httpRequest *http.Request) (string, error) {
}
func (q *Querier) validateEntriesLimits(ctx context.Context, query string, limit uint32) error {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

@ -6,14 +6,14 @@ import (
"net/http"
"time"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"google.golang.org/grpc/health/grpc_health_v1"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/spanlogger"
cortex_validation "github.com/cortexproject/cortex/pkg/util/validation"
"github.com/go-kit/kit/log/level"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
@ -254,7 +254,7 @@ func (q *Querier) buildQueryIntervals(queryStart, queryEnd time.Time) (*interval
// Label does the heavy lifting for a Label query.
func (q *Querier) Label(ctx context.Context, req *logproto.LabelRequest) (*logproto.LabelResponse, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
@ -358,7 +358,7 @@ func (q *Querier) Tail(ctx context.Context, req *logproto.TailRequest) (*Tailer,
// Series fetches any matching series for a list of matcher sets
func (q *Querier) Series(ctx context.Context, req *logproto.SeriesRequest) (*logproto.SeriesResponse, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}
@ -484,7 +484,7 @@ func (q *Querier) seriesForMatcher(ctx context.Context, from, through time.Time,
}
func (q *Querier) validateQueryRequest(ctx context.Context, req logql.QueryParams) (time.Time, time.Time, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return time.Time{}, time.Time{}, err
}
@ -532,7 +532,7 @@ func validateQueryTimeRangeLimits(ctx context.Context, userID string, limits tim
}
func (q *Querier) checkTailRequestLimit(ctx context.Context) error {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

@ -9,6 +9,7 @@ import (
"github.com/cortexproject/cortex/pkg/cortexpb"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/opentracing/opentracing-go"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
@ -200,7 +201,7 @@ func (rt limitedRoundTripper) RoundTrip(r *http.Request) (*http.Response, error)
if span := opentracing.SpanFromContext(ctx); span != nil {
request.LogToSpan(span)
}
userid, err := user.ExtractOrgID(ctx)
userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

@ -10,11 +10,11 @@ import (
"github.com/cortexproject/cortex/pkg/chunk"
"github.com/cortexproject/cortex/pkg/chunk/cache"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/go-kit/kit/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logql"
@ -176,7 +176,7 @@ func transformRegexQuery(req *http.Request, expr logql.LogSelectorExpr) (logql.L
// validates log entries limits
func validateLimits(req *http.Request, reqLimit uint32, limits Limits) error {
userID, err := user.ExtractOrgID(req.Context())
userID, err := tenant.TenantID(req.Context())
if err != nil {
return httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

@ -6,12 +6,12 @@ import (
"time"
"github.com/cortexproject/cortex/pkg/querier/queryrange"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/weaveworks/common/httpgrpc"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logproto"
)
@ -159,7 +159,7 @@ func (h *splitByInterval) loop(ctx context.Context, ch <-chan *lokiResult, next
}
func (h *splitByInterval) Do(ctx context.Context, r queryrange.Request) (queryrange.Response, error) {
userid, err := user.ExtractOrgID(ctx)
userid, err := tenant.TenantID(ctx)
if err != nil {
return nil, httpgrpc.Errorf(http.StatusBadRequest, err.Error())
}

@ -11,13 +11,13 @@ import (
cortex_local "github.com/cortexproject/cortex/pkg/chunk/local"
"github.com/cortexproject/cortex/pkg/chunk/storage"
"github.com/cortexproject/cortex/pkg/querier/astmapper"
"github.com/cortexproject/cortex/pkg/tenant"
"github.com/cortexproject/cortex/pkg/util/flagext"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logproto"
@ -196,7 +196,7 @@ func (s *store) SetChunkFilterer(chunkFilterer RequestChunkFilterer) {
// lazyChunks is an internal function used to resolve a set of lazy chunks from the store without actually loading them. It's used internally by `LazyQuery` and `GetSeries`
func (s *store) lazyChunks(ctx context.Context, matchers []*labels.Matcher, from, through model.Time) ([]*LazyChunk, error) {
userID, err := user.ExtractOrgID(ctx)
userID, err := tenant.TenantID(ctx)
if err != nil {
return nil, err
}

Loading…
Cancel
Save