fix(promtail): validate relabel config (#19996)

promtail client panics when using file target because the relabel configs are not validated there: https://github.com/grafana/loki/blob/main/clients/pkg/promtail/targets/file/filetargetmanager.go#L321

This issue was already addressed for different targets separately, here I attempt to move it out of the specific targets.
pull/20001/head^2
Ivan Kalita 1 month ago committed by GitHub
parent 5bb6b54052
commit 1bce8ecea8
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 15
      clients/pkg/promtail/config/config.go
  2. 4
      clients/pkg/promtail/promtail.go
  3. 17
      clients/pkg/promtail/targets/azureeventhubs/parser.go
  4. 8
      clients/pkg/promtail/targets/azureeventhubs/parser_test.go
  5. 18
      clients/pkg/promtail/targets/gcplog/formatter.go
  6. 6
      clients/pkg/promtail/targets/gcplog/formatter_test.go
  7. 13
      clients/pkg/promtail/targets/gcplog/push_target_test.go
  8. 16
      clients/pkg/promtail/targets/gelf/gelftarget.go
  9. 6
      clients/pkg/promtail/targets/gelf/gelftarget_test.go
  10. 16
      clients/pkg/promtail/targets/heroku/target.go
  11. 8
      clients/pkg/promtail/targets/heroku/target_test.go
  12. 16
      clients/pkg/promtail/targets/kafka/formatter.go
  13. 6
      clients/pkg/promtail/targets/kafka/target_syncer_test.go
  14. 4
      clients/pkg/promtail/targets/kafka/target_test.go
  15. 32
      clients/pkg/promtail/targets/syslog/syslogtarget.go
  16. 4
      clients/pkg/promtail/targets/syslog/syslogtarget_test.go
  17. 15
      clients/pkg/promtail/targets/testutils/testutils.go

@ -6,6 +6,8 @@ import (
"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/multierror"
"github.com/prometheus/common/model"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/v3/clients/pkg/promtail/client"
@ -86,6 +88,19 @@ func (c Config) String() string {
return string(b)
}
func (c *Config) Validate() error {
var errors []error
for i := range c.ScrapeConfig {
for j := range c.ScrapeConfig[i].RelabelConfigs {
err := c.ScrapeConfig[i].RelabelConfigs[j].Validate(model.UTF8Validation)
if err != nil {
errors = append(errors, fmt.Errorf("ScrapeConfig[%d].RelabelConfigs[%d]: %w", i, j, err))
}
}
}
return multierror.New(errors...).Err()
}
func (c *Config) Setup(l log.Logger) {
if c.ClientConfig.URL.URL != nil {
level.Warn(l).Log("msg", "use of CLI client.* and config file Client block are both deprecated in favour of the config file Clients block and will be removed in a future release")

@ -140,6 +140,10 @@ func (p *Promtail) reloadConfig(cfg *config.Config) error {
p.client.Stop()
}
if err := cfg.Validate(); err != nil {
return fmt.Errorf("error validating config: %w", err)
}
cfg.Setup(p.logger)
if cfg.LimitsConfig.ReadlineRateEnabled {
stages.SetReadLineRateLimiter(cfg.LimitsConfig.ReadlineRate, cfg.LimitsConfig.ReadlineBurst, cfg.LimitsConfig.ReadlineRateDrop)

@ -201,23 +201,8 @@ func (e *messageParser) getLabels(logRecord *azureMonitorResourceLog, relabelCon
})
var processed labels.Labels
// apply relabeling
if len(relabelConfig) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range relabelConfig {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lbs, relabelConfig...)
} else {
processed = lbs
}
processed, _ = relabel.Process(lbs, relabelConfig...)
} else {
processed = lbs
}

@ -9,6 +9,8 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
)
func Test_parseMessage_function_app(t *testing.T) {
@ -195,7 +197,7 @@ func Test_parseMessage_relable_config(t *testing.T) {
Value: readFile(t, "testdata/function_app_logs_message.txt"),
}
relableConfigs := []*relabel.Config{
relabelConfigs := testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__azure_event_hubs_category"},
Regex: relabel.MustNewRegexp("(.*)"),
@ -203,9 +205,9 @@ func Test_parseMessage_relable_config(t *testing.T) {
Replacement: "$1",
Action: "replace",
},
}
})
entries, err := messageParser.Parse(message, nil, relableConfigs, true)
entries, err := messageParser.Parse(message, nil, relabelConfigs, true)
assert.NoError(t, err)
assert.Len(t, entries, 2)

