The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/tsdb/loki/loki.go

203 lines
6.0 KiB

package loki
import (
"context"
"fmt"
"net/http"
"regexp"
"strings"
"time"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/infra/httpclient"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/interval"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/opentracing/opentracing-go"
5 years ago
"github.com/prometheus/common/config"
"github.com/prometheus/common/model"
)
type LokiExecutor struct {
intervalCalculator interval.Calculator
httpClientProvider httpclient.Provider
}
// nolint:staticcheck // plugins.DataPlugin deprecated
func New(httpClientProvider httpclient.Provider) func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
// nolint:staticcheck // plugins.DataPlugin deprecated
return func(dsInfo *models.DataSource) (plugins.DataPlugin, error) {
return &LokiExecutor{
intervalCalculator: interval.NewCalculator(interval.CalculatorOptions{MinInterval: time.Second * 1}),
httpClientProvider: httpClientProvider,
}, nil
}
}
var (
plog = log.New("tsdb.loki")
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
)
// DataQuery executes a Loki query.
//nolint: staticcheck // plugins.DataPlugin deprecated
func (e *LokiExecutor) DataQuery(ctx context.Context, dsInfo *models.DataSource,
queryContext plugins.DataQuery) (plugins.DataResponse, error) {
result := plugins.DataResponse{
Results: map[string]plugins.DataQueryResult{},
}
tlsConfig, err := dsInfo.GetTLSConfig(e.httpClientProvider)
5 years ago
if err != nil {
return plugins.DataResponse{}, err
}
transport, err := dsInfo.GetHTTPTransport(e.httpClientProvider)
if err != nil {
return plugins.DataResponse{}, err
}
client := &client.DefaultClient{
Address: dsInfo.Url,
Username: dsInfo.BasicAuthUser,
Password: dsInfo.DecryptedBasicAuthPassword(),
5 years ago
TLSConfig: config.TLSConfig{
InsecureSkipVerify: tlsConfig.InsecureSkipVerify,
},
Tripperware: func(t http.RoundTripper) http.RoundTripper {
return transport
},
}
queries, err := e.parseQuery(dsInfo, queryContext)
if err != nil {
return plugins.DataResponse{}, err
}
for _, query := range queries {
plog.Debug("Sending query", "start", query.Start, "end", query.End, "step", query.Step, "query", query.Expr)
span, _ := opentracing.StartSpanFromContext(ctx, "alerting.loki")
span.SetTag("expr", query.Expr)
span.SetTag("start_unixnano", query.Start.UnixNano())
span.SetTag("stop_unixnano", query.End.UnixNano())
defer span.Finish()
//Currently hard coded as not used - applies to log queries
limit := 1000
//Currently hard coded as not used - applies to queries which produce a stream response
interval := time.Second * 1
value, err := client.QueryRange(query.Expr, limit, query.Start, query.End, logproto.BACKWARD, query.Step, interval, false)
if err != nil {
return plugins.DataResponse{}, err
}
queryResult, err := parseResponse(value, query)
if err != nil {
return plugins.DataResponse{}, err
}
result.Results[query.RefID] = queryResult
}
return result, nil
}
//If legend (using of name or pattern instead of time series name) is used, use that name/pattern for formatting
func formatLegend(metric model.Metric, query *lokiQuery) string {
if query.LegendFormat == "" {
return metric.String()
}
result := legendFormat.ReplaceAllFunc([]byte(query.LegendFormat), func(in []byte) []byte {
labelName := strings.Replace(string(in), "{{", "", 1)
labelName = strings.Replace(labelName, "}}", "", 1)
labelName = strings.TrimSpace(labelName)
if val, exists := metric[model.LabelName(labelName)]; exists {
return []byte(val)
}
return []byte{}
})
return string(result)
}
func (e *LokiExecutor) parseQuery(dsInfo *models.DataSource, queryContext plugins.DataQuery) ([]*lokiQuery, error) {
qs := []*lokiQuery{}
for _, queryModel := range queryContext.Queries {
expr, err := queryModel.Model.Get("expr").String()
if err != nil {
return nil, fmt.Errorf("failed to parse Expr: %v", err)
}
format := queryModel.Model.Get("legendFormat").MustString("")
start, err := queryContext.TimeRange.ParseFrom()
if err != nil {
return nil, fmt.Errorf("failed to parse From: %v", err)
}
end, err := queryContext.TimeRange.ParseTo()
if err != nil {
return nil, fmt.Errorf("failed to parse To: %v", err)
}
dsInterval, err := interval.GetIntervalFrom(dsInfo, queryModel.Model, time.Second)
if err != nil {
return nil, fmt.Errorf("failed to parse Interval: %v", err)
}
interval := e.intervalCalculator.Calculate(*queryContext.TimeRange, dsInterval)
step := time.Duration(int64(interval.Value))
qs = append(qs, &lokiQuery{
Expr: expr,
Step: step,
LegendFormat: format,
Start: start,
End: end,
RefID: queryModel.RefID,
})
}
return qs, nil
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func parseResponse(value *loghttp.QueryResponse, query *lokiQuery) (plugins.DataQueryResult, error) {
var queryRes plugins.DataQueryResult
frames := data.Frames{}
//We are currently processing only matrix results (for alerting)
matrix, ok := value.Data.Result.(loghttp.Matrix)
if !ok {
return queryRes, fmt.Errorf("unsupported result format: %q", value.Data.ResultType)
}
for _, v := range matrix {
name := formatLegend(v.Metric, query)
tags := make(map[string]string, len(v.Metric))
timeVector := make([]time.Time, 0, len(v.Values))
values := make([]float64, 0, len(v.Values))
for k, v := range v.Metric {
tags[string(k)] = string(v)
}
for _, k := range v.Values {
timeVector = append(timeVector, time.Unix(k.Timestamp.Unix(), 0).UTC())
values = append(values, float64(k.Value))
}
frames = append(frames, data.NewFrame(name,
data.NewField("time", nil, timeVector),
data.NewField("value", tags, values).SetConfig(&data.FieldConfig{DisplayNameFromDS: name})))
}
queryRes.Dataframes = plugins.NewDecodedDataFrames(frames)
return queryRes, nil
}