fluent-bit: multi-instance support (#1294)

To run multiple plugin instances at the same time, the plugin instance
must be registered and later retrieved.

Additionally, a list of registered plugin instances must be stored for
proper disposal during fluent-bit shutdown.

Based on the [out_multiinstance example] in the upstream repository.

[out_multiinstance example]: fc386d2638/examples/out_multiinstance/out.go

Signed-off-by: Jens Erat <email@jenserat.de>
Co-authored-by: Hiroshi Hatake <cosmo0920.oucc@gmail.com>
pull/1473/head
Jens Erat 6 years ago committed by Cyril Tovena
parent c9016f4b2a
commit 557f9ccd7b
  1. 4
      cmd/fluent-bit/README.md
  2. 68
      cmd/fluent-bit/out_loki.go

@ -106,6 +106,10 @@ To configure the Loki output plugin add this section to fluent-bit.conf
```
A full [example configuration file](fluent-bit.conf) is also available in this repository.
### Running multiple plugin instances
You can run multiple plugin instances in the same fluent-bit process, for example if you want to push to different Loki servers or route logs into different Loki tenant IDs. To do so, add additional `[Output]` sections.
## Building
## Prerequisites

@ -14,8 +14,11 @@ import (
"github.com/weaveworks/common/logging"
)
var plugin *loki
var logger log.Logger
var (
// registered loki plugin instances, required for disposal during shutdown
plugins []*loki
logger log.Logger
)
func init() {
var logLevel logging.Level
@ -40,38 +43,53 @@ func FLBPluginRegister(ctx unsafe.Pointer) int {
// (fluentbit will call this)
// ctx (context) pointer to fluentbit context (state/ c code)
func FLBPluginInit(ctx unsafe.Pointer) int {
conf, err := parseConfig(&pluginConfig{ctx: ctx})
if err != nil {
level.Error(logger).Log("[flb-go]", "failed to launch", "error", err)
return output.FLB_ERROR
}
logger = newLogger(conf.logLevel)
// numeric plugin ID, only used for user-facing purpose (logging, ...)
id := len(plugins)
logger := log.With(newLogger(conf.logLevel), "id", id)
level.Info(logger).Log("[flb-go]", "Starting fluent-bit-go-loki", "version", version.Info())
level.Info(logger).Log("[flb-go]", "provided parameter", "URL", conf.clientConfig.URL)
level.Info(logger).Log("[flb-go]", "provided parameter", "TenantID", conf.clientConfig.TenantID)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchWait", conf.clientConfig.BatchWait)
level.Info(logger).Log("[flb-go]", "provided parameter", "BatchSize", conf.clientConfig.BatchSize)
level.Info(logger).Log("[flb-go]", "provided parameter", "Labels", conf.clientConfig.ExternalLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "LogLevel", conf.logLevel)
level.Info(logger).Log("[flb-go]", "provided parameter", "AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(logger).Log("[flb-go]", "provided parameter", "RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(logger).Log("[flb-go]", "provided parameter", "LineFormat", conf.lineFormat)
level.Info(logger).Log("[flb-go]", "provided parameter", "DropSingleKey", conf.dropSingleKey)
level.Info(logger).Log("[flb-go]", "provided parameter", "LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))
plugin, err = newPlugin(conf, logger)
paramLogger := log.With(logger, "[flb-go]", "provided parameter")
level.Info(paramLogger).Log("URL", conf.clientConfig.URL)
level.Info(paramLogger).Log("TenantID", conf.clientConfig.TenantID)
level.Info(paramLogger).Log("BatchWait", conf.clientConfig.BatchWait)
level.Info(paramLogger).Log("BatchSize", conf.clientConfig.BatchSize)
level.Info(paramLogger).Log("Labels", conf.clientConfig.ExternalLabels)
level.Info(paramLogger).Log("LogLevel", conf.logLevel.String())
level.Info(paramLogger).Log("AutoKubernetesLabels", conf.autoKubernetesLabels)
level.Info(paramLogger).Log("RemoveKeys", fmt.Sprintf("%+v", conf.removeKeys))
level.Info(paramLogger).Log("LabelKeys", fmt.Sprintf("%+v", conf.labelKeys))
level.Info(paramLogger).Log("LineFormat", conf.lineFormat)
level.Info(paramLogger).Log("DropSingleKey", conf.dropSingleKey)
level.Info(paramLogger).Log("LabelMapPath", fmt.Sprintf("%+v", conf.labelMap))
plugin, err := newPlugin(conf, logger)
if err != nil {
level.Error(logger).Log("newPlugin", err)
return output.FLB_ERROR
}
// register plugin instance, to be retrievable when sending logs
output.FLBPluginSetContext(ctx, plugin)
// remember plugin instance, required to cleanly dispose when fluent-bit is shutting down
plugins = append(plugins, plugin)
return output.FLB_OK
}
//export FLBPluginFlush
func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginFlushCtx
func FLBPluginFlushCtx(ctx, data unsafe.Pointer, length C.int, _ *C.char) int {
plugin := output.FLBPluginGetContext(ctx).(*loki)
if plugin == nil {
level.Error(logger).Log("[flb-go]", "plugin not initialized")
return output.FLB_ERROR
}
var ret int
var ts interface{}
var record map[interface{}]interface{}
@ -92,13 +110,13 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
case uint64:
timestamp = time.Unix(int64(t), 0)
default:
level.Warn(logger).Log("msg", "timestamp isn't known format. Use current time.")
level.Warn(plugin.logger).Log("msg", "timestamp isn't known format. Use current time.")
timestamp = time.Now()
}
err := plugin.sendRecord(record, timestamp)
if err != nil {
level.Error(logger).Log("msg", "error sending record to Loki", "error", err)
level.Error(plugin.logger).Log("msg", "error sending record to Loki", "error", err)
return output.FLB_ERROR
}
}
@ -113,8 +131,10 @@ func FLBPluginFlush(data unsafe.Pointer, length C.int, tag *C.char) int {
//export FLBPluginExit
func FLBPluginExit() int {
if plugin.client != nil {
plugin.client.Stop()
for _, plugin := range plugins {
if plugin.client != nil {
plugin.client.Stop()
}
}
return output.FLB_OK
}

Loading…
Cancel
Save