The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/public/app/plugins/datasource/loki/streaming.ts

97 lines
2.9 KiB

import { map, Observable, defer, mergeMap } from 'rxjs';
import {
DataFrameJSON,
DataQueryRequest,
DataQueryResponse,
LiveChannelScope,
LoadingState,
StreamingDataFrame,
} from '@grafana/data';
import { getGrafanaLiveSrv, config } from '@grafana/runtime';
import { LokiDatasource } from './datasource';
import { LokiQuery } from './types';
/**
* Calculate a unique key for the query. The key is used to pick a channel and should
* be unique for each distinct query execution plan. This key is not secure and is only picked to avoid
* possible collisions
*/
export async function getLiveStreamKey(query: LokiQuery): Promise<string> {
const str = JSON.stringify({ expr: query.expr });
const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array
const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message
const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes
return hashArray.map((b) => b.toString(16).padStart(2, '0')).join('');
}
// This will get both v1 and v2 result formats
export function doLokiChannelStream(
query: LokiQuery,
ds: LokiDatasource,
options: DataQueryRequest<LokiQuery>
): Observable<DataQueryResponse> {
// maximum time to keep values
const range = options.range;
const maxDelta = range.to.valueOf() - range.from.valueOf() + 1000;
let maxLength = options.maxDataPoints ?? 1000;
if (maxLength > 100) {
// for small buffers, keep them small
maxLength *= 2;
}
let frame: StreamingDataFrame | undefined = undefined;
const updateFrame = (msg: any) => {
if (msg?.message) {
const p: DataFrameJSON = msg.message;
if (!frame) {
frame = StreamingDataFrame.fromDataFrameJSON(p, {
maxLength,
maxDelta,
displayNameFormat: query.legendFormat,
});
} else {
frame.push(p);
}
}
return frame;
};
return defer(() => getLiveStreamKey(query)).pipe(
mergeMap((key) => {
return getGrafanaLiveSrv()
.getStream<any>({
scope: LiveChannelScope.DataSource,
namespace: ds.uid,
path: `tail/${key}`,
data: {
...query,
timeRange: {
from: range.from.valueOf().toString(),
to: range.to.valueOf().toString(),
},
},
})
.pipe(
map((evt) => {
const frame = updateFrame(evt);
return {
data: frame ? [frame] : [],
state: LoadingState.Streaming,
};
})
);
})
);
}
export const convertToWebSocketUrl = (url: string) => {
const protocol = window.location.protocol === 'https:' ? 'wss://' : 'ws://';
let backend = `${protocol}${window.location.host}${config.appSubUrl}`;
if (backend.endsWith('/')) {
backend = backend.slice(0, -1);
}
return `${backend}${url}`;
};