@ -38,20 +38,22 @@ const (
LatencyLabel = "filename"
HostLabel = "host"
ClientLabel = "client"
TenantLabel = "tenant"
)
var UserAgent = fmt . Sprintf ( "promtail/%s" , build . Version )
type Metrics struct {
encodedBytes * prometheus . CounterVec
sentBytes * prometheus . CounterVec
droppedBytes * prometheus . CounterVec
sentEntries * prometheus . CounterVec
droppedEntries * prometheus . CounterVec
requestDuration * prometheus . HistogramVec
batchRetries * prometheus . CounterVec
countersWithHost [ ] * prometheus . CounterVec
streamLag * prometheus . GaugeVec
encodedBytes * prometheus . CounterVec
sentBytes * prometheus . CounterVec
droppedBytes * prometheus . CounterVec
sentEntries * prometheus . CounterVec
droppedEntries * prometheus . CounterVec
requestDuration * prometheus . HistogramVec
batchRetries * prometheus . CounterVec
countersWithHost [ ] * prometheus . CounterVec
countersWithTenant [ ] * prometheus . CounterVec
streamLag * prometheus . GaugeVec
}
func NewMetrics ( reg prometheus . Registerer , streamLagLabels [ ] string ) * Metrics {
@ -71,7 +73,7 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace : "promtail" ,
Name : "dropped_bytes_total" ,
Help : "Number of bytes dropped because failed to be sent to the ingester after all retries." ,
} , [ ] string { HostLabel } )
} , [ ] string { HostLabel , TenantLabel } )
m . sentEntries = prometheus . NewCounterVec ( prometheus . CounterOpts {
Namespace : "promtail" ,
Name : "sent_entries_total" ,
@ -81,7 +83,7 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace : "promtail" ,
Name : "dropped_entries_total" ,
Help : "Number of log entries dropped because failed to be sent to the ingester after all retries." ,
} , [ ] string { HostLabel } )
} , [ ] string { HostLabel , TenantLabel } )
m . requestDuration = prometheus . NewHistogramVec ( prometheus . HistogramOpts {
Namespace : "promtail" ,
Name : "request_duration_seconds" ,
@ -91,10 +93,14 @@ func NewMetrics(reg prometheus.Registerer, streamLagLabels []string) *Metrics {
Namespace : "promtail" ,
Name : "batch_retries_total" ,
Help : "Number of times batches has had to be retried." ,
} , [ ] string { HostLabel } )
} , [ ] string { HostLabel , TenantLabel } )
m . countersWithHost = [ ] * prometheus . CounterVec {
m . encodedBytes , m . sentBytes , m . droppedBytes , m . sentEntries , m . droppedEntries , m . batchRetries ,
m . encodedBytes , m . sentBytes , m . sentEntries ,
}
m . countersWithTenant = [ ] * prometheus . CounterVec {
m . droppedBytes , m . droppedEntries , m . batchRetries ,
}
streamLagLabelsMerged := [ ] string { HostLabel , ClientLabel }
@ -270,6 +276,11 @@ func (c *client) run() {
// If the batch doesn't exist yet, we create a new one with the entry
if ! ok {
batches [ tenantID ] = newBatch ( c . maxStreams , e )
// Initialize counters to 0 so the metrics are exported before the first
// occurrence of incrementing to avoid missing metrics.
for _ , counter := range c . metrics . countersWithTenant {
counter . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Add ( 0 )
}
break
}
@ -285,8 +296,9 @@ func (c *client) run() {
// The max size of the batch isn't reached, so we can add the entry
err := batch . add ( e )
if err != nil {
level . Error ( c . logger ) . Log ( "msg" , "batch add err" , "error" , err )
c . metrics . droppedEntries . WithLabelValues ( c . cfg . URL . Host ) . Inc ( )
level . Error ( c . logger ) . Log ( "msg" , "batch add err" , "tenant" , tenantID , "error" , err )
c . metrics . droppedBytes . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Add ( float64 ( len ( e . Line ) ) )
c . metrics . droppedEntries . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Inc ( )
return
}
case <- maxWaitCheck . C :
@ -376,8 +388,8 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
break
}
level . Warn ( c . logger ) . Log ( "msg" , "error sending batch, will retry" , "status" , status , "error" , err )
c . metrics . batchRetries . WithLabelValues ( c . cfg . URL . Host ) . Inc ( )
level . Warn ( c . logger ) . Log ( "msg" , "error sending batch, will retry" , "status" , status , "tenant" , tenantID , " error" , err )
c . metrics . batchRetries . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Inc ( )
backoff . Wait ( )
// Make sure it sends at least once before checking for retry.
@ -387,9 +399,9 @@ func (c *client) sendBatch(tenantID string, batch *batch) {
}
if err != nil {
level . Error ( c . logger ) . Log ( "msg" , "final error sending batch" , "status" , status , "error" , err )
c . metrics . droppedBytes . WithLabelValues ( c . cfg . URL . Host ) . Add ( bufBytes )
c . metrics . droppedEntries . WithLabelValues ( c . cfg . URL . Host ) . Add ( float64 ( entriesCount ) )
level . Error ( c . logger ) . Log ( "msg" , "final error sending batch" , "status" , status , "tenant" , tenantID , " error" , err )
c . metrics . droppedBytes . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Add ( bufBytes )
c . metrics . droppedEntries . WithLabelValues ( c . cfg . URL . Host , tenantID ) . Add ( float64 ( entriesCount ) )
}
}