Expose tenant ID through relabel in push promtail targets (#7250)

**What this PR does / why we need it**:

Some Promtail targets that support pushing logs through HTTP, allow
using a specific header for writing logs for multiple tenants. This log
is propagated through a Promtail internal label, that is then consumed
by the remote write client.

If Promtail is used in a multi-tenant manner, it'd be nice to somehow be
able to track how much is each tenant's share from the logs received.
This PR exposes the received tenant id value through relabeling.

This allows for example, keeping it as a label and exposing a metric
though the `metrics` pipeline stage, which can be segregated by tenant.

**Which issue(s) this PR fixes**:
Related to https://github.com/grafana/cloud-onboarding/issues/2067
pull/7277/head
Pablo 4 years ago committed by GitHub
parent 73bea7e4fa
commit 64fe0af9dd
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 12
      clients/pkg/promtail/targets/gcplog/push_target_test.go
  2. 2
      clients/pkg/promtail/targets/gcplog/push_translation.go
  3. 10
      clients/pkg/promtail/targets/heroku/target.go
  4. 12
      clients/pkg/promtail/targets/heroku/target_test.go

@ -275,7 +275,16 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer)
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, nil, "test_job", config)
tenantIDRelabelConfig := []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
Regex: relabel.MustNewRegexp("(.*)"),
Replacement: "$1",
TargetLabel: "tenant_id",
Action: relabel.Replace,
},
}
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tenantIDRelabelConfig, "test_job", config)
require.NoError(t, err)
defer func() {
_ = pt.Stop()
@ -297,6 +306,7 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
require.Equal(t, 1, len(eh.Received()))
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[lokiClient.ReservedLabelTenantID])
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])
}
func TestPushTarget_ErroneousPayloadsAreRejected(t *testing.T) {

@ -57,6 +57,8 @@ func translate(m PushMessage, other model.LabelSet, useIncomingTimestamp bool, r
// If the incoming request carries the tenant id, inject it as the reserved label, so it's used by the
// remote write client.
if xScopeOrgID != "" {
// Expose tenant ID through relabel to use as logs or metrics label
lbs.Set(lokiClient.ReservedLabelTenantID, xScopeOrgID)
fixedLabels[lokiClient.ReservedLabelTenantID] = model.LabelValue(xScopeOrgID)
}

@ -117,10 +117,9 @@ func (h *Target) drain(w http.ResponseWriter, r *http.Request) {
ts = message.Timestamp
}
// If the incoming request carries the tenant id, inject it as the reserved label so it's used by the
// remote write client.
tenantIDHeaderValue := r.Header.Get("X-Scope-OrgID")
if tenantIDHeaderValue != "" {
// If present, first inject the tenant ID in, so it can be relabeled if necessary
lb.Set(lokiClient.ReservedLabelTenantID, tenantIDHeaderValue)
}
@ -129,12 +128,17 @@ func (h *Target) drain(w http.ResponseWriter, r *http.Request) {
// Start with the set of labels fixed in the configuration
filtered := h.Labels().Clone()
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") && lbl.Name != lokiClient.ReservedLabelTenantID {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
// Then, inject it as the reserved label, so it's used by the remote write client
if tenantIDHeaderValue != "" {
filtered[lokiClient.ReservedLabelTenantID] = model.LabelValue(tenantIDHeaderValue)
}
entries <- api.Entry{
Labels: filtered,
Entry: logproto.Entry{

@ -297,7 +297,16 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := NewMetrics(prometheus.DefaultRegisterer)
pt, err := NewTarget(metrics, logger, eh, "test_job", config, nil)
tenantIDRelabelConfig := []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
TargetLabel: "tenant_id",
Replacement: "$1",
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
}
pt, err := NewTarget(metrics, logger, eh, "test_job", config, tenantIDRelabelConfig)
require.NoError(t, err)
defer func() {
_ = pt.Stop()
@ -319,6 +328,7 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
require.Equal(t, 1, len(eh.Received()))
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels[lokiClient.ReservedLabelTenantID])
require.Equal(t, model.LabelValue("42"), eh.Received()[0].Labels["tenant_id"])
}
func waitForMessages(eh *fake.Client) {

Loading…
Cancel
Save