pkg/promtail: IETF Syslog (RFC5424) Support (#1275)

* IETF syslog (rfc5254) support

* Upgrade go-syslog to correctly released v2.0.1

* Incorporate feedback from new linter

* Update documentation with relevant RFC sections.

* Incorporate feedback from review

* Use context to shutdown server.
* Use util.Backoff from cortex
* Do not embed mutex into TestLabeledClient
* Use strings.HasPrefix
* Better naming of connectionsClosed -> openConnections

* Incorporate further feedback from review

* Callback instead of channel in ParseStream.
  Removes one running Goroutine per connection.
* Improve parse error log level.
* Move mutex above field it protects.
* Use backoff.Ongoing()

* Switch to "nontransparent" parser

* Further improvements to the documentation
pull/1416/head
Sebastian Widmer 6 years ago committed by Cyril Tovena
parent 9420fb10fd
commit f0f6f24926
  1. 5
      docs/clients/promtail/README.md
  2. 82
      docs/clients/promtail/configuration.md
  3. 56
      docs/clients/promtail/scraping.md
  4. 2
      go.mod
  5. 4
      go.sum
  6. 19
      pkg/promtail/scrape/scrape.go
  7. 14
      pkg/promtail/targets/manager.go
  8. 35
      pkg/promtail/targets/syslogparser/syslogparser.go
  9. 61
      pkg/promtail/targets/syslogparser/syslogparser_test.go
  10. 327
      pkg/promtail/targets/syslogtarget.go
  11. 215
      pkg/promtail/targets/syslogtarget_test.go
  12. 83
      pkg/promtail/targets/syslogtargetmanager.go
  13. 3
      pkg/promtail/targets/target.go
  14. 8
      vendor/github.com/influxdata/go-syslog/v2/.gitignore
  15. 21
      vendor/github.com/influxdata/go-syslog/v2/LICENSE
  16. 232
      vendor/github.com/influxdata/go-syslog/v2/README.md
  17. 81
      vendor/github.com/influxdata/go-syslog/v2/common/common.rl
  18. 42
      vendor/github.com/influxdata/go-syslog/v2/common/functions.go
  19. 8
      vendor/github.com/influxdata/go-syslog/v2/go.mod
  20. 10
      vendor/github.com/influxdata/go-syslog/v2/go.sum
  21. 115
      vendor/github.com/influxdata/go-syslog/v2/makefile
  22. 269
      vendor/github.com/influxdata/go-syslog/v2/nontransparent/parser.go
  23. 163
      vendor/github.com/influxdata/go-syslog/v2/nontransparent/parser.go.rl
  24. 76
      vendor/github.com/influxdata/go-syslog/v2/nontransparent/trailer_type.go
  25. 160
      vendor/github.com/influxdata/go-syslog/v2/octetcounting/parser.go
  26. 150
      vendor/github.com/influxdata/go-syslog/v2/octetcounting/scanner.go
  27. 45
      vendor/github.com/influxdata/go-syslog/v2/octetcounting/tokens.go
  28. 19
      vendor/github.com/influxdata/go-syslog/v2/options.go
  29. 9606
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/builder.go
  30. 312
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/builder.go.rl
  31. 12011
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/machine.go
  32. 383
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/machine.go.rl
  33. 13
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/options.go
  34. 45
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/parser.go
  35. 291
      vendor/github.com/influxdata/go-syslog/v2/rfc5424/syslog_message.go
  36. 63
      vendor/github.com/influxdata/go-syslog/v2/syslog.go
  37. 21
      vendor/github.com/leodido/ragel-machinery/LICENSE
  38. 9
      vendor/github.com/leodido/ragel-machinery/README.md
  39. 22
      vendor/github.com/leodido/ragel-machinery/errors.go
  40. 3
      vendor/github.com/leodido/ragel-machinery/go.mod
  41. 1
      vendor/github.com/leodido/ragel-machinery/go.sum
  42. 122
      vendor/github.com/leodido/ragel-machinery/parser/arbitrary_reader.go
  43. 13
      vendor/github.com/leodido/ragel-machinery/parser/fsm.go
  44. 84
      vendor/github.com/leodido/ragel-machinery/parser/parser.go
  45. 14
      vendor/github.com/leodido/ragel-machinery/parser/reader.go
  46. 23
      vendor/github.com/leodido/ragel-machinery/parser/state.go
  47. 9
      vendor/modules.txt

@ -32,6 +32,11 @@ Just like Prometheus, `promtail` is configured using a `scrape_configs` stanza.
drop, and the final metadata to attach to the log line. Refer to the docs for
[configuring Promtail](configuration.md) for more details.
## Receiving Logs From Syslog
When the [Syslog Target](./scraping.md#syslog-target) is being used, logs
can be written with the syslog protocol to the configured port.
## Labeling and Parsing
During service discovery, metadata is determined (pod name, filename, etc.) that

@ -23,6 +23,7 @@ and how to scrape logs from files.
* [metric_histogram](#metric_histogram)
* [tenant_stage](#tenant_stage)
* [journal_config](#journal_config)
* [syslog_config](#syslog_config)
* [relabel_config](#relabel_config)
* [static_config](#static_config)
* [file_sd_config](#file_sd_config)
@ -30,6 +31,7 @@ and how to scrape logs from files.
* [target_config](#target_config)
* [Example Docker Config](#example-docker-config)
* [Example Journal Config](#example-journal-config)
* [Example Syslog Config](#example-syslog-config)
## Configuration File Reference
@ -244,6 +246,9 @@ job_name: <string>
# Describes how to scrape logs from the journal.
[journal: <journal_config>]
# Describes how to receive logs from syslog.
[syslog: <syslog_config>]
# Describes how to relabel targets to determine if they should
# be processed.
relabel_configs:
@ -580,6 +585,59 @@ labels:
[path: <string>]
```
### syslog_config
The `syslog_config` block configures a syslog listener allowing users to push
logs to promtail with the syslog protocol.
Currently supported is [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424)
with and without octet counting.
The recommended deployment is to have a dedicated syslog forwarder like **syslog-ng** or **rsyslog**
in front of promtail. The forwarder can take care of the various specifications
and transports that exist (UDP, BSD syslog, ...).
[Octet counting](https://tools.ietf.org/html/rfc6587#section-3.4.1) is recommended as the
message framing method. In a stream with [non-transparent framing](https://tools.ietf.org/html/rfc6587#section-3.4.2),
promtail needs to wait for the next message to catch multi-line messages,
therefore delays between messages can occur.
See recommended output configurations for
[syslog-ng](scraping.md#syslog-ng-output-configuration) and
[rsyslog](scraping.md#rsyslog-output-configuration). Both configurations enable
IETF Syslog with octet-counting.
You may need to increase the open files limit for the promtail process
if many clients are connected. (`ulimit -Sn`)
```yaml
# TCP address to listen on. Has the format of "host:port".
listen_address: <string>
# The idle timeout for tcp syslog connections, default is 120 seconds.
idle_timeout: <duration>
# Whether to convert syslog structured data to labels.
# A structured data entry of [example@99999 test="yes"] would become
# the label "__syslog_message_sd_example_99999_test" with the value "yes".
label_structured_data: <bool>
# Label map to add to every log message.
labels:
[ <labelname>: <labelvalue> ... ]
```
#### Available Labels
* `__syslog_connection_ip_address`: The remote IP address.
* `__syslog_connection_hostname`: The remote hostname.
* `__syslog_message_severity`: The [syslog severity](https://tools.ietf.org/html/rfc5424#section-6.2.1) parsed from the message. Symbolic name as per [syslog_message.go](https://github.com/influxdata/go-syslog/blob/v2.0.1/rfc5424/syslog_message.go#L184).
* `__syslog_message_facility`: The [syslog facility](https://tools.ietf.org/html/rfc5424#section-6.2.1) parsed from the message. Symbolic name as per [syslog_message.go](https://github.com/influxdata/go-syslog/blob/v2.0.1/rfc5424/syslog_message.go#L235) and `syslog(3)`.
* `__syslog_message_hostname`: The [hostname](https://tools.ietf.org/html/rfc5424#section-6.2.4) parsed from the message.
* `__syslog_message_app_name`: The [app-name field](https://tools.ietf.org/html/rfc5424#section-6.2.5) parsed from the message.
* `__syslog_message_proc_id`: The [procid field](https://tools.ietf.org/html/rfc5424#section-6.2.6) parsed from the message.
* `__syslog_message_msg_id`: The [msgid field](https://tools.ietf.org/html/rfc5424#section-6.2.7) parsed from the message.
* `__syslog_message_sd_<sd_id>[_<iana_enterprise_id>]_<sd_name>`: The [structured-data field](https://tools.ietf.org/html/rfc5424#section-6.3) parsed from the message. The data field `[custom@99770 example="1"]` becomes `__syslog_message_sd_custom_99770_example`.
### relabel_config
Relabeling is a powerful tool to dynamically rewrite the label set of a target
@ -970,3 +1028,27 @@ scrape_configs:
- source_labels: ['__journal__systemd_unit']
target_label: 'unit'
```
## Example Syslog Config
```yaml
server:
http_listen_port: 9080
grpc_listen_port: 0
positions:
filename: /tmp/positions.yaml
clients:
- url: http://loki_addr:3100/loki/api/v1/push
scrape_configs:
- job_name: syslog
syslog:
listen_address: 0.0.0.0:1514
labels:
job: "syslog"
relabel_configs:
- source_labels: ['__syslog_message_hostname']
target_label: 'host'
```

@ -122,6 +122,62 @@ When Promtail reads from the journal, it brings in all fields prefixed with
field from the journal was transformed into a label called `unit` through
`relabel_configs`. See [Relabeling](#relabeling) for more information.
## Syslog Receiver
Promtail supports receiving [IETF Syslog (RFC5424)](https://tools.ietf.org/html/rfc5424)
messages from a tcp stream. Receiving syslog messages is defined in a `syslog`
stanza:
```yaml
scrape_configs:
- job_name: syslog
syslog:
listen_address: 0.0.0.0:1514
idle_timeout: 60s
label_structured_data: yes
labels:
job: "syslog"
relabel_configs:
- source_labels: ['__syslog_message_hostname']
target_label: 'host'
```
The only required field in the syslog section is the `listen_address` field,
where a valid network address should be provided. The `idle_timeout` can help
with cleaning up stale syslog connections. If `label_structured_data` is set,
[structured data](https://tools.ietf.org/html/rfc5424#section-6.3) in the
syslog header will be translated to internal labels in the form of
`__syslog_message_sd_<ID>_<KEY>`.
The labels map defines a constant list of labels to add to every journal entry
that Promtail reads.
Note that it is recommended to deploy a dedicated syslog forwarder
like **syslog-ng** or **rsyslog** in front of Promtail.
The forwarder can take care of the various specifications
and transports that exist (UDP, BSD syslog, ...). See recommended output
configurations for [syslog-ng](#syslog-ng-output-configuration) and
[rsyslog](#rsyslog-output-configuration).
When Promtail receives syslog messages, it brings in all header fields,
parsed from the received message, prefixed with `__syslog_` as internal labels.
Like in the example above, the `__syslog_message_hostname`
field from the journal was transformed into a label called `host` through
`relabel_configs`. See [Relabeling](#relabeling) for more information.
### Syslog-NG Output Configuration
```
destination d_loki {
syslog("localhost" transport("tcp") port(<promtail_port>));
};
```
### Rsyslog Output Configuration
```
action(type="omfwd" protocol="tcp" port="<promtail_port>" Template="RSYSLOG_SyslogProtocol23Format" TCP_Framing="octet-counted")
```
## Relabeling
Each `scrape_configs` entry can contain a `relabel_configs` stanza.

@ -28,12 +28,14 @@ require (
github.com/grpc-ecosystem/grpc-opentracing v0.0.0-20180507213350-8e809c8a8645
github.com/hashicorp/golang-lru v0.5.3
github.com/hpcloud/tail v1.0.0
github.com/influxdata/go-syslog/v2 v2.0.1
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/json-iterator/go v1.1.7
github.com/klauspost/compress v1.7.4
github.com/klauspost/cpuid v1.2.1 // indirect
github.com/mitchellh/mapstructure v1.1.2
github.com/morikuni/aec v0.0.0-20170113033406-39771216ff4c // indirect
github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f
github.com/opencontainers/go-digest v1.0.0-rc1 // indirect
github.com/opencontainers/image-spec v1.0.1 // indirect
github.com/opentracing/opentracing-go v1.1.0

@ -371,6 +371,8 @@ github.com/hashicorp/serf v0.8.3 h1:MWYcmct5EtKz0efYooPcL0yNkem+7kWxqXDi/UIh+8k=
github.com/hashicorp/serf v0.8.3/go.mod h1:UpNcs7fFbpKIyZaUuSW6EPiH+eZC7OuyFD+wc1oal+k=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/inconshreveable/mousetrap v1.0.0/go.mod h1:PxqpIevigyE2G7u3NXJIT2ANytuPF1OarO4DADm73n8=
github.com/influxdata/go-syslog/v2 v2.0.1 h1:l44S4l4Q8MhGQcoOxJpbo+QQYxJqp0vdgIVHh4+DO0s=
github.com/influxdata/go-syslog/v2 v2.0.1/go.mod h1:hjvie1UTaD5E1fTnDmxaCw8RRDrT4Ve+XHr5O2dKSCo=
github.com/influxdata/influxdb v1.7.7/go.mod h1:qZna6X/4elxqT3yI9iZYdZrWWdeFOOprn86kgg4+IzY=
github.com/jessevdk/go-flags v0.0.0-20180331124232-1c38ed7ad0cc/go.mod h1:4FA24M0QyGHXBuZZK/XkWh8h0e1EYbRYJSGM75WSRxI=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
@ -413,6 +415,8 @@ github.com/kylelemons/godebug v0.0.0-20170820004349-d65d576e9348/go.mod h1:B69LE
github.com/lann/builder v0.0.0-20150808151131-f22ce00fd939/go.mod h1:dXGbAdH5GtBTC4WfIxhKZfyBF/HBFgRZSWwZ9g/He9o=
github.com/lann/ps v0.0.0-20150810152359-62de8c46ede0/go.mod h1:vmVJ0l/dxyfGW6FmdpVm2joNMFikkuWg0EoCKLGUMNw=
github.com/leanovate/gopter v0.2.4/go.mod h1:gNcbPWNEWRe4lm+bycKqxUYoH5uoVje5SkOJ3uoLer8=
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 h1:bCiVCRCs1Heq84lurVinUPy19keqGEe4jh5vtK37jcg=
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg=
github.com/lib/pq v1.0.0/go.mod h1:5WUZQaWbwv1U+lTReE5YruASi9Al49XbQIvNi/34Woo=
github.com/lovoo/gcloud-opentracing v0.3.0/go.mod h1:ZFqk2y38kMDDikZPAK7ynTTGuyt17nSPdS3K5e+ZTBY=
github.com/mailru/easyjson v0.0.0-20160728113105-d5b7844b561a/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=

@ -3,6 +3,7 @@ package scrape
import (
"fmt"
"reflect"
"time"
"github.com/prometheus/common/model"
@ -19,6 +20,7 @@ type Config struct {
EntryParser api.EntryParser `yaml:"entry_parser"`
PipelineStages stages.PipelineStages `yaml:"pipeline_stages,omitempty"`
JournalConfig *JournalTargetConfig `yaml:"journal,omitempty"`
SyslogConfig *SyslogTargetConfig `yaml:"syslog,omitempty"`
RelabelConfigs []*relabel.Config `yaml:"relabel_configs,omitempty"`
ServiceDiscoveryConfig sd_config.ServiceDiscoveryConfig `yaml:",inline"`
}
@ -42,6 +44,23 @@ type JournalTargetConfig struct {
Path string `yaml:"path"`
}
// SyslogTargetConfig describes a scrape config that listens for log lines over syslog.
type SyslogTargetConfig struct {
// ListenAddress is the address to listen on for syslog messages.
ListenAddress string `yaml:"listen_address"`
// IdleTimeout is the idle timeout for tcp connections.
IdleTimeout time.Duration `yaml:"idle_timeout"`
// LabelStructuredData sets if the structured data part of a syslog message
// is translated to a label.
// [example@99999 test="yes"] => {__syslog_message_sd_example_99999_test="yes"}
LabelStructuredData bool `yaml:"label_structured_data"`
// Labels optionally holds labels to associate with each record read from syslog.
Labels model.LabelSet `yaml:"labels"`
}
// DefaultScrapeConfig is the default Config.
var DefaultScrapeConfig = Config{
EntryParser: api.Docker,

@ -32,6 +32,7 @@ func NewTargetManagers(
var targetManagers []targetManager
var fileScrapeConfigs []scrape.Config
var journalScrapeConfigs []scrape.Config
var syslogScrapeConfigs []scrape.Config
for _, cfg := range scrapeConfigs {
if cfg.HasServiceDiscoveryConfig() {
@ -70,6 +71,19 @@ func NewTargetManagers(
targetManagers = append(targetManagers, journalTargetManager)
}
for _, cfg := range scrapeConfigs {
if cfg.SyslogConfig != nil {
syslogScrapeConfigs = append(syslogScrapeConfigs, cfg)
}
}
if len(syslogScrapeConfigs) > 0 {
syslogTargetManager, err := NewSyslogTargetManager(logger, client, syslogScrapeConfigs)
if err != nil {
return nil, errors.Wrap(err, "failed to make syslog target manager")
}
targetManagers = append(targetManagers, syslogTargetManager)
}
return &TargetManagers{targetManagers: targetManagers}, nil
}

@ -0,0 +1,35 @@
package syslogparser
import (
"bufio"
"fmt"
"io"
"github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/nontransparent"
"github.com/influxdata/go-syslog/v2/octetcounting"
)
// ParseStream parses a rfc5424 syslog stream from the given Reader, calling
// the callback function with the parsed messages. The parser automatically
// detects octet counting.
// The function returns on EOF or unrecoverable errors.
func ParseStream(r io.Reader, callback func(res *syslog.Result)) error {
buf := bufio.NewReader(r)
firstByte, err := buf.Peek(1)
if err != nil {
return err
}
b := firstByte[0]
if b == '<' {
nontransparent.NewParser(syslog.WithListener(callback)).Parse(buf)
} else if b >= '0' && b <= '9' {
octetcounting.NewParser(syslog.WithListener(callback)).Parse(buf)
} else {
return fmt.Errorf("invalid or unsupported framing. first byte: '%s'", firstByte)
}
return nil
}

@ -0,0 +1,61 @@
package syslogparser_test
import (
"io"
"strings"
"testing"
"github.com/grafana/loki/pkg/promtail/targets/syslogparser"
"github.com/influxdata/go-syslog/v2"
"github.com/stretchr/testify/require"
)
func TestParseStream_OctetCounting(t *testing.T) {
r := strings.NewReader("23 <13>1 - - - - - - First24 <13>1 - - - - - - Second")
results := make([]*syslog.Result, 0)
cb := func(res *syslog.Result) {
results = append(results, res)
}
err := syslogparser.ParseStream(r, cb)
require.NoError(t, err)
require.Equal(t, 2, len(results))
require.NoError(t, results[0].Error)
require.Equal(t, "First", *results[0].Message.Message())
require.NoError(t, results[1].Error)
require.Equal(t, "Second", *results[1].Message.Message())
}
func TestParseStream_NewlineSeparated(t *testing.T) {
r := strings.NewReader("<13>1 - - - - - - First\n<13>1 - - - - - - Second\n")
results := make([]*syslog.Result, 0)
cb := func(res *syslog.Result) {
results = append(results, res)
}
err := syslogparser.ParseStream(r, cb)
require.NoError(t, err)
require.Equal(t, 2, len(results))
require.NoError(t, results[0].Error)
require.Equal(t, "First", *results[0].Message.Message())
require.NoError(t, results[1].Error)
require.Equal(t, "Second", *results[1].Message.Message())
}
func TestParseStream_InvalidStream(t *testing.T) {
r := strings.NewReader("invalid")
err := syslogparser.ParseStream(r, func(res *syslog.Result) {})
require.EqualError(t, err, "invalid or unsupported framing. first byte: 'i'")
}
func TestParseStream_EmptyStream(t *testing.T) {
r := strings.NewReader("")
err := syslogparser.ParseStream(r, func(res *syslog.Result) {})
require.Equal(t, err, io.EOF)
}

@ -0,0 +1,327 @@
package targets
import (
"context"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
"github.com/cortexproject/cortex/pkg/util"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/influxdata/go-syslog/v2"
"github.com/mwitkow/go-conntrack"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/labels"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/scrape"
"github.com/grafana/loki/pkg/promtail/targets/syslogparser"
)
var (
syslogEntries = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "syslog_target_entries_total",
Help: "Total number of successful entries sent to the syslog target",
})
syslogParsingErrors = promauto.NewCounter(prometheus.CounterOpts{
Namespace: "promtail",
Name: "syslog_target_parsing_errors_total",
Help: "Total number of parsing errors while receiving syslog messages",
})
defaultIdleTimeout = 120 * time.Second
)
// SyslogTarget listens to syslog messages.
type SyslogTarget struct {
logger log.Logger
handler api.EntryHandler
config *scrape.SyslogTargetConfig
relabelConfig []*relabel.Config
listener net.Listener
messages chan message
ctx context.Context
ctxCancel context.CancelFunc
openConnections *sync.WaitGroup
}
type message struct {
labels model.LabelSet
message string
}
// NewSyslogTarget configures a new SyslogTarget.
func NewSyslogTarget(
logger log.Logger,
handler api.EntryHandler,
relabel []*relabel.Config,
config *scrape.SyslogTargetConfig,
) (*SyslogTarget, error) {
ctx, cancel := context.WithCancel(context.Background())
t := &SyslogTarget{
logger: logger,
handler: handler,
config: config,
relabelConfig: relabel,
ctx: ctx,
ctxCancel: cancel,
openConnections: new(sync.WaitGroup),
}
t.messages = make(chan message)
go t.messageSender()
err := t.run()
return t, err
}
func (t *SyslogTarget) run() error {
l, err := net.Listen("tcp", t.config.ListenAddress)
l = conntrack.NewListener(l, conntrack.TrackWithName("syslog_target/"+t.config.ListenAddress))
if err != nil {
return fmt.Errorf("error setting up syslog target %w", err)
}
t.listener = l
level.Info(t.logger).Log("msg", "syslog listening on address", "address", t.ListenAddress().String())
t.openConnections.Add(1)
go t.acceptConnections()
return nil
}
func (t *SyslogTarget) acceptConnections() {
defer t.openConnections.Done()
l := log.With(t.logger, "address", t.listener.Addr().String())
backoff := util.NewBackoff(t.ctx, util.BackoffConfig{
MinBackoff: 5 * time.Millisecond,
MaxBackoff: 1 * time.Second,
})
for {
c, err := t.listener.Accept()
if err != nil {
if t.ctx.Err() != nil {
level.Info(l).Log("msg", "syslog server shutting down")
return
}
if ne, ok := err.(net.Error); ok && ne.Temporary() {
level.Warn(l).Log("msg", "failed to accept syslog connection", "err", err, "num_retries", backoff.NumRetries())
backoff.Wait()
continue
}
level.Error(l).Log("msg", "failed to accept syslog connection. quiting", "err", err)
return
}
backoff.Reset()
t.openConnections.Add(1)
go t.handleConnection(c)
}
}
func (t *SyslogTarget) handleConnection(cn net.Conn) {
defer t.openConnections.Done()
c := &idleTimeoutConn{cn, t.idleTimeout()}
handlerCtx, cancel := context.WithCancel(t.ctx)
defer cancel()
go func() {
<-handlerCtx.Done()
_ = c.Close()
}()
connLabels := t.connectionLabels(c)
err := syslogparser.ParseStream(c, func(msg *syslog.Result) {
if err := msg.Error; err != nil {
t.handleMessageError(err)
return
}
t.handleMessage(connLabels.Copy(), msg.Message)
})
if err != nil {
level.Warn(t.logger).Log("msg", "error initializing syslog stream", "err", err)
}
}
func (t *SyslogTarget) handleMessageError(err error) {
var ne net.Error
if errors.As(err, &ne) && ne.Timeout() {
level.Debug(t.logger).Log("msg", "connection timed out", "err", ne)
return
}
level.Warn(t.logger).Log("msg", "error parsing syslog stream", "err", err)
syslogParsingErrors.Inc()
}
func (t *SyslogTarget) handleMessage(connLabels labels.Labels, msg syslog.Message) {
if msg.Message() == nil {
return
}
lb := labels.NewBuilder(connLabels)
if v := msg.SeverityLevel(); v != nil {
lb.Set("__syslog_message_severity", *v)
}
if v := msg.FacilityLevel(); v != nil {
lb.Set("__syslog_message_facility", *v)
}
if v := msg.Hostname(); v != nil {
lb.Set("__syslog_message_hostname", *v)
}
if v := msg.Appname(); v != nil {
lb.Set("__syslog_message_app_name", *v)
}
if v := msg.ProcID(); v != nil {
lb.Set("__syslog_message_proc_id", *v)
}
if v := msg.MsgID(); v != nil {
lb.Set("__syslog_message_msg_id", *v)
}
if t.config.LabelStructuredData && msg.StructuredData() != nil {
for id, params := range *msg.StructuredData() {
id = strings.Replace(id, "@", "_", -1)
for name, value := range params {
key := "__syslog_message_sd_" + id + "_" + name
lb.Set(key, value)
}
}
}
processed := relabel.Process(lb.Labels(), t.relabelConfig...)
filtered := make(model.LabelSet)
for _, lbl := range processed {
if strings.HasPrefix(lbl.Name, "__") {
continue
}
filtered[model.LabelName(lbl.Name)] = model.LabelValue(lbl.Value)
}
t.messages <- message{filtered, *msg.Message()}
}
func (t *SyslogTarget) messageSender() {
for msg := range t.messages {
if err := t.handler.Handle(msg.labels, time.Now(), msg.message); err != nil {
level.Error(t.logger).Log("msg", "error handling line", "error", err)
}
syslogEntries.Inc()
}
}
func (t *SyslogTarget) connectionLabels(c net.Conn) labels.Labels {
lb := labels.NewBuilder(nil)
for k, v := range t.config.Labels {
lb.Set(string(k), string(v))
}
ip := ipFromConn(c).String()
lb.Set("__syslog_connection_ip_address", ip)
lb.Set("__syslog_connection_hostname", lookupAddr(ip))
return lb.Labels()
}
func ipFromConn(c net.Conn) net.IP {
switch addr := c.RemoteAddr().(type) {
case *net.TCPAddr:
return addr.IP
}
return nil
}
func lookupAddr(addr string) string {
names, _ := net.LookupAddr(addr)
return strings.Join(names, ",")
}
// Type returns SyslogTargetType.
func (t *SyslogTarget) Type() TargetType {
return SyslogTargetType
}
// Ready indicates whether or not the syslog target is ready to be read from.
func (t *SyslogTarget) Ready() bool {
return true
}
// DiscoveredLabels returns the set of labels discovered by the syslog target, which
// is always nil. Implements Target.
func (t *SyslogTarget) DiscoveredLabels() model.LabelSet {
return nil
}
// Labels returns the set of labels that statically apply to all log entries
// produced by the SyslogTarget.
func (t *SyslogTarget) Labels() model.LabelSet {
return t.config.Labels
}
// Details returns target-specific details.
func (t *SyslogTarget) Details() interface{} {
return map[string]string{}
}
// Stop shuts down the SyslogTarget.
func (t *SyslogTarget) Stop() error {
t.ctxCancel()
err := t.listener.Close()
t.openConnections.Wait()
close(t.messages)
return err
}
// ListenAddress returns the address SyslogTarget is listening on.
func (t *SyslogTarget) ListenAddress() net.Addr {
return t.listener.Addr()
}
func (t *SyslogTarget) idleTimeout() time.Duration {
if tm := t.config.IdleTimeout; tm != 0 {
return tm
}
return defaultIdleTimeout
}
type idleTimeoutConn struct {
net.Conn
idleTimeout time.Duration
}
func (c *idleTimeoutConn) Write(p []byte) (int, error) {
c.setDeadline()
return c.Conn.Write(p)
}
func (c *idleTimeoutConn) Read(b []byte) (int, error) {
c.setDeadline()
return c.Conn.Read(b)
}
func (c *idleTimeoutConn) setDeadline() {
_ = c.Conn.SetDeadline(time.Now().Add(c.idleTimeout))
}

@ -0,0 +1,215 @@
package targets
import (
"fmt"
"io"
"net"
"os"
"sync"
"testing"
"time"
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/pkg/relabel"
"github.com/stretchr/testify/require"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/promtail/scrape"
)
type ClientMessage struct {
Labels model.LabelSet
Timestamp time.Time
Message string
}
type TestLabeledClient struct {
log log.Logger
messagesMtx sync.RWMutex
messages []ClientMessage
}
func (c *TestLabeledClient) Handle(ls model.LabelSet, t time.Time, s string) error {
level.Debug(c.log).Log("msg", "received log", "log", s)
c.messagesMtx.Lock()
defer c.messagesMtx.Unlock()
c.messages = append(c.messages, ClientMessage{ls, t, s})
return nil
}
func (c *TestLabeledClient) Messages() []ClientMessage {
c.messagesMtx.RLock()
defer c.messagesMtx.RUnlock()
return c.messages
}
func TestSyslogTarget_NewlineSeparatedMessages(t *testing.T) {
testSyslogTarget(t, false)
}
func TestSyslogTarget_OctetCounting(t *testing.T) {
testSyslogTarget(t, true)
}
func testSyslogTarget(t *testing.T, octetCounting bool) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := &TestLabeledClient{log: logger}
tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrape.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
LabelStructuredData: true,
Labels: model.LabelSet{
"test": "syslog_target",
},
})
require.NoError(t, err)
defer func() {
require.NoError(t, tgt.Stop())
}()
addr := tgt.ListenAddress().String()
c, err := net.Dial("tcp", addr)
require.NoError(t, err)
messages := []string{
`<165>1 2018-10-11T22:14:15.003Z host5 e - id1 [custom@32473 exkey="1"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.005Z host5 e - id2 [custom@32473 exkey="2"] An application event log entry...`,
`<165>1 2018-10-11T22:14:15.007Z host5 e - id3 [custom@32473 exkey="3"] An application event log entry...`,
}
err = writeMessagesToStream(c, messages, octetCounting)
require.NoError(t, err)
require.NoError(t, c.Close())
require.Eventuallyf(t, func() bool {
return len(client.Messages()) == len(messages)
}, time.Second, time.Millisecond, "Expected to receive %d messages, got %d.", len(messages), len(client.Messages()))
require.Equal(t, model.LabelSet{
"test": "syslog_target",
"severity": "notice",
"facility": "local4",
"hostname": "host5",
"app_name": "e",
"msg_id": "id1",
"sd_custom_exkey": "1",
}, client.Messages()[0].Labels)
require.Equal(t, "An application event log entry...", client.Messages()[0].Message)
require.NotZero(t, client.Messages()[0].Timestamp)
}
func relabelConfig(t *testing.T) []*relabel.Config {
relabelCfg := `
- source_labels: ['__syslog_message_severity']
target_label: 'severity'
- source_labels: ['__syslog_message_facility']
target_label: 'facility'
- source_labels: ['__syslog_message_hostname']
target_label: 'hostname'
- source_labels: ['__syslog_message_app_name']
target_label: 'app_name'
- source_labels: ['__syslog_message_proc_id']
target_label: 'proc_id'
- source_labels: ['__syslog_message_msg_id']
target_label: 'msg_id'
- source_labels: ['__syslog_message_sd_custom_32473_exkey']
target_label: 'sd_custom_exkey'
`
var relabels []*relabel.Config
err := yaml.Unmarshal([]byte(relabelCfg), &relabels)
require.NoError(t, err)
return relabels
}
func writeMessagesToStream(w io.Writer, messages []string, octetCounting bool) error {
var formatter func(string) string
if octetCounting {
formatter = func(s string) string {
return fmt.Sprintf("%d %s", len(s), s)
}
} else {
formatter = func(s string) string {
return s + "\n"
}
}
for _, msg := range messages {
_, err := fmt.Fprint(w, formatter(msg))
if err != nil {
return err
}
}
return nil
}
func TestSyslogTarget_InvalidData(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := &TestLabeledClient{log: logger}
tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrape.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
})
require.NoError(t, err)
defer func() {
require.NoError(t, tgt.Stop())
}()
addr := tgt.ListenAddress().String()
c, err := net.Dial("tcp", addr)
require.NoError(t, err)
defer c.Close()
_, err = fmt.Fprint(c, "xxx")
require.NoError(t, err)
// syslog target should immediately close the connection if sent invalid data
err = c.SetDeadline(time.Now().Add(time.Second))
require.NoError(t, err)
buf := make([]byte, 1)
_, err = c.Read(buf)
require.EqualError(t, err, "EOF")
}
func TestSyslogTarget_IdleTimeout(t *testing.T) {
w := log.NewSyncWriter(os.Stderr)
logger := log.NewLogfmtLogger(w)
client := &TestLabeledClient{log: logger}
tgt, err := NewSyslogTarget(logger, client, relabelConfig(t), &scrape.SyslogTargetConfig{
ListenAddress: "127.0.0.1:0",
IdleTimeout: time.Millisecond,
})
require.NoError(t, err)
defer func() {
require.NoError(t, tgt.Stop())
}()
addr := tgt.ListenAddress().String()
c, err := net.Dial("tcp", addr)
require.NoError(t, err)
defer c.Close()
// connection should be closed before the higher timeout
// from SetDeadline fires
err = c.SetDeadline(time.Now().Add(time.Second))
require.NoError(t, err)
buf := make([]byte, 1)
_, err = c.Read(buf)
require.EqualError(t, err, "EOF")
}

@ -0,0 +1,83 @@
package targets
import (
"github.com/go-kit/kit/log"
"github.com/go-kit/kit/log/level"
"github.com/prometheus/client_golang/prometheus"
"github.com/grafana/loki/pkg/logentry/stages"
"github.com/grafana/loki/pkg/promtail/api"
"github.com/grafana/loki/pkg/promtail/scrape"
)
// SyslogTargetManager manages a series of SyslogTargets.
type SyslogTargetManager struct {
logger log.Logger
targets map[string]*SyslogTarget
}
// NewSyslogTargetManager creates a new SyslogTargetManager.
func NewSyslogTargetManager(
logger log.Logger,
client api.EntryHandler,
scrapeConfigs []scrape.Config,
) (*SyslogTargetManager, error) {
tm := &SyslogTargetManager{
logger: logger,
targets: make(map[string]*SyslogTarget),
}
for _, cfg := range scrapeConfigs {
registerer := prometheus.DefaultRegisterer
pipeline, err := stages.NewPipeline(log.With(logger, "component", "syslog_pipeline"), cfg.PipelineStages, &cfg.JobName, registerer)
if err != nil {
return nil, err
}
t, err := NewSyslogTarget(logger, pipeline.Wrap(client), cfg.RelabelConfigs, cfg.SyslogConfig)
if err != nil {
return nil, err
}
tm.targets[cfg.JobName] = t
}
return tm, nil
}
// Ready returns true if at least one SyslogTarget is also ready.
func (tm *SyslogTargetManager) Ready() bool {
for _, t := range tm.targets {
if t.Ready() {
return true
}
}
return false
}
// Stop stops the SyslogTargetManager and all of its SyslogTargets.
func (tm *SyslogTargetManager) Stop() {
for _, t := range tm.targets {
if err := t.Stop(); err != nil {
level.Error(t.logger).Log("msg", "error stopping SyslogTarget", "err", err.Error())
}
}
}
// ActiveTargets returns the list of SyslogTargets where syslog data
// is being read. ActiveTargets is an alias to AllTargets as
// SyslogTargets cannot be deactivated, only stopped.
func (tm *SyslogTargetManager) ActiveTargets() map[string][]Target {
return tm.AllTargets()
}
// AllTargets returns the list of all targets where syslog data
// is currently being read.
func (tm *SyslogTargetManager) AllTargets() map[string][]Target {
result := make(map[string][]Target, len(tm.targets))
for k, v := range tm.targets {
result[k] = []Target{v}
}
return result
}

@ -14,6 +14,9 @@ const (
// JournalTargetType is a journalctl target
JournalTargetType = TargetType("Journal")
// SyslogTargetType is a syslog target
SyslogTargetType = TargetType("Syslog")
// DroppedTargetType is a target that's been dropped.
DroppedTargetType = TargetType("dropped")
)

@ -0,0 +1,8 @@
debug.test
docs/*.png
.vscode/
*.out
*.html

@ -0,0 +1,21 @@
The MIT License
Copyright (c) 2018, InfluxData Inc.
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

@ -0,0 +1,232 @@
[![MIT License](http://img.shields.io/badge/license-MIT-blue.svg?style=for-the-badge)](LICENSE)
**A parser for syslog messages**.
> [Blazing fast](#Performances) RFC5424-compliant parsers
To wrap up, this package provides:
- a RFC5424-compliant parser
- a RFC5424-compliant builder
- a parser which works on streams for syslog with [octet counting](https://tools.ietf.org/html/rfc5425#section-4.3) framing technique
- a parser which works on streams for syslog with [non-transparent](https://tools.ietf.org/html/rfc6587#section-3.4.2) framing technique
This library provides the pieces to parse syslog messages transported following various RFCs.
For example:
- TLS with octet count ([RFC5425](https://tools.ietf.org/html/rfc5425))
- TCP with non-transparent framing or with octet count ([RFC 6587](https://tools.ietf.org/html/rfc6587))
- UDP carrying one message per packet ([RFC5426](https://tools.ietf.org/html/rfc5426))
## Installation
```
go get github.com/influxdata/go-syslog/v2
```
## Docs
[![Documentation](https://img.shields.io/badge/godoc-reference-blue.svg?style=for-the-badge)](http://godoc.org/github.com/influxdata/go-syslog)
The [docs](docs/) directory contains `.dot` files representing the FSM parts of a [RFC5424](https://tools.ietf.org/html/rfc5424) syslog message and also the ones representing the other transport parsers.
## Usage
Suppose you want to parse a given sequence of bytes as a RFC5424 message.
```go
i := []byte(`<165>4 2018-10-11T22:14:15.003Z mymach.it e - 1 [ex@32473 iut="3"] An application event log entry...`)
p := rfc5424.NewParser()
m, e := p.Parse(i, nil)
```
This results in `m` being equal to:
```go
// (*rfc5424.SyslogMessage)({
// priority: (*uint8)(165),
// facility: (*uint8)(20),
// severity: (*uint8)(5),
// version: (uint16) 4,
// timestamp: (*time.Time)(2018-10-11 22:14:15.003 +0000 UTC),
// hostname: (*string)((len=9) "mymach.it"),
// appname: (*string)((len=1) "e"),
// procID: (*string)(<nil>),
// msgID: (*string)((len=1) "1"),
// structuredData: (*map[string]map[string]string)((len=1) {
// (string) (len=8) "ex@32473": (map[string]string) (len=1) {
// (string) (len=3) "iut": (string) (len=1) "3"
// }
// }),
// message: (*string)((len=33) "An application event log entry...")
// })
```
And `e` being equal to `nil`, since the `i` is a perfectly valid RFC5424 message.
### Best effort mode
RFC5424 parser has the ability to perform partial matches (until it can).
With this mode enabled, when the parsing process errors out it returns the message collected until that position, and the error that caused the parser to stop.
Notice that in this modality the output is returned _iff_ it represents a minimally valid message - ie., a message containing almost a priority field in `[1,191]` within angular brackets, followed by a version in `]0,999]`.
Let's look at an example.
```go
bestEffortOn := true
i := []byte("<1>1 A - - - - - -")
p := NewParser()
m, e := p.Parse(i, &bestEffortOn)
```
This results in `m` being equal to the following `SyslogMessage` instance.
```go
// (*rfc5424.SyslogMessage)({
// priority: (*uint8)(1),
// facility: (*uint8)(0),
// severity: (*uint8)(1),
// version: (uint16) 1,
// timestamp: (*time.Time)(<nil>),
// hostname: (*string)(<nil>),
// appname: (*string)(<nil>),
// procID: (*string)(<nil>),
// msgID: (*string)(<nil>),
// structuredData: (*map[string]map[string]string)(<nil>),
// message: (*string)(<nil>)
// })
```
And, at the same time, in `e` reporting the error that actually stopped the parser.
```go
expecting a RFC3339MICRO timestamp or a nil value [col 5]
```
Both `m` and `e` have a value since at the column the parser stopped it already was able to construct a minimally valid `SyslogMessage`.
### Builder
This library also provides a builder to construct valid syslog messages.
Notice that its API ignores input values that does not match the grammar.
Let's have a look to an example.
```go
msg := &SyslogMessage{}
msg.SetTimestamp("not a RFC3339MICRO timestamp")
msg.Valid() // Not yet a valid message (try msg.Valid())
msg.SetPriority(191)
msg.SetVersion(1)
msg.Valid() // Now it is minimally valid
```
Printing `msg` you will verify it contains a `nil` timestamp (since an invalid one has been given).
```go
// (*rfc5424.SyslogMessage)({
// priority: (*uint8)(191),
// facility: (*uint8)(23),
// severity: (*uint8)(7),
// version: (uint16) 1,
// timestamp: (*time.Time)(<nil>),
// hostname: (*string)(<nil>),
// appname: (*string)(<nil>),
// procID: (*string)(<nil>),
// msgID: (*string)(<nil>),
// structuredData: (*map[string]map[string]string)(<nil>),
// message: (*string)(<nil>)
// })
```
Finally you can serialize the message into a string.
```go
str, _ := msg.String()
// <191>1 - - - - - -
```
## Message transfer
Excluding encapsulating one message for packet in packet protocols there are two ways to transfer syslog messages over streams.
The older - ie., the **non-transparent** framing - and the newer one - ie., the **octet counting** framing - which is reliable and has not been seen to cause problems noted with the non-transparent one.
This library provide stream parsers for both.
### Octet counting
In short, [RFC5425](https://tools.ietf.org/html/rfc5425#section-4.3) and [RFC6587](), aside from the protocol considerations, describe a **transparent framing** technique for syslog messages that uses the **octect counting** technique - ie., the message lenght of the incoming message.
Each syslog message is sent with a prefix representing the number of bytes it is made of.
This [package](./octetcounting) parses messages stream following such rule.
To quickly understand how to use it please have a look at the [example file](./octetcounting/example_test.go).
### Non transparent
The [RFC6587](https://tools.ietf.org/html/rfc6587#section-3.4.2) also describes the **non-transparent framing** transport of syslog messages.
In such case the messages are separated by a trailer, usually a line feed.
This [package](./nontransparent) parses message stream following such [technique](https://tools.ietf.org/html/rfc6587#section-3.4.2).
To quickly understand how to use it please have a look at the [example file](./nontransparent/example_test.go).
Things we do not support:
- trailers other than `LF` or `NUL`
- trailer which length is greater than 1 byte
- trailer change on a frame-by-frame basis
## Performances
To run the benchmark execute the following command.
```bash
make bench
```
On my machine<sup>[1](#mymachine)</sup> this are the results obtained in best effort mode.
```
[no]_empty_input__________________________________-4 30000000 253 ns/op 224 B/op 3 allocs/op
[no]_multiple_syslog_messages_on_multiple_lines___-4 20000000 433 ns/op 304 B/op 12 allocs/op
[no]_impossible_timestamp_________________________-4 10000000 1080 ns/op 528 B/op 11 allocs/op
[no]_malformed_structured_data____________________-4 20000000 552 ns/op 400 B/op 12 allocs/op
[no]_with_duplicated_structured_data_id___________-4 5000000 1246 ns/op 688 B/op 17 allocs/op
[ok]_minimal______________________________________-4 30000000 264 ns/op 247 B/op 9 allocs/op
[ok]_average_message______________________________-4 5000000 1984 ns/op 1536 B/op 26 allocs/op
[ok]_complicated_message__________________________-4 5000000 1644 ns/op 1280 B/op 25 allocs/op
[ok]_very_long_message____________________________-4 2000000 3826 ns/op 2464 B/op 28 allocs/op
[ok]_all_max_length_and_complete__________________-4 3000000 2792 ns/op 1888 B/op 28 allocs/op
[ok]_all_max_length_except_structured_data_and_mes-4 5000000 1830 ns/op 883 B/op 13 allocs/op
[ok]_minimal_with_message_containing_newline______-4 20000000 294 ns/op 250 B/op 10 allocs/op
[ok]_w/o_procid,_w/o_structured_data,_with_message-4 10000000 956 ns/op 364 B/op 11 allocs/op
[ok]_minimal_with_UTF-8_message___________________-4 20000000 586 ns/op 359 B/op 10 allocs/op
[ok]_with_structured_data_id,_w/o_structured_data_-4 10000000 998 ns/op 592 B/op 14 allocs/op
[ok]_with_multiple_structured_data________________-4 5000000 1538 ns/op 1232 B/op 22 allocs/op
[ok]_with_escaped_backslash_within_structured_data-4 5000000 1316 ns/op 920 B/op 20 allocs/op
[ok]_with_UTF-8_structured_data_param_value,_with_-4 5000000 1580 ns/op 1050 B/op 21 allocs/op
```
As you can see it takes:
* ~250ns to parse the smallest legal message
* ~2µs to parse an average legal message
* ~4µs to parse a very long legal message
Other RFC5424 implementations, like this [one](https://github.com/roguelazer/rust-syslog-rfc5424) in Rust, spend 8µs to parse an average legal message.
_TBD: comparation against other golang parsers_.
---
* <a name="mymachine">[1]</a>: Intel Core i7-7600U CPU @ 2.80GHz

@ -0,0 +1,81 @@
%%{
machine common;
# whitespace
sp = ' ';
# closing square bracket
csb = ']';
# double quote
dq = '"';
# backslash
bs = 0x5C;
# ", ], \
toescape = (dq | csb | bs);
# 0..59
sexagesimal = '0'..'5' . '0'..'9';
# 01..31
datemday = ('0' . '1'..'9' | '1'..'2' . '0'..'9' | '3' . '0'..'1');
# 01..12
datemonth = ('0' . '1'..'9' | '1' . '0'..'2');
datefullyear = digit{4};
fulldate = datefullyear '-' datemonth '-' datemday;
# 01..23
timehour = ('0'..'1' . '0'..'9' | '2' . '0'..'3');
timeminute = sexagesimal;
timesecond = sexagesimal;
timesecfrac = '.' digit{1,6};
timenumoffset = ('+' | '-') timehour ':' timeminute;
timeoffset = 'Z' | timenumoffset;
partialtime = timehour ':' timeminute ':' timesecond . timesecfrac?;
fulltime = partialtime . timeoffset;
printusascii = '!'..'~';
hostnamerange = printusascii{1,255};
appnamerange = printusascii{1,48};
procidrange = printusascii{1,128};
msgidrange = printusascii{1,32};
sdname = (printusascii - ('=' | sp | csb | dq)){1,32};
# rfc 3629
utf8tail = 0x80..0xBF;
utf81 = 0x00..0x7F;
utf82 = 0xC2..0xDF utf8tail;
utf83 = 0xE0 0xA0..0xBF utf8tail | 0xE1..0xEC utf8tail{2} | 0xED 0x80..0x9F utf8tail | 0xEE..0xEF utf8tail{2};
utf84 = 0xF0 0x90..0xBF utf8tail{2} | 0xF1..0xF3 utf8tail{3} | 0xF4 0x80..0x8F utf8tail{2};
utf8char = utf81 | utf82 | utf83 | utf84;
utf8octets = utf8char*;
bom = 0xEF 0xBB 0xBF;
# utf8char except ", ], \
utf8charwodelims = utf8char - toescape;
}%%

@ -0,0 +1,42 @@
package common
// UnsafeUTF8DecimalCodePointsToInt converts a slice containing
// a series of UTF-8 decimal code points into their integer rapresentation.
//
// It assumes input code points are in the range 48-57.
// Returns a pointer since an empty slice is equal to nil and not to the zero value of the codomain (ie., `int`).
func UnsafeUTF8DecimalCodePointsToInt(chars []uint8) int {
out := 0
ord := 1
for i := len(chars) - 1; i >= 0; i-- {
curchar := int(chars[i])
out += (curchar - '0') * ord
ord *= 10
}
return out
}
// RemoveBytes removes byte at given positions from data byte slice, starting from the given offset.
func RemoveBytes(data []byte, positions []int, offset int) []byte {
// We need a copy here to not modify original data
cp := append([]byte(nil), data...)
for i, pos := range positions {
at := pos - i - offset
cp = append(cp[:at], cp[(at+1):]...)
}
return cp
}
// EscapeBytes adds a backslash to \, ], " characters.
func EscapeBytes(value string) string {
res := ""
for i, c := range value {
// todo(leodido): generalize byte codes (the function should ideally accept a byte slice containing byte codes to escape)
if c == 92 || c == 93 || c == 34 {
res += `\`
}
res += string(value[i])
}
return res
}

@ -0,0 +1,8 @@
module github.com/influxdata/go-syslog/v2
require (
github.com/davecgh/go-spew v1.1.1
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/stretchr/testify v1.2.2
)

@ -0,0 +1,10 @@
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/google/go-cmp v0.2.0 h1:+dTQ8DZQJz0Mb/HjFlkptS1FeQ4cWSnN941F8aEG4SQ=
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165 h1:bCiVCRCs1Heq84lurVinUPy19keqGEe4jh5vtK37jcg=
github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165/go.mod h1:WZxr2/6a/Ar9bMDc2rN/LJrE/hF6bXE4LPyDSIxwAfg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/stretchr/testify v1.2.2 h1:bSDNvY7ZPG5RlJ8otE/7V6gMiyenm9RtJ7IUVIAoJ1w=
github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs=

@ -0,0 +1,115 @@
SHELL := /bin/bash
RAGEL := ragel -I common
export GO_TEST=env GOTRACEBACK=all GO111MODULE=on go test $(GO_ARGS)
.PHONY: build
build: rfc5424/machine.go rfc5424/builder.go nontransparent/parser.go
@gofmt -w -s ./rfc5424
@gofmt -w -s ./octetcounting
@gofmt -w -s ./nontransparent
rfc5424/machine.go: rfc5424/machine.go.rl common/common.rl
rfc5424/builder.go: rfc5424/builder.go.rl common/common.rl
rfc5424/builder.go rfc5424/machine.go:
$(RAGEL) -Z -G2 -e -o $@ $<
@sed -i '/^\/\/line/d' $@
$(MAKE) file=$@ snake2camel
nontransparent/parser.go: nontransparent/parser.go.rl
$(RAGEL) -Z -G2 -e -o $@ $<
@sed -i '/^\/\/line/d' $@
$(MAKE) file=$@ snake2camel
.PHONY: snake2camel
snake2camel:
@awk -i inplace '{ \
while ( match($$0, /(.*)([a-z]+[0-9]*)_([a-zA-Z0-9])(.*)/, cap) ) \
$$0 = cap[1] cap[2] toupper(cap[3]) cap[4]; \
print \
}' $(file)
.PHONY: bench
bench: rfc5424/*_test.go rfc5424/machine.go
go test -bench=. -benchmem -benchtime=5s ./...
.PHONY: tests
tests:
$(GO_TEST) ./...
docs/nontransparent.dot: nontransparent/parser.go.rl
$(RAGEL) -Z -Vp $< -o $@
docs/rfc5424.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp $< -o $@
docs/rfc5424_pri.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M pri $< -o $@
docs/rfc5424_pri.png: docs/rfc5424_pri.dot
dot $< -Tpng -o $@
docs/rfc5424_version.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M version $< -o $@
docs/rfc5424_version.png: docs/rfc5424_version.dot
dot $< -Tpng -o $@
docs/rfc5424_timestamp.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M timestamp $< -o $@
docs/rfc5424_timestamp.png: docs/rfc5424_timestamp.dot
dot $< -Tpng -o $@
docs/rfc5424_hostname.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M hostname $< -o $@
docs/rfc5424_hostname.png: docs/rfc5424_hostname.dot
dot $< -Tpng -o $@
docs/rfc5424_appname.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M appname $< -o $@
docs/rfc5424_appname.png: docs/rfc5424_appname.dot
dot $< -Tpng -o $@
docs/rfc5424_procid.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M procid $< -o $@
docs/rfc5424_procid.png: docs/rfc5424_procid.dot
dot $< -Tpng -o $@
docs/rfc5424_msgid.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M msgid $< -o $@
docs/rfc5424_msgid.png: docs/rfc5424_msgid.dot
dot $< -Tpng -o $@
docs/rfc5424_structureddata.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M structureddata $< -o $@
docs/rfc5424_structureddata.png: docs/rfc5424_structureddata.dot
dot $< -Tpng -o $@
docs/rfc5424_msg.dot: rfc5424/machine.go.rl common/common.rl
$(RAGEL) -Z -Vp -M msg $< -o $@
docs/rfc5424_msg.png: docs/rfc5424_msg.dot
dot $< -Tpng -o $@
docs:
@mkdir -p docs
.PHONY: dots
dots: docs
$(MAKE) -s docs/rfc5424.dot docs/nontransparent.dot docs/rfc5424_pri.dot docs/rfc5424_version.dot docs/rfc5424_timestamp.dot docs/rfc5424_hostname.dot docs/rfc5424_appname.dot docs/rfc5424_procid.dot docs/rfc5424_msgid.dot docs/rfc5424_structureddata.dot docs/rfc5424_msg.dot
.PHONY: graph
graph: dots docs/rfc5424_pri.png docs/rfc5424_version.png docs/rfc5424_timestamp.png docs/rfc5424_hostname.png docs/rfc5424_appname.png docs/rfc5424_procid.png docs/rfc5424_msgid.png docs/rfc5424_structureddata.png docs/rfc5424_msg.png
.PHONY: clean
clean: rfc5424/machine.go nontransparent/parser.go
@rm -f $?
@rm -rf docs

@ -0,0 +1,269 @@
package nontransparent
import (
"io"
syslog "github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/rfc5424"
parser "github.com/leodido/ragel-machinery/parser"
)
const nontransparentStart int = 1
const nontransparentError int = 0
const nontransparentEnMain int = 1
type machine struct {
trailertyp TrailerType // default is 0 thus TrailerType(LF)
trailer byte
candidate []byte
bestEffort bool
internal syslog.Machine
emit syslog.ParserListener
readError error
lastChunk []byte // store last candidate message also if it does not ends with a trailer
}
// Exec implements the ragel.Parser interface.
func (m *machine) Exec(s *parser.State) (int, int) {
// Retrieve previously stored parsing variables
cs, p, pe, eof, data := s.Get()
{
var _widec int16
if p == pe {
goto _testEof
}
switch cs {
case 1:
goto stCase1
case 0:
goto stCase0
case 2:
goto stCase2
case 3:
goto stCase3
}
goto stOut
stCase1:
if data[p] == 60 {
goto tr0
}
goto st0
stCase0:
st0:
cs = 0
goto _out
tr0:
if len(m.candidate) > 0 {
m.process()
}
m.candidate = make([]byte, 0)
goto st2
st2:
if p++; p == pe {
goto _testEof2
}
stCase2:
_widec = int16(data[p])
switch {
case data[p] > 0:
if 10 <= data[p] && data[p] <= 10 {
_widec = 256 + (int16(data[p]) - 0)
if m.trailertyp == LF {
_widec += 256
}
}
default:
_widec = 768 + (int16(data[p]) - 0)
if m.trailertyp == NUL {
_widec += 256
}
}
switch _widec {
case 266:
goto st2
case 522:
goto tr3
case 768:
goto st2
case 1024:
goto tr3
}
switch {
case _widec > 9:
if 11 <= _widec {
goto st2
}
case _widec >= 1:
goto st2
}
goto st0
tr3:
m.candidate = append(m.candidate, data...)
goto st3
st3:
if p++; p == pe {
goto _testEof3
}
stCase3:
_widec = int16(data[p])
switch {
case data[p] > 0:
if 10 <= data[p] && data[p] <= 10 {
_widec = 256 + (int16(data[p]) - 0)
if m.trailertyp == LF {
_widec += 256
}
}
default:
_widec = 768 + (int16(data[p]) - 0)
if m.trailertyp == NUL {
_widec += 256
}
}
switch _widec {
case 60:
goto tr0
case 266:
goto st2
case 522:
goto tr3
case 768:
goto st2
case 1024:
goto tr3
}
switch {
case _widec > 9:
if 11 <= _widec {
goto st2
}
case _widec >= 1:
goto st2
}
goto st0
stOut:
_testEof2:
cs = 2
goto _testEof
_testEof3:
cs = 3
goto _testEof
_testEof:
{
}
_out:
{
}
}
// Update parsing variables
s.Set(cs, p, pe, eof)
return p, pe
}
func (m *machine) OnErr(chunk []byte, err error) {
// Store the last chunk of bytes ending without a trailer - ie., unexpected EOF from the reader
m.lastChunk = chunk
m.readError = err
}
func (m *machine) OnEOF(chunk []byte) {
}
func (m *machine) OnCompletion() {
if len(m.candidate) > 0 {
m.process()
}
// Try to parse last chunk as a candidate
if m.readError != nil && len(m.lastChunk) > 0 {
res, err := m.internal.Parse(m.lastChunk)
if err == nil {
err = m.readError
}
m.emit(&syslog.Result{
Message: res,
Error: err,
})
}
}
// NewParser returns a syslog.Parser suitable to parse syslog messages sent with non-transparent framing - ie. RFC 6587.
func NewParser(options ...syslog.ParserOption) syslog.Parser {
m := &machine{
emit: func(*syslog.Result) { /* noop */ },
}
for _, opt := range options {
m = opt(m).(*machine)
}
// No error can happens since during its setting we check the trailer type passed in
trailer, _ := m.trailertyp.Value()
m.trailer = byte(trailer)
// Create internal parser depending on options
if m.bestEffort {
m.internal = rfc5424.NewMachine(rfc5424.WithBestEffort())
} else {
m.internal = rfc5424.NewMachine()
}
return m
}
// HasBestEffort tells whether the receiving parser has best effort mode on or off.
func (m *machine) HasBestEffort() bool {
return m.bestEffort
}
// WithTrailer ... todo(leodido)
func WithTrailer(t TrailerType) syslog.ParserOption {
return func(m syslog.Parser) syslog.Parser {
if val, err := t.Value(); err == nil {
m.(*machine).trailer = byte(val)
m.(*machine).trailertyp = t
}
return m
}
}
// WithBestEffort implements the syslog.BestEfforter interface.
//
// The generic options uses it.
func (m *machine) WithBestEffort() {
m.bestEffort = true
}
// WithListener implements the syslog.Parser interface.
//
// The generic options uses it.
func (m *machine) WithListener(f syslog.ParserListener) {
m.emit = f
}
// Parse parses the io.Reader incoming bytes.
//
// It stops parsing when an error regarding RFC 6587 is found.
func (m *machine) Parse(reader io.Reader) {
r := parser.ArbitraryReader(reader, m.trailer)
parser.New(r, m, parser.WithStart(1)).Parse()
}
func (m *machine) process() {
lastByte := len(m.candidate) - 1
if m.candidate[lastByte] == m.trailer {
m.candidate = m.candidate[:lastByte]
}
res, err := m.internal.Parse(m.candidate)
m.emit(&syslog.Result{
Message: res,
Error: err,
})
}

@ -0,0 +1,163 @@
package nontransparent
import (
"io"
parser "github.com/leodido/ragel-machinery/parser"
syslog "github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/rfc5424"
)
%%{
machine nontransparent;
# unsigned alphabet
alphtype uint8;
action on_trailer {
m.candidate = append(m.candidate, data...)
}
action on_init {
if len(m.candidate) > 0 {
m.process()
}
m.candidate = make([]byte, 0)
}
t = 10 when { m.trailertyp == LF } |
00 when { m.trailertyp == NUL };
main :=
start: (
'<' >on_init (any)* -> trailer
),
trailer: (
t >on_trailer -> final |
t >on_trailer -> start
);
}%%
%% write data nofinal;
type machine struct{
trailertyp TrailerType // default is 0 thus TrailerType(LF)
trailer byte
candidate []byte
bestEffort bool
internal syslog.Machine
emit syslog.ParserListener
readError error
lastChunk []byte // store last candidate message also if it does not ends with a trailer
}
// Exec implements the ragel.Parser interface.
func (m *machine) Exec(s *parser.State) (int, int) {
// Retrieve previously stored parsing variables
cs, p, pe, eof, data := s.Get()
%% write exec;
// Update parsing variables
s.Set(cs, p, pe, eof)
return p, pe
}
func (m *machine) OnErr(chunk []byte, err error) {
// Store the last chunk of bytes ending without a trailer - ie., unexpected EOF from the reader
m.lastChunk = chunk
m.readError = err
}
func (m *machine) OnEOF(chunk []byte) {
}
func (m *machine) OnCompletion() {
if len(m.candidate) > 0 {
m.process()
}
// Try to parse last chunk as a candidate
if m.readError != nil && len(m.lastChunk) > 0 {
res, err := m.internal.Parse(m.lastChunk)
if err == nil {
err = m.readError
}
m.emit(&syslog.Result{
Message: res,
Error: err,
})
}
}
// NewParser returns a syslog.Parser suitable to parse syslog messages sent with non-transparent framing - ie. RFC 6587.
func NewParser(options ...syslog.ParserOption) syslog.Parser {
m := &machine{
emit: func(*syslog.Result) { /* noop */ },
}
for _, opt := range options {
m = opt(m).(*machine)
}
// No error can happens since during its setting we check the trailer type passed in
trailer, _ := m.trailertyp.Value()
m.trailer = byte(trailer)
// Create internal parser depending on options
if m.bestEffort {
m.internal = rfc5424.NewMachine(rfc5424.WithBestEffort())
} else {
m.internal = rfc5424.NewMachine()
}
return m
}
// HasBestEffort tells whether the receiving parser has best effort mode on or off.
func (m *machine) HasBestEffort() bool {
return m.bestEffort
}
// WithTrailer ... todo(leodido)
func WithTrailer(t TrailerType) syslog.ParserOption {
return func(m syslog.Parser) syslog.Parser {
if val, err := t.Value(); err == nil {
m.(*machine).trailer = byte(val)
m.(*machine).trailertyp = t
}
return m
}
}
// WithBestEffort implements the syslog.BestEfforter interface.
//
// The generic options uses it.
func (m *machine) WithBestEffort() {
m.bestEffort = true
}
// WithListener implements the syslog.Parser interface.
//
// The generic options uses it.
func (m *machine) WithListener(f syslog.ParserListener) {
m.emit = f
}
// Parse parses the io.Reader incoming bytes.
//
// It stops parsing when an error regarding RFC 6587 is found.
func (m *machine) Parse(reader io.Reader) {
r := parser.ArbitraryReader(reader, m.trailer)
parser.New(r, m, parser.WithStart(%%{ write start; }%%)).Parse()
}
func (m *machine) process() {
lastByte := len(m.candidate) - 1
if m.candidate[lastByte] == m.trailer {
m.candidate = m.candidate[:lastByte]
}
res, err := m.internal.Parse(m.candidate)
m.emit(&syslog.Result{
Message: res,
Error: err,
})
}

@ -0,0 +1,76 @@
package nontransparent
import (
"fmt"
"strings"
)
// TrailerType is the king of supported trailers for non-transparent frames.
type TrailerType int
const (
// LF is the line feed - ie., byte 10. Also the default one.
LF TrailerType = iota
// NUL is the nul byte - ie., byte 0.
NUL
)
var names = [...]string{"LF", "NUL"}
var bytes = []int{10, 0}
func (t TrailerType) String() string {
if t < LF || t > NUL {
return ""
}
return names[t]
}
// Value returns the byte corresponding to the receiving TrailerType.
func (t TrailerType) Value() (int, error) {
if t < LF || t > NUL {
return -1, fmt.Errorf("unknown TrailerType")
}
return bytes[t], nil
}
// TrailerTypeFromString returns a TrailerType given a string.
func TrailerTypeFromString(s string) (TrailerType, error) {
switch strings.ToUpper(s) {
case `"LF"`:
fallthrough
case `'LF'`:
fallthrough
case `LF`:
return LF, nil
case `"NUL"`:
fallthrough
case `'NUL'`:
fallthrough
case `NUL`:
return NUL, nil
}
return -1, fmt.Errorf("unknown TrailerType")
}
// UnmarshalTOML decodes trailer type from TOML data.
func (t *TrailerType) UnmarshalTOML(data []byte) (err error) {
return t.UnmarshalText(data)
}
// UnmarshalText implements encoding.TextUnmarshaler
func (t *TrailerType) UnmarshalText(data []byte) (err error) {
*t, err = TrailerTypeFromString(string(data))
return err
}
// MarshalText implements encoding.TextMarshaler
func (t TrailerType) MarshalText() ([]byte, error) {
s := t.String()
if s != "" {
return []byte(s), nil
}
return nil, fmt.Errorf("unknown TrailerType")
}

@ -0,0 +1,160 @@
package octetcounting
import (
"fmt"
"io"
syslog "github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/rfc5424"
)
// parser is capable to parse the input stream containing syslog messages with octetcounting framing.
//
// Use NewParser function to instantiate one.
type parser struct {
msglen int64
s Scanner
internal syslog.Machine
last Token
stepback bool // Wheter to retrieve the last token or not
bestEffort bool // Best effort mode flag
emit syslog.ParserListener
}
// NewParser returns a syslog.Parser suitable to parse syslog messages sent with transparent - ie. octet counting (RFC 5425) - framing.
func NewParser(opts ...syslog.ParserOption) syslog.Parser {
p := &parser{
emit: func(*syslog.Result) { /* noop */ },
}
for _, opt := range opts {
p = opt(p).(*parser)
}
// Create internal parser depending on options
if p.bestEffort {
p.internal = rfc5424.NewMachine(rfc5424.WithBestEffort())
} else {
p.internal = rfc5424.NewMachine()
}
return p
}
// HasBestEffort tells whether the receiving parser has best effort mode on or off.
func (p *parser) HasBestEffort() bool {
return p.bestEffort
}
// WithBestEffort implements the syslog.BestEfforter interface.
//
// The generic options uses it.
func (p *parser) WithBestEffort() {
p.bestEffort = true
}
// WithListener implements the syslog.Parser interface.
//
// The generic options uses it.
func (p *parser) WithListener(f syslog.ParserListener) {
p.emit = f
}
// Parse parses the io.Reader incoming bytes.
//
// It stops parsing when an error regarding RFC 5425 is found.
func (p *parser) Parse(r io.Reader) {
p.s = *NewScanner(r)
p.run()
}
func (p *parser) run() {
for {
var tok Token
// First token MUST be a MSGLEN
if tok = p.scan(); tok.typ != MSGLEN {
p.emit(&syslog.Result{
Error: fmt.Errorf("found %s, expecting a %s", tok, MSGLEN),
})
break
}
// Next we MUST see a WS
if tok = p.scan(); tok.typ != WS {
p.emit(&syslog.Result{
Error: fmt.Errorf("found %s, expecting a %s", tok, WS),
})
break
}
// Next we MUST see a SYSLOGMSG with length equal to MSGLEN
if tok = p.scan(); tok.typ != SYSLOGMSG {
e := fmt.Errorf(`found %s after "%s", expecting a %s containing %d octets`, tok, tok.lit, SYSLOGMSG, p.s.msglen)
// Underflow case
if len(tok.lit) < int(p.s.msglen) && p.bestEffort {
// Though MSGLEN was not respected, we try to parse the existing SYSLOGMSG as a RFC5424 syslog message
result := p.parse(tok.lit)
if result.Error == nil {
result.Error = e
}
p.emit(result)
break
}
p.emit(&syslog.Result{
Error: e,
})
break
}
// Parse the SYSLOGMSG literal pretending it is a RFC5424 syslog message
result := p.parse(tok.lit)
if p.bestEffort || result.Error == nil {
p.emit(result)
}
if !p.bestEffort && result.Error != nil {
p.emit(&syslog.Result{Error: result.Error})
break
}
// Next we MUST see an EOF otherwise the parsing we'll start again
if tok = p.scan(); tok.typ == EOF {
break
} else {
p.unscan()
}
}
}
func (p *parser) parse(input []byte) *syslog.Result {
sys, err := p.internal.Parse(input)
return &syslog.Result{
Message: sys,
Error: err,
}
}
// scan returns the next token from the underlying scanner;
// if a token has been unscanned then read that instead.
func (p *parser) scan() Token {
// If we have a token on the buffer, then return it.
if p.stepback {
p.stepback = false
return p.last
}
// Otherwise read the next token from the scanner.
tok := p.s.Scan()
// Save it to the buffer in case we unscan later.
p.last = tok
return tok
}
// unscan pushes the previously read token back onto the buffer.
func (p *parser) unscan() {
p.stepback = true
}

@ -0,0 +1,150 @@
package octetcounting
import (
"bufio"
"bytes"
"io"
"strconv"
)
// size as per RFC5425#section-4.3.1
var size = 8192
// eof represents a marker byte for the end of the reader
var eof = byte(0)
// ws represents the whitespace
var ws = byte(32)
// lt represents the "<" character
var lt = byte(60)
// isDigit returns true if the byte represents a number in [0,9]
func isDigit(ch byte) bool {
return (ch >= 47 && ch <= 57)
}
// isNonZeroDigit returns true if the byte represents a number in ]0,9]
func isNonZeroDigit(ch byte) bool {
return (ch >= 48 && ch <= 57)
}
// Scanner represents the lexical scanner for octet counting transport.
type Scanner struct {
r *bufio.Reader
msglen uint64
ready bool
}
// NewScanner returns a pointer to a new instance of Scanner.
func NewScanner(r io.Reader) *Scanner {
return &Scanner{
r: bufio.NewReaderSize(r, size+5), // "8192 " has length 5
}
}
// read reads the next byte from the buffered reader
// it returns the byte(0) if an error occurs (or io.EOF is returned)
func (s *Scanner) read() byte {
b, err := s.r.ReadByte()
if err != nil {
return eof
}
return b
}
// unread places the previously read byte back on the reader
func (s *Scanner) unread() {
_ = s.r.UnreadByte()
}
// Scan returns the next token.
func (s *Scanner) Scan() (tok Token) {
// Read the next byte.
b := s.read()
if isNonZeroDigit(b) {
s.unread()
s.ready = false
return s.scanMsgLen()
}
// Otherwise read the individual character
switch b {
case eof:
s.ready = false
return Token{
typ: EOF,
}
case ws:
s.ready = true
return Token{
typ: WS,
lit: []byte{ws},
}
case lt:
if s.msglen > 0 && s.ready {
s.unread()
return s.scanSyslogMsg()
}
}
return Token{
typ: ILLEGAL,
lit: []byte{b},
}
}
func (s *Scanner) scanMsgLen() Token {
// Create a buffer and read the current character into it
var buf bytes.Buffer
buf.WriteByte(s.read())
// Read every subsequent digit character into the buffer
// Non-digit characters and EOF will cause the loop to exit
for {
if b := s.read(); b == eof {
break
} else if !isDigit(b) {
s.unread()
break
} else {
buf.WriteByte(b)
}
}
msglen := buf.String()
s.msglen, _ = strconv.ParseUint(msglen, 10, 64)
// (todo) > return ILLEGAL if s.msglen > size (8192)
// (todo) > only when NOT in besteffort mode or always?
return Token{
typ: MSGLEN,
lit: buf.Bytes(),
}
}
func (s *Scanner) scanSyslogMsg() Token {
// Check the reader contains almost MSGLEN characters
n := int(s.msglen)
b, err := s.r.Peek(n)
if err != nil {
return Token{
typ: EOF,
lit: b,
}
}
// Advance the reader of MSGLEN characters
s.r.Discard(n)
// Reset status
s.ready = false
s.msglen = 0
// Return SYSLOGMSG token
return Token{
typ: SYSLOGMSG,
lit: b,
}
}

@ -0,0 +1,45 @@
package octetcounting
import (
"strconv"
)
// Token represents a lexical token of the octetcounting.
type Token struct {
typ TokenType
lit []byte
}
// TokenType represents a lexical token type of the octetcounting.
type TokenType int
// Tokens
const (
ILLEGAL TokenType = iota
EOF
WS
MSGLEN
SYSLOGMSG
)
// String outputs the string representation of the receiving Token.
func (t Token) String() string {
switch t.typ {
case WS, EOF:
return t.typ.String()
default:
return t.typ.String() + "(" + string(t.lit) + ")"
}
}
const tokentypename = "ILLEGALEOFWSMSGLENSYSLOGMSG"
var tokentypeindex = [...]uint8{0, 7, 10, 12, 18, 27}
// String outputs the string representation of the receiving TokenType.
func (i TokenType) String() string {
if i < 0 || i >= TokenType(len(tokentypeindex)-1) {
return "TokenType(" + strconv.FormatInt(int64(i), 10) + ")"
}
return tokentypename[tokentypeindex[i]:tokentypeindex[i+1]]
}

@ -0,0 +1,19 @@
package syslog
// WithListener returns a generic option that sets the emit function for syslog parsers.
func WithListener(f ParserListener) ParserOption {
return func(p Parser) Parser {
p.WithListener(f)
return p
}
}
// WithBestEffort returns a generic options that enables best effort mode for syslog parsers.
//
// When passed to a parser it tries to recover as much of the syslog messages as possible.
func WithBestEffort() ParserOption {
return func(p Parser) Parser {
p.WithBestEffort()
return p
}
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,312 @@
package rfc5424
import (
"time"
"sort"
"fmt"
"github.com/influxdata/go-syslog/v2/common"
)
%%{
machine builder;
include common "common.rl";
action mark {
pb = p
}
action set_timestamp {
if t, e := time.Parse(RFC3339MICRO, string(data[pb:p])); e == nil {
sm.timestamp = &t
}
}
action set_hostname {
if s := string(data[pb:p]); s != "-" {
sm.hostname = &s
}
}
action set_appname {
if s := string(data[pb:p]); s != "-" {
sm.appname = &s
}
}
action set_procid {
if s := string(data[pb:p]); s != "-" {
sm.procID = &s
}
}
action set_msgid {
if s := string(data[pb:p]); s != "-" {
sm.msgID = &s
}
}
action set_sdid {
if sm.structuredData == nil {
sm.structuredData = &(map[string]map[string]string{})
}
id := string(data[pb:p])
elements := *sm.structuredData
if _, ok := elements[id]; !ok {
elements[id] = map[string]string{}
}
}
action set_sdpn {
// Assuming SD map already exists, contains currentid key (set from outside)
elements := *sm.structuredData
elements[currentid][string(data[pb:p])] = ""
}
action markslash {
backslashes = append(backslashes, p)
}
action set_sdpv {
// Store text
text := data[pb:p]
// Strip backslashes only when there are ...
if len(backslashes) > 0 {
text = common.RemoveBytes(text, backslashes, pb)
}
// Assuming SD map already exists, contains currentid key and currentparamname key (set from outside)
elements := *sm.structuredData
elements[currentid][currentparamname] = string(text)
}
action set_msg {
if s := string(data[pb:p]); s != "" {
sm.message = &s
}
}
timestamp := (fulldate 'T' fulltime) >mark %set_timestamp;
hostname := hostnamerange >mark %set_hostname;
appname := appnamerange >mark %set_appname;
procid := procidrange >mark %set_procid;
msgid := msgidrange >mark %set_msgid;
sdid := sdname >mark %set_sdid;
sdpn := sdname >mark %set_sdpn;
escapes = (bs >markslash toescape);
sdpv := (utf8charwodelims* escapes*)+ >mark %set_sdpv;
msg := (bom? utf8octets) >mark %set_msg;
write data noerror nofinal;
}%%
type entrypoint int
const (
timestamp entrypoint = iota
hostname
appname
procid
msgid
sdid
sdpn
sdpv
msg
)
func (e entrypoint) translate() int {
switch e {
case timestamp:
return builder_en_timestamp
case hostname:
return builder_en_hostname
case appname:
return builder_en_appname
case procid:
return builder_en_procid
case msgid:
return builder_en_msgid
case sdid:
return builder_en_sdid
case sdpn:
return builder_en_sdpn
case sdpv:
return builder_en_sdpv
case msg:
return builder_en_msg
default:
return builder_start
}
}
var currentid string
var currentparamname string
func (sm *SyslogMessage) set(from entrypoint, value string) *SyslogMessage {
data := []byte(value)
p := 0
pb := 0
pe := len(data)
eof := len(data)
cs := from.translate()
backslashes := []int{}
%% write exec;
return sm
}
// SetPriority set the priority value and the computed facility and severity codes accordingly.
//
// It ignores incorrect priority values (range [0, 191]).
func (sm *SyslogMessage) SetPriority(value uint8) *SyslogMessage {
if value >= 0 && value <= 191 {
sm.setPriority(value)
}
return sm
}
// SetVersion set the version value.
//
// It ignores incorrect version values (range ]0, 999]).
func (sm *SyslogMessage) SetVersion(value uint16) *SyslogMessage {
if value > 0 && value <= 999 {
sm.version = value
}
return sm
}
// SetTimestamp set the timestamp value.
func (sm *SyslogMessage) SetTimestamp(value string) *SyslogMessage {
return sm.set(timestamp, value)
}
// SetHostname set the hostname value.
func (sm *SyslogMessage) SetHostname(value string) *SyslogMessage {
return sm.set(hostname, value)
}
// SetAppname set the appname value.
func (sm *SyslogMessage) SetAppname(value string) *SyslogMessage {
return sm.set(appname, value)
}
// SetProcID set the procid value.
func (sm *SyslogMessage) SetProcID(value string) *SyslogMessage {
return sm.set(procid, value)
}
// SetMsgID set the msgid value.
func (sm *SyslogMessage) SetMsgID(value string) *SyslogMessage {
return sm.set(msgid, value)
}
// SetElementID set a structured data id.
//
// When the provided id already exists the operation is discarded.
func (sm *SyslogMessage) SetElementID(value string) *SyslogMessage {
return sm.set(sdid, value)
}
// SetParameter set a structured data parameter belonging to the given element.
//
// If the element does not exist it creates one with the given element id.
// When a parameter with the given name already exists for the given element the operation is discarded.
func (sm *SyslogMessage) SetParameter(id string, name string, value string) *SyslogMessage {
// Create an element with the given id (or re-use the existing one)
sm.set(sdid, id)
// We can create parameter iff the given element id exists
if sm.structuredData != nil {
elements := *sm.structuredData
if _, ok := elements[id]; ok {
currentid = id
sm.set(sdpn, name)
// We can assign parameter value iff the given parameter key exists
if _, ok := elements[id][name]; ok {
currentparamname = name
sm.set(sdpv, value)
}
}
}
return sm
}
// SetMessage set the message value.
func (sm *SyslogMessage) SetMessage(value string) *SyslogMessage {
return sm.set(msg, value)
}
func (sm *SyslogMessage) String() (string, error) {
if !sm.Valid() {
return "", fmt.Errorf("invalid syslog")
}
template := "<%d>%d %s %s %s %s %s %s%s"
t := "-"
hn := "-"
an := "-"
pid := "-"
mid := "-"
sd := "-"
m := ""
if sm.timestamp != nil {
t = sm.timestamp.Format("2006-01-02T15:04:05.999999Z07:00") // verify 07:00
}
if sm.hostname != nil {
hn = *sm.hostname
}
if sm.appname != nil {
an = *sm.appname
}
if sm.procID != nil {
pid = *sm.procID
}
if sm.msgID != nil {
mid = *sm.msgID
}
if sm.structuredData != nil {
// Sort element identifiers
identifiers := make([]string, 0)
for k, _ := range *sm.structuredData {
identifiers = append(identifiers, k)
}
sort.Strings(identifiers)
sd = ""
for _, id := range identifiers {
sd += fmt.Sprintf("[%s", id)
// Sort parameter names
params := (*sm.structuredData)[id]
names := make([]string, 0)
for n, _ := range params {
names = append(names, n)
}
sort.Strings(names)
for _, name := range names {
sd += fmt.Sprintf(" %s=\"%s\"", name, common.EscapeBytes(params[name]))
}
sd += "]"
}
}
if sm.message != nil {
m = " " + *sm.message
}
return fmt.Sprintf(template, *sm.priority, sm.version, t, hn, an, pid, mid, sd, m), nil
}

File diff suppressed because it is too large Load Diff

@ -0,0 +1,383 @@
package rfc5424
import (
"time"
"fmt"
"github.com/influxdata/go-syslog/v2"
"github.com/influxdata/go-syslog/v2/common"
)
// ColumnPositionTemplate is the template used to communicate the column where errors occur.
var ColumnPositionTemplate = " [col %d]"
const (
// ErrPrival represents an error in the priority value (PRIVAL) inside the PRI part of the RFC5424 syslog message.
ErrPrival = "expecting a priority value in the range 1-191 or equal to 0"
// ErrPri represents an error in the PRI part of the RFC5424 syslog message.
ErrPri = "expecting a priority value within angle brackets"
// ErrVersion represents an error in the VERSION part of the RFC5424 syslog message.
ErrVersion = "expecting a version value in the range 1-999"
// ErrTimestamp represents an error in the TIMESTAMP part of the RFC5424 syslog message.
ErrTimestamp = "expecting a RFC3339MICRO timestamp or a nil value"
// ErrHostname represents an error in the HOSTNAME part of the RFC5424 syslog message.
ErrHostname = "expecting an hostname (from 1 to max 255 US-ASCII characters) or a nil value"
// ErrAppname represents an error in the APP-NAME part of the RFC5424 syslog message.
ErrAppname = "expecting an app-name (from 1 to max 48 US-ASCII characters) or a nil value"
// ErrProcID represents an error in the PROCID part of the RFC5424 syslog message.
ErrProcID = "expecting a procid (from 1 to max 128 US-ASCII characters) or a nil value"
// ErrMsgID represents an error in the MSGID part of the RFC5424 syslog message.
ErrMsgID = "expecting a msgid (from 1 to max 32 US-ASCII characters) or a nil value"
// ErrStructuredData represents an error in the STRUCTURED DATA part of the RFC5424 syslog message.
ErrStructuredData = "expecting a structured data section containing one or more elements (`[id( key=\"value\")*]+`) or a nil value"
// ErrSdID represents an error regarding the ID of a STRUCTURED DATA element of the RFC5424 syslog message.
ErrSdID = "expecting a structured data element id (from 1 to max 32 US-ASCII characters; except `=`, ` `, `]`, and `\"`"
// ErrSdIDDuplicated represents an error occurring when two STRUCTURED DATA elementes have the same ID in a RFC5424 syslog message.
ErrSdIDDuplicated = "duplicate structured data element id"
// ErrSdParam represents an error regarding a STRUCTURED DATA PARAM of the RFC5424 syslog message.
ErrSdParam = "expecting a structured data parameter (`key=\"value\"`, both part from 1 to max 32 US-ASCII characters; key cannot contain `=`, ` `, `]`, and `\"`, while value cannot contain `]`, backslash, and `\"` unless escaped)"
// ErrMsg represents an error in the MESSAGE part of the RFC5424 syslog message.
ErrMsg = "expecting a free-form optional message in UTF-8 (starting with or without BOM)"
// ErrEscape represents the error for a RFC5424 syslog message occurring when a STRUCTURED DATA PARAM value contains '"', '\', or ']' not escaped.
ErrEscape = "expecting chars `]`, `\"`, and `\\` to be escaped within param value"
// ErrParse represents a general parsing error for a RFC5424 syslog message.
ErrParse = "parsing error"
)
// RFC3339MICRO represents the timestamp format that RFC5424 mandates.
const RFC3339MICRO = "2006-01-02T15:04:05.999999Z07:00"
%%{
machine rfc5424;
include common "common.rl";
# unsigned alphabet
alphtype uint8;
action mark {
m.pb = m.p
}
action markmsg {
m.msgat = m.p
}
action set_prival {
output.priority = uint8(common.UnsafeUTF8DecimalCodePointsToInt(m.text()))
output.prioritySet = true
}
action set_version {
output.version = uint16(common.UnsafeUTF8DecimalCodePointsToInt(m.text()))
}
action set_timestamp {
if t, e := time.Parse(RFC3339MICRO, string(m.text())); e != nil {
m.err = fmt.Errorf("%s [col %d]", e, m.p)
fhold;
fgoto fail;
} else {
output.timestamp = t
output.timestampSet = true
}
}
action set_hostname {
output.hostname = string(m.text())
}
action set_appname {
output.appname = string(m.text())
}
action set_procid {
output.procID = string(m.text())
}
action set_msgid {
output.msgID = string(m.text())
}
action ini_elements {
output.structuredData = map[string]map[string]string{}
}
action set_id {
if _, ok := output.structuredData[string(m.text())]; ok {
// As per RFC5424 section 6.3.2 SD-ID MUST NOT exist more than once in a message
m.err = fmt.Errorf(ErrSdIDDuplicated + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
} else {
id := string(m.text())
output.structuredData[id] = map[string]string{}
output.hasElements = true
m.currentelem = id
}
}
action ini_sdparam {
m.backslashat = []int{}
}
action add_slash {
m.backslashat = append(m.backslashat, m.p)
}
action set_paramname {
m.currentparam = string(m.text())
}
action set_paramvalue {
if output.hasElements {
// (fixme) > what if SD-PARAM-NAME already exist for the current element (ie., current SD-ID)?
// Store text
text := m.text()
// Strip backslashes only when there are ...
if len(m.backslashat) > 0 {
text = common.RemoveBytes(text, m.backslashat, m.pb)
}
output.structuredData[m.currentelem][m.currentparam] = string(text)
}
}
action set_msg {
output.message = string(m.text())
}
action err_prival {
m.err = fmt.Errorf(ErrPrival + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_pri {
m.err = fmt.Errorf(ErrPri + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_version {
m.err = fmt.Errorf(ErrVersion + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_timestamp {
m.err = fmt.Errorf(ErrTimestamp + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_hostname {
m.err = fmt.Errorf(ErrHostname + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_appname {
m.err = fmt.Errorf(ErrAppname + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_procid {
m.err = fmt.Errorf(ErrProcID + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_msgid {
m.err = fmt.Errorf(ErrMsgID + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_structureddata {
m.err = fmt.Errorf(ErrStructuredData + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_sdid {
delete(output.structuredData, m.currentelem)
if len(output.structuredData) == 0 {
output.hasElements = false
}
m.err = fmt.Errorf(ErrSdID + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_sdparam {
if len(output.structuredData) > 0 {
delete(output.structuredData[m.currentelem], m.currentparam)
}
m.err = fmt.Errorf(ErrSdParam + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_msg {
// If error encountered within the message rule ...
if m.msgat > 0 {
// Save the text until valid (m.p is where the parser has stopped)
output.message = string(m.data[m.msgat:m.p])
}
m.err = fmt.Errorf(ErrMsg + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_escape {
m.err = fmt.Errorf(ErrEscape + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
action err_parse {
m.err = fmt.Errorf(ErrParse + ColumnPositionTemplate, m.p)
fhold;
fgoto fail;
}
nilvalue = '-';
nonzerodigit = '1'..'9';
# 1..191
privalrange = (('1' ('9' ('0'..'1'){,1} | '0'..'8' ('0'..'9'){,1}){,1}) | ('2'..'9' ('0'..'9'){,1}));
# 1..191 or 0
prival = (privalrange | '0') >mark %from(set_prival) $err(err_prival);
pri = ('<' prival '>') @err(err_pri);
version = (nonzerodigit digit{0,2} <err(err_version)) >mark %from(set_version) %eof(set_version) @err(err_version);
timestamp = (nilvalue | (fulldate >mark 'T' fulltime %set_timestamp %err(set_timestamp))) @err(err_timestamp);
hostname = hostnamerange >mark %set_hostname $err(err_hostname);
appname = appnamerange >mark %set_appname $err(err_appname);
procid = procidrange >mark %set_procid $err(err_procid);
msgid = msgidrange >mark %set_msgid $err(err_msgid);
header = (pri version sp timestamp sp hostname sp appname sp procid sp msgid) <>err(err_parse);
# \", \], \\
escapes = (bs >add_slash toescape) $err(err_escape);
# As per section 6.3.3 param value MUST NOT contain '"', '\' and ']', unless they are escaped.
# A backslash '\' followed by none of the this three characters is an invalid escape sequence.
# In this case, treat it as a regular backslash and the following character as a regular character (not altering the invalid sequence).
paramvalue = (utf8charwodelims* escapes*)+ >mark %set_paramvalue;
paramname = sdname >mark %set_paramname;
sdparam = (paramname '=' dq paramvalue dq) >ini_sdparam $err(err_sdparam);
# (note) > finegrained semantics of section 6.3.2 not represented here since not so useful for parsing goal
sdid = sdname >mark %set_id %err(set_id) $err(err_sdid);
sdelement = ('[' sdid (sp sdparam)* ']');
structureddata = nilvalue | sdelement+ >ini_elements $err(err_structureddata);
msg = (bom? utf8octets) >mark >markmsg %set_msg $err(err_msg);
fail := (any - [\n\r])* @err{ fgoto main; };
main := header sp structureddata (sp msg)? $err(err_parse);
}%%
%% write data noerror noprefix;
type machine struct {
data []byte
cs int
p, pe, eof int
pb int
err error
currentelem string
currentparam string
msgat int
backslashat []int
bestEffort bool
}
// NewMachine creates a new FSM able to parse RFC5424 syslog messages.
func NewMachine(options ...syslog.MachineOption) syslog.Machine {
m := &machine{}
for _, opt := range options {
opt(m)
}
%% access m.;
%% variable p m.p;
%% variable pe m.pe;
%% variable eof m.eof;
%% variable data m.data;
return m
}
func (m *machine) WithBestEffort() {
m.bestEffort = true
}
// HasBestEffort tells whether the receiving machine has best effort mode on or off.
func (m *machine) HasBestEffort() bool {
return m.bestEffort
}
// Err returns the error that occurred on the last call to Parse.
//
// If the result is nil, then the line was parsed successfully.
func (m *machine) Err() error {
return m.err
}
func (m *machine) text() []byte {
return m.data[m.pb:m.p]
}
// Parse parses the input byte array as a RFC5424 syslog message.
//
// When a valid RFC5424 syslog message is given it outputs its structured representation.
// If the parsing detects an error it returns it with the position where the error occurred.
//
// It can also partially parse input messages returning a partially valid structured representation
// and the error that stopped the parsing.
func (m *machine) Parse(input []byte) (syslog.Message, error) {
m.data = input
m.p = 0
m.pb = 0
m.msgat = 0
m.backslashat = []int{}
m.pe = len(input)
m.eof = len(input)
m.err = nil
output := &syslogMessage{}
%% write init;
%% write exec;
if m.cs < first_final || m.cs == en_fail {
if m.bestEffort && output.valid() {
// An error occurred but partial parsing is on and partial message is minimally valid
return output.export(), m.err
}
return nil, m.err
}
return output.export(), nil
}

@ -0,0 +1,13 @@
package rfc5424
import (
syslog "github.com/influxdata/go-syslog/v2"
)
// WithBestEffort enables the best effort mode.
func WithBestEffort() syslog.MachineOption {
return func(m syslog.Machine) syslog.Machine {
m.WithBestEffort()
return m
}
}

@ -0,0 +1,45 @@
package rfc5424
import (
"sync"
syslog "github.com/influxdata/go-syslog/v2"
)
// parser represent a RFC5424 parser with mutex capabilities.
type parser struct {
sync.Mutex
*machine
}
// NewParser creates a syslog.Machine that parses RFC5424 syslog messages.
func NewParser(options ...syslog.MachineOption) syslog.Machine {
p := &parser{
machine: NewMachine(options...).(*machine),
}
return p
}
// HasBestEffort tells whether the receiving parser has best effort mode on or off.
func (p *parser) HasBestEffort() bool {
return p.bestEffort
}
// Parse parses the input RFC5424 syslog message using its FSM.
//
// Best effort mode enables the partial parsing.
func (p *parser) Parse(input []byte) (syslog.Message, error) {
p.Lock()
defer p.Unlock()
msg, err := p.machine.Parse(input)
if err != nil {
if p.bestEffort {
return msg, err
}
return nil, err
}
return msg, nil
}

@ -0,0 +1,291 @@
package rfc5424
import (
"time"
)
type syslogMessage struct {
prioritySet bool // We explictly flag the setting of priority since its zero value is a valid priority by RFC 5424
timestampSet bool // We explictly flag the setting of timestamp since its zero value is a valid timestamp by RFC 5424
hasElements bool
priority uint8
version uint16
timestamp time.Time
hostname string
appname string
procID string
msgID string
structuredData map[string]map[string]string
message string
}
func (sm *syslogMessage) valid() bool {
if sm.prioritySet && sm.version > 0 && sm.version <= 999 {
return true
}
return false
}
func (sm *syslogMessage) export() *SyslogMessage {
out := &SyslogMessage{}
if sm.prioritySet {
out.setPriority(sm.priority)
}
if sm.version > 0 && sm.version <= 999 {
out.version = sm.version
}
if sm.timestampSet {
out.timestamp = &sm.timestamp
}
if sm.hostname != "-" && sm.hostname != "" {
out.hostname = &sm.hostname
}
if sm.appname != "-" && sm.appname != "" {
out.appname = &sm.appname
}
if sm.procID != "-" && sm.procID != "" {
out.procID = &sm.procID
}
if sm.msgID != "-" && sm.msgID != "" {
out.msgID = &sm.msgID
}
if sm.hasElements {
out.structuredData = &sm.structuredData
}
if sm.message != "" {
out.message = &sm.message
}
return out
}
// SyslogMessage represents a syslog message.
type SyslogMessage struct {
priority *uint8
facility *uint8
severity *uint8
version uint16 // Grammar mandates that version cannot be 0, so we can use the 0 value of uint16 to signal nil
timestamp *time.Time
hostname *string
appname *string
procID *string
msgID *string
structuredData *map[string]map[string]string
message *string
}
// Valid tells whether the receiving SyslogMessage is well-formed or not.
//
// A minimally well-formed syslog message contains at least a priority ([1, 191] or 0) and the version (]0, 999]).
func (sm *SyslogMessage) Valid() bool {
// A nil priority or a 0 version means that the message is not valid
// Not checking the priority range since it's parser responsibility
if sm.priority != nil && *sm.priority >= 0 && *sm.priority <= 191 && sm.version > 0 && sm.version <= 999 {
return true
}
return false
}
// Priority returns the syslog priority or nil when not set
func (sm *SyslogMessage) Priority() *uint8 {
return sm.priority
}
// Version returns the syslog version or nil when not set
func (sm *SyslogMessage) Version() uint16 {
return sm.version
}
func (sm *SyslogMessage) setPriority(value uint8) {
sm.priority = &value
facility := uint8(value / 8)
severity := uint8(value % 8)
sm.facility = &facility
sm.severity = &severity
}
// Facility returns the facility code.
func (sm *SyslogMessage) Facility() *uint8 {
return sm.facility
}
// Severity returns the severity code.
func (sm *SyslogMessage) Severity() *uint8 {
return sm.severity
}
// FacilityMessage returns the text message for the current facility value.
func (sm *SyslogMessage) FacilityMessage() *string {
if sm.facility != nil {
msg := facilities[*sm.facility]
return &msg
}
return nil
}
// FacilityLevel returns the
func (sm *SyslogMessage) FacilityLevel() *string {
if sm.facility != nil {
if msg, ok := facilityKeywords[*sm.facility]; ok {
return &msg
}
// Fallback to facility message
msg := facilities[*sm.facility]
return &msg
}
return nil
}
// SeverityMessage returns the text message for the current severity value.
func (sm *SyslogMessage) SeverityMessage() *string {
if sm.severity != nil {
msg := severityMessages[*sm.severity]
return &msg
}
return nil
}
// SeverityLevel returns the text level for the current severity value.
func (sm *SyslogMessage) SeverityLevel() *string {
if sm.severity != nil {
msg := severityLevels[*sm.severity]
return &msg
}
return nil
}
// SeverityShortLevel returns the short text level for the current severity value.
func (sm *SyslogMessage) SeverityShortLevel() *string {
if sm.severity != nil {
msg := severityLevelsShort[*sm.severity]
return &msg
}
return nil
}
var severityMessages = map[uint8]string{
0: "system is unusable",
1: "action must be taken immediately",
2: "critical conditions",
3: "error conditions",
4: "warning conditions",
5: "normal but significant condition",
6: "informational messages",
7: "debug-level messages",
}
var severityLevels = map[uint8]string{
0: "emergency",
1: "alert",
2: "critical",
3: "error",
4: "warning",
5: "notice",
6: "informational",
7: "debug",
}
// As per https://github.com/torvalds/linux/blob/master/tools/include/linux/kern_levels.h and syslog(3)
var severityLevelsShort = map[uint8]string{
0: "emerg",
1: "alert",
2: "crit",
3: "err",
4: "warning",
5: "notice",
6: "info",
7: "debug",
}
var facilities = map[uint8]string{
0: "kernel messages",
1: "user-level messages",
2: "mail system",
3: "system daemons",
4: "security/authorization messages",
5: "messages generated internally by syslogd",
6: "line printer subsystem",
7: "network news subsystem",
8: "UUCP subsystem",
9: "clock daemon",
10: "security/authorization messages",
11: "FTP daemon",
12: "NTP subsystem",
13: "log audit",
14: "log alert",
15: "clock daemon (note 2)", // (todo) > some sources reporting "scheduling daemon"
16: "local use 0 (local0)",
17: "local use 1 (local1)",
18: "local use 2 (local2)",
19: "local use 3 (local3)",
20: "local use 4 (local4)",
21: "local use 5 (local5)",
22: "local use 6 (local6)",
23: "local use 7 (local7)",
}
// As per syslog(3)
var facilityKeywords = map[uint8]string{
0: "kern",
1: "user",
2: "mail",
3: "daemon",
4: "auth",
5: "syslog",
6: "lpr",
7: "news",
8: "uucp",
10: "authpriv",
11: "ftp",
15: "cron",
16: "local0",
17: "local1",
18: "local2",
19: "local3",
20: "local4",
21: "local5",
22: "local6",
23: "local7",
}
// Timestamp returns the syslog timestamp or nil when not set
func (sm *SyslogMessage) Timestamp() *time.Time {
return sm.timestamp
}
// Hostname returns the syslog hostname or nil when not set
func (sm *SyslogMessage) Hostname() *string {
return sm.hostname
}
// ProcID returns the syslog proc ID or nil when not set
func (sm *SyslogMessage) ProcID() *string {
return sm.procID
}
// Appname returns the syslog appname or nil when not set
func (sm *SyslogMessage) Appname() *string {
return sm.appname
}
// MsgID returns the syslog msg ID or nil when not set
func (sm *SyslogMessage) MsgID() *string {
return sm.msgID
}
// Message returns the syslog message or nil when not set
func (sm *SyslogMessage) Message() *string {
return sm.message
}
// StructuredData returns the syslog structured data or nil when not set
func (sm *SyslogMessage) StructuredData() *map[string]map[string]string {
return sm.structuredData
}

@ -0,0 +1,63 @@
// Package syslog provides generic interfaces and structs for syslog messages and transport.
// Subpackages contains various parsers or scanners for different syslog formats.
package syslog
import (
"io"
"time"
)
// BestEfforter is an interface that wraps the HasBestEffort method.
type BestEfforter interface {
WithBestEffort()
HasBestEffort() bool
}
// Machine represent a FSM able to parse an entire syslog message and return it in an structured way.
type Machine interface {
Parse(input []byte) (Message, error)
BestEfforter
}
// MachineOption represents the type of option setters for Machine instances.
type MachineOption func(m Machine) Machine
// Parser is an interface that wraps the Parse method.
type Parser interface {
Parse(r io.Reader)
WithListener(ParserListener)
BestEfforter
}
// ParserOption represent the type of option setters for Parser instances.
type ParserOption func(p Parser) Parser
// ParserListener is a function that receives syslog parsing results, one by one.
type ParserListener func(*Result)
// Result wraps the outcomes obtained parsing a syslog message.
type Result struct {
Message Message
Error error
}
// Message represent a structured representation of a syslog message.
type Message interface {
Valid() bool
Priority() *uint8
Version() uint16
Facility() *uint8
Severity() *uint8
FacilityMessage() *string
FacilityLevel() *string
SeverityMessage() *string
SeverityLevel() *string
SeverityShortLevel() *string
Timestamp() *time.Time
Hostname() *string
ProcID() *string
Appname() *string
MsgID() *string
Message() *string
StructuredData() *map[string]map[string]string
}

@ -0,0 +1,21 @@
MIT License
Copyright (c) 2018 Leonardo Di Donato
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.

@ -0,0 +1,9 @@
# ragel machinery
Machineries to speed up and facilitate the development of ragel parsers able to accept streaming inputs.
It is only intended for use with ragel parsers.
---
[![Analytics](https://ga-beacon.appspot.com/UA-49657176-1/ragel-machinery?flat)](https://github.com/igrigorik/ga-beacon)

@ -0,0 +1,22 @@
package ragel
// ReadingError represents errors occurred in the readers.
type ReadingError struct {
message string
}
// NewReadingError creates a ReadingError with the given message.
func NewReadingError(message string) *ReadingError {
return &ReadingError{
message: message,
}
}
func (e *ReadingError) Error() string {
return e.message
}
var (
// ErrNotFound is the message representing a situation in which the needle we were looking for has not been found.
ErrNotFound = "needle not found"
)

@ -0,0 +1,3 @@
module github.com/leodido/ragel-machinery
require github.com/google/go-cmp v0.2.0

@ -0,0 +1 @@
github.com/google/go-cmp v0.2.0/go.mod h1:oXzfMopK8JAjlY9xF4vHSVASa0yLyX7SntLO5aqRK0M=

@ -0,0 +1,122 @@
package parser
import (
"bufio"
ragel "github.com/leodido/ragel-machinery"
"io"
)
// ArbitraryReader returns a Reader that reads from r
// but stops when it finds a delimiter.
// The underlying implementation is a *DelimitedReader.
func ArbitraryReader(r io.Reader, delim byte) *DelimitedReader {
return &DelimitedReader{
delim: delim,
reader: bufio.NewReader(r),
parsingState: &parsingState{
data: []byte{},
p: 0,
pe: 0,
eof: -1,
},
}
}
// DelimitedReader reads arbitrarily sized bytes slices until a delimiter is found.
// It depends on and it keeps track of Ragel's state variables.
type DelimitedReader struct {
*parsingState
delim byte
reader *bufio.Reader
}
// State returns a pointer to the current State.
func (r DelimitedReader) State() *State {
return (*State)(r.parsingState)
}
// Read reads a chunk of bytes until it finds a delimiter.
//
// It always works on the current boundaries of the data,
// and updates them accordingly.
// It returns the chunk of bytes read and, eventually, an error.
// When delim is not found it returns an io.ErrUnexpectedEOF.
func (r *DelimitedReader) Read() (line []byte, err error) {
p := r.p
// Process only the data still to read when P is greater than the half of the data
// data = a b c d e f g h i l m n
// vars = - - - - - - - p - pe- -
if p > len(r.data)/2 {
copy(r.data, r.data[p:len(r.data)])
r.p = 0
r.pe = r.pe - p
// data = h i l m n f g h i l m n
// vars = p - pe- - - - - - - - -
r.data = r.data[0 : len(r.data)-p]
// data = h i l m n
// vars = p - pe- -
}
// Read until the first occurrence of the delimiter
line, err = r.reader.ReadBytes(r.delim)
// Storing the data up to and including the delimiter
r.data = append(r.data, line...)
// Update the position of end
r.pe = len(r.data)
if err == io.EOF {
if len(line) != 0 {
err = io.ErrUnexpectedEOF
}
// Update the position of EOF
r.eof = r.pe
}
if err != nil {
err = ragel.NewReadingError(err.Error())
}
return line, err
}
// Seek look for the first instance of until.
//
// It always works on the current boundaries of the data.
// When it finds what it looks for it returns the number of bytes sought before to find it.
// Otherwise will also return an error.
// Search is only backwards at the moment,
// from the right boundary (end) to the left one (start position).
// It returns the number of bytes read to find the first occurrence of the until byte.
// It sets the right boundary (end) of the data to the character after the until byte,
// so the user can eventually start again from here.
func (r *DelimitedReader) Seek(until byte, backwards bool) (n int, err error) {
data := r.data
if len(data) == 0 {
return 0, ragel.NewReadingError(ragel.ErrNotFound)
}
if backwards {
// Data boundaries
p := r.p
i := r.pe - 1
// Until there are no more bytes or they are different from the the sought one
for ; i >= p && data[i] != until; i-- {
}
// Store the number of sought bytes
n := r.pe - i
// Did we find anything?
if i == p-1 && data[p] != until {
return r.pe - i, ragel.NewReadingError(ragel.ErrNotFound)
}
// Update the right boundary to be the character next to the sought one
r.pe = i + 1
return n, nil
}
panic("not implemented")
}

@ -0,0 +1,13 @@
package parser
// Machiner is an interface that wraps the Exec method.
type Machiner interface {
// Exec contains the ragel finite-state machine code and returns boundaries.
Exec(state *State) (p int, pe int)
// OnErr is a method called when an error is encountered.
OnErr(chunk []byte, err error)
// OnEOF is a method called when an EOF is encountered.
OnEOF(chunk []byte)
// OnCompletion is a method called when the parser loop completes.
OnCompletion()
}

@ -0,0 +1,84 @@
package parser
import (
"github.com/leodido/ragel-machinery"
"io"
)
// Parser creates ragel parsers for stream inputs.
//
// Its scope is to let the user concentrate on the definition of the ragel machines.
// To do so it allows the user to specify (and delegates to)
// how to read the input chunks and how to parse them.
type Parser parser
type parser struct {
reader Reader // Reader to which to delegate the reading logic
machine Machiner // FSM to which to delegate the parsing logic
*parsingState
}
// Option represents an option for parser brokers.
type Option func(*parser)
// WithStart serves to specify the ragel start state.
func WithStart(cs int) Option {
return func(p *parser) {
p.cs = cs
}
}
// WithError serves to specify the ragel error state.
func WithError(err int) Option {
return func(p *parser) {
p.errorState = err
}
}
// WithFirstFinal serves to specify the ragel first final state.
func WithFirstFinal(f int) Option {
return func(p *parser) {
p.finalState = f
}
}
// New cretes a new Parser.
func New(r Reader, m Machiner, opts ...Option) *Parser {
p := &parser{
reader: r,
machine: m,
}
// Tell the broker to use the reader status for parsing
p.parsingState = (*parsingState)(r.State())
// Apply options
for _, opt := range opts {
opt(p)
}
return (*Parser)(p)
}
// Parse is the standard parsing method for stream inputs.
//
// It calls the Read method of the Reader, which defines how and what to read
// and then it calls on such data window the finite-state machine to parse its content.
// It stops whenever and EOF or an error happens.
func (p *Parser) Parse() {
for {
res, err := p.reader.Read()
if err != nil {
if err == io.EOF {
p.machine.OnEOF(res)
} else {
p.machine.OnErr(res, ragel.NewReadingError(err.Error()))
}
break
}
// Execute the FSM
p.machine.Exec((*State)(p.parsingState))
}
p.machine.OnCompletion()
}

@ -0,0 +1,14 @@
package parser
// Reader is the interface that wraps the Read method.
// Its implementations are intended to be used with Ragel parsers or scanners.
type Reader interface {
Read() (res []byte, err error)
State() *State
}
// Seeker is the interface that wraps the Seek method.
// Its implementations are intended to be used with Ragel parsers or scanners.
type Seeker interface {
Seek(until byte, backward bool) (n int, err error)
}

@ -0,0 +1,23 @@
package parser
// State represents the ragel state variables for parsing.
type State parsingState
type parsingState struct {
cs int // _start at time 0, then eventually current state
errorState int // _error > fixme(leodido)
finalState int // _first_final > fixme(leodido)
p, pe, eof int // parsing pointers
data []byte // data pointer
}
// Set sets the state variables of a ragel parser.
func (s *State) Set(cs, p, pe, eof int) {
s.cs, s.p, s.pe, s.eof = cs, p, pe, eof
}
// Get retrieves the state variables of a ragel parser.
func (s *State) Get() (cs, p, pe, eof int, data []byte) {
return s.cs, s.p, s.pe, s.eof, s.data
}

@ -326,6 +326,12 @@ github.com/hpcloud/tail/ratelimiter
github.com/hpcloud/tail/util
github.com/hpcloud/tail/watch
github.com/hpcloud/tail/winfile
# github.com/influxdata/go-syslog/v2 v2.0.1
github.com/influxdata/go-syslog/v2
github.com/influxdata/go-syslog/v2/common
github.com/influxdata/go-syslog/v2/nontransparent
github.com/influxdata/go-syslog/v2/octetcounting
github.com/influxdata/go-syslog/v2/rfc5424
# github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af
github.com/jmespath/go-jmespath
# github.com/jonboulle/clockwork v0.1.0
@ -343,6 +349,9 @@ github.com/klauspost/cpuid
github.com/konsorten/go-windows-terminal-sequences
# github.com/kr/logfmt v0.0.0-20140226030751-b84e30acd515
github.com/kr/logfmt
# github.com/leodido/ragel-machinery v0.0.0-20181214104525-299bdde78165
github.com/leodido/ragel-machinery
github.com/leodido/ragel-machinery/parser
# github.com/mattn/go-colorable v0.0.9
github.com/mattn/go-colorable
# github.com/mattn/go-isatty v0.0.4

Loading…
Cancel
Save