The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/tsdb/loki/loki.go

324 lines
10 KiB

package loki
import (
"context"
"encoding/json"
"fmt"
"net/http"
"regexp"
"strings"
"sync"
"time"
"github.com/grafana/dskit/concurrency"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/backend/datasource"
"github.com/grafana/grafana-plugin-sdk-go/backend/instancemgmt"
"github.com/grafana/grafana-plugin-sdk-go/backend/log"
"github.com/grafana/grafana-plugin-sdk-go/data"
"go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana-plugin-sdk-go/backend/httpclient"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/promlib/models"
"github.com/grafana/grafana/pkg/services/contexthandler"
"github.com/grafana/grafana/pkg/services/featuremgmt"
ngalertmodels "github.com/grafana/grafana/pkg/services/ngalert/models"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
)
type Service struct {
im instancemgmt.InstanceManager
tracer tracing.Tracer
logger log.Logger
}
var (
_ backend.QueryDataHandler = (*Service)(nil)
_ backend.StreamHandler = (*Service)(nil)
_ backend.CallResourceHandler = (*Service)(nil)
)
func ProvideService(httpClientProvider *httpclient.Provider, tracer tracing.Tracer) *Service {
return &Service{
im: datasource.NewInstanceManager(newInstanceSettings(httpClientProvider)),
tracer: tracer,
logger: backend.NewLoggerWith("logger", "tsdb.loki"),
}
}
var (
legendFormat = regexp.MustCompile(`\{\{\s*(.+?)\s*\}\}`)
stagePrepareRequest = "prepareRequest"
stageDatabaseRequest = "databaseRequest"
stageParseResponse = "parseResponse"
dashboardTitleHeader = "X-Dashboard-Title"
panelTitleHeader = "X-Panel-Title"
)
type datasourceInfo struct {
HTTPClient *http.Client
URL string
// open streams
streams map[string]data.FrameJSONCache
streamsMu sync.RWMutex
}
type QueryJSONModel struct {
dataquery.LokiDataQuery
Direction *string `json:"direction,omitempty"`
SupportingQueryType *string `json:"supportingQueryType"`
Scopes []models.ScopeFilter `json:"scopes"`
}
type ResponseOpts struct {
logsDataplane bool
}
func parseQueryModel(raw json.RawMessage) (*QueryJSONModel, error) {
model := &QueryJSONModel{}
err := json.Unmarshal(raw, model)
return model, err
}
func newInstanceSettings(httpClientProvider *httpclient.Provider) datasource.InstanceFactoryFunc {
return func(ctx context.Context, settings backend.DataSourceInstanceSettings) (instancemgmt.Instance, error) {
opts, err := settings.HTTPClientOptions(ctx)
if err != nil {
return nil, err
}
opts.ForwardHTTPHeaders = true
client, err := httpClientProvider.New(opts)
if err != nil {
return nil, err
}
model := &datasourceInfo{
HTTPClient: client,
URL: settings.URL,
streams: make(map[string]data.FrameJSONCache),
}
return model, nil
4 years ago
}
}
4 years ago
func (s *Service) CallResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender) error {
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
logger := s.logger.FromContext(ctx)
if err != nil {
logger.Error("Failed to get data source info", "error", err)
return err
}
return callResource(ctx, req, sender, dsInfo, logger, s.tracer)
}
func callResource(ctx context.Context, req *backend.CallResourceRequest, sender backend.CallResourceResponseSender, dsInfo *datasourceInfo, plog log.Logger, tracer tracing.Tracer) error {
url := req.URL
lokiURL := fmt.Sprintf("/loki/api/v1/%s", url)
ctx, span := tracer.Start(ctx, "datasource.loki.CallResource", trace.WithAttributes(
attribute.String("url", lokiURL),
))
defer span.End()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, false)
var rawLokiResponse RawLokiResponse
var err error
// suggestions is a resource endpoint that will return label and label value suggestions based
// on queries and the existing scope. By moving this to the backend we can use the logql parser to
// rewrite queries safely.
if req.Method == http.MethodPost && strings.EqualFold(req.Path, "suggestions") {
rawLokiResponse, err = GetSuggestions(ctx, api, req)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plog.FromContext(ctx).Error("Failed to get suggestions from loki", "err", err)
return err
}
} else {
rawLokiResponse, err = api.RawQuery(ctx, lokiURL)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
plog.Error("Failed resource call from loki", "err", err, "url", lokiURL)
return err
}
}
respHeaders := map[string][]string{
"content-type": {"application/json"},
}
if rawLokiResponse.Encoding != "" {
respHeaders["content-encoding"] = []string{rawLokiResponse.Encoding}
}
return sender.Send(&backend.CallResourceResponse{
Status: rawLokiResponse.Status,
Headers: respHeaders,
Body: rawLokiResponse.Body,
})
}
func (s *Service) QueryData(ctx context.Context, req *backend.QueryDataRequest) (*backend.QueryDataResponse, error) {
dsInfo, err := s.getDSInfo(ctx, req.PluginContext)
_, fromAlert := req.Headers[ngalertmodels.FromAlertHeaderName]
logger := s.logger.FromContext(ctx).With("fromAlert", fromAlert)
if err != nil {
logger.Error("Failed to get data source info", "err", err)
result := backend.NewQueryDataResponse()
return result, err
}
responseOpts := ResponseOpts{
logsDataplane: isFeatureEnabled(ctx, featuremgmt.FlagLokiLogsDataplane),
}
if isFeatureEnabled(ctx, featuremgmt.FlagLokiSendDashboardPanelNames) {
s.applyHeaders(ctx, req)
}
return queryData(ctx, req, dsInfo, responseOpts, s.tracer, logger, isFeatureEnabled(ctx, featuremgmt.FlagLokiRunQueriesInParallel), isFeatureEnabled(ctx, featuremgmt.FlagLokiStructuredMetadata), isFeatureEnabled(ctx, featuremgmt.FlagLogQLScope))
}
func (s *Service) applyHeaders(ctx context.Context, req backend.ForwardHTTPHeaders) {
reqCtx := contexthandler.FromContext(ctx)
if req == nil || reqCtx == nil || reqCtx.Req == nil {
return
}
var hList = []string{dashboardTitleHeader, panelTitleHeader}
for _, hName := range hList {
hVal := reqCtx.Req.Header.Get(hName)
if hVal == "" {
continue
}
req.SetHTTPHeader(hName, hVal)
}
}
func queryData(ctx context.Context, req *backend.QueryDataRequest, dsInfo *datasourceInfo, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger, runInParallel bool, requestStructuredMetadata, logQLScopes bool) (*backend.QueryDataResponse, error) {
result := backend.NewQueryDataResponse()
api := newLokiAPI(dsInfo.HTTPClient, dsInfo.URL, plog, tracer, requestStructuredMetadata)
start := time.Now()
queries, err := parseQuery(req, logQLScopes)
if err != nil {
plog.Error("Failed to prepare request to Loki", "error", err, "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest)
return result, err
}
plog.Info("Prepared request to Loki", "duration", time.Since(start), "queriesLength", len(queries), "stage", stagePrepareRequest, "runInParallel", runInParallel)
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries", trace.WithAttributes(
attribute.Bool("runInParallel", runInParallel),
attribute.Int("queriesLength", len(queries)),
))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
span.SetAttributes(attribute.String("query_group_id", req.GetHTTPHeader("X-Query-Group-Id")))
}
defer span.End()
start = time.Now()
// We are testing running of queries in parallel behind feature flag
if runInParallel {
resultLock := sync.Mutex{}
err = concurrency.ForEachJob(ctx, len(queries), 10, func(ctx context.Context, idx int) error {
query := queries[idx]
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
resultLock.Lock()
defer resultLock.Unlock()
result.Responses[query.RefID] = queryRes
return nil // errors are saved per-query,always return nil
})
} else {
for _, query := range queries {
queryRes := executeQuery(ctx, query, req, runInParallel, api, responseOpts, tracer, plog)
result.Responses[query.RefID] = queryRes
}
}
plog.Debug("Executed queries", "duration", time.Since(start), "queriesLength", len(queries), "runInParallel", runInParallel)
return result, err
}
func executeQuery(ctx context.Context, query *lokiQuery, req *backend.QueryDataRequest, runInParallel bool, api *LokiAPI, responseOpts ResponseOpts, tracer tracing.Tracer, plog log.Logger) backend.DataResponse {
ctx, span := tracer.Start(ctx, "datasource.loki.queryData.runQueries.runQuery", trace.WithAttributes(
attribute.Bool("runInParallel", runInParallel),
attribute.String("expr", query.Expr),
attribute.Int64("start_unixnano", query.Start.UnixNano()),
attribute.Int64("stop_unixnano", query.End.UnixNano()),
))
if req.GetHTTPHeader("X-Query-Group-Id") != "" {
span.SetAttributes(attribute.String("query_group_id", req.GetHTTPHeader("X-Query-Group-Id")))
}
defer span.End()
queryRes, err := runQuery(ctx, api, query, responseOpts, plog)
if queryRes == nil {
// we always want to return a backend.DataResponse object, even if we received just an error
queryRes = &backend.DataResponse{}
}
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
queryRes.Error = err
}
return *queryRes
}
// we extracted this part of the functionality to make it easy to unit-test it
func runQuery(ctx context.Context, api *LokiAPI, query *lokiQuery, responseOpts ResponseOpts, plog log.Logger) (*backend.DataResponse, error) {
res, err := api.DataQuery(ctx, *query, responseOpts)
if err != nil {
plog.Error("Error querying loki", "error", err)
return res, err
}
for _, frame := range res.Frames {
// Skip frames without fields
if len(frame.Fields) < 2 {
continue
}
err = adjustFrame(frame, query, false, responseOpts.logsDataplane)
if err != nil {
plog.Error("Error adjusting frame", "error", err)
return res, err
}
}
return res, nil
}
func (s *Service) getDSInfo(ctx context.Context, pluginCtx backend.PluginContext) (*datasourceInfo, error) {
i, err := s.im.Get(ctx, pluginCtx)
if err != nil {
return nil, err
}
instance, ok := i.(*datasourceInfo)
if !ok {
return nil, fmt.Errorf("failed to cast data source info")
}
return instance, nil
}
func isFeatureEnabled(ctx context.Context, feature string) bool {
return backend.GrafanaConfigFromContext(ctx).FeatureToggles().IsEnabled(feature)
}