From c8d2c3200288e60d6ef7d0f0967ecfa76d593734 Mon Sep 17 00:00:00 2001 From: Travis Patterson Date: Mon, 25 Mar 2024 12:50:36 -0600 Subject: [PATCH] feat: Introduce special header that tells Loki not to modify query results (#12327) --- integration/loki_micro_services_test.go | 1 + integration/loki_rule_eval_test.go | 1 + integration/loki_simple_scalable_test.go | 1 + integration/loki_single_binary_test.go | 1 + integration/multi_tenant_queries_test.go | 1 + integration/parse_metrics.go | 1 + integration/parse_metrics_test.go | 1 + integration/per_request_limits_test.go | 1 + pkg/ingester/client/client.go | 2 + pkg/ingester/instance.go | 6 +- pkg/ingester/instance_test.go | 78 +++++++++++++++++ pkg/loki/loki.go | 4 +- pkg/loki/modules.go | 101 ++++++++++++----------- pkg/querier/queryrange/codec.go | 5 ++ pkg/querier/queryrange/marshal.go | 11 +++ pkg/storage/store.go | 6 +- pkg/storage/store_test.go | 63 ++++++++++++++ pkg/util/httpreq/headers.go | 7 +- pkg/util/server/grpc_headers.go | 61 ++++++++++++++ pkg/util/server/grpc_headers_test.go | 81 ++++++++++++++++++ 20 files changed, 380 insertions(+), 53 deletions(-) create mode 100644 pkg/util/server/grpc_headers.go create mode 100644 pkg/util/server/grpc_headers_test.go diff --git a/integration/loki_micro_services_test.go b/integration/loki_micro_services_test.go index 67f888d41a..3480757f07 100644 --- a/integration/loki_micro_services_test.go +++ b/integration/loki_micro_services_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/loki_rule_eval_test.go b/integration/loki_rule_eval_test.go index 025b74df5a..5ee9bf97ac 100644 --- a/integration/loki_rule_eval_test.go +++ b/integration/loki_rule_eval_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/loki_simple_scalable_test.go b/integration/loki_simple_scalable_test.go index ccbf839b6a..f831dcc406 100644 --- a/integration/loki_simple_scalable_test.go +++ b/integration/loki_simple_scalable_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/loki_single_binary_test.go b/integration/loki_single_binary_test.go index 31b9990d3a..7e26f9c4ca 100644 --- a/integration/loki_single_binary_test.go +++ b/integration/loki_single_binary_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/multi_tenant_queries_test.go b/integration/multi_tenant_queries_test.go index cec967fd13..4c13d6f9e6 100644 --- a/integration/multi_tenant_queries_test.go +++ b/integration/multi_tenant_queries_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/parse_metrics.go b/integration/parse_metrics.go index d2896de6e2..530ed44802 100644 --- a/integration/parse_metrics.go +++ b/integration/parse_metrics.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/parse_metrics_test.go b/integration/parse_metrics_test.go index 7af3289e36..f6c9a01f27 100644 --- a/integration/parse_metrics_test.go +++ b/integration/parse_metrics_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/integration/per_request_limits_test.go b/integration/per_request_limits_test.go index 93e2c44086..34d9c7e99f 100644 --- a/integration/per_request_limits_test.go +++ b/integration/per_request_limits_test.go @@ -1,4 +1,5 @@ //go:build integration + package integration import ( diff --git a/pkg/ingester/client/client.go b/pkg/ingester/client/client.go index 3d0c48e8d0..861a925d6b 100644 --- a/pkg/ingester/client/client.go +++ b/pkg/ingester/client/client.go @@ -91,6 +91,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC var unaryInterceptors []grpc.UnaryClientInterceptor unaryInterceptors = append(unaryInterceptors, cfg.GRPCUnaryClientInterceptors...) unaryInterceptors = append(unaryInterceptors, server.UnaryClientQueryTagsInterceptor) + unaryInterceptors = append(unaryInterceptors, server.UnaryClientHTTPHeadersInterceptor) unaryInterceptors = append(unaryInterceptors, otgrpc.OpenTracingClientInterceptor(opentracing.GlobalTracer())) if !cfg.Internal { unaryInterceptors = append(unaryInterceptors, middleware.ClientUserHeaderInterceptor) @@ -100,6 +101,7 @@ func instrumentation(cfg *Config) ([]grpc.UnaryClientInterceptor, []grpc.StreamC var streamInterceptors []grpc.StreamClientInterceptor streamInterceptors = append(streamInterceptors, cfg.GRCPStreamClientInterceptors...) streamInterceptors = append(streamInterceptors, server.StreamClientQueryTagsInterceptor) + streamInterceptors = append(streamInterceptors, server.StreamClientHTTPHeadersInterceptor) streamInterceptors = append(streamInterceptors, otgrpc.OpenTracingStreamClientInterceptor(opentracing.GlobalTracer())) if !cfg.Internal { streamInterceptors = append(streamInterceptors, middleware.StreamClientUserHeaderInterceptor) diff --git a/pkg/ingester/instance.go b/pkg/ingester/instance.go index 9ecc84e70a..64678da85a 100644 --- a/pkg/ingester/instance.go +++ b/pkg/ingester/instance.go @@ -10,6 +10,8 @@ import ( "syscall" "time" + "github.com/grafana/loki/pkg/util/httpreq" + "github.com/go-kit/log/level" "github.com/grafana/dskit/httpgrpc" "github.com/opentracing/opentracing-go" @@ -436,7 +438,7 @@ func (i *instance) Query(ctx context.Context, req logql.SelectLogParams) (iter.E return nil, err } - if i.pipelineWrapper != nil { + if i.pipelineWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -490,7 +492,7 @@ func (i *instance) QuerySample(ctx context.Context, req logql.SelectSampleParams return nil, err } - if i.extractorWrapper != nil { + if i.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err diff --git a/pkg/ingester/instance_test.go b/pkg/ingester/instance_test.go index 0f574cfdc1..2547795114 100644 --- a/pkg/ingester/instance_test.go +++ b/pkg/ingester/instance_test.go @@ -10,6 +10,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/util/httpreq" + "github.com/grafana/dskit/tenant" "github.com/grafana/dskit/user" @@ -717,6 +719,47 @@ func Test_PipelineWrapper(t *testing.T) { require.Equal(t, 10, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper } +func Test_PipelineWrapper_disabled(t *testing.T) { + instance := defaultInstance(t) + + wrapper := &testPipelineWrapper{ + pipeline: newMockPipeline(), + } + instance.pipelineWrapper = wrapper + + ctx := user.InjectOrgID(context.Background(), "test-user") + ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") + _, err := tenant.TenantID(ctx) + require.NoError(t, err) + + it, err := instance.Query(ctx, + logql.SelectLogParams{ + QueryRequest: &logproto.QueryRequest{ + Selector: `{job="3"}`, + Limit: uint32(2), + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Direction: logproto.BACKWARD, + Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()}, + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`{job="3"}`), + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + for it.Next() { + // Consume the iterator + require.NoError(t, it.Error()) + } + + require.Equal(t, "", wrapper.tenant) + require.Equal(t, ``, wrapper.query) + require.Equal(t, 0, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper +} + type testPipelineWrapper struct { query string tenant string @@ -807,6 +850,41 @@ func Test_ExtractorWrapper(t *testing.T) { require.Equal(t, 10, wrapper.extractor.sp.called) // we've passed every log line through the wrapper } +func Test_ExtractorWrapper_disabled(t *testing.T) { + instance := defaultInstance(t) + + wrapper := &testExtractorWrapper{ + extractor: newMockExtractor(), + } + instance.extractorWrapper = wrapper + + ctx := user.InjectOrgID(context.Background(), "test-user") + ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") + it, err := instance.QuerySample(ctx, + logql.SelectSampleParams{ + SampleQueryRequest: &logproto.SampleQueryRequest{ + Selector: `sum(count_over_time({job="3"}[1m]))`, + Start: time.Unix(0, 0), + End: time.Unix(0, 100000000), + Shards: []string{astmapper.ShardAnnotation{Shard: 0, Of: 1}.String()}, + Plan: &plan.QueryPlan{ + AST: syntax.MustParseExpr(`sum(count_over_time({job="3"}[1m]))`), + }, + }, + }, + ) + require.NoError(t, err) + defer it.Close() + + for it.Next() { + // Consume the iterator + require.NoError(t, it.Error()) + } + + require.Equal(t, ``, wrapper.query) + require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper +} + type testExtractorWrapper struct { query string tenant string diff --git a/pkg/loki/loki.go b/pkg/loki/loki.go index 90bb134a56..eb513910f1 100644 --- a/pkg/loki/loki.go +++ b/pkg/loki/loki.go @@ -607,7 +607,7 @@ func (t *Loki) setupModuleManager() error { mm.RegisterModule(Querier, t.initQuerier) mm.RegisterModule(Ingester, t.initIngester) mm.RegisterModule(IngesterQuerier, t.initIngesterQuerier) - mm.RegisterModule(IngesterQueryTagsInterceptors, t.initIngesterQueryTagsInterceptors, modules.UserInvisibleModule) + mm.RegisterModule(IngesterGRPCInterceptors, t.initIngesterGRPCInterceptors, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontendTripperware, t.initQueryFrontendMiddleware, modules.UserInvisibleModule) mm.RegisterModule(QueryFrontend, t.initQueryFrontend) mm.RegisterModule(RulerStorage, t.initRulerStorage, modules.UserInvisibleModule) @@ -714,7 +714,7 @@ func (t *Loki) setupModuleManager() error { // Initialise query tags interceptors on targets running ingester if t.Cfg.isModuleEnabled(Ingester) || t.Cfg.isModuleEnabled(Write) || t.Cfg.isModuleEnabled(All) { - deps[Server] = append(deps[Server], IngesterQueryTagsInterceptors) + deps[Server] = append(deps[Server], IngesterGRPCInterceptors) } // Add bloom gateway ring in client mode to IndexGateway service dependencies if bloom filtering is enabled. diff --git a/pkg/loki/modules.go b/pkg/loki/modules.go index 6172bbccf0..d3a9a4842a 100644 --- a/pkg/loki/modules.go +++ b/pkg/loki/modules.go @@ -86,47 +86,47 @@ const maxChunkAgeForTableManager = 12 * time.Hour // The various modules that make up Loki. const ( - Ring string = "ring" - RuntimeConfig string = "runtime-config" - Overrides string = "overrides" - OverridesExporter string = "overrides-exporter" - TenantConfigs string = "tenant-configs" - Server string = "server" - InternalServer string = "internal-server" - Distributor string = "distributor" - Querier string = "querier" - CacheGenerationLoader string = "cache-generation-loader" - Ingester string = "ingester" - IngesterQuerier string = "ingester-querier" - IngesterQueryTagsInterceptors string = "ingester-query-tags-interceptors" - QueryFrontend string = "query-frontend" - QueryFrontendTripperware string = "query-frontend-tripperware" - QueryLimiter string = "query-limiter" - QueryLimitsInterceptors string = "query-limits-interceptors" - QueryLimitsTripperware string = "query-limits-tripper" - RulerStorage string = "ruler-storage" - Ruler string = "ruler" - RuleEvaluator string = "rule-evaluator" - Store string = "store" - TableManager string = "table-manager" - MemberlistKV string = "memberlist-kv" - Compactor string = "compactor" - BloomGateway string = "bloom-gateway" - BloomGatewayRing string = "bloom-gateway-ring" - IndexGateway string = "index-gateway" - IndexGatewayRing string = "index-gateway-ring" - IndexGatewayInterceptors string = "index-gateway-interceptors" - QueryScheduler string = "query-scheduler" - QuerySchedulerRing string = "query-scheduler-ring" - BloomCompactor string = "bloom-compactor" - BloomCompactorRing string = "bloom-compactor-ring" - BloomStore string = "bloom-store" - All string = "all" - Read string = "read" - Write string = "write" - Backend string = "backend" - Analytics string = "analytics" - InitCodec string = "init-codec" + Ring string = "ring" + RuntimeConfig string = "runtime-config" + Overrides string = "overrides" + OverridesExporter string = "overrides-exporter" + TenantConfigs string = "tenant-configs" + Server string = "server" + InternalServer string = "internal-server" + Distributor string = "distributor" + Querier string = "querier" + CacheGenerationLoader string = "cache-generation-loader" + Ingester string = "ingester" + IngesterQuerier string = "ingester-querier" + IngesterGRPCInterceptors string = "ingester-query-tags-interceptors" + QueryFrontend string = "query-frontend" + QueryFrontendTripperware string = "query-frontend-tripperware" + QueryLimiter string = "query-limiter" + QueryLimitsInterceptors string = "query-limits-interceptors" + QueryLimitsTripperware string = "query-limits-tripper" + RulerStorage string = "ruler-storage" + Ruler string = "ruler" + RuleEvaluator string = "rule-evaluator" + Store string = "store" + TableManager string = "table-manager" + MemberlistKV string = "memberlist-kv" + Compactor string = "compactor" + BloomGateway string = "bloom-gateway" + BloomGatewayRing string = "bloom-gateway-ring" + IndexGateway string = "index-gateway" + IndexGatewayRing string = "index-gateway-ring" + IndexGatewayInterceptors string = "index-gateway-interceptors" + QueryScheduler string = "query-scheduler" + QuerySchedulerRing string = "query-scheduler-ring" + BloomCompactor string = "bloom-compactor" + BloomCompactorRing string = "bloom-compactor-ring" + BloomStore string = "bloom-store" + All string = "all" + Read string = "read" + Write string = "write" + Backend string = "backend" + Analytics string = "analytics" + InitCodec string = "init-codec" ) const ( @@ -404,7 +404,7 @@ func (t *Loki) initQuerier() (services.Service, error) { toMerge := []middleware.Interface{ httpreq.ExtractQueryMetricsMiddleware(), httpreq.ExtractQueryTagsMiddleware(), - httpreq.PropagateHeadersMiddleware(httpreq.LokiEncodingFlagsHeader), + httpreq.PropagateHeadersMiddleware(httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader), serverutil.RecoveryHTTPMiddleware, t.HTTPAuthMiddleware, serverutil.NewPrepopulateMiddleware(), @@ -983,7 +983,7 @@ func (t *Loki) initQueryFrontend() (_ services.Service, err error) { toMerge := []middleware.Interface{ httpreq.ExtractQueryTagsMiddleware(), - httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader), + httpreq.PropagateHeadersMiddleware(httpreq.LokiActorPathHeader, httpreq.LokiEncodingFlagsHeader, httpreq.LokiDisablePipelineWrappersHeader), serverutil.RecoveryHTTPMiddleware, t.HTTPAuthMiddleware, queryrange.StatsHTTPMiddleware, @@ -1573,10 +1573,19 @@ func (t *Loki) initQueryLimitsInterceptors() (services.Service, error) { return nil, nil } -func (t *Loki) initIngesterQueryTagsInterceptors() (services.Service, error) { +func (t *Loki) initIngesterGRPCInterceptors() (services.Service, error) { _ = level.Debug(util_log.Logger).Log("msg", "initializing ingester query tags interceptors") - t.Cfg.Server.GRPCStreamMiddleware = append(t.Cfg.Server.GRPCStreamMiddleware, serverutil.StreamServerQueryTagsInterceptor) - t.Cfg.Server.GRPCMiddleware = append(t.Cfg.Server.GRPCMiddleware, serverutil.UnaryServerQueryTagsInterceptor) + t.Cfg.Server.GRPCStreamMiddleware = append( + t.Cfg.Server.GRPCStreamMiddleware, + serverutil.StreamServerQueryTagsInterceptor, + serverutil.StreamServerHTTPHeadersInterceptor, + ) + + t.Cfg.Server.GRPCMiddleware = append( + t.Cfg.Server.GRPCMiddleware, + serverutil.UnaryServerQueryTagsInterceptor, + serverutil.UnaryServerHTTPHeadersnIterceptor, + ) return nil, nil } diff --git a/pkg/querier/queryrange/codec.go b/pkg/querier/queryrange/codec.go index 205cae586e..44de02408b 100644 --- a/pkg/querier/queryrange/codec.go +++ b/pkg/querier/queryrange/codec.go @@ -612,6 +612,11 @@ func (c Codec) EncodeRequest(ctx context.Context, r queryrangebase.Request) (*ht header.Set(httpreq.LokiActorPathHeader, actor) } + // Add disable wrappers + if disableWrappers := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader); disableWrappers != "" { + header.Set(httpreq.LokiDisablePipelineWrappersHeader, disableWrappers) + } + // Add limits if limits := querylimits.ExtractQueryLimitsContext(ctx); limits != nil { err := querylimits.InjectQueryLimitsHeader(&header, limits) diff --git a/pkg/querier/queryrange/marshal.go b/pkg/querier/queryrange/marshal.go index 994b8d682c..4480b06adc 100644 --- a/pkg/querier/queryrange/marshal.go +++ b/pkg/querier/queryrange/marshal.go @@ -272,6 +272,11 @@ func (Codec) QueryRequestUnwrap(ctx context.Context, req *QueryRequest) (queryra ctx = httpreq.InjectActorPath(ctx, actor) } + // Add disable wrappers + if disableWrappers, ok := req.Metadata[httpreq.LokiDisablePipelineWrappersHeader]; ok { + ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, disableWrappers) + } + // Add limits if encodedLimits, ok := req.Metadata[querylimits.HTTPHeaderQueryLimitsKey]; ok { limits, err := querylimits.UnmarshalQueryLimits([]byte(encodedLimits)) @@ -364,6 +369,12 @@ func (Codec) QueryRequestWrap(ctx context.Context, r queryrangebase.Request) (*Q result.Metadata[httpreq.LokiActorPathHeader] = actor } + // Keep disable wrappers + disableWrappers := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) + if disableWrappers != "" { + result.Metadata[httpreq.LokiDisablePipelineWrappersHeader] = disableWrappers + } + // Add limits limits := querylimits.ExtractQueryLimitsContext(ctx) if limits != nil { diff --git a/pkg/storage/store.go b/pkg/storage/store.go index 35ddba3893..706f630931 100644 --- a/pkg/storage/store.go +++ b/pkg/storage/store.go @@ -6,6 +6,8 @@ import ( "math" "time" + "github.com/grafana/loki/pkg/util/httpreq" + lokilog "github.com/grafana/loki/pkg/logql/log" "github.com/go-kit/log" @@ -507,7 +509,7 @@ func (s *LokiStore) SelectLogs(ctx context.Context, req logql.SelectLogParams) ( return nil, err } - if s.pipelineWrapper != nil { + if s.pipelineWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err @@ -554,7 +556,7 @@ func (s *LokiStore) SelectSamples(ctx context.Context, req logql.SelectSamplePar return nil, err } - if s.extractorWrapper != nil { + if s.extractorWrapper != nil && httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) != "true" { userID, err := tenant.TenantID(ctx) if err != nil { return nil, err diff --git a/pkg/storage/store_test.go b/pkg/storage/store_test.go index 2cc32bf7ec..52df29c079 100644 --- a/pkg/storage/store_test.go +++ b/pkg/storage/store_test.go @@ -13,6 +13,8 @@ import ( "testing" "time" + "github.com/grafana/loki/pkg/util/httpreq" + "github.com/cespare/xxhash/v2" "github.com/go-kit/log" "github.com/grafana/dskit/flagext" @@ -932,6 +934,37 @@ func Test_PipelineWrapper(t *testing.T) { require.Equal(t, 28, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper } +func Test_PipelineWrapper_disabled(t *testing.T) { + s := &LokiStore{ + Store: storeFixture, + cfg: Config{ + MaxChunkBatchSize: 10, + }, + chunkMetrics: NilMetrics, + } + wrapper := &testPipelineWrapper{ + pipeline: newMockPipeline(), + } + + s.SetPipelineWrapper(wrapper) + ctx = user.InjectOrgID(context.Background(), "test-user") + ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") + logit, err := s.SelectLogs(ctx, logql.SelectLogParams{QueryRequest: newQuery("{foo=~\"ba.*\"}", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 5}}, nil)}) + + if err != nil { + t.Errorf("store.SelectLogs() error = %v", err) + return + } + defer logit.Close() + for logit.Next() { + require.NoError(t, logit.Error()) // consume the iterator + } + + require.Equal(t, "", wrapper.tenant) + require.Equal(t, "", wrapper.query) + require.Equal(t, 0, wrapper.pipeline.sp.called) // we've passed every log line through the wrapper +} + type testPipelineWrapper struct { query string pipeline *mockPipeline @@ -1017,6 +1050,36 @@ func Test_SampleWrapper(t *testing.T) { require.Equal(t, 28, wrapper.extractor.sp.called) // we've passed every log line through the wrapper } +func Test_SampleWrapper_disabled(t *testing.T) { + s := &LokiStore{ + Store: storeFixture, + cfg: Config{ + MaxChunkBatchSize: 10, + }, + chunkMetrics: NilMetrics, + } + wrapper := &testExtractorWrapper{ + extractor: newMockExtractor(), + } + s.SetExtractorWrapper(wrapper) + + ctx = user.InjectOrgID(context.Background(), "test-user") + ctx = httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, "true") + it, err := s.SelectSamples(ctx, logql.SelectSampleParams{SampleQueryRequest: newSampleQuery("count_over_time({foo=~\"ba.*\"}[1s])", from, from.Add(1*time.Hour), []astmapper.ShardAnnotation{{Shard: 1, Of: 3}}, nil)}) + if err != nil { + t.Errorf("store.SelectSamples() error = %v", err) + return + } + defer it.Close() + for it.Next() { + require.NoError(t, it.Error()) // consume the iterator + } + + require.Equal(t, "", wrapper.tenant) + require.Equal(t, "", wrapper.query) + require.Equal(t, 0, wrapper.extractor.sp.called) // we've passed every log line through the wrapper +} + type testExtractorWrapper struct { query string tenant string diff --git a/pkg/util/httpreq/headers.go b/pkg/util/httpreq/headers.go index 0b751f78ed..fef401243c 100644 --- a/pkg/util/httpreq/headers.go +++ b/pkg/util/httpreq/headers.go @@ -12,7 +12,8 @@ type headerContextKey string var ( // LokiActorPathHeader is the name of the header e.g. used to enqueue requests in hierarchical queues. - LokiActorPathHeader = "X-Loki-Actor-Path" + LokiActorPathHeader = "X-Loki-Actor-Path" + LokiDisablePipelineWrappersHeader = "X-Loki-Disable-Pipeline-Wrappers" // LokiActorPathDelimiter is the delimiter used to serialise the hierarchy of the actor. LokiActorPathDelimiter = "|" @@ -50,3 +51,7 @@ func ExtractActorPath(ctx context.Context) []string { func InjectActorPath(ctx context.Context, value string) context.Context { return context.WithValue(ctx, headerContextKey(LokiActorPathHeader), value) } + +func InjectHeader(ctx context.Context, key, value string) context.Context { + return context.WithValue(ctx, headerContextKey(key), value) +} diff --git a/pkg/util/server/grpc_headers.go b/pkg/util/server/grpc_headers.go new file mode 100644 index 0000000000..3b205a73d1 --- /dev/null +++ b/pkg/util/server/grpc_headers.go @@ -0,0 +1,61 @@ +package server + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" + + "github.com/grafana/loki/pkg/util/httpreq" +) + +func injectHTTPHeadersIntoGRPCRequest(ctx context.Context) context.Context { + header := httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader) + if header == "" { + return ctx + } + + // inject into GRPC metadata + md, ok := metadata.FromOutgoingContext(ctx) + if !ok { + md = metadata.New(map[string]string{}) + } + md = md.Copy() + md.Set(httpreq.LokiDisablePipelineWrappersHeader, header) + + return metadata.NewOutgoingContext(ctx, md) +} + +func extractHTTPHeadersFromGRPCRequest(ctx context.Context) context.Context { + md, ok := metadata.FromIncomingContext(ctx) + if !ok { + // No metadata, just return as is + return ctx + } + + headerValues := md.Get(httpreq.LokiDisablePipelineWrappersHeader) + if len(headerValues) == 0 { + return ctx + } + + return httpreq.InjectHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader, headerValues[0]) +} + +func UnaryClientHTTPHeadersInterceptor(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { + return invoker(injectHTTPHeadersIntoGRPCRequest(ctx), method, req, reply, cc, opts...) +} + +func StreamClientHTTPHeadersInterceptor(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) { + return streamer(injectHTTPHeadersIntoGRPCRequest(ctx), desc, cc, method, opts...) +} + +func UnaryServerHTTPHeadersnIterceptor(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + return handler(extractHTTPHeadersFromGRPCRequest(ctx), req) +} + +func StreamServerHTTPHeadersInterceptor(srv interface{}, ss grpc.ServerStream, _ *grpc.StreamServerInfo, handler grpc.StreamHandler) error { + return handler(srv, serverStream{ + ctx: extractHTTPHeadersFromGRPCRequest(ss.Context()), + ServerStream: ss, + }) +} diff --git a/pkg/util/server/grpc_headers_test.go b/pkg/util/server/grpc_headers_test.go new file mode 100644 index 0000000000..db222451f4 --- /dev/null +++ b/pkg/util/server/grpc_headers_test.go @@ -0,0 +1,81 @@ +package server + +import ( + "context" + "testing" + + "github.com/stretchr/testify/require" + "google.golang.org/grpc/metadata" + + "github.com/grafana/loki/pkg/util/httpreq" +) + +func TestInjectHTTPHeaderIntoGRPCRequest(t *testing.T) { + for _, tt := range []struct { + name, header string + md, expectMetadata metadata.MD + }{ + { + name: "creates new metadata and sets header", + header: "true", + expectMetadata: metadata.New(map[string]string{httpreq.LokiDisablePipelineWrappersHeader: "true"}), + }, + { + name: "sets header on existing metadata", + header: "true", + md: metadata.New(map[string]string{"x-foo": "bar"}), + expectMetadata: metadata.New(map[string]string{"x-foo": "bar", httpreq.LokiDisablePipelineWrappersHeader: "true"}), + }, + { + name: "no header, leave metadata untouched", + md: metadata.New(map[string]string{"x-foo": "bar"}), + expectMetadata: metadata.New(map[string]string{"x-foo": "bar"}), + }, + { + name: "no header", + expectMetadata: nil, + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := context.Background() + if tt.header != "" { + ctx = httpreq.InjectHeader(context.Background(), httpreq.LokiDisablePipelineWrappersHeader, tt.header) + } + + if tt.md != nil { + ctx = metadata.NewOutgoingContext(ctx, tt.md) + } + + ctx = injectHTTPHeadersIntoGRPCRequest(ctx) + md, _ := metadata.FromOutgoingContext(ctx) + require.EqualValues(t, tt.expectMetadata, md) + }) + } +} + +func TestExtractHTTPHeaderFromGRPCRequest(t *testing.T) { + for _, tt := range []struct { + name string + md metadata.MD + expectedResp string + }{ + { + name: "extracts header from metadata", + md: metadata.New(map[string]string{httpreq.LokiDisablePipelineWrappersHeader: "true"}), + expectedResp: "true", + }, + { + name: "non-nil metadata without header", + md: metadata.New(map[string]string{"x-foo": "bar"}), + }, + { + name: "nil metadata", + }, + } { + t.Run(tt.name, func(t *testing.T) { + ctx := metadata.NewIncomingContext(context.Background(), tt.md) + ctx = extractHTTPHeadersFromGRPCRequest(ctx) + require.Equal(t, tt.expectedResp, httpreq.ExtractHeader(ctx, httpreq.LokiDisablePipelineWrappersHeader)) + }) + } +}