fluentd: Refactor label_keys and and add extract_kubernetes_labels configuration (#1186)

* refactor label_keys to labels and add kubernetes label extract

* fix escaping
pull/1202/head
Sándor Guba 6 years ago committed by Cyril Tovena
parent 38586f5b19
commit 51105f6f5b
  1. 67
      fluentd/fluent-plugin-grafana-loki/README.md
  2. 4
      fluentd/fluent-plugin-grafana-loki/fluent-plugin-grafana-loki.gemspec
  3. 80
      fluentd/fluent-plugin-grafana-loki/lib/fluent/plugin/out_loki.rb
  4. 58
      fluentd/fluent-plugin-grafana-loki/spec/gems/fluent/plugin/loki_output_spec.rb

@ -6,7 +6,7 @@ This plugin offers two line formats and uses protobuf to send compressed data to
Key features:
* extra_labels - labels to be added to every line of a logfile, useful for designating environments
* label_keys - customizable list of keys for stream labels
* label - This section allows you to specify labels from your log fields
## Installation
@ -29,6 +29,64 @@ In your Fluentd configuration, use `@type loki`. Additional configuration is opt
</match>
```
### Using labels
Simple label from top level attribute
```
<match mytag>
@type loki
# ...
<label>
fluentd_worker
</label>
# ...
</match>
```
You can rewrite the label keys as well as the following
```
<match mytag>
@type loki
# ...
<label>
worker fluentd_worker
</label>
# ...
</match>
```
You can use record accessor syntax for nested field. https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax
```
<match mytag>
@type loki
# ...
<label>
container $.kubernetes.container
</label>
# ...
</match>
```
### Extracting Kubernetes labels
As Kubernetes labels are a list of nested key-value pairs there is a separate option to extract them.
Note that special characters like "`. - /`" will be overwritten with `_`.
Use with the `remove_keys kubernetes` option to eliminate metadata from the log.
```
<match mytag>
@type loki
# ...
extract_kubernetes_labels true
remove_keys kubernetes
<label>
container $.kubernetes.container
</label>
# ...
</match>
```
### Multi-worker usage
Loki doesn't currently support out-of-order inserts - if you try to insert a log entry an earlier timestamp after a log entry with with identical labels but a later timestamp, the insert will fail with `HTTP status code: 500, message: rpc error: code = Unknown desc = Entry out of order`. Therefore, in order to use this plugin in a multi worker Fluentd setup, you'll need to include the worker ID in the labels.
@ -45,7 +103,9 @@ For example, using [fluent-plugin-record-modifier](https://github.com/repeatedly
<match mytag>
@type loki
# ...
label_keys "fluentd_worker"
<label>
fluentd_worker
</label>
# ...
</match>
```
@ -112,8 +172,7 @@ Loki is intended to index and group log streams using only a small set of labels
There are few configurations settings to control the output format.
- extra_labels: (default: nil) set of labels to include with every Loki stream. eg `{"env":"dev", "datacenter": "dc1"}`
- remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line
- label_keys: (default: "job,instance") comma separated list of keys to use as stream labels. All other keys will be placed into the log line
- remove_keys: (default: nil) comma separated list of needless record keys to remove. All other keys will be placed into the log line. You can use [record_accessor syntax](https://docs.fluentd.org/plugin-helper-overview/api-plugin-helper-record_accessor#syntax).
- line_format: format to use when flattening the record to a log line. Valid values are "json" or "key_value". If set to "json" the log line sent to Loki will be the fluentd record (excluding any keys extracted out as labels) dumped as json. If set to "key_value", the log line will be each item in the record concatenated together (separated by a single space) in the format `<key>=<value>`.
- drop_single_key: if set to true and after extracting label_keys a record only has a single key remaining, the log line sent to Loki will just be the value of the record key.

@ -4,7 +4,7 @@ $LOAD_PATH.push File.expand_path('lib', __dir__)
Gem::Specification.new do |spec|
spec.name = 'fluent-plugin-grafana-loki'
spec.version = '1.0.2'
spec.version = '1.1.0'
spec.authors = %w[woodsaj briangann]
spec.email = ['awoods@grafana.com', 'brian@grafana.com']
@ -25,7 +25,7 @@ Gem::Specification.new do |spec|
# spec.test_files = test_files
# spec.require_paths = ['lib']
spec.add_development_dependency 'bundler', '~> 1.15'
spec.add_development_dependency 'bundler'
spec.add_development_dependency 'rake', '~> 12.0'
spec.add_development_dependency 'rspec', '~> 3.0'
spec.add_runtime_dependency 'fluentd', ['>= 0.14.10', '< 2']

@ -24,36 +24,38 @@ require 'time'
module Fluent
module Plugin
# Subclass of Fluent Plugin Output
class LokiOutput < Fluent::Plugin::Output
class LokiOutput < Fluent::Plugin::Output # rubocop:disable Metrics/ClassLength
Fluent::Plugin.register_output('loki', self)
helpers :compat_parameters
helpers :compat_parameters, :record_accessor
attr_accessor :record_accessors
DEFAULT_BUFFER_TYPE = 'memory'
# url of loki server
desc 'url of loki server'
config_param :url, :string, default: 'https://logs-us-west1.grafana.net'
# BasicAuth credentials
desc 'BasicAuth credentials'
config_param :username, :string, default: nil
config_param :password, :string, default: nil, secret: true
# Loki tenant id
desc 'Loki tenant id'
config_param :tenant, :string, default: nil
# extra labels to add to all log streams
desc 'extra labels to add to all log streams'
config_param :extra_labels, :hash, default: {}
# format to use when flattening the record to a log line
desc 'format to use when flattening the record to a log line'
config_param :line_format, :enum, list: %i[json key_value], default: :key_value
# comma separated list of keys to use as stream lables. All other keys will be placed into the log line
config_param :label_keys, :string, default: 'job,instance'
desc 'extract kubernetes labels as loki labels'
config_param :extract_kubernetes_labels, :bool, default: false
# comma separated list of needless record keys to remove
config_param :remove_keys, :string, default: nil
desc 'comma separated list of needless record keys to remove'
config_param :remove_keys, :array, default: %w[], value_type: :string
# if a record only has 1 key, then just set the log line to the value and discard the key.
desc 'if a record only has 1 key, then just set the log line to the value and discard the key.'
config_param :drop_single_key, :bool, default: false
config_section :buffer do
@ -64,9 +66,18 @@ module Fluent
def configure(conf)
compat_parameters_convert(conf, :buffer)
super
@label_keys = @label_keys.split(/\s*,\s*/) if @label_keys
@remove_keys = @remove_keys.split(',').map(&:strip) if @remove_keys
@record_accessors = {}
conf.elements.select { |element| element.name == 'label' }.each do |element|
element.each_pair do |k, v|
element.key?(k) # to suppress unread configuration warning
v = k if v.empty?
@record_accessors[k] = record_accessor_create(v)
end
end
@remove_keys_accessors = []
@remove_keys.each do |key|
@remove_keys_accessors.push(record_accessor_create(key))
end
end
def multi_workers_ready?
@ -84,7 +95,7 @@ module Fluent
def write(chunk)
# streams by label
payload = generic_to_loki(chunk)
body = { 'streams': payload }
body = { 'streams' => payload }
# add ingest path to loki url
uri = URI.parse(url + '/api/prom/push')
@ -101,7 +112,7 @@ module Fluent
}
log.debug "sending #{req.body.length} bytes to loki"
res = Net::HTTP.start(uri.hostname, uri.port, **opts) { |http| http.request(req) }
unless res && res.is_a?(Net::HTTPSuccess)
unless res&.is_a?(Net::HTTPSuccess)
res_summary = if res
"#{res.code} #{res.message} #{res.body}"
else
@ -136,7 +147,7 @@ module Fluent
data_labels = data_labels.merge(@extra_labels)
data_labels.each do |k, v|
formatted_labels.push("#{k}=\"#{v.gsub('"','\\"')}\"") if v
formatted_labels.push(%(#{k}="#{v.gsub('"', '\\"')}")) if v
end
'{' + formatted_labels.join(',') + '}'
end
@ -145,7 +156,7 @@ module Fluent
payload = []
streams.each do |k, v|
# create a stream for each label set.
# Additionally sort the entries by timestamp just incase we
# Additionally sort the entries by timestamp just in case we
# got them out of order.
# 'labels' => '{worker="0"}',
payload.push(
@ -167,7 +178,7 @@ module Fluent
when :key_value
formatted_labels = []
record.each do |k, v|
formatted_labels.push("#{k}=\"#{v}\"")
formatted_labels.push(%(#{k}="#{v}"))
end
line = formatted_labels.join(' ')
end
@ -175,22 +186,31 @@ module Fluent
line
end
#
# convert a line to loki line with labels
def line_to_loki(record)
chunk_labels = {}
line = ''
if record.is_a?(Hash)
# remove needless keys.
@remove_keys.each { |v|
record.delete(v)
} if @remove_keys
# extract white listed record keys into labels.
@label_keys.each do |k|
if record.key?(k)
chunk_labels[k] = record[k]
record.delete(k)
@record_accessors&.each do |name, accessor|
new_key = name.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = accessor.call(record)
accessor.delete(record)
end
if @extract_kubernetes_labels && record.key?('kubernetes')
kubernetes_labels = record['kubernetes']['labels']
kubernetes_labels.each_key do |l|
new_key = l.gsub(%r{[.\-\/]}, '_')
chunk_labels[new_key] = kubernetes_labels[l]
end
end if @label_keys
end
# remove needless keys.
@remove_keys_accessors&.each do |deleter|
deleter.delete(record)
end
line = record_to_line(record)
else
line = record.to_s

@ -22,8 +22,12 @@ RSpec.describe Fluent::Plugin::LokiOutput do
tenant 1234
extra_labels {}
line_format key_value
label_keys "job,instance"
drop_single_key true
remove_keys a, b
<label>
job
instance instance
</label>
CONF
expect(driver.instance.url).to eq 'https://logs-us-west1.grafana.net'
@ -32,7 +36,8 @@ RSpec.describe Fluent::Plugin::LokiOutput do
expect(driver.instance.tenant).to eq '1234'
expect(driver.instance.extra_labels).to eq({})
expect(driver.instance.line_format).to eq :key_value
expect(driver.instance.label_keys).to eq %w[job instance]
expect(driver.instance.record_accessors.keys).to eq %w[job instance]
expect(driver.instance.remove_keys).to eq %w[a b]
expect(driver.instance.drop_single_key).to eq true
end
@ -139,7 +144,9 @@ RSpec.describe Fluent::Plugin::LokiOutput do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
label_keys "stream"
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
@ -153,12 +160,55 @@ RSpec.describe Fluent::Plugin::LokiOutput do
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
end
it 'extracts nested record key as label' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
<label>
pod $.kubernetes.pod
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0], 'kubernetes' => {})
end
it 'extracts nested record key as label and drop key after' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
remove_keys kubernetes
<label>
pod $.kubernetes.pod
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)
content = File.readlines('spec/gems/fluent/plugin/data/syslog')
line1 = [1_546_270_458, { 'message' => content[0], 'kubernetes' => { 'pod' => 'podname' } }]
payload = driver.instance.generic_to_loki([line1])
body = { 'streams': payload }
expect(body[:streams][0]['labels']).to eq '{pod="podname"}'
expect(body[:streams][0]['entries'].count).to eq 1
expect(body[:streams][0]['entries'][0]['ts']).to eq '2018-12-31T15:34:18.000000Z'
expect(body[:streams][0]['entries'][0]['line']).to eq Yajl.dump('message' => content[0])
end
it 'formats as simple string when only 1 record key' do
config = <<-CONF
url https://logs-us-west1.grafana.net
line_format json
label_keys "stream"
drop_single_key true
<label>
stream
</label>
CONF
driver = Fluent::Test::Driver::Output.new(described_class)
driver.configure(config)

Loading…
Cancel
Save