@ -71,24 +71,8 @@ func parseGCPLogsEntry(data []byte, other model.LabelSet, otherInternal labels.L
}
var processed labels.Labels
// apply relabeling
if len(relabelConfig) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range relabelConfig {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lbs.Labels(), relabelConfig...)
} else {
processed = lbs.Labels()
}
processed, _ = relabel.Process(lbs.Labels(), relabelConfig...)
} else {
processed = lbs.Labels()
}

@ -11,6 +11,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/pkg/logproto"
@ -34,7 +36,7 @@ func TestFormat(t *testing.T) {
labels: model.LabelSet{
"jobname": "pubsub-test",
},
relabel: []*relabel.Config{
relabel: testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__gcp_resource_labels_backend_service_name"},
Separator: ";",
@ -67,7 +69,7 @@ func TestFormat(t *testing.T) {
Action: "replace",
Replacement: "$1",
},
},
}),
useIncomingTimestamp: true,
expected: api.Entry{
Labels: model.LabelSet{

@ -10,6 +10,7 @@ import (
"time"
"github.com/grafana/loki/v3/clients/pkg/promtail/api"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/go-kit/log"
"github.com/prometheus/client_golang/prometheus"
@ -90,7 +91,7 @@ func TestPushTarget(t *testing.T) {
Labels: model.LabelSet{
"job": "some_job_name",
},
RelabelConfigs: []*relabel.Config{
RelabelConfigs: testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__gcp_attributes_logging_googleapis_com_timestamp"},
Regex: relabel.MustNewRegexp("(.*)"),
@ -134,7 +135,7 @@ func TestPushTarget(t *testing.T) {
TargetLabel: "cluster",
Action: relabel.Replace,
},
},
}),
},
expectedEntries: []expectedEntry{
{
@ -276,7 +277,7 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer)
tenantIDRelabelConfig := []*relabel.Config{
tenantIDRelabelConfig := testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
Regex: relabel.MustNewRegexp("(.*)"),
@ -284,7 +285,7 @@ func TestPushTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
TargetLabel: "tenant_id",
Action: relabel.Replace,
},
}
})
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tenantIDRelabelConfig, t.Name()+"_test_job", config)
require.NoError(t, err)
defer func() {
@ -410,7 +411,7 @@ func TestPushTarget_UsePushTimeout(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := gcplog.NewMetrics(prometheus.DefaultRegisterer)
tenantIDRelabelConfig := []*relabel.Config{
tenantIDRelabelConfig := testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
Regex: relabel.MustNewRegexp("(.*)"),
@ -418,7 +419,7 @@ func TestPushTarget_UsePushTimeout(t *testing.T) {
TargetLabel: "tenant_id",
Action: relabel.Replace,
},
}
})
pt, err := gcplog.NewGCPLogTarget(metrics, logger, eh, tenantIDRelabelConfig, t.Name()+"_test_job", config)
require.NoError(t, err)
defer func() {

@ -124,21 +124,7 @@ func (t *Target) handleMessage(msg *gelf.Message) {
var processed labels.Labels
if len(t.relabelConfig) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range t.relabelConfig {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}

@ -15,6 +15,8 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/grafana/loki/v3/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
)
@ -29,7 +31,7 @@ func Test_Gelf(t *testing.T) {
UseIncomingTimestamp: true,
Labels: model.LabelSet{"cfg": "true"},
},
RelabelConfigs: []*relabel.Config{
RelabelConfigs: testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__gelf_message_level"},
TargetLabel: "level",
@ -51,7 +53,7 @@ func Test_Gelf(t *testing.T) {
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
},
}),
},
})
require.NoError(t, err)

@ -130,21 +130,7 @@ func (h *Target) drain(w http.ResponseWriter, r *http.Request) {
var processed labels.Labels
if len(h.relabelConfigs) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range h.relabelConfigs {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lb.Labels(), h.relabelConfigs...)
} else {
processed = lb.Labels()
}
processed, _ = relabel.Process(lb.Labels(), h.relabelConfigs...)
} else {
processed = lb.Labels()
}

