@ -276,45 +276,60 @@ func (e *sqlQueryEndpoint) transformToTable(query *tsdb.Query, rows *core.Rows,
return nil
}
func ( e * sqlQueryEndpoint ) transformToTimeSeries ( query * tsdb . Query , rows * core . Rows , result * tsdb . QueryResult , tsdbQuery * tsdb . TsdbQuery ) error {
pointsBySeries := make ( map [ string ] * tsdb . TimeSeries )
seriesByQueryOrder := list . New ( )
func newProcessCfg ( query * tsdb . Query , tsdbQuery * tsdb . TsdbQuery , rows * core . Rows ) ( * processCfg , error ) {
columnNames , err := rows . Columns ( )
if err != nil {
return err
return nil , err
}
columnTypes , err := rows . ColumnTypes ( )
if err != nil {
return err
return nil , err
}
rowCount := 0
timeIndex := - 1
metricIndex := - 1
metricPrefix := false
var metricPrefixValue string
fillMissing := query . Model . Get ( "fill" ) . MustBool ( false )
cfg := & processCfg {
rowCount : 0 ,
columnTypes : columnTypes ,
columnNames : columnNames ,
rows : rows ,
timeIndex : - 1 ,
metricIndex : - 1 ,
metricPrefix : false ,
fillMissing : fillMissing ,
seriesByQueryOrder : list . New ( ) ,
pointsBySeries : make ( map [ string ] * tsdb . TimeSeries ) ,
tsdbQuery : tsdbQuery ,
}
return cfg , nil
}
func ( e * sqlQueryEndpoint ) transformToTimeSeries ( query * tsdb . Query , rows * core . Rows , result * tsdb . QueryResult ,
tsdbQuery * tsdb . TsdbQuery ) error {
cfg , err := newProcessCfg ( query , tsdbQuery , rows )
if err != nil {
return err
}
// check columns of resultset: a column named time is mandatory
// the first text column is treated as metric name unless a column named metric is present
for i , col := range columnNames {
for i , col := range cfg . c olumnNames {
for _ , tc := range e . timeColumnNames {
if col == tc {
timeIndex = i
cfg . timeIndex = i
continue
}
}
switch col {
case "metric" :
metricIndex = i
cfg . metricIndex = i
default :
if metricIndex == - 1 {
columnType := columnTypes [ i ] . DatabaseTypeName ( )
if cfg . metricIndex == - 1 {
columnType := cfg . c olumnTypes [ i ] . DatabaseTypeName ( )
for _ , mct := range e . metricColumnTypes {
if columnType == mct {
metricIndex = i
cfg . metricIndex = i
continue
}
}
@ -323,154 +338,179 @@ func (e *sqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core.R
}
// use metric column as prefix with multiple value columns
if metricIndex != - 1 && len ( columnNames ) > 3 {
metricPrefix = true
if cfg . metricIndex != - 1 && len ( cfg . columnNames ) > 3 {
cfg . metricPrefix = true
}
if timeIndex == - 1 {
if cfg . timeIndex == - 1 {
return fmt . Errorf ( "Found no column named %s" , strings . Join ( e . timeColumnNames , " or " ) )
}
fillMissing := query . Model . Get ( "fill" ) . MustBool ( false )
var fillInterval float64
fillValue := null . Float { }
fillPrevious := false
if fillMissing {
fillInterval = query . Model . Get ( "fillInterval" ) . MustFloat64 ( ) * 1000
if cfg . fillMissing {
cfg . fillInterval = query . Model . Get ( "fillInterval" ) . MustFloat64 ( ) * 1000
switch query . Model . Get ( "fillMode" ) . MustString ( ) {
case "null" :
case "previous" :
fillPrevious = true
cfg . fillPrevious = true
case "value" :
fillValue . Float64 = query . Model . Get ( "fillValue" ) . MustFloat64 ( )
fillValue . Valid = true
cfg . fillValue . Float64 = query . Model . Get ( "fillValue" ) . MustFloat64 ( )
cfg . fillValue . Valid = true
}
}
for rows . Next ( ) {
var timestamp float64
var value null . Float
var metric string
if rowCount > rowLimit {
return fmt . Errorf ( "query row limit exceeded, limit %d" , rowLimit )
}
values , err := e . queryResultTransformer . TransformQueryResult ( columnTypes , rows )
if err != nil {
if err := e . processRow ( cfg ) ; err != nil {
return err
}
}
// converts column named time to unix timestamp in milliseconds to make
// native mysql datetime types and epoch dates work in
// annotation and table queries.
ConvertSqlTimeColumnToEpochMs ( values , timeIndex )
switch columnValue := values [ timeIndex ] . ( type ) {
case int64 :
timestamp = float64 ( columnValue )
case float64 :
timestamp = columnValue
default :
return fmt . Errorf ( "Invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v" , columnValue , columnValue )
for elem := cfg . seriesByQueryOrder . Front ( ) ; elem != nil ; elem = elem . Next ( ) {
key := elem . Value . ( string )
result . Series = append ( result . Series , cfg . pointsBySeries [ key ] )
if ! cfg . fillMissing {
continue
}
if metricIndex >= 0 {
if columnValue , ok := values [ metricIndex ] . ( string ) ; ok {
if metricPrefix {
metricPrefixValue = columnValue
} else {
metric = columnValue
}
series := cfg . pointsBySeries [ key ]
// fill in values from last fetched value till interval end
intervalStart := series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64
intervalEnd := float64 ( tsdbQuery . TimeRange . MustGetTo ( ) . UnixNano ( ) / 1e6 )
if cfg . fillPrevious {
if len ( series . Points ) > 0 {
cfg . fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
} else {
return fmt . Errorf ( "Column metric must be of type %s. metric column name: %s type: %s but datatype is %T" , strings . Join ( e . metricColumnTypes , ", " ) , columnNames [ metricIndex ] , columnTypes [ metricIndex ] . DatabaseTypeName ( ) , values [ metricIndex ] )
cfg . fillValue . Valid = false
}
}
for i , col := range columnNames {
if i == timeIndex || i == metricIndex {
continue
}
if value , err = ConvertSqlValueColumnToFloat ( col , values [ i ] ) ; err != nil {
return err
}
// align interval start
intervalStart = math . Floor ( intervalStart / cfg . fillInterval ) * cfg . fillInterval
for i := intervalStart + cfg . fillInterval ; i < intervalEnd ; i += cfg . fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { cfg . fillValue , null . FloatFrom ( i ) } )
cfg . rowCount ++
}
}
if metricIndex == - 1 {
metric = col
} else if metricPrefix {
metric = metricPrefixValue + " " + col
}
result . Meta . Set ( "rowCount" , cfg . rowCount )
return nil
}
series , exist := pointsBySeries [ metric ]
if ! exist {
series = & tsdb . TimeSeries { Name : metric }
pointsBySeries [ metric ] = series
seriesByQueryOrder . PushBack ( metric )
}
type processCfg struct {
rowCount int
columnTypes [ ] * sql . ColumnType
columnNames [ ] string
rows * core . Rows
timeIndex int
metricIndex int
metricPrefix bool
metricPrefixValue string
fillMissing bool
pointsBySeries map [ string ] * tsdb . TimeSeries
seriesByQueryOrder * list . List
fillValue null . Float
tsdbQuery * tsdb . TsdbQuery
fillInterval float64
fillPrevious bool
}
if fillMissing {
var intervalStart float64
if ! exist {
intervalStart = float64 ( tsdbQuery . TimeRange . MustGetFrom ( ) . UnixNano ( ) / 1e6 )
} else {
intervalStart = series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64 + fillInterval
}
func ( e * sqlQueryEndpoint ) processRow ( cfg * processCfg ) error {
var timestamp float64
var value null . Float
var metric string
if fillPrevious {
if len ( series . Points ) > 0 {
fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
} else {
fillValue . Valid = false
}
}
if cfg . rowCount > rowLimit {
return fmt . Errorf ( "query row limit exceeded, limit %d" , rowLimit )
}
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
values , err := e . queryResultTransformer . TransformQueryResult ( cfg . columnTypes , cfg . rows )
if err != nil {
return err
}
for i := intervalStart ; i < timestamp ; i += fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
}
}
// converts column named time to unix timestamp in milliseconds to make
// native mysql datetime types and epoch dates work in
// annotation and table queries.
ConvertSqlTimeColumnToEpochMs ( values , cfg . timeIndex )
series . Points = append ( series . Points , tsdb . TimePoint { value , null . FloatFrom ( timestamp ) } )
switch columnValue := values [ cfg . timeIndex ] . ( type ) {
case int64 :
timestamp = float64 ( columnValue )
case float64 :
timestamp = columnValue
default :
return fmt . Errorf ( "invalid type for column time, must be of type timestamp or unix timestamp, got: %T %v" ,
columnValue , columnValue )
}
if setting . Env == setting . DEV {
e . log . Debug ( "Rows" , "metric" , metric , "time" , timestamp , "value" , value )
if cfg . metricIndex >= 0 {
if columnValue , ok := values [ cfg . metricIndex ] . ( string ) ; ok {
if cfg . metricPrefix {
cfg . metricPrefixValue = columnValue
} else {
metric = columnValue
}
} else {
return fmt . Errorf ( "column metric must be of type %s. metric column name: %s type: %s but datatype is %T" ,
strings . Join ( e . metricColumnTypes , ", " ) , cfg . columnNames [ cfg . metricIndex ] ,
cfg . columnTypes [ cfg . metricIndex ] . DatabaseTypeName ( ) , values [ cfg . metricIndex ] )
}
}
for elem := seriesByQueryOrder . Front ( ) ; elem != nil ; elem = elem . Next ( ) {
key := elem . Value . ( string )
result . Series = append ( result . Series , pointsBySeries [ key ] )
for i , col := range cfg . columnNames {
if i == cfg . timeIndex || i == cfg . metricIndex {
continue
}
if fillMissing {
series := pointsBySeries [ key ]
// fill in values from last fetched value till interval end
intervalStart := series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64
intervalEnd := float64 ( tsdbQuery . TimeRange . MustGetTo ( ) . UnixNano ( ) / 1e6 )
if value , err = ConvertSqlValueColumnToFloat ( col , values [ i ] ) ; err != nil {
return err
}
if fillPrevious {
if cfg . metricIndex == - 1 {
metric = col
} else if cfg . metricPrefix {
metric = cfg . metricPrefixValue + " " + col
}
series , exist := cfg . pointsBySeries [ metric ]
if ! exist {
series = & tsdb . TimeSeries { Name : metric }
cfg . pointsBySeries [ metric ] = series
cfg . seriesByQueryOrder . PushBack ( metric )
}
if cfg . fillMissing {
var intervalStart float64
if ! exist {
intervalStart = float64 ( cfg . tsdbQuery . TimeRange . MustGetFrom ( ) . UnixNano ( ) / 1e6 )
} else {
intervalStart = series . Points [ len ( series . Points ) - 1 ] [ 1 ] . Float64 + cfg . fillInterval
}
if cfg . fillPrevious {
if len ( series . Points ) > 0 {
fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
cfg . fillValue = series . Points [ len ( series . Points ) - 1 ] [ 0 ]
} else {
fillValue . Valid = false
cfg . fillValue . Valid = false
}
}
// align interval start
intervalStart = math . Floor ( intervalStart / fillInterval ) * fillInterval
for i := intervalStart + fillInterval ; i < intervalEnd ; i += fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { fillValue , null . FloatFrom ( i ) } )
rowCount ++
intervalStart = math . Floor ( intervalStart / cfg . fillInterval ) * cfg . fillInterval
for i := intervalStart ; i < timestamp ; i += cfg . fillInterval {
series . Points = append ( series . Points , tsdb . TimePoint { cfg . fillValue , null . FloatFrom ( i ) } )
cfg . rowCount ++
}
}
series . Points = append ( series . Points , tsdb . TimePoint { value , null . FloatFrom ( timestamp ) } )
if setting . Env == setting . DEV {
e . log . Debug ( "Rows" , "metric" , metric , "time" , timestamp , "value" , value )
}
}
result . Meta . Set ( "rowCount" , rowCount )
return nil
}
@ -526,6 +566,7 @@ func ConvertSqlTimeColumnToEpochMs(values tsdb.RowValues, timeIndex int) {
}
// ConvertSqlValueColumnToFloat converts timeseries value column to float.
//nolint: gocyclo
func ConvertSqlValueColumnToFloat ( columnName string , columnValue interface { } ) ( null . Float , error ) {
var value null . Float