@ -2,6 +2,7 @@ package reader
import (
"context"
"crypto/tls"
"encoding/base64"
"fmt"
"io"
@ -21,6 +22,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/common/config"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel"
@ -48,32 +50,36 @@ type LokiReader interface {
}
type Reader struct {
header http . Header
tls bool
addr string
user string
pass string
tenantID string
queryTimeout time . Duration
sName string
sValue string
lName string
lVal string
backoff * backoff . Backoff
nextQuery time . Time
backoffMtx sync . RWMutex
interval time . Duration
conn * websocket . Conn
w io . Writer
recv chan time . Time
quit chan struct { }
shuttingDown bool
done chan struct { }
header http . Header
tls bool
clientTLSConfig * tls . Config
caFile string
addr string
user string
pass string
tenantID string
queryTimeout time . Duration
sName string
sValue string
lName string
lVal string
backoff * backoff . Backoff
nextQuery time . Time
backoffMtx sync . RWMutex
interval time . Duration
conn * websocket . Conn
w io . Writer
recv chan time . Time
quit chan struct { }
shuttingDown bool
done chan struct { }
}
func NewReader ( writer io . Writer ,
receivedChan chan time . Time ,
tls bool ,
tlsConfig * tls . Config ,
caFile string ,
address string ,
user string ,
pass string ,
@ -102,25 +108,27 @@ func NewReader(writer io.Writer,
bkoff := backoff . New ( context . Background ( ) , bkcfg )
rd := Reader {
header : h ,
tls : tls ,
addr : address ,
user : user ,
pass : pass ,
tenantID : tenantID ,
queryTimeout : queryTimeout ,
sName : streamName ,
sValue : streamValue ,
lName : labelName ,
lVal : labelVal ,
nextQuery : next ,
backoff : bkoff ,
interval : interval ,
w : writer ,
recv : receivedChan ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
shuttingDown : false ,
header : h ,
tls : tls ,
clientTLSConfig : tlsConfig ,
caFile : caFile ,
addr : address ,
user : user ,
pass : pass ,
tenantID : tenantID ,
queryTimeout : queryTimeout ,
sName : streamName ,
sValue : streamValue ,
lName : labelName ,
lVal : labelVal ,
nextQuery : next ,
backoff : bkoff ,
interval : interval ,
w : writer ,
recv : receivedChan ,
quit : make ( chan struct { } ) ,
done : make ( chan struct { } ) ,
shuttingDown : false ,
}
go rd . run ( )
@ -189,7 +197,11 @@ func (r *Reader) QueryCountOverTime(queryRange string) (float64, error) {
}
req . Header . Set ( "User-Agent" , userAgent )
resp , err := http . DefaultClient . Do ( req )
httpClient , err := r . httpClient ( )
if err != nil {
return 0 , errors . Wrap ( err , "failed to create httpClient when querying Loki for count of logs over time" )
}
resp , err := httpClient . Do ( req )
if err != nil {
return 0 , err
}
@ -280,10 +292,14 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
}
req . Header . Set ( "User-Agent" , userAgent )
resp , err := http . DefaultClient . Do ( req )
httpClient , err := r . httpClient ( )
if err != nil {
return nil , err
}
resp , err := httpClient . Do ( req )
if err != nil {
return nil , errors . Wrap ( err , "failed to create httpClient when issuing Loki query" )
}
defer func ( ) {
if err := resp . Body . Close ( ) ; err != nil {
log . Println ( "error closing body" , err )
@ -329,7 +345,7 @@ func (r *Reader) Query(start time.Time, end time.Time) ([]time.Time, error) {
return tss , nil
}
// run uses the established websocket connection to tail logs from Loki and
// run uses the established websocket connection to tail logs from Loki
func ( r * Reader ) run ( ) {
r . closeAndReconnect ( )
@ -421,7 +437,8 @@ func (r *Reader) closeAndReconnect() {
fmt . Fprintf ( r . w , "Connecting to loki at %v, querying for label '%v' with value '%v'\n" , u . String ( ) , r . lName , r . lVal )
c , _ , err := websocket . DefaultDialer . Dial ( u . String ( ) , r . header )
dialer := r . webSocketDialer ( )
c , _ , err := dialer . Dial ( u . String ( ) , r . header )
if err != nil {
fmt . Fprintf ( r . w , "failed to connect to %s with err %s\n" , u . String ( ) , err )
<- time . After ( 10 * time . Second )
@ -442,6 +459,35 @@ func (r *Reader) closeAndReconnect() {
}
}
// httpClient uses the config in Reader to return a http client.
// http.DefaultClient will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a http.Client configured to use the client side certificates.
func ( r * Reader ) httpClient ( ) ( * http . Client , error ) {
if r . clientTLSConfig == nil {
return http . DefaultClient , nil
}
rt , err := config . NewTLSRoundTripper ( r . clientTLSConfig , r . caFile , func ( tls * tls . Config ) ( http . RoundTripper , error ) {
return & http . Transport { TLSClientConfig : tls } , nil
} )
if err != nil {
return nil , err
}
return & http . Client {
Transport : rt ,
} , nil
}
// webSocketDialer creates a dialer for the web socket connection to Loki
// websocket.DefaultDialer will be returned in the case that the connection to Loki is http or TLS without client certs.
// For the mTLS case, return a websocket.Dialer configured to use client side certificates.
func ( r * Reader ) webSocketDialer ( ) * websocket . Dialer {
return & websocket . Dialer {
Proxy : http . ProxyFromEnvironment ,
TLSClientConfig : r . clientTLSConfig ,
HandshakeTimeout : 45 * time . Second ,
}
}
func parseResponse ( entry * loghttp . Entry ) ( * time . Time , error ) {
sp := strings . Split ( entry . Line , " " )
if len ( sp ) != 2 {