diff --git a/pkg/tsdb/loki/streaming.go b/pkg/tsdb/loki/streaming.go index 29eb7be93a5..adf62ae5d87 100644 --- a/pkg/tsdb/loki/streaming.go +++ b/pkg/tsdb/loki/streaming.go @@ -13,6 +13,8 @@ import ( "github.com/gorilla/websocket" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery" ) func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { @@ -24,7 +26,7 @@ func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStr } // Expect tail/${key} - if !strings.HasPrefix(req.Path, "tail/") { + if !strings.HasPrefix(req.Path, "tail/") && !strings.HasPrefix(req.Path, "mtail/") { return &backend.SubscribeStreamResponse{ Status: backend.SubscribeStreamStatusNotFound, }, fmt.Errorf("expected tail in channel path") @@ -34,7 +36,7 @@ func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStr if err != nil { return nil, err } - if query.Expr != nil { + if query.Expr == nil { return &backend.SubscribeStreamResponse{ Status: backend.SubscribeStreamStatusNotFound, }, fmt.Errorf("missing expr in channel (subscribe)") @@ -69,10 +71,14 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest, if err != nil { return err } - if query.Expr != nil { + if query.Expr == nil { return fmt.Errorf("missing expr in cuannel") } + if strings.HasPrefix(req.Path, "mtail/") { + return s.streamMetricQuery(ctx, req, sender, dsInfo) + } + logger := s.logger.FromContext(ctx) count := int64(0) @@ -172,3 +178,111 @@ func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamReque Status: backend.PublishStreamStatusPermissionDenied, }, nil } + +// TimeRange represents a time range for a query and is a property of DataQuery. +type TimeRange struct { + From time.Time `json:"from"` + To time.Time `json:"to"` +} + +type MetricQueryJSONModel struct { + dataquery.LokiDataQuery + Direction *string `json:"direction,omitempty"` + SupportingQueryType *string `json:"supportingQueryType"` + TimeRange TimeRange `json:"timeRange"` +} + +func (s *Service) streamMetricQuery(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, ds *datasourceInfo) error { + s.logger.Info("Running metric query", "model", req) + + lokiQuery, err := parseStreamingQuery(req) + if err != nil { + return err + } + + api := newLokiAPI(ds.HTTPClient, ds.URL, s.logger, s.tracer, false) + responseOpts := ResponseOpts{ + metricDataplane: s.features.IsEnabled(ctx, featuremgmt.FlagLokiMetricDataplane), + logsDataplane: s.features.IsEnabled(ctx, featuremgmt.FlagLokiLogsDataplane), + } + // cast model.QueryType to dataquery.LokiQueryType + + expr := lokiQuery.Expr + for i := 50; i > 0; i-- { + // sleep for 0.5 seconds + q := lokiQuery + braceIndex := strings.LastIndex(expr, "}") + q.Expr = fmt.Sprintf("%s ,__stream_shard__=\"%d\"%s", (expr)[:braceIndex], i, (expr)[braceIndex:]) + res, _ := runQuery(ctx, api, q, responseOpts, s.logger) + if res == nil || len(res.Frames) == 0 { + continue + } + + for _, frame := range res.Frames { + if err := sender.SendFrame(frame, data.IncludeAll); err != nil { + return err + } + } + //frame.Fields = append(frame.Fields, data.NewField("value", nil, []float64{float64(i * 10)})) + //frame.Fields = append(frame.Fields, data.NewField("time", nil, []time.Time{startTime})) + } + return nil +} + +func parseStreamingQuery(req *backend.RunStreamRequest) (*lokiQuery, error) { + var model *MetricQueryJSONModel + err := json.Unmarshal(req.Data, &model) + if err != nil { + return nil, err + } + + start := model.TimeRange.From + end := model.TimeRange.To + + resolution := int64(1) + if model.Resolution != nil && (*model.Resolution >= 1 && *model.Resolution <= 5 || *model.Resolution == 10) { + resolution = *model.Resolution + } + + interval := time.Millisecond // set this from the frontend directly + timeRange := end.Sub(start) + + step, err := calculateStep(interval, timeRange, resolution, model.Step) + if err != nil { + return nil, err + } + + queryType, err := parseQueryType(model.QueryType) + if err != nil { + return nil, err + } + + expr := interpolateVariables(depointerizer(model.Expr), interval, timeRange, queryType, step) + direction, err := parseDirection(model.Direction) + if err != nil { + return nil, err + } + + var maxLines int64 + if model.MaxLines != nil { + maxLines = *model.MaxLines + } + var legendFormat string + if model.LegendFormat != nil { + legendFormat = *model.LegendFormat + } + + supportingQueryType := parseSupportingQueryType(model.SupportingQueryType) + return &lokiQuery{ + Expr: expr, + QueryType: queryType, + Direction: direction, + Step: step, + MaxLines: int(maxLines), + LegendFormat: legendFormat, + Start: start, + End: end, + SupportingQueryType: supportingQueryType, + // add a RefID here + }, nil +} diff --git a/public/app/plugins/datasource/loki/datasource.ts b/public/app/plugins/datasource/loki/datasource.ts index e63c317ff30..55249b809d4 100644 --- a/public/app/plugins/datasource/loki/datasource.ts +++ b/public/app/plugins/datasource/loki/datasource.ts @@ -325,6 +325,20 @@ export class LokiDatasource ); } + const metricQ = fixedRequest.targets; + const req = { + ...fixedRequest, targets: metricQ + } + return merge( + ...metricQ.map((q) => + doLokiChannelStream( + this.applyTemplateVariables(q, request.scopedVars, request.filters), + this, + req + ) + ) + ) + if (fixedRequest.liveStreaming) { return this.runLiveQueryThroughBackend(fixedRequest); } diff --git a/public/app/plugins/datasource/loki/streaming.ts b/public/app/plugins/datasource/loki/streaming.ts index 57a515ee723..ec9cd21000b 100644 --- a/public/app/plugins/datasource/loki/streaming.ts +++ b/public/app/plugins/datasource/loki/streaming.ts @@ -7,6 +7,7 @@ import { LiveChannelScope, LoadingState, StreamingDataFrame, + QueryResultMetaStat, } from '@grafana/data'; import { getGrafanaLiveSrv, config } from '@grafana/runtime'; @@ -33,7 +34,7 @@ export function doLokiChannelStream( ds: LokiDatasource, options: DataQueryRequest ): Observable { - // maximum time to keep values + console.log("Starting stream"); const range = options.range; const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000; let maxLength = options.maxDataPoints ?? 1000; @@ -43,6 +44,8 @@ export function doLokiChannelStream( } let frame: StreamingDataFrame | undefined = undefined; + let combinedFrames: StreamingDataFrame | undefined = undefined; + const updateFrame = (msg: any) => { if (msg?.message) { const p: DataFrameJSON = msg.message; @@ -65,24 +68,32 @@ export function doLokiChannelStream( .getStream({ scope: LiveChannelScope.DataSource, namespace: ds.uid, - path: `tail/${key}`, + path: `mtail/${key}`, data: { ...query, timeRange: { - from: range.from.valueOf().toString(), - to: range.to.valueOf().toString(), + from: range.from.toISOString(), + to: range.to.toISOString(), }, }, }) .pipe( map((evt) => { + console.log("received event:", evt); const frame = updateFrame(evt); + if (combinedFrames === undefined) { + combinedFrames = deepClone(frame); + } else { + if (frame !== undefined) { + combineFrames(combinedFrames, frame); + } + } return { - data: frame ? [frame] : [], + data: combinedFrames ? [combinedFrames] : [], state: LoadingState.Streaming, }; }) - ); + ); }) ); } @@ -95,3 +106,70 @@ export const convertToWebSocketUrl = (url: string) => { } return `${backend}${url}`; }; + + +function combineFrames(dest: StreamingDataFrame, source: StreamingDataFrame) { + console.log("combining frames: dest: %s, source: %s", dest.fields.length, source.fields.length); + + // `dest` and `source` might have more or less fields, we need to go through all of them + const totalFields = Math.max(dest.fields.length, source.fields.length); + for (let i = 0; i < totalFields; i++) { + // For now, skip undefined fields that exist in the new frame + if (!dest.fields[i]) { + continue; + } + // Index is not reliable when frames have disordered fields, or an extra/missing field, so we find them by name. + // If the field has no name, we fallback to the old index version. + const sourceField = dest.fields[i].name + ? source.fields.find((f) => f.name === dest.fields[i].name) + : source.fields[i]; + if (!sourceField) { + continue; + } + dest.fields[i].values = [].concat.apply(sourceField.values, dest.fields[i].values); + if (sourceField.nanos) { + const nanos: number[] = dest.fields[i].nanos?.slice() || []; + dest.fields[i].nanos = source.fields[i].nanos?.concat(nanos); + } + } + dest.length += source.length; + dest.meta = { + ...dest.meta, + stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []), + }; + console.log("combined frames: dest: %s, source: %s", dest.fields.length, source.fields.length); +} + + +const TOTAL_BYTES_STAT = 'Summary: total bytes processed'; +// This is specific for Loki +function getCombinedMetadataStats( + destStats: QueryResultMetaStat[], + sourceStats: QueryResultMetaStat[] +): QueryResultMetaStat[] { + // in the current approach, we only handle a single stat + const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT); + const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT); + + if (sourceStat != null && destStat != null) { + return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }]; + } + + // maybe one of them exist + const eitherStat = sourceStat ?? destStat; + if (eitherStat != null) { + return [eitherStat]; + } + + return []; +} + +function deepClone(frame: StreamingDataFrame | undefined): StreamingDataFrame | undefined { + // clone the frame and return + if (frame === undefined) { + return undefined; + } + let f = frame.serialize(); + return StreamingDataFrame.deserialize(f); +} +