WIP - stream loki queries on websocket

wip-streaming-loki-queries
shantanualshi 1 year ago
parent d8137083d9
commit b722f20995
No known key found for this signature in database
GPG Key ID: C8E087FA44D542E1
  1. 120
      pkg/tsdb/loki/streaming.go
  2. 14
      public/app/plugins/datasource/loki/datasource.ts
  3. 90
      public/app/plugins/datasource/loki/streaming.ts

@ -13,6 +13,8 @@ import (
"github.com/gorilla/websocket"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/services/featuremgmt"
"github.com/grafana/grafana/pkg/tsdb/loki/kinds/dataquery"
)
func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) {
@ -24,7 +26,7 @@ func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStr
}
// Expect tail/${key}
if !strings.HasPrefix(req.Path, "tail/") {
if !strings.HasPrefix(req.Path, "tail/") && !strings.HasPrefix(req.Path, "mtail/") {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("expected tail in channel path")
@ -34,7 +36,7 @@ func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStr
if err != nil {
return nil, err
}
if query.Expr != nil {
if query.Expr == nil {
return &backend.SubscribeStreamResponse{
Status: backend.SubscribeStreamStatusNotFound,
}, fmt.Errorf("missing expr in channel (subscribe)")
@ -69,10 +71,14 @@ func (s *Service) RunStream(ctx context.Context, req *backend.RunStreamRequest,
if err != nil {
return err
}
if query.Expr != nil {
if query.Expr == nil {
return fmt.Errorf("missing expr in cuannel")
}
if strings.HasPrefix(req.Path, "mtail/") {
return s.streamMetricQuery(ctx, req, sender, dsInfo)
}
logger := s.logger.FromContext(ctx)
count := int64(0)
@ -172,3 +178,111 @@ func (s *Service) PublishStream(_ context.Context, _ *backend.PublishStreamReque
Status: backend.PublishStreamStatusPermissionDenied,
}, nil
}
// TimeRange represents a time range for a query and is a property of DataQuery.
type TimeRange struct {
From time.Time `json:"from"`
To time.Time `json:"to"`
}
type MetricQueryJSONModel struct {
dataquery.LokiDataQuery
Direction *string `json:"direction,omitempty"`
SupportingQueryType *string `json:"supportingQueryType"`
TimeRange TimeRange `json:"timeRange"`
}
func (s *Service) streamMetricQuery(ctx context.Context, req *backend.RunStreamRequest, sender *backend.StreamSender, ds *datasourceInfo) error {
s.logger.Info("Running metric query", "model", req)
lokiQuery, err := parseStreamingQuery(req)
if err != nil {
return err
}
api := newLokiAPI(ds.HTTPClient, ds.URL, s.logger, s.tracer, false)
responseOpts := ResponseOpts{
metricDataplane: s.features.IsEnabled(ctx, featuremgmt.FlagLokiMetricDataplane),
logsDataplane: s.features.IsEnabled(ctx, featuremgmt.FlagLokiLogsDataplane),
}
// cast model.QueryType to dataquery.LokiQueryType
expr := lokiQuery.Expr
for i := 50; i > 0; i-- {
// sleep for 0.5 seconds
q := lokiQuery
braceIndex := strings.LastIndex(expr, "}")
q.Expr = fmt.Sprintf("%s ,__stream_shard__=\"%d\"%s", (expr)[:braceIndex], i, (expr)[braceIndex:])
res, _ := runQuery(ctx, api, q, responseOpts, s.logger)
if res == nil || len(res.Frames) == 0 {
continue
}
for _, frame := range res.Frames {
if err := sender.SendFrame(frame, data.IncludeAll); err != nil {
return err
}
}
//frame.Fields = append(frame.Fields, data.NewField("value", nil, []float64{float64(i * 10)}))
//frame.Fields = append(frame.Fields, data.NewField("time", nil, []time.Time{startTime}))
}
return nil
}
func parseStreamingQuery(req *backend.RunStreamRequest) (*lokiQuery, error) {
var model *MetricQueryJSONModel
err := json.Unmarshal(req.Data, &model)
if err != nil {
return nil, err
}
start := model.TimeRange.From
end := model.TimeRange.To
resolution := int64(1)
if model.Resolution != nil && (*model.Resolution >= 1 && *model.Resolution <= 5 || *model.Resolution == 10) {
resolution = *model.Resolution
}
interval := time.Millisecond // set this from the frontend directly
timeRange := end.Sub(start)
step, err := calculateStep(interval, timeRange, resolution, model.Step)
if err != nil {
return nil, err
}
queryType, err := parseQueryType(model.QueryType)
if err != nil {
return nil, err
}
expr := interpolateVariables(depointerizer(model.Expr), interval, timeRange, queryType, step)
direction, err := parseDirection(model.Direction)
if err != nil {
return nil, err
}
var maxLines int64
if model.MaxLines != nil {
maxLines = *model.MaxLines
}
var legendFormat string
if model.LegendFormat != nil {
legendFormat = *model.LegendFormat
}
supportingQueryType := parseSupportingQueryType(model.SupportingQueryType)
return &lokiQuery{
Expr: expr,
QueryType: queryType,
Direction: direction,
Step: step,
MaxLines: int(maxLines),
LegendFormat: legendFormat,
Start: start,
End: end,
SupportingQueryType: supportingQueryType,
// add a RefID here
}, nil
}

@ -325,6 +325,20 @@ export class LokiDatasource
);
}
const metricQ = fixedRequest.targets;
const req = {
...fixedRequest, targets: metricQ
}
return merge(
...metricQ.map((q) =>
doLokiChannelStream(
this.applyTemplateVariables(q, request.scopedVars, request.filters),
this,
req
)
)
)
if (fixedRequest.liveStreaming) {
return this.runLiveQueryThroughBackend(fixedRequest);
}

@ -7,6 +7,7 @@ import {
LiveChannelScope,
LoadingState,
StreamingDataFrame,
QueryResultMetaStat,
} from '@grafana/data';
import { getGrafanaLiveSrv, config } from '@grafana/runtime';
@ -33,7 +34,7 @@ export function doLokiChannelStream(
ds: LokiDatasource,
options: DataQueryRequest<LokiQuery>
): Observable<DataQueryResponse> {
// maximum time to keep values
console.log("Starting stream");
const range = options.range;
const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000;
let maxLength = options.maxDataPoints ?? 1000;
@ -43,6 +44,8 @@ export function doLokiChannelStream(
}
let frame: StreamingDataFrame | undefined = undefined;
let combinedFrames: StreamingDataFrame | undefined = undefined;
const updateFrame = (msg: any) => {
if (msg?.message) {
const p: DataFrameJSON = msg.message;
@ -65,24 +68,32 @@ export function doLokiChannelStream(
.getStream({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `tail/${key}`,
path: `mtail/${key}`,
data: {
...query,
timeRange: {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
from: range.from.toISOString(),
to: range.to.toISOString(),
},
},
})
.pipe(
map((evt) => {
console.log("received event:", evt);
const frame = updateFrame(evt);
if (combinedFrames === undefined) {
combinedFrames = deepClone(frame);
} else {
if (frame !== undefined) {
combineFrames(combinedFrames, frame);
}
}
return {
data: frame ? [frame] : [],
data: combinedFrames ? [combinedFrames] : [],
state: LoadingState.Streaming,
};
})
);
);
})
);
}
@ -95,3 +106,70 @@ export const convertToWebSocketUrl = (url: string) => {
}
return `${backend}${url}`;
};
function combineFrames(dest: StreamingDataFrame, source: StreamingDataFrame) {
console.log("combining frames: dest: %s, source: %s", dest.fields.length, source.fields.length);
// `dest` and `source` might have more or less fields, we need to go through all of them
const totalFields = Math.max(dest.fields.length, source.fields.length);
for (let i = 0; i < totalFields; i++) {
// For now, skip undefined fields that exist in the new frame
if (!dest.fields[i]) {
continue;
}
// Index is not reliable when frames have disordered fields, or an extra/missing field, so we find them by name.
// If the field has no name, we fallback to the old index version.
const sourceField = dest.fields[i].name
? source.fields.find((f) => f.name === dest.fields[i].name)
: source.fields[i];
if (!sourceField) {
continue;
}
dest.fields[i].values = [].concat.apply(sourceField.values, dest.fields[i].values);
if (sourceField.nanos) {
const nanos: number[] = dest.fields[i].nanos?.slice() || [];
dest.fields[i].nanos = source.fields[i].nanos?.concat(nanos);
}
}
dest.length += source.length;
dest.meta = {
...dest.meta,
stats: getCombinedMetadataStats(dest.meta?.stats ?? [], source.meta?.stats ?? []),
};
console.log("combined frames: dest: %s, source: %s", dest.fields.length, source.fields.length);
}
const TOTAL_BYTES_STAT = 'Summary: total bytes processed';
// This is specific for Loki
function getCombinedMetadataStats(
destStats: QueryResultMetaStat[],
sourceStats: QueryResultMetaStat[]
): QueryResultMetaStat[] {
// in the current approach, we only handle a single stat
const destStat = destStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
const sourceStat = sourceStats.find((s) => s.displayName === TOTAL_BYTES_STAT);
if (sourceStat != null && destStat != null) {
return [{ value: sourceStat.value + destStat.value, displayName: TOTAL_BYTES_STAT, unit: destStat.unit }];
}
// maybe one of them exist
const eitherStat = sourceStat ?? destStat;
if (eitherStat != null) {
return [eitherStat];
}
return [];
}
function deepClone(frame: StreamingDataFrame | undefined): StreamingDataFrame | undefined {
// clone the frame and return
if (frame === undefined) {
return undefined;
}
let f = frame.serialize();
return StreamingDataFrame.deserialize(f);
}

Loading…
Cancel
Save