|
|
|
|
@ -5,6 +5,7 @@ import ( |
|
|
|
|
"context" |
|
|
|
|
"database/sql" |
|
|
|
|
"fmt" |
|
|
|
|
"math" |
|
|
|
|
"strconv" |
|
|
|
|
|
|
|
|
|
"time" |
|
|
|
|
@ -176,6 +177,18 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. |
|
|
|
|
rowLimit := 1000000 |
|
|
|
|
rowCount := 0 |
|
|
|
|
|
|
|
|
|
fillMissing := query.Model.Get("fill").MustBool(false) |
|
|
|
|
var fillInterval float64 |
|
|
|
|
fillValue := null.Float{} |
|
|
|
|
if fillMissing { |
|
|
|
|
fillInterval = query.Model.Get("fillInterval").MustFloat64() * 1000 |
|
|
|
|
if query.Model.Get("fillNull").MustBool(false) == false { |
|
|
|
|
fillValue.Float64 = query.Model.Get("fillValue").MustFloat64() |
|
|
|
|
fillValue.Valid = true |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for ; rows.Next(); rowCount++ { |
|
|
|
|
if rowCount > rowLimit { |
|
|
|
|
return fmt.Errorf("MySQL query row limit exceeded, limit %d", rowLimit) |
|
|
|
|
@ -195,19 +208,50 @@ func (e MysqlQueryEndpoint) transformToTimeSeries(query *tsdb.Query, rows *core. |
|
|
|
|
return fmt.Errorf("Found row with no time value") |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if series, exist := pointsBySeries[rowData.metric]; exist { |
|
|
|
|
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) |
|
|
|
|
} else { |
|
|
|
|
series := &tsdb.TimeSeries{Name: rowData.metric} |
|
|
|
|
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) |
|
|
|
|
series, exist := pointsBySeries[rowData.metric] |
|
|
|
|
if exist == false { |
|
|
|
|
series = &tsdb.TimeSeries{Name: rowData.metric} |
|
|
|
|
pointsBySeries[rowData.metric] = series |
|
|
|
|
seriesByQueryOrder.PushBack(rowData.metric) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if fillMissing { |
|
|
|
|
var intervalStart float64 |
|
|
|
|
if exist == false { |
|
|
|
|
intervalStart = float64(tsdbQuery.TimeRange.MustGetFrom().UnixNano() / 1e6) |
|
|
|
|
} else { |
|
|
|
|
intervalStart = series.Points[len(series.Points)-1][1].Float64 + fillInterval |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// align interval start
|
|
|
|
|
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval |
|
|
|
|
|
|
|
|
|
for i := intervalStart; i < rowData.time.Float64; i += fillInterval { |
|
|
|
|
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) |
|
|
|
|
rowCount++ |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
series.Points = append(series.Points, tsdb.TimePoint{rowData.value, rowData.time}) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
for elem := seriesByQueryOrder.Front(); elem != nil; elem = elem.Next() { |
|
|
|
|
key := elem.Value.(string) |
|
|
|
|
result.Series = append(result.Series, pointsBySeries[key]) |
|
|
|
|
|
|
|
|
|
if fillMissing { |
|
|
|
|
series := pointsBySeries[key] |
|
|
|
|
// fill in values from last fetched value till interval end
|
|
|
|
|
intervalStart := series.Points[len(series.Points)-1][1].Float64 |
|
|
|
|
intervalEnd := float64(tsdbQuery.TimeRange.MustGetTo().UnixNano() / 1e6) |
|
|
|
|
|
|
|
|
|
// align interval start
|
|
|
|
|
intervalStart = math.Floor(intervalStart/fillInterval) * fillInterval |
|
|
|
|
for i := intervalStart + fillInterval; i < intervalEnd; i += fillInterval { |
|
|
|
|
series.Points = append(series.Points, tsdb.TimePoint{fillValue, null.FloatFrom(i)}) |
|
|
|
|
rowCount++ |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
result.Meta.Set("rowCount", rowCount) |
|
|
|
|
|