Logstash: Add config option which allows seting up an allowlist for labels to be mapped to Loki (#5244)

* add a config called `include_fields` which lets you allowlist a specific set of fields which will be mapped to labels and sent to Loki.

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* update docs

Signed-off-by: Edward Welch <edward.welch@grafana.com>

* restore path for Dockerfile so it works in CI

Signed-off-by: Edward Welch <edward.welch@grafana.com>
pull/5268/head
Ed Welch 4 years ago committed by GitHub
parent f5d570177a
commit 09a5a2cee3
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 5
      clients/cmd/logstash/lib/logstash/outputs/loki.rb
  2. 3
      clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb
  3. 2
      clients/cmd/logstash/logstash-output-loki.gemspec
  4. 3
      clients/cmd/logstash/loki.conf
  5. 15
      clients/cmd/logstash/spec/outputs/loki/entry_spec.rb
  6. 25
      clients/cmd/logstash/spec/outputs/loki_spec.rb
  7. 11
      docs/sources/clients/logstash/_index.md

@ -47,6 +47,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'Backoff configuration. Initial backoff time between retries. Default 1s'
config :min_delay, :validate => :number, :default => 1, :required => false
## 'An array of fields to map to labels, if defined only fields in this list will be mapped.'
config :include_fields, :validate => :array, :default => [], :required => false
## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false
@ -198,7 +201,7 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## Receives logstash events
public
def receive(event)
@entries << Entry.new(event, @message_field)
@entries << Entry.new(event, @message_field, @include_fields)
end
def close

@ -5,7 +5,7 @@ module Loki
class Entry
include Loki
attr_reader :labels, :entry
def initialize(event,message_field)
def initialize(event,message_field,include_fields)
@entry = {
"ts" => to_ns(event.get("@timestamp")),
"line" => event.get(message_field).to_s
@ -18,6 +18,7 @@ module Loki
event.to_hash.each { |key,value|
next if key.start_with?('@')
next if value.is_a?(Hash)
next if include_fields.length() > 0 and not include_fields.include?(key)
@labels[key] = value.to_s
}
end

@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.0.4'
s.version = '1.1.0'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

@ -12,6 +12,9 @@ output {
#message_field => "message" #default message
# If include_fields is set, only fields in this list will be sent to Loki as labels.
#include_fields => ["service","host","app","env"] #default empty array, all labels included.
#batch_wait => 1 ## in seconds #default 1 second
#batch_size => 102400 #bytes #default 102400 bytes

@ -27,18 +27,25 @@ describe Loki::Entry do
}
it 'labels extracted should not contains object and metadata or timestamp' do
entry = Entry.new(event,"message")
entry = Entry.new(event,"message", [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end
it 'labels extracted should only contain allowlisted labels' do
entry = Entry.new(event, "message", %w[agent foo])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end
end
context 'test batch generation with label order' do
let (:entries) {[
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message"),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log"),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message"),
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []),
]}
let (:expected) {

@ -28,12 +28,15 @@ describe LogStash::Outputs::Loki do
context 'when adding en entry to the batch' do
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")}
let (:lbs) { {"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])}
let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h}
it 'should not add empty line' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config)
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo")
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [])
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true
expect(plugin.batch).to eql nil
end
@ -50,6 +53,18 @@ describe LogStash::Outputs::Loki do
expect(plugin.batch.size_bytes).to eq 14
end
it 'should only allowed labels defined in include_fields' do
plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.streams.length).to eq 1
expect(plugin.batch.streams[include_lbs.to_s]['entries'].length).to eq 2
expect(plugin.batch.streams[include_lbs.to_s]['labels']).to eq include_lbs
expect(plugin.batch.size_bytes).to eq 14
end
it 'should not add if full' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config.merge!({'batch_size'=>10}))
expect(plugin.batch).to eql nil
@ -69,7 +84,7 @@ describe LogStash::Outputs::Loki do
end
context 'batch expiration' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
it 'should not expire if empty' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5}))
@ -138,7 +153,7 @@ describe LogStash::Outputs::Loki do
end
context 'http requests' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
it 'should send credentials' do
conf = {

@ -57,6 +57,8 @@ output {
[tenant_id => string | default = nil | required=false]
[message_field => string | default = "message" | required=false]
[include_fields => array | default = [] | required=false]
[batch_wait => number | default = 1(s) | required=false]
@ -106,6 +108,8 @@ Contains a `message` and `@timestamp` fields, which are respectively used to for
All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](https://grafana.com/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs.
You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter.
**Note:** In version 1.1.0 and greater of this plugin you can also specify a list of labels to allowlist via the `include_fields` configuration.
For example the configuration below :
```conf
@ -204,6 +208,10 @@ If using the [GrafanaLab's hosted Loki](https://grafana.com/products/cloud/), th
Message field to use for log lines. You can use logstash key accessor language to grab nested property, for example : `[log][message]`.
#### include_fields
An array of fields which will be mapped to labels and sent to Loki, when this list is configured **only** these fields will be sent, all other fields will be ignored.
#### batch_wait
Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data.
@ -259,7 +267,7 @@ filter {
}
}
mutate {
remove_field => ["tags"]
remove_field => ["tags"] # Note: with include_fields defined below this wouldn't be necessary
}
}
@ -273,6 +281,7 @@ output {
min_delay => 3
max_delay => 500
message_field => "message"
include_fields => ["container_name","namespace","pod","host"]
}
# stdout { codec => rubydebug }
}

Loading…
Cancel
Save