From 5bb1b1602d1e5bd614e2c11fa3a1e984df988357 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Wed, 17 Mar 2021 21:46:56 -0700 Subject: [PATCH] Live: use StreamingDataSource for live measurments (#31713) --- .../src/dataframe/StreamingDataFrame.ts | 20 +- .../src/measurement/collector.test.ts | 241 +++--------------- .../src/measurement/collector.ts | 206 +++------------ .../grafana-runtime/src/measurement/types.ts | 51 +--- .../grafana/components/QueryEditor.tsx | 16 +- 5 files changed, 102 insertions(+), 432 deletions(-) diff --git a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts index 57b86830d2f..e78693e77a7 100644 --- a/packages/grafana-data/src/dataframe/StreamingDataFrame.ts +++ b/packages/grafana-data/src/dataframe/StreamingDataFrame.ts @@ -50,8 +50,20 @@ export class StreamingDataFrame implements DataFrame { update(msg: DataFrameJSON) { const { schema, data } = msg; if (schema) { - if (this.fields.length > 0) { - // ?? keep existing data? + // Keep old values if they are the same shape + let oldValues: ArrayVector[] | undefined; + if (schema.fields.length === this.fields.length) { + let same = true; + oldValues = this.fields.map((f, idx) => { + const oldField = this.fields[idx]; + if (f.name !== oldField.name || f.type !== oldField.type) { + same = false; + } + return f.values; + }); + if (!same) { + oldValues = undefined; + } } this.name = schema.name; @@ -59,13 +71,13 @@ export class StreamingDataFrame implements DataFrame { this.meta = schema.meta; // Create new fields from the schema - this.fields = schema.fields.map((f) => { + this.fields = schema.fields.map((f, idx) => { return { config: f.config ?? {}, name: f.name, labels: f.labels, type: f.type ?? FieldType.other, - values: new ArrayVector(), + values: oldValues ? oldValues[idx] : new ArrayVector(), }; }); diff --git a/packages/grafana-runtime/src/measurement/collector.test.ts b/packages/grafana-runtime/src/measurement/collector.test.ts index 31e6258530a..074c28bfc13 100644 --- a/packages/grafana-runtime/src/measurement/collector.test.ts +++ b/packages/grafana-runtime/src/measurement/collector.test.ts @@ -1,52 +1,42 @@ +import { FieldType } from '@grafana/data'; import { MeasurementCollector } from './collector'; -import { MeasurementAction } from './types'; describe('MeasurementCollector', () => { it('should collect values', () => { const collector = new MeasurementCollector(); collector.addBatch({ - measurements: [ + batch: [ { - name: 'test', - labels: { host: 'a' }, - time: 100, - values: { - f0: 0, - f1: 1, - f2: 'hello', + key: 'aaa', + schema: { + fields: [ + { name: 'time', type: FieldType.time }, + { name: 'value', type: FieldType.number }, + ], + }, + data: { + values: [ + [100, 200], + [1, 2], + ], }, }, { - name: 'test', - labels: { host: 'b' }, - time: 101, - values: { - f0: 0, - f1: 1, - f2: 'hello', - }, - config: { - f2: { - unit: 'mph', - }, - }, + key: 'aaa', + data: { values: [[300], [3]] }, }, { - name: 'test', - time: 102, - labels: { host: 'a' }, // should append to first value - values: { - // note the missing values for f0/1 - f2: 'world', - }, + key: 'aaa', + data: { values: [[400], [4]] }, }, ], }); const frames = collector.getData(); - expect(frames.length).toEqual(2); + expect(frames.length).toEqual(1); + (frames[0] as any).lastUpdateTime = 0; expect(frames[0]).toMatchInlineSnapshot(` - Object { + StreamingDataFrame { "fields": Array [ Object { "config": Object {}, @@ -55,196 +45,33 @@ describe('MeasurementCollector', () => { "type": "time", "values": Array [ 100, - 102, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f0", - "type": "number", - "values": Array [ - 0, - undefined, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f1", - "type": "number", - "values": Array [ - 1, - undefined, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f2", - "type": "string", - "values": Array [ - "hello", - "world", + 200, + 300, + 400, ], }, - ], - "meta": Object { - "custom": Object { - "labels": Object { - "host": "a", - }, - }, - }, - "name": "test", - "refId": undefined, - } - `); - expect(frames[1]).toMatchInlineSnapshot(` - Object { - "fields": Array [ Object { "config": Object {}, "labels": undefined, - "name": "time", - "type": "time", - "values": Array [ - 101, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "b", - }, - "name": "f0", - "type": "number", - "values": Array [ - 0, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "b", - }, - "name": "f1", + "name": "value", "type": "number", "values": Array [ 1, - ], - }, - Object { - "config": Object { - "unit": "mph", - }, - "labels": Object { - "host": "b", - }, - "name": "f2", - "type": "string", - "values": Array [ - "hello", - ], - }, - ], - "meta": Object { - "custom": Object { - "labels": Object { - "host": "b", - }, - }, - }, - "name": "test", - "refId": undefined, - } - `); - - collector.addBatch({ - action: MeasurementAction.Replace, - measurements: [ - { - name: 'test', - time: 105, - labels: { host: 'a' }, - values: { - f1: 10, - }, - }, - ], - }); - - const frames2 = collector.getData(); - expect(frames2.length).toEqual(2); - expect(frames2[0].length).toEqual(1); // not three! - expect(frames2[0]).toMatchInlineSnapshot(` - Object { - "fields": Array [ - Object { - "config": Object {}, - "labels": undefined, - "name": "time", - "type": "time", - "values": Array [ - 105, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f0", - "type": "number", - "values": Array [ - undefined, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f1", - "type": "number", - "values": Array [ - 10, - ], - }, - Object { - "config": Object {}, - "labels": Object { - "host": "a", - }, - "name": "f2", - "type": "string", - "values": Array [ - undefined, + 2, + 3, + 4, ], }, ], - "meta": Object { - "custom": Object { - "labels": Object { - "host": "a", - }, - }, + "lastUpdateTime": 0, + "meta": undefined, + "name": undefined, + "options": Object { + "maxLength": 600, }, - "name": "test", "refId": undefined, + "timeFieldIndex": 0, } `); - - collector.addBatch({ - action: MeasurementAction.Clear, - measurements: [], - }); - expect(collector.getData().length).toEqual(0); }); }); diff --git a/packages/grafana-runtime/src/measurement/collector.ts b/packages/grafana-runtime/src/measurement/collector.ts index f84070ebb03..caf29ab848b 100644 --- a/packages/grafana-runtime/src/measurement/collector.ts +++ b/packages/grafana-runtime/src/measurement/collector.ts @@ -1,118 +1,15 @@ -import { - CircularDataFrame, - Labels, - formatLabels, - FieldType, - DataFrame, - matchAllLabels, - parseLabels, - CircularVector, - ArrayVector, -} from '@grafana/data'; -import { Measurement, MeasurementBatch, LiveMeasurements, MeasurementsQuery, MeasurementAction } from './types'; - -interface MeasurementCacheConfig { - append?: 'head' | 'tail'; - capacity?: number; -} - -/** This is a cache scoped to a the measurement name - * - * @alpha -- experimental - */ -export class MeasurementCache { - readonly frames: Record = {}; // key is the labels - - constructor(public name: string, private config: MeasurementCacheConfig) { - if (!this.config) { - this.config = { - append: 'tail', - capacity: 600, // Default capacity 10min @ 1hz - }; - } - } - - getFrames(match?: Labels): DataFrame[] { - const frames = Object.values(this.frames); - if (!match) { - return frames; - } - return frames.filter((f) => { - return matchAllLabels(match, f.meta?.custom?.labels); - }); - } - - addMeasurement(m: Measurement, action: MeasurementAction): DataFrame { - const key = m.labels ? formatLabels(m.labels) : ''; - let frame = this.frames[key]; - - if (!frame) { - frame = new CircularDataFrame(this.config); - frame.name = this.name; - frame.addField({ - name: 'time', - type: FieldType.time, - }); - - for (const [key, value] of Object.entries(m.values)) { - frame.addFieldFor(value, key).labels = m.labels; - } - - frame.meta = { - custom: { - labels: m.labels, - }, - }; - - this.frames[key] = frame; - } - - // Clear existing values - if (action === MeasurementAction.Replace) { - for (const field of frame.fields) { - (field.values as ArrayVector).buffer.length = 0; // same buffer, but reset to empty length - } - } - - // Add the timestamp - frame.fields[0].values.add(m.time || Date.now()); - - // Attach field config to the current fields - if (m.config) { - for (const [key, value] of Object.entries(m.config)) { - const f = frame.fields.find((f) => f.name === key); - if (f) { - f.config = value; - } - } - } - - // Append all values (a row) - for (const [key, value] of Object.entries(m.values)) { - const existingField = frame.fields.find((v) => v.name === key); - if (!existingField) { - const f = frame.addFieldFor(value, key); - f.labels = m.labels; - f.values.add(value); - } else { - existingField.values.add(value); - } - } - - // Make sure all fields have the same length - frame.validate(); - return frame; - } -} +import { DataFrame, DataFrameJSON, StreamingDataFrame, StreamingFrameOptions } from '@grafana/data'; +import { MeasurementBatch, LiveMeasurements, MeasurementsQuery } from './types'; /** + * This will collect + * * @alpha -- experimental */ export class MeasurementCollector implements LiveMeasurements { - measurements = new Map(); - config: MeasurementCacheConfig = { - append: 'tail', - capacity: 600, // Default capacity 10min @ 1hz + measurements = new Map(); + config: StreamingFrameOptions = { + maxLength: 600, // Default capacity 10min @ 1hz }; //------------------------------------------------------ @@ -120,93 +17,68 @@ export class MeasurementCollector implements LiveMeasurements { //------------------------------------------------------ getData(query?: MeasurementsQuery): DataFrame[] { - const { name, labels, fields } = query || {}; - - let data: DataFrame[] = []; - if (name) { - // for now we only match exact names - const m = this.measurements.get(name); - if (m) { - data = m.getFrames(labels); + const { key, fields } = query || {}; + + // Find the data + let data: StreamingDataFrame[] = []; + if (key) { + const f = this.measurements.get(key); + if (!f) { + return []; } + data.push(f); } else { + // Add all frames for (const f of this.measurements.values()) { - data.push.apply(data, f.getFrames(labels)); + data.push(f); } } + // Filter the fields we want if (fields && fields.length) { let filtered: DataFrame[] = []; for (const frame of data) { const match = frame.fields.filter((f) => fields.includes(f.name)); if (match.length > 0) { - filtered.push({ ...frame, fields: match }); // Copy the frame with fewer fields + filtered.push({ ...frame, fields: match, length: frame.length }); // Copy the frame with fewer fields } } + if (filtered.length) { + return filtered; + } } return data; } - getDistinctNames(): string[] { + getKeys(): string[] { return Object.keys(this.measurements); } - getDistinctLabels(name: string): Labels[] { - const m = this.measurements.get(name); - if (m) { - return Object.keys(m.frames).map((k) => parseLabels(k)); - } - return []; - } - - setCapacity(size: number) { - this.config.capacity = size; - - // Now update all the circular buffers - for (const wrap of this.measurements.values()) { - for (const frame of Object.values(wrap.frames)) { - for (const field of frame.fields) { - (field.values as CircularVector).setCapacity(size); - } - } - } - } - - getCapacity() { - return this.config.capacity!; - } - - clear() { - this.measurements.clear(); + ensureCapacity(size: number) { + // TODO... } //------------------------------------------------------ // Collector //------------------------------------------------------ - addBatch = (batch: MeasurementBatch) => { - let action = batch.action ?? MeasurementAction.Append; - if (action === MeasurementAction.Clear) { - this.measurements.clear(); - action = MeasurementAction.Append; + addBatch = (msg: MeasurementBatch) => { + // HACK! sending one message from the backend, not a batch + if (!msg.batch) { + const df: DataFrameJSON = msg as any; + msg = { batch: [df] }; + console.log('NOTE converting message to batch'); } - // Change the local buffer size - if (batch.capacity && batch.capacity !== this.config.capacity) { - this.setCapacity(batch.capacity); - } + for (const measure of msg.batch) { + const key = measure.key ?? measure.schema?.name ?? ''; - for (const measure of batch.measurements) { - const name = measure.name || ''; - let m = this.measurements.get(name); - if (!m) { - m = new MeasurementCache(name, this.config); - this.measurements.set(name, m); - } - if (measure.values) { - m.addMeasurement(measure, action); + let s = this.measurements.get(key); + if (s) { + s.update(measure); } else { - console.log('invalid measurement', measure); + s = new StreamingDataFrame(measure, this.config); // + this.measurements.set(key, s); } } return this; diff --git a/packages/grafana-runtime/src/measurement/types.ts b/packages/grafana-runtime/src/measurement/types.ts index 687e4947346..7727b3173f0 100644 --- a/packages/grafana-runtime/src/measurement/types.ts +++ b/packages/grafana-runtime/src/measurement/types.ts @@ -1,31 +1,4 @@ -import { DataFrame, Labels, FieldConfig } from '@grafana/data'; - -/** - * the raw channel events are batches of Measurements - * - * @alpha -- experimental - */ -export interface Measurement { - name: string; - time?: number; // Missing will use the browser time - values: Record; - config?: Record; - labels?: Labels; -} - -/** - * @alpha -- experimental - */ -export enum MeasurementAction { - /** The measurements will be added to the client buffer */ - Append = 'append', - - /** The measurements will replace the client buffer */ - Replace = 'replace', - - /** All measurements will be removed from the client buffer before processing */ - Clear = 'clear', -} +import { DataFrame, DataFrameJSON } from '@grafana/data'; /** * List of Measurements sent in a batch @@ -33,29 +6,17 @@ export enum MeasurementAction { * @alpha -- experimental */ export interface MeasurementBatch { - /** - * The default action is to append values to the client buffer - */ - action?: MeasurementAction; - /** * List of measurements to process */ - measurements: Measurement[]; - - /** - * This will set the capacity on the client buffer for everything - * in the measurement channel - */ - capacity?: number; + batch: DataFrameJSON[]; } /** * @alpha -- experimental */ export interface MeasurementsQuery { - name?: string; - labels?: Labels; + key?: string; fields?: string[]; // only include the fields with these names } @@ -66,8 +27,6 @@ export interface MeasurementsQuery { */ export interface LiveMeasurements { getData(query?: MeasurementsQuery): DataFrame[]; - getDistinctNames(): string[]; - getDistinctLabels(name: string): Labels[]; - setCapacity(size: number): void; - getCapacity(): number; + getKeys(): string[]; + ensureCapacity(size: number): void; } diff --git a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx index acb3240f02a..54b7d602d50 100644 --- a/public/app/plugins/datasource/grafana/components/QueryEditor.tsx +++ b/public/app/plugins/datasource/grafana/components/QueryEditor.tsx @@ -43,7 +43,7 @@ export class QueryEditor extends PureComponent { ...query, measurements: { ...query.measurements, - name: sel?.value, + key: sel?.value, }, }); onRunQuery(); @@ -79,12 +79,12 @@ export class QueryEditor extends PureComponent { let foundName = false; if (info) { - for (const name of info.getDistinctNames()) { + for (const name of info.getKeys()) { names.push({ value: name, label: name, }); - if (name === measurements.name) { + if (name === measurements.key) { foundName = true; } } @@ -92,11 +92,11 @@ export class QueryEditor extends PureComponent { console.log('NO INFO for', channel); } - if (measurements.name && !foundName) { + if (measurements.key && !foundName) { names.push({ - label: measurements.name, - value: measurements.name, - description: `Frames with name ${measurements.name}`, + label: measurements.key, + value: measurements.key, + description: `Frames with key ${measurements.key}`, }); } } @@ -123,7 +123,7 @@ export class QueryEditor extends PureComponent {