@ -19,6 +19,8 @@ import (
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
lokiClient "github.com/grafana/loki/v3/clients/pkg/promtail/client"
"github.com/grafana/loki/v3/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
@ -281,7 +283,7 @@ func TestHerokuDrainTarget(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := NewMetrics(prometheus.DefaultRegisterer)
pt, err := NewTarget(metrics, logger, eh, "test_job", config, tc.args.RelabelConfigs)
pt, err := NewTarget(metrics, logger, eh, "test_job", config, testutils.ValidateRelabelConfig(t, tc.args.RelabelConfigs))
require.NoError(t, err)
defer func() {
_ = pt.Stop()
@ -411,7 +413,7 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
prometheus.DefaultRegisterer = prometheus.NewRegistry()
metrics := NewMetrics(prometheus.DefaultRegisterer)
tenantIDRelabelConfig := []*relabel.Config{
tenantIDRelabelConfig := testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__tenant_id__"},
TargetLabel: "tenant_id",
@ -419,7 +421,7 @@ func TestHerokuDrainTarget_UseTenantIDHeaderIfPresent(t *testing.T) {
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
}
})
pt, err := NewTarget(metrics, logger, eh, "test_job", config, tenantIDRelabelConfig)
require.NoError(t, err)
defer func() {

@ -16,21 +16,7 @@ func format(lbs labels.Labels, cfg []*relabel.Config) model.LabelSet {
}
var processed labels.Labels
if len(cfg) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range cfg {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lbs, cfg...)
} else {
processed = lbs
}
processed, _ = relabel.Process(lbs, cfg...)
} else {
processed = lbs
}

@ -18,6 +18,8 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/grafana/loki/v3/clients/pkg/logentry/stages"
"github.com/grafana/loki/v3/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
@ -87,7 +89,7 @@ func Test_NewTarget(t *testing.T) {
client: fake.New(func() {}),
cfg: &TargetSyncerConfig{
GroupID: "group_1",
RelabelConfigs: []*relabel.Config{
RelabelConfigs: testutils.ValidateRelabelConfig(t, []*relabel.Config{
{
SourceLabels: model.LabelNames{"__meta_kafka_topic"},
TargetLabel: "topic",
@ -95,7 +97,7 @@ func Test_NewTarget(t *testing.T) {
Action: relabel.Replace,
Regex: relabel.MustNewRegexp("(.*)"),
},
},
}),
Labels: model.LabelSet{"static": "static1"},
},
}

@ -13,6 +13,8 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/atomic"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/grafana/loki/v3/clients/pkg/promtail/client/fake"
)
@ -159,7 +161,7 @@ func Test_TargetRun(t *testing.T) {
closed = true
},
)
tg := NewTarget(nil, session, claim, tt.inDiscoveredLS, tt.inLS, tt.relabels, fc, true, messageParser{})
tg := NewTarget(nil, session, claim, tt.inDiscoveredLS, tt.inLS, testutils.ValidateRelabelConfig(t, tt.relabels), fc, true, messageParser{})
var wg sync.WaitGroup
wg.Add(1)

@ -151,21 +151,7 @@ func (t *SyslogTarget) handleMessageRFC5424(connLabels labels.Labels, msg syslog
var processed labels.Labels
if len(t.relabelConfig) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range t.relabelConfig {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}
@ -227,21 +213,7 @@ func (t *SyslogTarget) handleMessageRFC3164(connLabels labels.Labels, msg syslog
var processed labels.Labels
if len(t.relabelConfig) > 0 {
// Validate relabel configs to set the validation scheme properly
valid := true
for _, rc := range t.relabelConfig {
if err := rc.Validate(model.UTF8Validation); err != nil {
// If validation fails, skip relabeling and use original labels
valid = false
break
}
}
// Only process if all configs were validated successfully
if valid {
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}
processed, _ = relabel.Process(lb.Labels(), t.relabelConfig...)
} else {
processed = lb.Labels()
}

@ -19,6 +19,8 @@ import (
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/testutils"
"github.com/grafana/loki/v3/clients/pkg/promtail/client/fake"
"github.com/grafana/loki/v3/clients/pkg/promtail/scrapeconfig"
"github.com/grafana/loki/v3/clients/pkg/promtail/targets/syslog/syslogparser"
@ -457,7 +459,7 @@ func relabelConfig(t *testing.T) []*relabel.Config {
err := yaml.Unmarshal([]byte(relabelCfg), &relabels)
require.NoError(t, err)
return relabels
return testutils.ValidateRelabelConfig(t, relabels)
}
func writeMessagesToStream(w io.Writer, messages []string, formatter formatFunc) error {

@ -2,7 +2,12 @@ package testutils
import (
"math/rand"
"testing"
"time"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/relabel"
"github.com/stretchr/testify/require"
)
var randomGenerator *rand.Rand
@ -20,3 +25,13 @@ func RandName() string {
}
return string(b)
}
func ValidateRelabelConfig(t *testing.T, configs []*relabel.Config) []*relabel.Config {
t.Helper()
for _, c := range configs {
require.NoError(t, c.Validate(model.UTF8Validation))
}
return configs
}

Loading…
Cancel
Save