feat(logstash): clients logstash output structured metadata support (#10899)

**What this PR does / why we need it**:
This PR adds support for Loki Structured Metadata to the logstash output plugin.

**Special notes for your reviewer**:
Given that Structure Metadata is enabled as an experimental feature plugin changes were done in a way that is backward compatible for users not wanted structured metadata.

Co-authored-by: Jared King <kingjs@gmail.com>
Co-authored-by: J Stickler <julie.stickler@grafana.com>
pull/11200/head
fostec82 2 years ago committed by GitHub
parent 42e81487e1
commit 32f1ec2fda
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 13
      clients/cmd/logstash/lib/logstash/outputs/loki.rb
  2. 14
      clients/cmd/logstash/lib/logstash/outputs/loki/batch.rb
  3. 18
      clients/cmd/logstash/lib/logstash/outputs/loki/entry.rb
  4. 2
      clients/cmd/logstash/logstash-output-loki.gemspec
  5. 15
      clients/cmd/logstash/loki-metadata.conf
  6. 3
      clients/cmd/logstash/loki.conf
  7. 21
      clients/cmd/logstash/spec/outputs/loki/entry_spec.rb
  8. 55
      clients/cmd/logstash/spec/outputs/loki_spec.rb
  9. 4
      docs/sources/get-started/labels/structured-metadata.md
  10. 24
      docs/sources/send-data/logstash/_index.md

@ -50,6 +50,9 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## 'An array of fields to map to labels, if defined only fields in this list will be mapped.'
config :include_fields, :validate => :array, :default => [], :required => false
## 'An array of fields to map to structure metadata, if defined only fields in this list will be mapped.'
config :metadata_fields, :validate => :array, :default => [], :required => false
## 'Backoff configuration. Maximum backoff time between retries. Default 300s'
config :max_delay, :validate => :number, :default => 300, :required => false
@ -71,7 +74,7 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
@logger.info("Loki output plugin", :class => self.class.name)
# initialize Queue and Mutex
@entries = Queue.new
@entries = Queue.new
@mutex = Mutex.new
@stop = false
@ -94,7 +97,7 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
@mutex.synchronize do
return if @stop
end
e = @entries.deq
return if e.nil?
@ -201,13 +204,13 @@ class LogStash::Outputs::Loki < LogStash::Outputs::Base
## Receives logstash events
public
def receive(event)
@entries << Entry.new(event, @message_field, @include_fields)
@entries << Entry.new(event, @message_field, @include_fields, @metadata_fields)
end
def close
@entries.close
@mutex.synchronize do
@stop = true
@mutex.synchronize do
@stop = true
end
@batch_wait_thread.join
@batch_size_thread.join

@ -52,7 +52,19 @@ module Loki
def build_stream(stream)
values = []
stream['entries'].each { |entry|
values.append([entry['ts'].to_s, entry['line']])
if entry.key?('metadata')
sorted_metadata = entry['metadata'].sort.to_h
values.append([
entry['ts'].to_s,
entry['line'],
sorted_metadata
])
else
values.append([
entry['ts'].to_s,
entry['line']
])
end
}
return {
'stream'=>stream['labels'],

@ -5,7 +5,7 @@ module Loki
class Entry
include Loki
attr_reader :labels, :entry
def initialize(event,message_field,include_fields)
def initialize(event,message_field,include_fields,metadata_fields)
@entry = {
"ts" => to_ns(event.get("@timestamp")),
"line" => event.get(message_field).to_s
@ -21,6 +21,22 @@ module Loki
next if include_fields.length() > 0 and not include_fields.include?(key)
@labels[key] = value.to_s
}
# Unlike include_fields we should skip if no metadata_fields provided
if metadata_fields.length() > 0
@metadata = {}
event.to_hash.each { |key,value|
next if key.start_with?('@')
next if value.is_a?(Hash)
next if metadata_fields.length() > 0 and not metadata_fields.include?(key)
@metadata[key] = value.to_s
}
# Add @metadata to @entry if there was a match
if @metadata.size > 0
@entry.merge!('metadata' => @metadata)
end
end
end
end
end

@ -1,6 +1,6 @@
Gem::Specification.new do |s|
s.name = 'logstash-output-loki'
s.version = '1.1.0'
s.version = '1.2.0'
s.authors = ['Aditya C S','Cyril Tovena']
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com']

@ -0,0 +1,15 @@
input {
generator {
message => "Hello world!"
count => 10
add_field => {cluster=> "foo" namespace=>"bar" trace_id=> "trace_001"}
}
}
output {
loki {
url => "http://localhost:3100"
include_fields => ["cluster"]
metadata_fields => ["trace_id"]
}
}

@ -15,6 +15,9 @@ output {
# If include_fields is set, only fields in this list will be sent to Loki as labels.
#include_fields => ["service","host","app","env"] #default empty array, all labels included.
# If metadata_fields is set, fields in this list will be sent to Loki as structured metadata for the associated log.
#metadata_fields => ["trace_id"] #default empty array, no structure metadata will be included
#batch_wait => 1 ## in seconds #default 1 second
#batch_size => 102400 #bytes #default 102400 bytes

@ -21,31 +21,40 @@ describe Loki::Entry do
{'@path' => '/path/to/file.log'},
},
'host' => '172.0.0.1',
'trace_id' => 'trace_001',
'@timestamp' => Time.now
}
)
}
it 'labels extracted should not contains object and metadata or timestamp' do
entry = Entry.new(event,"message", [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'})
entry = Entry.new(event,"message", [], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5', 'trace_id'=>'trace_001'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end
it 'labels extracted should only contain allowlisted labels' do
entry = Entry.new(event, "message", %w[agent foo])
entry = Entry.new(event, "message", %w[agent foo], [])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
end
it 'labels and structured metadata extracted should only contain allow listed labels and metadata' do
entry = Entry.new(event, "message", %w[agent foo], %w[trace_id])
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'foo'=>'5'})
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp"))
expect(entry.entry['line']).to eql 'hello'
expect(entry.entry['metadata']).to eql({'trace_id' => 'trace_001'})
end
end
context 'test batch generation with label order' do
let (:entries) {[
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", []),
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], []),
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log", [], []),
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message", [], []),
]}
let (:expected) {

@ -28,15 +28,15 @@ describe LogStash::Outputs::Loki do
context 'when adding en entry to the batch' do
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:lbs) {{"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"])}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:include_lbs) {{"cluster"=>"us-central1"}.sort.to_h}
it 'should not add empty line' do
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config)
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [])
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo", [], [])
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true
expect(plugin.batch).to eql nil
end
@ -83,8 +83,51 @@ describe LogStash::Outputs::Loki do
end
end
context 'when building json from batch to send' do
let (:basic_loki_config) {{'url' => 'http://localhost:3100'}}
let (:basic_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", [], [])}
let (:include_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"] }}
let (:include_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], [])}
let (:metadata_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id"] }}
let (:metadata_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id"])}
let (:metadata_multi_loki_config) {{ 'url' => 'http://localhost:3100', 'include_fields' => ["cluster"], 'metadata_fields' => ["trace_id", "user_id"] }}
let (:metadata_multi_entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","trace_id"=>"trace_001","user_id"=>"user_001","@timestamp"=>Time.at(1)}),"message", ["cluster"], ["trace_id", "user_id"])}
it 'should not include labels or metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(basic_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(basic_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"buzz":"bar","cluster":"us-central1","trace_id":"trace_001"},"values":[["1000000000","foobuzz"]]}]}'
end
it 'should include metadata with no labels' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001"}]]}]}'
end
it 'should include labels with no metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(include_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(include_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz"]]}]}'
end
it 'should include labels with multiple metadata' do
plugin = LogStash::Plugin.lookup("output", "loki").new(metadata_multi_loki_config)
expect(plugin.batch).to eql nil
expect(plugin.add_entry_to_batch(metadata_multi_entry)).to eql true
expect(plugin.batch).not_to be_nil
expect(plugin.batch.to_json).to eq '{"streams":[{"stream":{"cluster":"us-central1"},"values":[["1000000000","foobuzz",{"trace_id":"trace_001","user_id":"user_001"}]]}]}'
end
end
context 'batch expiration' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
it 'should not expire if empty' do
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5}))
@ -147,13 +190,13 @@ describe LogStash::Outputs::Loki do
loki.receive(event)
sent.deq
sleep(0.01) # Adding a minimal sleep. In few cases @batch=nil might happen after evaluating for nil
expect(loki.batch).to be_nil
expect(loki.batch).to be_nil
loki.close
end
end
context 'http requests' do
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [])}
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message", [], [])}
it 'should send credentials' do
conf = {

@ -32,6 +32,8 @@ For more information on how to push logs to Loki via the HTTP endpoint, refer to
Alternatively, you can use the Grafana Agent or Promtail to extract and attach structured metadata to your log lines.
See the [Promtail: Structured metadata stage]({{< relref "../../send-data/promtail/stages/structured_metadata" >}}) for more information.
With Loki version 1.2.0, support for structured metadata has been added to the Logstash output plugin. For more information, see [logstash]({{< relref "../../send-data/logstash/_index.md" >}}).
## Querying structured metadata
Structured metadata is extracted automatically for each returned log line and added to the labels returned for the query.
@ -49,7 +51,7 @@ Of course, you can filter by multiple labels of structured metadata at the same
{job="example"} | trace_id="0242ac120002" | user_id="superUser123"
```
Note that since structured metadata is extracted automatically to the results labels, some metric queries might return
Note that since structured metadata is extracted automatically to the results labels, some metric queries might return
an error like `maximum of series (50000) reached for a single query`. You can use the [Keep]({{< relref "../../query/log_queries#keep-labels-expression" >}}) and [Drop]({{< relref "../../query/log_queries#drop-labels-expression" >}}) stages to filter out labels that you don't need.
For example:

@ -1,8 +1,8 @@
---
title: Logstash plugin
menuTitle:
menuTitle:
description: Instructions to install, configure, and use the Logstash plugin to send logs to Loki.
aliases:
aliases:
- ../send-data/a/logstash/
weight: 800
---
@ -61,9 +61,11 @@ output {
[tenant_id => string | default = nil | required=false]
[message_field => string | default = "message" | required=false]
[include_fields => array | default = [] | required=false]
[metadata_fields => array | default = [] | required=false]
[batch_wait => number | default = 1(s) | required=false]
[batch_size => number | default = 102400(bytes) | required=false]
@ -112,8 +114,6 @@ Contains a `message` and `@timestamp` fields, which are respectively used to for
All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs.
You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter.
**Note:** In version 1.1.0 and greater of this plugin you can also specify a list of labels to allowlist via the `include_fields` configuration.
For example the configuration below :
```conf
@ -194,6 +194,13 @@ filter {
}
```
### Version Notes
Important notes regarding versions:
- Version 1.1.0 and greater of this plugin you can also specify a list of labels to allow list via the `include_fields` configuration.
- Version 1.2.0 and greater of this plugin you can also specify structured metadata via the `metadata_fields` configuration.
### Configuration Properties
#### url
@ -216,6 +223,10 @@ Message field to use for log lines. You can use logstash key accessor language t
An array of fields which will be mapped to labels and sent to Loki, when this list is configured **only** these fields will be sent, all other fields will be ignored.
#### metadata_fields
An array of fields which will be mapped to [structured metadata]({{< relref "../../get-started/labels/structured-metadata.md" >}}) and sent to Loki for each log line
#### batch_wait
Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data.
@ -246,7 +257,7 @@ Loki is a multi-tenant log storage platform and all requests sent must include a
Specify a pair of client certificate and private key with `cert` and `key` if a reverse proxy with client certificate verification is configured in front of Loki. `ca_cert` can also be specified if the server uses custom certificate authority.
### insecure_skip_verify
#### insecure_skip_verify
A flag to disable server certificate verification. By default it is set to `false`.
@ -286,6 +297,7 @@ output {
max_delay => 500
message_field => "message"
include_fields => ["container_name","namespace","pod","host"]
metadata_fields => ["pod"]
}
# stdout { codec => rubydebug }
}

Loading…
Cancel
Save