fluent-bit-plugin: Auto add Kubernetes labels to Loki labels (#1204)

* auto parser kubernetes metadata when lineformat set kubernetes

* fix format

* fix test

* refactor

* fix typo, use switch instead of if and go fmt

* fix switch
pull/1333/head
allanhung 7 years ago committed by Cyril Tovena
parent 4202ceda8f
commit 30ec4eba79
  1. 15
      cmd/fluent-bit/README.md
  2. 25
      cmd/fluent-bit/config.go
  3. 34
      cmd/fluent-bit/loki.go
  4. 1
      cmd/fluent-bit/out_loki.go

@ -17,6 +17,7 @@ This plugin is implemented with [Fluent Bit's Go plugin](https://github.com/flue
| Labels | labels for API requests. | {job="fluent-bit"} |
| LogLevel | LogLevel for plugin logger. | "info" |
| RemoveKeys | Specify removing keys. | none |
| AutoKubernetesLabels | If set to true, it will add all Kubernetes labels to Loki labels | false |
| LabelKeys | Comma separated list of keys to use as stream labels. All other keys will be placed into the log line. LabelKeys is deactivated when using `LabelMapPath` label mapping configuration. | none |
| LineFormat | Format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format <key>=<value>. | json |
| DropSingleKey | If set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.| true |
@ -28,6 +29,10 @@ Labels are used to [query logs](../../docs/logql.md) `{container_name="nginx", c
You can use `Labels`, `RemoveKeys` , `LabelKeys` and `LabelMapPath` to how the output plugin will perform labels extraction.
### AutoKubernetesLabels
If set to true, it will add all Kubernetes labels to Loki labels automatically and ignore paramaters `LabelKeys`, LabelMapPath.
### LabelMapPath
When using the `Parser` and `Filter` plugins Fluent Bit can extract and add data to the current record/log data. While Loki labels are key value pair, record data can be nested structures.
@ -89,6 +94,16 @@ To configure the Loki output plugin add this section to fluent-bit.conf
LineFormat key_value
```
```properties
[Output]
Name loki
Match *
Url http://localhost:3100/loki/api/v1/push
BatchWait 1 # (1sec)
BatchSize 30720 # (30KiB)
AutoKubernetesLabels true
RemoveKeys key1,key2
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.
## Building

@ -36,13 +36,14 @@ const (
)
type config struct {
clientConfig client.Config
logLevel logging.Level
removeKeys []string
labelKeys []string
lineFormat format
dropSingleKey bool
labelMap map[string]interface{}
clientConfig client.Config
logLevel logging.Level
autoKubernetesLabels bool
removeKeys []string
labelKeys []string
lineFormat format
dropSingleKey bool
labelMap map[string]interface{}
}
func parseConfig(cfg ConfigGetter) (*config, error) {
@ -106,6 +107,16 @@ func parseConfig(cfg ConfigGetter) (*config, error) {
}
res.logLevel = level
autoKubernetesLabels := cfg.Get("AutoKubernetesLabels")
switch autoKubernetesLabels {
case "false", "":
res.autoKubernetesLabels = false
case "true":
res.autoKubernetesLabels = true
default:
return nil, fmt.Errorf("invalid boolean AutoKubernetesLabels: %v", autoKubernetesLabels)
}
removeKey := cfg.Get("RemoveKeys")
if removeKey != "" {
res.removeKeys = strings.Split(removeKey, ",")

@ -5,6 +5,7 @@ import (
"fmt"
"os"
"sort"
"strings"
"time"
"github.com/go-kit/kit/log"
@ -38,7 +39,9 @@ func (l *loki) sendRecord(r map[interface{}]interface{}, ts time.Time) error {
records := toStringMap(r)
level.Debug(l.logger).Log("msg", "processing records", "records", fmt.Sprintf("%+v", records))
lbs := model.LabelSet{}
if l.cfg.labelMap != nil {
if l.cfg.autoKubernetesLabels {
lbs = autoLabels(records)
} else if l.cfg.labelMap != nil {
mapLabels(records, l.cfg.labelMap, lbs)
} else {
lbs = extractLabels(records, l.cfg.labelKeys)
@ -81,6 +84,35 @@ func toStringMap(record map[interface{}]interface{}) map[string]interface{} {
return m
}
func autoLabels(records map[string]interface{}) model.LabelSet {
kuberneteslbs := model.LabelSet{}
replacer := strings.NewReplacer("/", "_", ".", "_", "-", "_")
for k, v := range records["kubernetes"].(map[interface{}]interface{}) {
switch key := k.(string); key {
case "labels":
for m, n := range v.(map[interface{}]interface{}) {
switch t := n.(type) {
case []byte:
kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(string(t))
default:
kuberneteslbs[model.LabelName(replacer.Replace(m.(string)))] = model.LabelValue(fmt.Sprintf("%v", n))
}
}
case "docker_id", "pod_id", "annotations":
// do nothing
continue
default:
switch t := v.(type) {
case []byte:
kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(string(t))
default:
kuberneteslbs[model.LabelName(k.(string))] = model.LabelValue(fmt.Sprintf("%v", v))
}
}
}
return kuberneteslbs
}
func extractLabels(records map[string]interface{}, keys []string) model.LabelSet {
res := model.LabelSet{}
for _, k := range keys {

@ -53,6 +53,7 @@ func FLBPluginInit(ctx unsafe.Pointer) int {
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)

Loading…
Cancel
Save