promtail/targets/syslog: Enable best effort parsing for Syslog messages (#5409)

* promtail/targets/syslog: enable best effort parsing

* promtail/targets/syslog: fmt
k86
Leo 3 years ago committed by GitHub
parent d563d2c28b
commit 6c84304430
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 4
      clients/pkg/promtail/targets/syslog/syslogparser/syslogparser.go
  2. 17
      clients/pkg/promtail/targets/syslog/syslogparser/syslogparser_test.go
  3. 13
      clients/pkg/promtail/targets/syslog/syslogtarget_test.go

@ -24,9 +24,9 @@ func ParseStream(r io.Reader, callback func(res *syslog.Result), maxMessageLengt
b := firstByte[0]
if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength)).Parse(buf)
nontransparent.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength)).Parse(buf)
octetcounting.NewParser(syslog.WithListener(callback), syslog.WithMaxMessageLength(maxMessageLength), syslog.WithBestEffort()).Parse(buf)
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte)
}

@ -34,6 +34,23 @@ func TestParseStream_OctetCounting(t *testing.T) {
require.Equal(t, "Second", *results[1].Message.(*rfc5424.SyslogMessage).Message)
}
func TestParseStream_ValidParseError(t *testing.T) {
// This message can not parse fully but is valid when using the BestEffort Parser Option.
r := strings.NewReader("17 <13>1 First")
results := make([]*syslog.Result, 0)
cb := func(res *syslog.Result) {
results = append(results, res)
}
err := syslogparser.ParseStream(r, cb, defaultMaxMessageLength)
require.NoError(t, err)
require.Equal(t, 1, len(results))
require.EqualError(t, results[0].Error, "expecting a RFC3339MICRO timestamp or a nil value [col 6]")
require.True(t, results[0].Message.(*rfc5424.SyslogMessage).Valid())
}
func TestParseStream_OctetCounting_LongMessage(t *testing.T) {
r := strings.NewReader("8198 <13>1 - - - - - - First")

@ -424,19 +424,26 @@ func testSyslogTargetWithTLS(t *testing.T, octetCounting bool) {
c, err := tls.Dial("tcp", addr, &tlsConfig)
require.NoError(t, err)
messages := []string{
validMessages := []string{
`<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`,
}
// Messages that are malformed but still valid.
// This causes error messages being written, but the parser does not stop and close the connection.
malformeddMessages := []string{
`<165>1 - An application event log entry...`,
`<165>1 2018-10-11T22:14:15.007Z host5 e - An application event log entry...`,
}
messages := append(malformeddMessages, validMessages...)
err = writeMessagesToStream(c, messages, octetCounting)
require.NoError(t, err)
require.NoError(t, c.Close())
require.Eventuallyf(t, func() bool {
return len(client.Received()) == len(messages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Received()))
return len(client.Received()) == len(validMessages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(validMessages), len(client.Received()))
require.Equal(t, model.LabelSet{
"test": "syslog_target",

Loading…
Cancel
Save