mirror of https://github.com/grafana/loki
Loki Logstash Plugin (#1822)
* Logstash plugin * include_labels * include_labels * Removes binary. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improve documentation and remove the push path. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Move to cmd. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Add more precision for jruby. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update docs/clients/logstash/README.md * p Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * ignore Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * remove ignore file/ Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * More precision for installing jruby Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Rename without Grafana Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * A lot of refactoring and testing. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * change delay logic * Fully tested version. Now testing/writing docs and charts. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Forgot to save merge. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * working version. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Makefile + easier docker build. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * adds ci to build logstash image. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix build for logstash. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Adds example with helm charts. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fix target to send 10 logs with logstash. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Improved documentation. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * add missing helm add repo for external repo Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Review comment. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Fixes loki service in Promtail. Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> * Update loki-stack version Signed-off-by: Cyril Tovena <cyril.tovena@gmail.com> Co-authored-by: Cyril Tovena <cyril.tovena@gmail.com>pull/2371/head
parent
99604a556d
commit
4c6090756f
@ -0,0 +1,7 @@ |
||||
*.gem |
||||
.ruby-version |
||||
.bundle |
||||
logstash |
||||
path |
||||
!lib |
||||
.rakeTasks |
@ -0,0 +1,24 @@ |
||||
FROM logstash:7.6.2 |
||||
|
||||
USER logstash |
||||
ENV PATH /usr/share/logstash/vendor/jruby/bin:/usr/share/logstash/vendor/bundle/jruby/2.5.0/bin:$PATH |
||||
ENV LOGSTASH_PATH="/usr/share/logstash" |
||||
ENV GEM_PATH /usr/share/logstash/vendor/bundle/jruby/2.5.0 |
||||
ENV GEM_HOME /usr/share/logstash/vendor/bundle/jruby/2.5.0 |
||||
|
||||
RUN gem install bundler:2.1.4 |
||||
|
||||
COPY --chown=logstash:logstash ./cmd/logstash/ /home/logstash/ |
||||
WORKDIR /home/logstash/ |
||||
|
||||
|
||||
RUN bundle install --path=/usr/share/logstash/vendor/bundle && \ |
||||
bundle exec rake vendor && \ |
||||
bundle exec rspec |
||||
|
||||
RUN cat logstash-output-loki.gemspec | grep s.version | awk '{print $3}' | cut -d "'" -f 2 > VERSION |
||||
|
||||
RUN gem build logstash-output-loki.gemspec && \ |
||||
PLUGIN_VERSION=$(cat VERSION); /usr/share/logstash/bin/logstash-plugin install logstash-output-loki-${PLUGIN_VERSION}.gem |
||||
|
||||
EXPOSE 5044 |
@ -0,0 +1,14 @@ |
||||
source 'https://rubygems.org' |
||||
|
||||
gemspec |
||||
|
||||
logstash_path = ENV["LOGSTASH_PATH"] || "./logstash" |
||||
|
||||
if Dir.exist?(logstash_path) |
||||
gem 'logstash-core', :path => "#{logstash_path}/logstash-core" |
||||
gem 'logstash-core-plugin-api', :path => "#{logstash_path}/logstash-core-plugin-api" |
||||
else |
||||
raise 'missing logstash vendoring' |
||||
end |
||||
|
||||
gem "webmock", "~> 3.8" |
@ -0,0 +1,74 @@ |
||||
# Contributing to Loki Logstash Output Plugin |
||||
|
||||
For information about hwo to use this plugin see this [documentation](../../docs/clients/logstash/README.md). |
||||
|
||||
## Install dependencies |
||||
|
||||
First you need to setup JRuby environment to build this plugin. Refer https://github.com/rbenv/rbenv for setting up your rbenv environment. |
||||
|
||||
After setting up `rbenv`. Install JRuby |
||||
|
||||
```bash |
||||
rbenv install jruby-9.2.10.0 |
||||
rbenv local jruby-9.2.10.0 |
||||
``` |
||||
|
||||
Check that the environment is configured |
||||
|
||||
```bash |
||||
ruby --version |
||||
jruby 9.2.10 |
||||
``` |
||||
|
||||
You should use make sure you are running jruby and not ruby. If the command below still shows ruby and not jruby, check that PATH contains `$HOME/.rbenv/shims` and `$HOME/.rbenv/bin`. Also verify that you have this in your bash profile: |
||||
|
||||
```bash |
||||
export PATH="$HOME/.rbenv/bin:$PATH" |
||||
eval "$(rbenv init -)" |
||||
``` |
||||
|
||||
Then install bundler |
||||
`gem install bundler:2.1.4` |
||||
|
||||
Follow those instructions to [install logstash](https://www.elastic.co/guide/en/logstash/current/installing-logstash.html) before moving to the next section. |
||||
|
||||
## Install dependencies and Build plugin |
||||
|
||||
### Install required packages |
||||
|
||||
```bash |
||||
git clone git@github.com:elastic/logstash.git |
||||
cd logstash |
||||
git checkout tags/v7.6.2 |
||||
export LOGSTASH_PATH=`pwd` |
||||
export GEM_PATH=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0 |
||||
export GEM_HOME=$LOGSTASH_PATH/vendor/bundle/jruby/2.5.0 |
||||
./gradlew assemble |
||||
cd .. |
||||
ruby -S bundle install |
||||
ruby -S bundle exec rake vendor |
||||
``` |
||||
|
||||
### Build the plugin |
||||
|
||||
`gem build logstash-output-loki.gemspec` |
||||
|
||||
### Test |
||||
|
||||
`ruby -S bundle exec rspec` |
||||
|
||||
Alternatively if you don't want to install JRuby. Enter inside logstash-loki container. |
||||
|
||||
```bash |
||||
docker build -t logstash-loki ./ |
||||
docker run -v `pwd`/spec:/home/logstash/spec -it --rm --entrypoint /bin/sh logstash-loki |
||||
bundle exec rspec |
||||
``` |
||||
|
||||
## Install plugin to local logstash |
||||
|
||||
`bin/logstash-plugin install --no-verify --local logstash-output-loki-1.0.0.gem` |
||||
|
||||
## Send sample event and check plugin is working |
||||
|
||||
`bin/logstash -f loki.conf` |
@ -0,0 +1 @@ |
||||
require "logstash/devutils/rake" |
@ -0,0 +1,240 @@ |
||||
# encoding: utf-8 |
||||
require "logstash/outputs/base" |
||||
require "logstash/outputs/loki/entry" |
||||
require "logstash/outputs/loki/batch" |
||||
require "logstash/namespace" |
||||
require 'net/http' |
||||
require 'concurrent-edge' |
||||
require 'time' |
||||
require 'uri' |
||||
require 'json' |
||||
|
||||
class LogStash::Outputs::Loki < LogStash::Outputs::Base |
||||
include Loki |
||||
config_name "loki" |
||||
|
||||
## 'A single instance of the Output will be shared among the pipeline worker threads' |
||||
concurrency :single |
||||
|
||||
## 'Loki URL' |
||||
config :url, :validate => :string, :required => true |
||||
|
||||
## 'BasicAuth credentials' |
||||
config :username, :validate => :string, :required => false |
||||
config :password, :validate => :string, secret: true, :required => false |
||||
|
||||
## 'Client certificate' |
||||
config :cert, :validate => :path, :required => false |
||||
config :key, :validate => :path, :required => false |
||||
|
||||
## 'TLS' |
||||
config :ca_cert, :validate => :path, :required => false |
||||
|
||||
## 'Loki Tenant ID' |
||||
config :tenant_id, :validate => :string, :required => false |
||||
|
||||
## 'Maximum batch size to accrue before pushing to loki. Defaults to 102400 bytes' |
||||
config :batch_size, :validate => :number, :default => 102400, :required => false |
||||
|
||||
## 'Interval in seconds to wait before pushing a batch of records to loki. Defaults to 1 second' |
||||
config :batch_wait, :validate => :number, :default => 1, :required => false |
||||
|
||||
## 'Log line field to pick from logstash. Defaults to "message"' |
||||
config :message_field, :validate => :string, :default => "message", :required => false |
||||
|
||||
## 'Backoff configuration. Initial backoff time between retries. Default 1s' |
||||
config :min_delay, :validate => :number, :default => 1, :required => false |
||||
|
||||
## 'Backoff configuration. Maximum backoff time between retries. Default 300s' |
||||
config :max_delay, :validate => :number, :default => 300, :required => false |
||||
|
||||
## 'Backoff configuration. Maximum number of retries to do' |
||||
config :retries, :validate => :number, :default => 10, :required => false |
||||
|
||||
attr_reader :batch |
||||
public |
||||
def register |
||||
@uri = URI.parse(@url) |
||||
unless @uri.is_a?(URI::HTTP) || @uri.is_a?(URI::HTTPS) |
||||
raise LogStash::ConfigurationError, "url parameter must be valid HTTP, currently '#{@url}'" |
||||
end |
||||
|
||||
if @min_delay > @max_delay |
||||
raise LogStash::ConfigurationError, "Min delay should be less than Max delay, currently 'Min delay is #{@min_delay} and Max delay is #{@max_delay}'" |
||||
end |
||||
|
||||
@logger.info("Loki output plugin", :class => self.class.name) |
||||
|
||||
# initialize channels |
||||
@Channel = Concurrent::Channel |
||||
@entries = @Channel.new |
||||
@stop = @Channel.new |
||||
|
||||
# create nil batch object. |
||||
@batch = nil |
||||
|
||||
# validate certs |
||||
if ssl_cert? |
||||
load_ssl |
||||
validate_ssl_key |
||||
end |
||||
|
||||
@Channel.go{run()} |
||||
end |
||||
|
||||
def ssl_cert? |
||||
!@key.nil? && !@cert.nil? |
||||
end |
||||
|
||||
def load_ssl |
||||
@cert = OpenSSL::X509::Certificate.new(File.read(@cert)) if @cert |
||||
@key = OpenSSL::PKey.read(File.read(@key)) if @key |
||||
end |
||||
|
||||
def validate_ssl_key |
||||
if !@key.is_a?(OpenSSL::PKey::RSA) && !@key.is_a?(OpenSSL::PKey::DSA) |
||||
raise LogStash::ConfigurationError, "Unsupported private key type '#{@key.class}''" |
||||
end |
||||
end |
||||
|
||||
def ssl_opts(uri) |
||||
opts = { |
||||
use_ssl: uri.scheme == 'https' |
||||
} |
||||
|
||||
if !@cert.nil? && !@key.nil? |
||||
opts = opts.merge( |
||||
verify_mode: OpenSSL::SSL::VERIFY_PEER, |
||||
cert: @cert, |
||||
key: @key |
||||
) |
||||
end |
||||
|
||||
unless @ca_cert.nil? |
||||
opts = opts.merge( |
||||
ca_file: @ca_cert |
||||
) |
||||
end |
||||
opts |
||||
end |
||||
|
||||
def run() |
||||
min_wait_checkfrequency = 1/100 #1 millisecond |
||||
max_wait_checkfrequency = @batch_wait / 10 |
||||
if max_wait_checkfrequency < min_wait_checkfrequency |
||||
max_wait_checkfrequency = min_wait_checkfrequency |
||||
end |
||||
|
||||
@max_wait_check = Concurrent::Channel.tick(max_wait_checkfrequency) |
||||
loop do |
||||
Concurrent::Channel.select do |s| |
||||
s.take(@stop) { |
||||
return |
||||
} |
||||
s.take(@entries) { |e| |
||||
if !add_entry_to_batch(e) |
||||
@logger.debug("Max batch_size is reached. Sending batch to loki") |
||||
send(@batch) |
||||
@batch = Batch.new(e) |
||||
end |
||||
} |
||||
s.take(@max_wait_check) { |
||||
# Send batch if max wait time has been reached |
||||
if is_batch_expired |
||||
@logger.debug("Max batch_wait time is reached. Sending batch to loki") |
||||
send(@batch) |
||||
@batch = nil |
||||
end |
||||
} |
||||
end |
||||
end |
||||
end |
||||
|
||||
# add an entry to the current batch return false if the batch is full |
||||
# and the entry can't be added. |
||||
def add_entry_to_batch(e) |
||||
line = e.entry['line'] |
||||
# we don't want to send empty lines. |
||||
return true if line.to_s.strip.empty? |
||||
|
||||
if @batch.nil? |
||||
@batch = Batch.new(e) |
||||
return true |
||||
end |
||||
|
||||
if @batch.size_bytes_after(line) > @batch_size |
||||
return false |
||||
end |
||||
@batch.add(e) |
||||
return true |
||||
end |
||||
|
||||
def is_batch_expired |
||||
return !@batch.nil? && @batch.age() >= @batch_wait |
||||
end |
||||
|
||||
## Receives logstash events |
||||
public |
||||
def receive(event) |
||||
@entries << Entry.new(event, @message_field) |
||||
end |
||||
|
||||
def close |
||||
@entries.close |
||||
@max_wait_check.close if !@max_wait_check.nil? |
||||
@stop << true # stop will block until it's accepted by the worker. |
||||
|
||||
# if by any chance we still have a forming batch, we need to send it. |
||||
send(@batch) if !@batch.nil? |
||||
@batch = nil |
||||
end |
||||
|
||||
def send(batch) |
||||
payload = batch.to_json |
||||
res = loki_http_request(payload) |
||||
if res.is_a?(Net::HTTPSuccess) |
||||
@logger.debug("Successfully pushed data to loki") |
||||
else |
||||
@logger.debug("failed payload", :payload => payload) |
||||
end |
||||
end |
||||
|
||||
def loki_http_request(payload) |
||||
req = Net::HTTP::Post.new( |
||||
@uri.request_uri |
||||
) |
||||
req.add_field('Content-Type', 'application/json') |
||||
req.add_field('X-Scope-OrgID', @tenant_id) if @tenant_id |
||||
req['User-Agent']= 'loki-logstash' |
||||
req.basic_auth(@username, @password) if @username |
||||
req.body = payload |
||||
|
||||
opts = ssl_opts(@uri) |
||||
|
||||
@logger.debug("sending #{req.body.length} bytes to loki") |
||||
retry_count = 0 |
||||
delay = @min_delay |
||||
begin |
||||
res = Net::HTTP.start(@uri.host, @uri.port, **opts) { |http| |
||||
http.request(req) |
||||
} |
||||
return res if !res.nil? && res.code.to_i != 429 && res.code.to_i.div(100) != 5 |
||||
raise StandardError.new res |
||||
rescue StandardError => e |
||||
retry_count += 1 |
||||
@logger.warn("Failed to send batch attempt: #{retry_count}/#{@retries}", :error_inspect => e.inspect, :error => e) |
||||
if retry_count < @retries |
||||
sleep delay |
||||
if (delay * 2 - delay) > @max_delay |
||||
delay = delay |
||||
else |
||||
delay = delay * 2 |
||||
end |
||||
retry |
||||
else |
||||
@logger.error("Failed to send batch", :error_inspect => e.inspect, :error => e) |
||||
return res |
||||
end |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,63 @@ |
||||
require 'time' |
||||
|
||||
module Loki |
||||
class Batch |
||||
attr_reader :streams |
||||
def initialize(e) |
||||
@bytes = 0 |
||||
@createdAt = Time.now |
||||
@streams = {} |
||||
add(e) |
||||
end |
||||
|
||||
def size_bytes |
||||
return @bytes |
||||
end |
||||
|
||||
def add(e) |
||||
@bytes = @bytes + e.entry['line'].length |
||||
|
||||
# Append the entry to an already existing stream (if any) |
||||
labels = e.labels.sort.to_h |
||||
labelkey = labels.to_s |
||||
if @streams.has_key?(labelkey) |
||||
stream = @streams[labelkey] |
||||
stream['entries'].append(e.entry) |
||||
return |
||||
else |
||||
# Add the entry as a new stream |
||||
@streams[labelkey] = { |
||||
"labels" => labels, |
||||
"entries" => [e.entry], |
||||
} |
||||
end |
||||
end |
||||
|
||||
def size_bytes_after(line) |
||||
return @bytes + line.length |
||||
end |
||||
|
||||
def age() |
||||
return Time.now - @createdAt |
||||
end |
||||
|
||||
def to_json |
||||
streams = [] |
||||
@streams.each { |_ , stream| |
||||
streams.append(build_stream(stream)) |
||||
} |
||||
return {"streams"=>streams}.to_json |
||||
end |
||||
|
||||
def build_stream(stream) |
||||
values = [] |
||||
stream['entries'].each { |entry| |
||||
values.append([entry['ts'].to_s, entry['line']]) |
||||
} |
||||
return { |
||||
'stream'=>stream['labels'], |
||||
'values' => values |
||||
} |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,25 @@ |
||||
module Loki |
||||
def to_ns(s) |
||||
(s.to_f * (10**9)).to_i |
||||
end |
||||
class Entry |
||||
include Loki |
||||
attr_reader :labels, :entry |
||||
def initialize(event,message_field) |
||||
@entry = { |
||||
"ts" => to_ns(event.get("@timestamp")), |
||||
"line" => event.get(message_field).to_s |
||||
} |
||||
event = event.clone() |
||||
event.remove(message_field) |
||||
event.remove("@timestamp") |
||||
|
||||
@labels = {} |
||||
event.to_hash.each { |key,value| |
||||
next if key.start_with?('@') |
||||
next if value.is_a?(Hash) |
||||
@labels[key] = value.to_s |
||||
} |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,27 @@ |
||||
Gem::Specification.new do |s| |
||||
s.name = 'logstash-output-loki' |
||||
s.version = '1.0.1' |
||||
s.authors = ['Aditya C S','Cyril Tovena'] |
||||
s.email = ['aditya.gnu@gmail.com','cyril.tovena@grafana.com'] |
||||
|
||||
s.summary = 'Output plugin to ship logs to a Grafana Loki server' |
||||
s.description = 'Output plugin to ship logs to a Grafana Loki server' |
||||
s.homepage = 'https://github.com/grafana/loki/' |
||||
s.license = 'Apache-2.0' |
||||
s.require_paths = ["lib"] |
||||
|
||||
# Files |
||||
s.files = Dir['lib/**/*','spec/**/*','vendor/**/*','*.gemspec','*.md','CONTRIBUTORS','Gemfile'] |
||||
# Tests |
||||
s.test_files = s.files.grep(%r{^(test|spec|features)/}) |
||||
|
||||
# Special flag to let us know this is actually a logstash plugin |
||||
s.metadata = { "logstash_plugin" => "true", "logstash_group" => "output" } |
||||
|
||||
# Gem dependencies |
||||
# |
||||
s.add_runtime_dependency "logstash-core-plugin-api", ">= 1.60", "<= 2.99" |
||||
s.add_runtime_dependency "logstash-codec-plain", "3.0.6" |
||||
s.add_runtime_dependency "concurrent-ruby-edge", "0.6.0" |
||||
s.add_development_dependency 'logstash-devutils', "2.0.2" |
||||
end |
@ -0,0 +1,16 @@ |
||||
input { |
||||
generator { |
||||
message => "Hello world!" |
||||
count => 10 |
||||
add_field => {cluster=> "foo" namespace=>"bar"} |
||||
} |
||||
beats { |
||||
port => 5044 |
||||
} |
||||
} |
||||
|
||||
output { |
||||
loki { |
||||
url => "${LOKI_URL}" |
||||
} |
||||
} |
@ -0,0 +1,37 @@ |
||||
input { |
||||
beats { |
||||
port => 5044 |
||||
} |
||||
} |
||||
|
||||
output { |
||||
loki { |
||||
url => "http://localhost:3100/loki/api/v1/push" |
||||
|
||||
#tenant_id => "fake" #default none |
||||
|
||||
#message_field => "message" #default message |
||||
|
||||
#batch_wait => 1 ## in seconds #default 1 second |
||||
|
||||
#batch_size => 102400 #bytes #default 102400 bytes |
||||
|
||||
#min_delay => 1 |
||||
|
||||
#max_delay => 300 |
||||
|
||||
#retries => 10 |
||||
|
||||
# Basic auth credentials |
||||
#username => "test" |
||||
|
||||
#password => "test" |
||||
|
||||
# TLS config |
||||
# cert => /path/to/certificate.pem |
||||
|
||||
# key => /path/to/key.key |
||||
|
||||
# ca_cert => /path/to/ca.pem |
||||
} |
||||
} |
@ -0,0 +1,59 @@ |
||||
# encoding: utf-8 |
||||
require "logstash/devutils/rspec/spec_helper" |
||||
require "logstash/outputs/loki" |
||||
require "logstash/codecs/plain" |
||||
require "logstash/event" |
||||
require "net/http" |
||||
include Loki |
||||
|
||||
describe Loki::Entry do |
||||
context 'test entry generation' do |
||||
let (:event) { |
||||
LogStash::Event.new( |
||||
{ |
||||
'message' => 'hello', |
||||
'@metadata' => {'foo'=>'bar'}, |
||||
'@version' => '1', |
||||
'foo' => 5, |
||||
'agent' => 'filebeat', |
||||
'log' => { |
||||
'file' => |
||||
{'@path' => '/path/to/file.log'}, |
||||
}, |
||||
'host' => '172.0.0.1', |
||||
'@timestamp' => Time.now |
||||
} |
||||
) |
||||
} |
||||
|
||||
it 'labels extracted should not contains object and metadata or timestamp' do |
||||
entry = Entry.new(event,"message") |
||||
expect(entry.labels).to eql({ 'agent' => 'filebeat', 'host' => '172.0.0.1', 'foo'=>'5'}) |
||||
expect(entry.entry['ts']).to eql to_ns(event.get("@timestamp")) |
||||
expect(entry.entry['line']).to eql 'hello' |
||||
end |
||||
end |
||||
|
||||
context 'test batch generation with label order' do |
||||
let (:entries) {[ |
||||
Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message"), |
||||
Entry.new(LogStash::Event.new({"log"=>"foobar","bar"=>"bar","@timestamp"=>Time.at(2)}),"log"), |
||||
Entry.new(LogStash::Event.new({"cluster"=>"us-central1","message"=>"foobuzz","buzz"=>"bar","@timestamp"=>Time.at(3)}),"message"), |
||||
|
||||
]} |
||||
let (:expected) { |
||||
{"streams" => [ |
||||
{"stream"=> {"buzz"=>"bar","cluster"=>"us-central1"}, "values" => [[to_ns(Time.at(1)).to_s,"foobuzz"],[to_ns(Time.at(3)).to_s,"foobuzz"]]}, |
||||
{"stream"=> {"bar"=>"bar"}, "values"=>[[to_ns(Time.at(2)).to_s,"foobar"]]}, |
||||
] } |
||||
} |
||||
|
||||
it 'to_json' do |
||||
@batch = Loki::Batch.new(entries.first) |
||||
entries.drop(1).each { |e| @batch.add(e)} |
||||
expect(JSON.parse(@batch.to_json)).to eql expected |
||||
end |
||||
end |
||||
|
||||
|
||||
end |
@ -0,0 +1,247 @@ |
||||
# encoding: utf-8 |
||||
require "logstash/devutils/rspec/spec_helper" |
||||
require "logstash/outputs/loki" |
||||
require "logstash/codecs/plain" |
||||
require "logstash/event" |
||||
require "net/http" |
||||
require 'webmock/rspec' |
||||
include Loki |
||||
|
||||
describe LogStash::Outputs::Loki do |
||||
|
||||
let (:simple_loki_config) { {'url' => 'http://localhost:3100'} } |
||||
|
||||
context 'when initializing' do |
||||
it "should register" do |
||||
loki = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config) |
||||
expect { loki.register }.to_not raise_error |
||||
end |
||||
|
||||
it 'should populate loki config with default or initialized values' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config) |
||||
expect(loki.url).to eql 'http://localhost:3100' |
||||
expect(loki.tenant_id).to eql nil |
||||
expect(loki.batch_size).to eql 102400 |
||||
expect(loki.batch_wait).to eql 1 |
||||
end |
||||
end |
||||
|
||||
context 'when adding en entry to the batch' do |
||||
let (:simple_loki_config) {{'url' => 'http://localhost:3100'}} |
||||
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} |
||||
let (:lbs) { {"buzz"=>"bar","cluster"=>"us-central1"}.sort.to_h} |
||||
|
||||
it 'should not add empty line' do |
||||
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config) |
||||
emptyEntry = Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"foo") |
||||
expect(plugin.add_entry_to_batch(emptyEntry)).to eql true |
||||
expect(plugin.batch).to eql nil |
||||
end |
||||
|
||||
it 'should add entry' do |
||||
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config) |
||||
expect(plugin.batch).to eql nil |
||||
expect(plugin.add_entry_to_batch(entry)).to eql true |
||||
expect(plugin.add_entry_to_batch(entry)).to eql true |
||||
expect(plugin.batch).not_to be_nil |
||||
expect(plugin.batch.streams.length).to eq 1 |
||||
expect(plugin.batch.streams[lbs.to_s]['entries'].length).to eq 2 |
||||
expect(plugin.batch.streams[lbs.to_s]['labels']).to eq lbs |
||||
expect(plugin.batch.size_bytes).to eq 14 |
||||
end |
||||
|
||||
it 'should not add if full' do |
||||
plugin = LogStash::Plugin.lookup("output", "loki").new(simple_loki_config.merge!({'batch_size'=>10})) |
||||
expect(plugin.batch).to eql nil |
||||
expect(plugin.add_entry_to_batch(entry)).to eql true # first entry is fine. |
||||
expect(plugin.batch).not_to be_nil |
||||
expect(plugin.batch.streams.length).to eq 1 |
||||
expect(plugin.batch.streams[lbs.to_s]['entries'].length).to eq 1 |
||||
expect(plugin.batch.streams[lbs.to_s]['labels']).to eq lbs |
||||
expect(plugin.batch.size_bytes).to eq 7 |
||||
expect(plugin.add_entry_to_batch(entry)).to eql false # second entry goes over the limit. |
||||
expect(plugin.batch).not_to be_nil |
||||
expect(plugin.batch.streams.length).to eq 1 |
||||
expect(plugin.batch.streams[lbs.to_s]['entries'].length).to eq 1 |
||||
expect(plugin.batch.streams[lbs.to_s]['labels']).to eq lbs |
||||
expect(plugin.batch.size_bytes).to eq 7 |
||||
end |
||||
end |
||||
|
||||
context 'batch expiration' do |
||||
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} |
||||
|
||||
it 'should not expire if empty' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5})) |
||||
sleep(1) |
||||
expect(loki.is_batch_expired).to be false |
||||
end |
||||
it 'should not expire batch if not old' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5})) |
||||
expect(loki.add_entry_to_batch(entry)).to eql true |
||||
expect(loki.is_batch_expired).to be false |
||||
end |
||||
it 'should expire if old' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5})) |
||||
expect(loki.add_entry_to_batch(entry)).to eql true |
||||
sleep(1) |
||||
expect(loki.is_batch_expired).to be true |
||||
end |
||||
end |
||||
|
||||
context 'channel' do |
||||
let (:event) {LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)})} |
||||
|
||||
it 'should send entry if batch size reached with no tenant' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10})) |
||||
loki.register |
||||
sent = Concurrent::Channel.new(capacity: 3) |
||||
allow(loki).to receive(:send) do |batch| |
||||
Thread.new do |
||||
sent << batch |
||||
end |
||||
end |
||||
loki.receive(event) |
||||
loki.receive(event) |
||||
loki.close |
||||
~sent |
||||
~sent |
||||
end |
||||
it 'should send entry while closing' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>10,'batch_size'=>10})) |
||||
loki.register |
||||
sent = Concurrent::Channel.new(capacity: 3) |
||||
allow(loki).to receive(:send) do | batch| |
||||
Thread.new do |
||||
sent << batch |
||||
end |
||||
end |
||||
loki.receive(event) |
||||
loki.close |
||||
~sent |
||||
end |
||||
it 'should send entry when batch is expiring' do |
||||
loki = LogStash::Outputs::Loki.new(simple_loki_config.merge!({'batch_wait'=>0.5,'batch_size'=>10})) |
||||
loki.register |
||||
sent = Concurrent::Channel.new(capacity: 3) |
||||
allow(loki).to receive(:send) do | batch| |
||||
Thread.new do |
||||
sent << batch |
||||
end |
||||
end |
||||
loki.receive(event) |
||||
~sent |
||||
expect(loki.batch).to be_nil |
||||
loki.close |
||||
end |
||||
end |
||||
|
||||
context 'http requests' do |
||||
let (:entry) {Entry.new(LogStash::Event.new({"message"=>"foobuzz","buzz"=>"bar","cluster"=>"us-central1","@timestamp"=>Time.at(1)}),"message")} |
||||
|
||||
it 'should send credentials' do |
||||
conf = { |
||||
'url'=>'http://localhost:3100/loki/api/v1/push', |
||||
'username' => 'foo', |
||||
'password' => 'bar', |
||||
'tenant_id' => 't' |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://localhost:3100/loki/api/v1/push").with( |
||||
basic_auth: ['foo', 'bar'], |
||||
body: b.to_json, |
||||
headers:{ |
||||
'Content-Type' => 'application/json' , |
||||
'User-Agent' => 'loki-logstash', |
||||
'X-Scope-OrgID'=>'t', |
||||
'Accept'=>'*/*', |
||||
'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', |
||||
} |
||||
) |
||||
loki.send(b) |
||||
expect(post).to have_been_requested.times(1) |
||||
end |
||||
|
||||
it 'should not send credentials' do |
||||
conf = { |
||||
'url'=>'http://foo.com/loki/api/v1/push', |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://foo.com/loki/api/v1/push").with( |
||||
body: b.to_json, |
||||
headers:{ |
||||
'Content-Type' => 'application/json' , |
||||
'User-Agent' => 'loki-logstash', |
||||
'Accept'=>'*/*', |
||||
'Accept-Encoding'=>'gzip;q=1.0,deflate;q=0.6,identity;q=0.3', |
||||
} |
||||
) |
||||
loki.send(b) |
||||
expect(post).to have_been_requested.times(1) |
||||
end |
||||
it 'should retry 500' do |
||||
conf = { |
||||
'url'=>'http://foo.com/loki/api/v1/push', |
||||
'retries' => 3, |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://foo.com/loki/api/v1/push").with( |
||||
body: b.to_json, |
||||
).to_return(status: [500, "Internal Server Error"]) |
||||
loki.send(b) |
||||
loki.close |
||||
expect(post).to have_been_requested.times(3) |
||||
end |
||||
it 'should retry 429' do |
||||
conf = { |
||||
'url'=>'http://foo.com/loki/api/v1/push', |
||||
'retries' => 2, |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://foo.com/loki/api/v1/push").with( |
||||
body: b.to_json, |
||||
).to_return(status: [429, "stop spamming"]) |
||||
loki.send(b) |
||||
loki.close |
||||
expect(post).to have_been_requested.times(2) |
||||
end |
||||
it 'should not retry 400' do |
||||
conf = { |
||||
'url'=>'http://foo.com/loki/api/v1/push', |
||||
'retries' => 11, |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://foo.com/loki/api/v1/push").with( |
||||
body: b.to_json, |
||||
).to_return(status: [400, "bad request"]) |
||||
loki.send(b) |
||||
loki.close |
||||
expect(post).to have_been_requested.times(1) |
||||
end |
||||
it 'should retry exception' do |
||||
conf = { |
||||
'url'=>'http://foo.com/loki/api/v1/push', |
||||
'retries' => 11, |
||||
} |
||||
loki = LogStash::Outputs::Loki.new(conf) |
||||
loki.register |
||||
b = Batch.new(entry) |
||||
post = stub_request(:post, "http://foo.com/loki/api/v1/push").with( |
||||
body: b.to_json, |
||||
).to_raise("some error").then.to_return(status: [200, "fine !"]) |
||||
loki.send(b) |
||||
loki.close |
||||
expect(post).to have_been_requested.times(2) |
||||
end |
||||
end |
||||
end |
@ -0,0 +1,269 @@ |
||||
# Logstash |
||||
|
||||
Loki has a [Logstash](https://www.elastic.co/logstash) output plugin called |
||||
`logstash-output-loki` that enables shipping logs to a Loki |
||||
instance or [Grafana Cloud](https://grafana.com/products/cloud/). |
||||
|
||||
## Installation |
||||
|
||||
### Local |
||||
|
||||
If you need to install the Loki output plugin manually you can do simply so by using the command below: |
||||
|
||||
```bash |
||||
$ bin/logstash-plugin install logstash-output-loki |
||||
``` |
||||
|
||||
This will download the latest gem for the output plugin and install it in logstash. |
||||
|
||||
### Docker |
||||
|
||||
We also provide a docker image on [docker hub](https://hub.docker.com/r/grafana/logstash-output-loki). The image contains logstash and the Loki output plugin |
||||
already pre-installed. |
||||
|
||||
For example if you want to run logstash in docker with the `loki.conf` as pipeline configuration you can use the command bellow : |
||||
|
||||
```bash |
||||
docker run -v `pwd`/loki-test.conf:/home/logstash/ --rm grafana/logstash-output-loki:1.0.1 -f loki-test.conf |
||||
``` |
||||
|
||||
### Kubernetes |
||||
|
||||
We also provides default helm values for scraping logs with Filebeat and forward them to Loki with logstash in our `loki-stack` umbrella chart. |
||||
You can switch from Promtail to logstash by using the following command: |
||||
|
||||
```bash |
||||
helm upgrade --install loki loki/loki-stack \ |
||||
--set filebeat.enabled=true,logstash.enabled=true,promtail.enabled=false |
||||
``` |
||||
|
||||
This will automatically scrape all pods logs in the cluster and send them to Loki with Kubernetes metadata attached as labels. |
||||
You can use the [`values.yaml`](../../../production/helm/loki-stack/values.yaml) file as a starting point for your own configuration. |
||||
|
||||
## Usage and Configuration |
||||
|
||||
To configure Logstash to forward logs to Loki, simply add the `loki` output to your [Logstash configuration file](https://www.elastic.co/guide/en/logstash/current/configuration-file-structure.html) as documented below : |
||||
|
||||
```conf |
||||
output { |
||||
loki { |
||||
[url => "" | default = none | required=true] |
||||
|
||||
[tenant_id => string | default = nil | required=false] |
||||
|
||||
[message_field => string | default = "message" | required=false] |
||||
|
||||
[batch_wait => number | default = 1(s) | required=false] |
||||
|
||||
[batch_size => number | default = 102400(bytes) | required=false] |
||||
|
||||
[min_delay => number | default = 1(s) | required=false] |
||||
|
||||
[max_delay => number | default = 300(s) | required=false] |
||||
|
||||
[retries => number | default = 10 | required=false] |
||||
|
||||
[username => string | default = nil | required=false] |
||||
|
||||
[password => secret | default = nil | required=false] |
||||
|
||||
[cert => path | default = nil | required=false] |
||||
|
||||
[key => path | default = nil| required=false] |
||||
|
||||
[ca_cert => path | default = nil | required=false] |
||||
|
||||
} |
||||
} |
||||
``` |
||||
|
||||
By default Loki will create entry from event fields it receives. |
||||
A logstash event as shown below. |
||||
|
||||
```conf |
||||
{ |
||||
"@timestamp" => 2017-04-26T19:33:39.257Z, |
||||
"src" => "localhost", |
||||
"@version" => "1", |
||||
"host" => "localhost.localdomain", |
||||
"pid" => "1", |
||||
"message" => "Apr 26 12:20:02 localhost systemd[1]: Starting system activity accounting tool...", |
||||
"type" => "stdin", |
||||
"prog" => "systemd", |
||||
} |
||||
``` |
||||
|
||||
Contains a `message` and `@timestamp` fields, which are respectively used to form the Loki entry log line and timestamp. |
||||
|
||||
> You can use a different property for the log line by using the configuration property [`message_field`](message_field). If you also need to change the timestamp value use the Logstash `date` filter to change the `@timestamp` field. |
||||
|
||||
All other fields (except nested fields) will form the label set (key value pairs) attached to the log line. [This means you're responsible for mutating and dropping high cardinality labels](https://grafana.com/blog/2020/04/21/how-labels-in-loki-can-make-log-queries-faster-and-easier/) such as client IPs. |
||||
You can usually do so by using a [`mutate`](https://www.elastic.co/guide/en/logstash/current/plugins-filters-mutate.html) filter. |
||||
|
||||
For example the configuration below : |
||||
|
||||
```conf |
||||
input { |
||||
... |
||||
} |
||||
|
||||
filter { |
||||
mutate { |
||||
add_field => { |
||||
"cluster" => "us-central1" |
||||
"job" => "logstash" |
||||
} |
||||
replace => { "type" => "stream"} |
||||
remove_field => ["src"] |
||||
} |
||||
} |
||||
output { |
||||
loki { |
||||
url => "http://myloki.domain:3100/loki/api/v1/push" |
||||
} |
||||
} |
||||
``` |
||||
|
||||
Will add `cluster` and `job` static labels, remove `src` fields and replace `type` to be named `stream`. |
||||
|
||||
If you want to include nested fields or metadata fields (starting with `@`) you need to rename them. |
||||
|
||||
For example when using Filebeat with the [`add_kubernetes_metadata`](https://www.elastic.co/guide/en/beats/filebeat/current/add-kubernetes-metadata.html) processor, it will attach Kubernetes metadata to your events like below: |
||||
|
||||
```json |
||||
{ |
||||
"kubernetes" : { |
||||
"labels" : { |
||||
"app" : "MY-APP", |
||||
"pod-template-hash" : "959f54cd", |
||||
"serving" : "true", |
||||
"version" : "1.0", |
||||
"visualize" : "true" |
||||
}, |
||||
"pod" : { |
||||
"uid" : "e20173cb-3c5f-11ea-836e-02c1ee65b375", |
||||
"name" : "MY-APP-959f54cd-lhd5p" |
||||
}, |
||||
"node" : { |
||||
"name" : "ip-xxx-xx-xx-xxx.ec2.internal" |
||||
}, |
||||
"container" : { |
||||
"name" : "istio" |
||||
}, |
||||
"namespace" : "production", |
||||
"replicaset" : { |
||||
"name" : "MY-APP-959f54cd" |
||||
} |
||||
}, |
||||
"message": "Failed to parse configuration", |
||||
"@timestamp": "2017-04-26T19:33:39.257Z", |
||||
} |
||||
``` |
||||
|
||||
The filter below show you how to extract those Kubernetes fields into labels (`container_name`,`namespace`,`pod` and `host`): |
||||
|
||||
```conf |
||||
filter { |
||||
if [kubernetes] { |
||||
mutate { |
||||
add_field => { |
||||
"container_name" => "%{[kubernetes][container][name]}" |
||||
"namespace" => "%{[kubernetes][namespace]}" |
||||
"pod" => "%{[kubernetes][pod][name]}" |
||||
} |
||||
replace => { "host" => "%{[kubernetes][node][name]}"} |
||||
} |
||||
} |
||||
mutate { |
||||
remove_field => ["tags"] |
||||
} |
||||
} |
||||
``` |
||||
|
||||
### Configuration Properties |
||||
|
||||
#### url |
||||
|
||||
The url of the Loki server to send logs to. |
||||
When sending data the push path need to also be provided e.g. `http://localhost:3100/loki/api/v1/push`. |
||||
|
||||
If you want to send to [GrafanaCloud](https://grafana.com/products/cloud/) you would use `https://logs-prod-us-central1.grafana.net/loki/api/v1/push`. |
||||
|
||||
#### username / password |
||||
|
||||
Specify a username and password if the Loki server requires basic authentication. |
||||
If using the [GrafanaLab's hosted Loki](https://grafana.com/products/cloud/), the username needs to be set to your instance/user id and the password should be a Grafana.com api key. |
||||
|
||||
#### message_field |
||||
|
||||
Message field to use for log lines. You can use logstash key accessor language to grab nested property, for example : `[log][message]`. |
||||
|
||||
#### batch_wait |
||||
|
||||
Interval in seconds to wait before pushing a batch of records to Loki. This means even if the [batch size](#batch_size) is not reached after `batch_wait` a partial batch will be sent, this is to ensure freshness of the data. |
||||
|
||||
#### batch_size |
||||
|
||||
Maximum batch size to accrue before pushing to loki. Defaults to 102400 bytes |
||||
|
||||
#### Backoff config |
||||
|
||||
##### min_delay => 1(1s) |
||||
|
||||
Initial backoff time between retries |
||||
|
||||
##### max_delay => 300(5m) |
||||
|
||||
Maximum backoff time between retries |
||||
|
||||
##### retries => 10 |
||||
|
||||
Maximum number of retries to do |
||||
|
||||
#### tenant_id |
||||
|
||||
Loki is a multi-tenant log storage platform and all requests sent must include a tenant. For some installations the tenant will be set automatically by an authenticating proxy. Otherwise you can define a tenant to be passed through. The tenant can be any string value. |
||||
|
||||
#### client certificate verification |
||||
|
||||
Specify a pair of client certificate and private key with `cert` and `key` if a reverse proxy with client certificate verification is configured in front of Loki. `ca_cert` can also be specified if the server uses custom certificate authority. |
||||
|
||||
### Full configuration example |
||||
|
||||
```conf |
||||
input { |
||||
beats { |
||||
port => 5044 |
||||
} |
||||
} |
||||
|
||||
filter { |
||||
if [kubernetes] { |
||||
mutate { |
||||
add_field => { |
||||
"container_name" => "%{[kubernetes][container][name]}" |
||||
"namespace" => "%{[kubernetes][namespace]}" |
||||
"pod" => "%{[kubernetes][pod][name]}" |
||||
} |
||||
replace => { "host" => "%{[kubernetes][node][name]}"} |
||||
} |
||||
} |
||||
mutate { |
||||
remove_field => ["tags"] |
||||
} |
||||
} |
||||
|
||||
output { |
||||
loki { |
||||
url => "https://logs-prod-us-central1.grafana.net/loki/api/v1/push" |
||||
username => "3241" |
||||
password => "REDACTED" |
||||
batch_size => 112640 #112.64 kilobytes |
||||
retries => 5 |
||||
min_delay => 3 |
||||
max_delay => 500 |
||||
message_field => "message" |
||||
} |
||||
# stdout { codec => rubydebug } |
||||
} |
||||
``` |
Loading…
Reference in new issue