@ -14,15 +14,17 @@
package influxdb
import (
"encoding/json "
"context "
"errors"
"fmt"
"log/slog"
"math"
"os"
"strings"
"time"
influx "github.com/influxdata/influxdb/client/v2"
influx "github.com/influxdata/influxdb-client-go/v2"
"github.com/influxdata/influxdb-client-go/v2/api/query"
"github.com/influxdata/influxdb-client-go/v2/api/write"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/common/promslog"
@ -34,36 +36,39 @@ import (
type Client struct {
logger * slog . Logger
client influx . Client
database string
retentionPolicy string
ignoredSamples prometheus . Counter
client influx . Client
organization string
bucket string
ignoredSamples prometheus . Counter
context context . Context
}
// NewClient creates a new Client.
func NewClient ( logger * slog . Logger , conf influx . HTTPConfig , db , rp string ) * Client {
c , err := influx . NewHTTPClient ( conf )
// Currently influx.NewClient() *should* never return an error.
if err != nil {
logger . Error ( "Error creating influx HTTP client" , "err" , err )
os . Exit ( 1 )
}
func NewClient ( logger * slog . Logger , url , authToken , organization , bucket string ) * Client {
c := influx . NewClientWithOptions (
url ,
authToken ,
influx . DefaultOptions ( ) . SetPrecision ( time . Millisecond ) ,
)
if logger == nil {
logger = promslog . NewNopLogger ( )
}
return & Client {
logger : logger ,
client : c ,
database : db ,
retentionPolicy : rp ,
logger : logger ,
client : c ,
organization : organization ,
bucket : bucket ,
ignoredSamples : prometheus . NewCounter (
prometheus . CounterOpts {
Name : "prometheus_influxdb_ignored_samples_total" ,
Help : "The total number of samples not sent to InfluxDB due to unsupported float values (Inf, -Inf, NaN)." ,
} ,
) ,
context : context . Background ( ) ,
}
}
@ -80,39 +85,41 @@ func tagsFromMetric(m model.Metric) map[string]string {
// Write sends a batch of samples to InfluxDB via its HTTP API.
func ( c * Client ) Write ( samples model . Samples ) error {
points := make ( [ ] * influx . Point , 0 , len ( samples ) )
points := make ( [ ] * write . Point , 0 , len ( samples ) )
for _ , s := range samples {
v := float64 ( s . Value )
if math . IsNaN ( v ) || math . IsInf ( v , 0 ) {
c . logger . Debug ( "Cannot send to InfluxDB, skipping sample" , "value" , v , "sample" , s )
c . logger . Debug ( "Cannot send to InfluxDB, skipping sample" , "value" , v , "sample" , s )
c . ignoredSamples . Inc ( )
continue
}
p , err := influx . NewPoint (
p := influx . NewPoint (
string ( s . Metric [ model . MetricNameLabel ] ) ,
tagsFromMetric ( s . Metric ) ,
map [ string ] interface { } { "value" : v } ,
s . Timestamp . Time ( ) ,
)
if err != nil {
return err
}
points = append ( points , p )
}
bps , err := influx . NewBatchPoints ( influx . BatchPointsConfig {
Precision : "ms" ,
Database : c . database ,
RetentionPolicy : c . retentionPolicy ,
} )
if err != nil {
writeAPI := c . client . WriteAPIBlocking ( c . organization , c . bucket )
writeAPI . EnableBatching ( ) // default 5_000
var err error
for _ , p := range points {
if err = writeAPI . WritePoint ( c . context , p ) ; err != nil {
return err
}
}
if err = writeAPI . Flush ( c . context ) ; err != nil {
return err
}
bps . AddPoints ( points )
return c . client . Write ( bps )
return nil
}
func ( c * Client ) Read ( req * prompb . ReadRequest ) ( * prompb . ReadResponse , error ) {
queryAPI := c . client . QueryAPI ( c . organization )
labelsToSeries := map [ string ] * prompb . TimeSeries { }
for _ , q := range req . Queries {
command , err := c . buildCommand ( q )
@ -120,17 +127,18 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
return nil , err
}
query := influx . NewQuery ( command , c . database , "ms" )
resp , err := c . client . Query ( query )
resp , err := queryAPI . Query ( c . context , command )
if err != nil {
return nil , err
}
if resp . Err != "" {
return nil , errors . New ( resp . Err )
if resp . Err ( ) != nil {
return nil , resp . Err ( )
}
if err = mergeResult ( labelsToSeries , resp . Results ) ; err != nil {
return nil , err
for resp . Next ( ) {
if err = mergeResult ( labelsToSeries , resp . Record ( ) ) ; err != nil {
return nil , err
}
}
}
@ -146,17 +154,20 @@ func (c *Client) Read(req *prompb.ReadRequest) (*prompb.ReadResponse, error) {
}
func ( c * Client ) buildCommand ( q * prompb . Query ) ( string , error ) {
matchers := make ( [ ] string , 0 , len ( q . Matchers ) )
rangeInNs := fmt . Sprintf ( "start: time(v: %v), stop: time(v: %v)" , q . StartTimestampMs * time . Millisecond . Nanoseconds ( ) , q . EndTimestampMs * time . Millisecond . Nanoseconds ( ) )
// If we don't find a metric name matcher, query all metrics
// (InfluxDB measurements) by default.
from := "FROM /.+/"
measurement := ` r._measurement `
matchers := make ( [ ] string , 0 , len ( q . Matchers ) )
var joinedMatchers string
for _ , m := range q . Matchers {
if m . Name == model . MetricNameLabel {
switch m . Type {
case prompb . LabelMatcher_EQ :
from = fmt . Sprintf ( "FROM %q.%q" , c . retentionPolicy , m . Value )
measurement + = fmt . Sprintf ( " == \"%s\"" , m . Value )
case prompb . LabelMatcher_RE :
from = fmt . Sprintf ( "FROM %q./^%s$/" , c . retentionPolicy , escapeSlashes ( m . Value ) )
measurement + = fmt . Sprintf ( " =~ /%s/" , escapeSlashes ( m . Value ) )
default :
// TODO: Figure out how to support these efficiently.
return "" , errors . New ( "non-equal or regex-non-equal matchers are not supported on the metric name yet" )
@ -166,21 +177,28 @@ func (c *Client) buildCommand(q *prompb.Query) (string, error) {
switch m . Type {
case prompb . LabelMatcher_EQ :
matchers = append ( matchers , fmt . Sprintf ( "%q = '%s' " , m . Name , escapeSingleQuotes ( m . Value ) ) )
matchers = append ( matchers , fmt . Sprintf ( "r.%s == \"%s\" " , m . Name , escapeSingleQuotes ( m . Value ) ) )
case prompb . LabelMatcher_NEQ :
matchers = append ( matchers , fmt . Sprintf ( "%q != '%s' " , m . Name , escapeSingleQuotes ( m . Value ) ) )
matchers = append ( matchers , fmt . Sprintf ( "r.%s != \"%s\" " , m . Name , escapeSingleQuotes ( m . Value ) ) )
case prompb . LabelMatcher_RE :
matchers = append ( matchers , fmt . Sprintf ( "%q =~ /^%s$ /" , m . Name , escapeSlash es ( m . Value ) ) )
matchers = append ( matchers , fmt . Sprintf ( "r.%s =~ /%s /" , m . Name , escapeSingleQuot es ( m . Value ) ) )
case prompb . LabelMatcher_NRE :
matchers = append ( matchers , fmt . Sprintf ( "%q !~ /^%s$ /" , m . Name , escapeSlash es ( m . Value ) ) )
matchers = append ( matchers , fmt . Sprintf ( "r.%s !~ /%s /" , m . Name , escapeSingleQuot es ( m . Value ) ) )
default :
return "" , fmt . Errorf ( "unknown match type %v" , m . Type )
}
}
matchers = append ( matchers , fmt . Sprintf ( "time >= %vms" , q . StartTimestampMs ) )
matchers = append ( matchers , fmt . Sprintf ( "time <= %vms" , q . EndTimestampMs ) )
if len ( matchers ) > 0 {
joinedMatchers = fmt . Sprintf ( " and %s" , strings . Join ( matchers , " and " ) )
}
return fmt . Sprintf ( "SELECT value %s WHERE %v GROUP BY *" , from , strings . Join ( matchers , " AND " ) ) , nil
// _measurement must be retained, otherwise "invalid metric name" shall be thrown
command := fmt . Sprintf (
"from(bucket: \"%s\") |> range(%s) |> filter(fn: (r) => %s%s)" ,
c . bucket , rangeInNs , measurement , joinedMatchers ,
)
return command , nil
}
func escapeSingleQuotes ( str string ) string {
@ -191,44 +209,60 @@ func escapeSlashes(str string) string {
return strings . ReplaceAll ( str , ` / ` , ` \/ ` )
}
func mergeResult ( labelsToSeries map [ string ] * prompb . TimeSeries , results [ ] influx . Result ) error {
for _ , r := range results {
for _ , s := range r . Series {
k := concatLabels ( s . Tags )
ts , ok := labelsToSeries [ k ]
if ! ok {
ts = & prompb . TimeSeries {
Labels : tagsToLabelPairs ( s . Name , s . Tags ) ,
}
labelsToSeries [ k ] = ts
}
func mergeResult ( labelsToSeries map [ string ] * prompb . TimeSeries , record * query . FluxRecord ) error {
builtIntime := record . Time ( )
builtInvalue := record . Value ( )
builtInMeasurement := record . Measurement ( )
labels := record . Values ( )
samples , err := valuesToSamples ( s . Values )
if err != nil {
return err
}
filterOutBuiltInLabels ( labels )
k := concatLabels ( labels )
ts . Samples = mergeSamples ( ts . Samples , samples )
ts , ok := labelsToSeries [ k ]
if ! ok {
ts = & prompb . TimeSeries {
Labels : tagsToLabelPairs ( builtInMeasurement , labels ) ,
}
labelsToSeries [ k ] = ts
}
sample , err := valuesToSamples ( builtIntime , builtInvalue )
if err != nil {
return err
}
ts . Samples = mergeSamples ( ts . Samples , [ ] prompb . Sample { sample } )
return nil
}
func concatLabels ( labels map [ string ] string ) string {
func filterOutBuiltInLabels ( labels map [ string ] interface { } ) {
delete ( labels , "table" )
delete ( labels , "_start" )
delete ( labels , "_stop" )
delete ( labels , "_time" )
delete ( labels , "_value" )
delete ( labels , "_field" )
delete ( labels , "result" )
delete ( labels , "_measurement" )
}
func concatLabels ( labels map [ string ] interface { } ) string {
// 0xff cannot occur in valid UTF-8 sequences, so use it
// as a separator here.
separator := "\xff"
pairs := make ( [ ] string , 0 , len ( labels ) )
for k , v := range labels {
pairs = append ( pairs , k + separator + v )
pairs = append ( pairs , fmt . Sprintf ( "%s%s%v" , k , separator , v ) )
}
return strings . Join ( pairs , separator )
}
func tagsToLabelPairs ( name string , tags map [ string ] string ) [ ] prompb . Label {
func tagsToLabelPairs ( name string , tags map [ string ] interface { } ) [ ] prompb . Label {
pairs := make ( [ ] prompb . Label , 0 , len ( tags ) )
for k , v := range tags {
if v == "" {
if v == nil {
// If we select metrics with different sets of labels names,
// InfluxDB returns *all* possible tag names on all returned
// series, with empty tag values on series where they don't
@ -239,7 +273,7 @@ func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label {
}
pairs = append ( pairs , prompb . Label {
Name : k ,
Value : v ,
Value : fmt . Sprintf ( "% v" , v ) ,
} )
}
pairs = append ( pairs , prompb . Label {
@ -249,39 +283,22 @@ func tagsToLabelPairs(name string, tags map[string]string) []prompb.Label {
return pairs
}
func valuesToSamples ( values [ ] [ ] interface { } ) ( [ ] prompb . Sample , error ) {
samples := make ( [ ] prompb . Sample , 0 , len ( values ) )
for _ , v := range values {
if len ( v ) != 2 {
return nil , fmt . Errorf ( "bad sample tuple length, expected [<timestamp>, <value>], got %v" , v )
}
jsonTimestamp , ok := v [ 0 ] . ( json . Number )
if ! ok {
return nil , fmt . Errorf ( "bad timestamp: %v" , v [ 0 ] )
func valuesToSamples ( timestamp time . Time , value interface { } ) ( prompb . Sample , error ) {
var valueFloat64 float64
var valueInt64 int64
var ok bool
if valueFloat64 , ok = value . ( float64 ) ; ! ok {
if valueInt64 , ok = value . ( int64 ) ; ok {
valueFloat64 = float64 ( valueInt64 )
} else {
return prompb . Sample { } , fmt . Errorf ( "unable to convert sample value to float64: %v" , value )
}
jsonValue , ok := v [ 1 ] . ( json . Number )
if ! ok {
return nil , fmt . Errorf ( "bad sample value: %v" , v [ 1 ] )
}
timestamp , err := jsonTimestamp . Int64 ( )
if err != nil {
return nil , fmt . Errorf ( "unable to convert sample timestamp to int64: %w" , err )
}
value , err := jsonValue . Float64 ( )
if err != nil {
return nil , fmt . Errorf ( "unable to convert sample value to float64: %w" , err )
}
samples = append ( samples , prompb . Sample {
Timestamp : timestamp ,
Value : value ,
} )
}
return samples , nil
return prompb . Sample {
Timestamp : timestamp . UnixMilli ( ) ,
Value : valueFloat64 ,
} , nil
}
// mergeSamples merges two lists of sample pairs and removes duplicate