diff --git a/clients/cmd/logstash/lib/logstash/outputs/loki.rb b/clients/cmd/logstash/lib/logstash/outputs/loki.rb index 1b130399d9..01f9a0aa2e 100644 --- a/clients/cmd/logstash/lib/logstash/outputs/loki.rb +++ b/clients/cmd/logstash/lib/logstash/outputs/loki.rb @@ -47,6 +47,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base ## 'Backoff configuration. Initial backoff time between retries. Default 1s' config :min_delay, :validate => :number, :default => 1, :required => false + ## 'An array of fields to map to labels, if defined only fields in this list will be mapped.' + config :include_fields, :validate => :array, :default => [], :required => false + ## 'Backoff configuration. Maximum backoff time between retries. Default 300s' config :max_delay, :validate => :number, :default => 300, :required => false @@ -198,7 +201,7 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base ## Receives logstash events public def receive(event) - @entries << Entry.new(event, @message_field) + @entries << Entry.new(event, @message_field, @include_fields) end def close diff --git a/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb b/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb index dcfd70bb20..26f04fa36b 100644 --- a/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb +++ b/clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb @@ -5,7 +5,7 @@ module Loki class Entry include Loki attr_reader :labels, :entry - def initialize(event,message_field) + def initialize(event,message_field,include_fields) @entry = { "ts" => to_ns(event.get("@timestamp")), "line" => event.get(message_field).to_s @@ -18,6 +18,7 @@ module Loki event.to_hash.each { |key,value| next if key.start_with?('@') next if value.is_a?(Hash) + next if include_fields.length() > 0 and not include_fields.include?(key) @labels[key] = value.to_s } end diff --git a/clients/cmd/logstash/logstash-output-loki.gemspec b/clients/cmd/logstash/logstash-output-loki.gemspec index d58c702d14..cbd66eb062 100644 --- a/clients/cmd/logstash/logstash-output-loki.gemspec +++ b/clients/cmd/logstash/logstash-output-loki.gemspec @@ -1,6 +1,6 @@ Gem::Specification.new do |s| s.name = 'logstash-output-loki' - s.version = '1.0.4' + s.version = '1.1.0' s.authors = ['Aditya C S','Cyril Tovena'] s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com'] diff --git a/clients/cmd/logstash/loki.conf b/clients/cmd/logstash/loki.conf index f8ae19ba48..a0ab6e062a 100644 --- a/clients/cmd/logstash/loki.conf +++ b/clients/cmd/logstash/loki.conf @@ -12,6 +12,9 @@ output { #message_field => "message" #default message + # If include_fields is set, only fields in this list will be sent to Loki as labels. + #include_fields => ["service","host","app","env"] #default empty array, all labels included. + #batch_wait => 1 ## in seconds #default 1 second #batch_size => 102400 #bytes #default 102400 bytes diff --git a/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb b/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb index 4f545706c6..615d873bcc 100644 --- a/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb +++ b/clients/cmd/logstash/spec/outputs/loki/entry_spec.rb @@ -27,18 +27,25 @@ describe Loki::Entry do } it 'labels extracted should not contains object and metadata or timestamp' do - entry = Entry.new(event,"message") + entry = Entry.new(event,"message", []) expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'}) expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) expect(entry.entry['line']).to eql 'hello' end + + it 'labels extracted should only contain allowlisted labels' do + entry = Entry.new(event, "message", %w[agent foo]) + expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'}) + expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) + expect(entry.entry['line']).to eql 'hello' + end end context 'test batch generation with label order' do let (:entries) {[ - Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message"), - Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log"), - Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message"), + Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []), + Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []), + Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []), ]} let (:expected) { diff --git a/clients/cmd/logstash/spec/outputs/loki_spec.rb b/clients/cmd/logstash/spec/outputs/loki_spec.rb index bf1d42ee6a..8183798f23 100644 --- a/clients/cmd/logstash/spec/outputs/loki_spec.rb +++ b/clients/cmd/logstash/spec/outputs/loki_spec.rb @@ -28,12 +28,15 @@ describe LogStash::Outputs::Loki do context 'when adding en entry to the batch' do let (:simple_loki_config) {{'url' => 'http://localhost:3100'}} - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} - let (:lbs) { {"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} + let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h} + let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }} + let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])} + let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h} it 'should not add empty line' do plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config) - emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo") + emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", []) expect(plugin.add_entry_to_batch(emptyEntry)).to eql true expect(plugin.batch).to eql nil end @@ -50,6 +53,18 @@ describe LogStash::Outputs::Loki do expect(plugin.batch.size_bytes).to eq 14 end + it 'should only allowed labels defined in include_fields' do + plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config) + expect(plugin.batch).to eql nil + expect(plugin.add_entry_to_batch(include_entry)).to eql true + expect(plugin.add_entry_to_batch(include_entry)).to eql true + expect(plugin.batch).not_to be_nil + expect(plugin.batch.streams.length).to eq 1 + expect(plugin.batch.streams[include_lbs.to_s]['entries'].length).to eq 2 + expect(plugin.batch.streams[include_lbs.to_s]['labels']).to eq include_lbs + expect(plugin.batch.size_bytes).to eq 14 + end + it 'should not add if full' do plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config.merge!({'batch_size'=>10})) expect(plugin.batch).to eql nil @@ -69,7 +84,7 @@ describe LogStash::Outputs::Loki do end context 'batch expiration' do - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} it 'should not expire if empty' do loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5})) @@ -138,7 +153,7 @@ describe LogStash::Outputs::Loki do end context 'http requests' do - let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} + let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])} it 'should send credentials' do conf = { diff --git a/docs/sources/clients/logstash/_index.md b/docs/sources/clients/logstash/_index.md index da70791e47..7bdf2305ba 100644 --- a/docs/sources/clients/logstash/_index.md +++ b/docs/sources/clients/logstash/_index.md @@ -57,6 +57,8 @@ output { [tenant_id => string | default = nil | required=false] [message_field => string | default = "message" | required=false] + + [include_fields => array | default = [] | required=false] [batch_wait => number | default = 1(s) | required=false] @@ -106,6 +108,8 @@ Contains a `message` and `@timestamp` fields, which are respectively used to for All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](https://grafana.com/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs. You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter. +**Note:** In version 1.1.0 and greater of this plugin you can also specify a list of labels to allowlist via the `include_fields` configuration. + For example the configuration below : ```conf @@ -204,6 +208,10 @@ If using the [GrafanaLab's hosted Loki](https://grafana.com/products/cloud/), th Message field to use for log lines. You can use logstash key accessor language to grab nested property, for example : `[log][message]`. +#### include_fields + +An array of fields which will be mapped to labels and sent to Loki, when this list is configured **only** these fields will be sent, all other fields will be ignored. + #### batch_wait Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data. @@ -259,7 +267,7 @@ filter { } } mutate { - remove_field => ["tags"] + remove_field => ["tags"] # Note: with include_fields defined below this wouldn't be necessary } } @@ -273,6 +281,7 @@ output { min_delay => 3 max_delay => 500 message_field => "message" + include_fields => ["container_name","namespace","pod","host"] } # stdout { codec => rubydebug } }