@ -4,6 +4,7 @@ import (
"bufio"
"bytes"
"context"
"crypto/sha256"
"errors"
"fmt"
"io"
@ -20,7 +21,6 @@ import (
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/grafana/loki/clients/pkg/logentry/metric"
"github.com/grafana/loki/clients/pkg/promtail/api"
lokiutil "github.com/grafana/loki/pkg/util"
@ -37,11 +37,12 @@ const (
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
)
var UserAgent = fmt . Sprintf ( "promtail/%s" , build . Version )
type m etrics struct {
type M etrics struct {
encodedBytes * prometheus . CounterVec
sentBytes * prometheus . CounterVec
droppedBytes * prometheus . CounterVec
@ -49,12 +50,12 @@ type metrics struct {
droppedEntries * prometheus . CounterVec
requestDuration * prometheus . HistogramVec
batchRetries * prometheus . CounterVec
streamLag * metric . Gauges
countersWithHost [ ] * prometheus . CounterVec
streamLag * prometheus . GaugeVec
}
func n ewMetrics( reg prometheus . Registerer ) * m etrics {
var m m etrics
func N ewMetrics( reg prometheus . Registerer , streamLagLabels [ ] string ) * M etrics {
var m M etrics
m . encodedBytes = prometheus . NewCounterVec ( prometheus . CounterOpts {
Namespace : "promtail" ,
@ -92,20 +93,18 @@ func newMetrics(reg prometheus.Registerer) *metrics {
Help : "Number of times batches has had to be retried." ,
} , [ ] string { HostLabel } )
var err error
m . streamLag , err = metric . NewGauges ( "promtail_stream_lag_seconds" ,
"Difference between current time and last batch timestamp for successful sends" ,
metric . GaugeConfig { Action : "set" } ,
int64 ( 1 * time . Minute . Seconds ( ) ) , // This strips out files which update slowly and reduces noise in this metric.
)
if err != nil {
panic ( err )
}
m . countersWithHost = [ ] * prometheus . CounterVec {
m . encodedBytes , m . sentBytes , m . droppedBytes , m . sentEntries , m . droppedEntries ,
}
streamLagLabelsMerged := [ ] string { HostLabel , ClientLabel }
streamLagLabelsMerged = append ( streamLagLabelsMerged , streamLagLabels ... )
m . streamLag = prometheus . NewGaugeVec ( prometheus . GaugeOpts {
Namespace : "promtail" ,
Name : "stream_lag_seconds" ,
Help : "Difference between current time and last batch timestamp for successful sends" ,
} , streamLagLabelsMerged )
if reg != nil {
m . encodedBytes = mustRegisterOrGet ( reg , m . encodedBytes ) . ( * prometheus . CounterVec )
m . sentBytes = mustRegisterOrGet ( reg , m . sentBytes ) . ( * prometheus . CounterVec )
@ -114,7 +113,7 @@ func newMetrics(reg prometheus.Registerer) *metrics {
m . droppedEntries = mustRegisterOrGet ( reg , m . droppedEntries ) . ( * prometheus . CounterVec )
m . requestDuration = mustRegisterOrGet ( reg , m . requestDuration ) . ( * prometheus . HistogramVec )
m . batchRetries = mustRegisterOrGet ( reg , m . batchRetries ) . ( * prometheus . CounterVec )
m . streamLag = mustRegisterOrGet ( reg , m . streamLag ) . ( * metric . Gauges )
m . streamLag = mustRegisterOrGet ( reg , m . streamLag ) . ( * prometheus . GaugeVec )
}
return & m
@ -139,11 +138,13 @@ type Client interface {
// Client for pushing logs in snappy-compressed protos over HTTP.
type client struct {
metrics * metrics
logger log . Logger
cfg Config
client * http . Client
entries chan api . Entry
name string
metrics * Metrics
streamLagLabels [ ] string
logger log . Logger
cfg Config
client * http . Client
entries chan api . Entry
once sync . Once
wg sync . WaitGroup
@ -159,11 +160,12 @@ type client struct {
type Tripperware func ( http . RoundTripper ) http . RoundTripper
// New makes a new Client.
func New ( reg prometheus . Registerer , cfg Confi g, logger log . Logger ) ( Client , error ) {
return newClient ( reg , cfg , logger )
func New ( metrics * Metrics , cfg Config , streamLagLabels [ ] strin g, logger log . Logger ) ( Client , error ) {
return newClient ( metrics , cfg , streamLagLabels , logger )
}
func newClient ( reg prometheus . Registerer , cfg Config , logger log . Logger ) ( * client , error ) {
func newClient ( metrics * Metrics , cfg Config , streamLagLabels [ ] string , logger log . Logger ) ( * client , error ) {
if cfg . URL . URL == nil {
return nil , errors . New ( "client needs target URL" )
}
@ -171,15 +173,20 @@ func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*clien
ctx , cancel := context . WithCancel ( context . Background ( ) )
c := & client {
logger : log . With ( logger , "component" , "client" , "host" , cfg . URL . Host ) ,
cfg : cfg ,
entries : make ( chan api . Entry ) ,
metrics : newMetrics ( reg ) ,
logger : log . With ( logger , "component" , "client" , "host" , cfg . URL . Host ) ,
cfg : cfg ,
entries : make ( chan api . Entry ) ,
metrics : metrics ,
streamLagLabels : streamLagLabels ,
name : asSha256 ( cfg ) ,
externalLabels : cfg . ExternalLabels . LabelSet ,
ctx : ctx ,
cancel : cancel ,
}
if cfg . Name != "" {
c . name = cfg . Name
}
err := cfg . Client . Validate ( )
if err != nil {
@ -205,8 +212,8 @@ func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*clien
}
// NewWithTripperware creates a new Loki client with a custom tripperware.
func NewWithTripperware ( reg prometheus . Registerer , cfg Confi g, logger log . Logger , tp Tripperware ) ( Client , error ) {
c , err := newClient ( reg , cfg , logger )
func NewWithTripperware ( metrics * Metrics , cfg Config , streamLagLabels [ ] strin g, logger log . Logger , tp Tripperware ) ( Client , error ) {
c , err := newClient ( metrics , cfg , streamLagLabels , logger )
if err != nil {
return nil , err
}
@ -290,6 +297,14 @@ func (c *client) Chan() chan<- api.Entry {
return c . entries
}
func asSha256 ( o interface { } ) string {
h := sha256 . New ( )
h . Write ( [ ] byte ( fmt . Sprintf ( "%v" , o ) ) )
temp := fmt . Sprintf ( "%x" , h . Sum ( nil ) )
return temp [ : 6 ]
}
func ( c * client ) sendBatch ( tenantID string , batch * batch ) {
buf , entriesCount , err := batch . encode ( )
if err != nil {
@ -318,23 +333,20 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
level . Warn ( c . logger ) . Log ( "msg" , "error converting stream label string to label.Labels, cannot update lagging metric" , "error" , err )
return
}
var lblSet model . LabelSet
lblSet := make ( prometheus . Labels )
for i := range lbls {
for _ , lbl := range c . cfg . S treamLagLabels {
for _ , lbl := range c . s treamLagLabels {
if lbls [ i ] . Name == lbl {
if lblSet == nil {
lblSet = model . LabelSet { }
}
lblSet = lblSet . Merge ( model . LabelSet {
model . LabelName ( lbl ) : model . LabelValue ( lbls [ i ] . Value ) ,
} )
lblSet [ lbl ] = lbls [ i ] . Value
}
}
}
if lblSet != nil {
// always set host
lblSet = lblSet . Merge ( model . LabelSet { model . LabelName ( HostLabel ) : model . LabelValue ( c . cfg . URL . Host ) } )
lblSet [ HostLabel ] = c . cfg . URL . Host
// also set client name since if we have multiple promtail clients configured we will run into a
// duplicate metric collected with same labels error when trying to hit the /metrics endpoint
lblSet [ ClientLabel ] = c . name
c . metrics . streamLag . With ( lblSet ) . Set ( time . Since ( s . Entries [ len ( s . Entries ) - 1 ] . Timestamp ) . Seconds ( ) )
}
}
@ -434,7 +446,7 @@ func (c *client) processEntry(e api.Entry) (api.Entry, string) {
return e , tenantID
}
func ( c * client ) UnregisterLatencyMetric ( labels model . LabelSet ) {
labels [ HostLabel ] = model . LabelValue ( c . cfg . URL . Host )
func ( c * client ) UnregisterLatencyMetric ( labels prometheus . Labels ) {
labels [ HostLabel ] = c . cfg . URL . Host
c . metrics . streamLag . Delete ( labels )
}