Live: use StreamingDataSource for live measurments (#31713)

pull/32077/head^2
Ryan McKinley 4 years ago committed by GitHub
parent 5286ddbbc0
commit 5bb1b1602d
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 20
      packages/grafana-data/src/dataframe/StreamingDataFrame.ts
  2. 241
      packages/grafana-runtime/src/measurement/collector.test.ts
  3. 206
      packages/grafana-runtime/src/measurement/collector.ts
  4. 51
      packages/grafana-runtime/src/measurement/types.ts
  5. 16
      public/app/plugins/datasource/grafana/components/QueryEditor.tsx

@ -50,8 +50,20 @@ export class StreamingDataFrame implements DataFrame {
update(msg: DataFrameJSON) {
const { schema, data } = msg;
if (schema) {
if (this.fields.length > 0) {
// ?? keep existing data?
// Keep old values if they are the same shape
let oldValues: ArrayVector[] | undefined;
if (schema.fields.length === this.fields.length) {
let same = true;
oldValues = this.fields.map((f, idx) => {
const oldField = this.fields[idx];
if (f.name !== oldField.name || f.type !== oldField.type) {
same = false;
}
return f.values;
});
if (!same) {
oldValues = undefined;
}
}
this.name = schema.name;
@ -59,13 +71,13 @@ export class StreamingDataFrame implements DataFrame {
this.meta = schema.meta;
// Create new fields from the schema
this.fields = schema.fields.map((f) => {
this.fields = schema.fields.map((f, idx) => {
return {
config: f.config ?? {},
name: f.name,
labels: f.labels,
type: f.type ?? FieldType.other,
values: new ArrayVector(),
values: oldValues ? oldValues[idx] : new ArrayVector(),
};
});

@ -1,52 +1,42 @@
import { FieldType } from '@grafana/data';
import { MeasurementCollector } from './collector';
import { MeasurementAction } from './types';
describe('MeasurementCollector', () => {
it('should collect values', () => {
const collector = new MeasurementCollector();
collector.addBatch({
measurements: [
batch: [
{
name: 'test',
labels: { host: 'a' },
time: 100,
values: {
f0: 0,
f1: 1,
f2: 'hello',
key: 'aaa',
schema: {
fields: [
{ name: 'time', type: FieldType.time },
{ name: 'value', type: FieldType.number },
],
},
data: {
values: [
[100, 200],
[1, 2],
],
},
},
{
name: 'test',
labels: { host: 'b' },
time: 101,
values: {
f0: 0,
f1: 1,
f2: 'hello',
},
config: {
f2: {
unit: 'mph',
},
},
key: 'aaa',
data: { values: [[300], [3]] },
},
{
name: 'test',
time: 102,
labels: { host: 'a' }, // should append to first value
values: {
// note the missing values for f0/1
f2: 'world',
},
key: 'aaa',
data: { values: [[400], [4]] },
},
],
});
const frames = collector.getData();
expect(frames.length).toEqual(2);
expect(frames.length).toEqual(1);
(frames[0] as any).lastUpdateTime = 0;
expect(frames[0]).toMatchInlineSnapshot(`
Object {
StreamingDataFrame {
"fields": Array [
Object {
"config": Object {},
@ -55,196 +45,33 @@ describe('MeasurementCollector', () => {
"type": "time",
"values": Array [
100,
102,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f0",
"type": "number",
"values": Array [
0,
undefined,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f1",
"type": "number",
"values": Array [
1,
undefined,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f2",
"type": "string",
"values": Array [
"hello",
"world",
200,
300,
400,
],
},
],
"meta": Object {
"custom": Object {
"labels": Object {
"host": "a",
},
},
},
"name": "test",
"refId": undefined,
}
`);
expect(frames[1]).toMatchInlineSnapshot(`
Object {
"fields": Array [
Object {
"config": Object {},
"labels": undefined,
"name": "time",
"type": "time",
"values": Array [
101,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "b",
},
"name": "f0",
"type": "number",
"values": Array [
0,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "b",
},
"name": "f1",
"name": "value",
"type": "number",
"values": Array [
1,
],
},
Object {
"config": Object {
"unit": "mph",
},
"labels": Object {
"host": "b",
},
"name": "f2",
"type": "string",
"values": Array [
"hello",
],
},
],
"meta": Object {
"custom": Object {
"labels": Object {
"host": "b",
},
},
},
"name": "test",
"refId": undefined,
}
`);
collector.addBatch({
action: MeasurementAction.Replace,
measurements: [
{
name: 'test',
time: 105,
labels: { host: 'a' },
values: {
f1: 10,
},
},
],
});
const frames2 = collector.getData();
expect(frames2.length).toEqual(2);
expect(frames2[0].length).toEqual(1); // not three!
expect(frames2[0]).toMatchInlineSnapshot(`
Object {
"fields": Array [
Object {
"config": Object {},
"labels": undefined,
"name": "time",
"type": "time",
"values": Array [
105,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f0",
"type": "number",
"values": Array [
undefined,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f1",
"type": "number",
"values": Array [
10,
],
},
Object {
"config": Object {},
"labels": Object {
"host": "a",
},
"name": "f2",
"type": "string",
"values": Array [
undefined,
2,
3,
4,
],
},
],
"meta": Object {
"custom": Object {
"labels": Object {
"host": "a",
},
},
"lastUpdateTime": 0,
"meta": undefined,
"name": undefined,
"options": Object {
"maxLength": 600,
},
"name": "test",
"refId": undefined,
"timeFieldIndex": 0,
}
`);
collector.addBatch({
action: MeasurementAction.Clear,
measurements: [],
});
expect(collector.getData().length).toEqual(0);
});
});

@ -1,118 +1,15 @@
import {
CircularDataFrame,
Labels,
formatLabels,
FieldType,
DataFrame,
matchAllLabels,
parseLabels,
CircularVector,
ArrayVector,
} from '@grafana/data';
import { Measurement, MeasurementBatch, LiveMeasurements, MeasurementsQuery, MeasurementAction } from './types';
interface MeasurementCacheConfig {
append?: 'head' | 'tail';
capacity?: number;
}
/** This is a cache scoped to a the measurement name
*
* @alpha -- experimental
*/
export class MeasurementCache {
readonly frames: Record<string, CircularDataFrame> = {}; // key is the labels
constructor(public name: string, private config: MeasurementCacheConfig) {
if (!this.config) {
this.config = {
append: 'tail',
capacity: 600, // Default capacity 10min @ 1hz
};
}
}
getFrames(match?: Labels): DataFrame[] {
const frames = Object.values(this.frames);
if (!match) {
return frames;
}
return frames.filter((f) => {
return matchAllLabels(match, f.meta?.custom?.labels);
});
}
addMeasurement(m: Measurement, action: MeasurementAction): DataFrame {
const key = m.labels ? formatLabels(m.labels) : '';
let frame = this.frames[key];
if (!frame) {
frame = new CircularDataFrame(this.config);
frame.name = this.name;
frame.addField({
name: 'time',
type: FieldType.time,
});
for (const [key, value] of Object.entries(m.values)) {
frame.addFieldFor(value, key).labels = m.labels;
}
frame.meta = {
custom: {
labels: m.labels,
},
};
this.frames[key] = frame;
}
// Clear existing values
if (action === MeasurementAction.Replace) {
for (const field of frame.fields) {
(field.values as ArrayVector).buffer.length = 0; // same buffer, but reset to empty length
}
}
// Add the timestamp
frame.fields[0].values.add(m.time || Date.now());
// Attach field config to the current fields
if (m.config) {
for (const [key, value] of Object.entries(m.config)) {
const f = frame.fields.find((f) => f.name === key);
if (f) {
f.config = value;
}
}
}
// Append all values (a row)
for (const [key, value] of Object.entries(m.values)) {
const existingField = frame.fields.find((v) => v.name === key);
if (!existingField) {
const f = frame.addFieldFor(value, key);
f.labels = m.labels;
f.values.add(value);
} else {
existingField.values.add(value);
}
}
// Make sure all fields have the same length
frame.validate();
return frame;
}
}
import { DataFrame, DataFrameJSON, StreamingDataFrame, StreamingFrameOptions } from '@grafana/data';
import { MeasurementBatch, LiveMeasurements, MeasurementsQuery } from './types';
/**
* This will collect
*
* @alpha -- experimental
*/
export class MeasurementCollector implements LiveMeasurements {
measurements = new Map<string, MeasurementCache>();
config: MeasurementCacheConfig = {
append: 'tail',
capacity: 600, // Default capacity 10min @ 1hz
measurements = new Map<string, StreamingDataFrame>();
config: StreamingFrameOptions = {
maxLength: 600, // Default capacity 10min @ 1hz
};
//------------------------------------------------------
@ -120,93 +17,68 @@ export class MeasurementCollector implements LiveMeasurements {
//------------------------------------------------------
getData(query?: MeasurementsQuery): DataFrame[] {
const { name, labels, fields } = query || {};
let data: DataFrame[] = [];
if (name) {
// for now we only match exact names
const m = this.measurements.get(name);
if (m) {
data = m.getFrames(labels);
const { key, fields } = query || {};
// Find the data
let data: StreamingDataFrame[] = [];
if (key) {
const f = this.measurements.get(key);
if (!f) {
return [];
}
data.push(f);
} else {
// Add all frames
for (const f of this.measurements.values()) {
data.push.apply(data, f.getFrames(labels));
data.push(f);
}
}
// Filter the fields we want
if (fields && fields.length) {
let filtered: DataFrame[] = [];
for (const frame of data) {
const match = frame.fields.filter((f) => fields.includes(f.name));
if (match.length > 0) {
filtered.push({ ...frame, fields: match }); // Copy the frame with fewer fields
filtered.push({ ...frame, fields: match, length: frame.length }); // Copy the frame with fewer fields
}
}
if (filtered.length) {
return filtered;
}
}
return data;
}
getDistinctNames(): string[] {
getKeys(): string[] {
return Object.keys(this.measurements);
}
getDistinctLabels(name: string): Labels[] {
const m = this.measurements.get(name);
if (m) {
return Object.keys(m.frames).map((k) => parseLabels(k));
}
return [];
}
setCapacity(size: number) {
this.config.capacity = size;
// Now update all the circular buffers
for (const wrap of this.measurements.values()) {
for (const frame of Object.values(wrap.frames)) {
for (const field of frame.fields) {
(field.values as CircularVector).setCapacity(size);
}
}
}
}
getCapacity() {
return this.config.capacity!;
}
clear() {
this.measurements.clear();
ensureCapacity(size: number) {
// TODO...
}
//------------------------------------------------------
// Collector
//------------------------------------------------------
addBatch = (batch: MeasurementBatch) => {
let action = batch.action ?? MeasurementAction.Append;
if (action === MeasurementAction.Clear) {
this.measurements.clear();
action = MeasurementAction.Append;
addBatch = (msg: MeasurementBatch) => {
// HACK! sending one message from the backend, not a batch
if (!msg.batch) {
const df: DataFrameJSON = msg as any;
msg = { batch: [df] };
console.log('NOTE converting message to batch');
}
// Change the local buffer size
if (batch.capacity && batch.capacity !== this.config.capacity) {
this.setCapacity(batch.capacity);
}
for (const measure of msg.batch) {
const key = measure.key ?? measure.schema?.name ?? '';
for (const measure of batch.measurements) {
const name = measure.name || '';
let m = this.measurements.get(name);
if (!m) {
m = new MeasurementCache(name, this.config);
this.measurements.set(name, m);
}
if (measure.values) {
m.addMeasurement(measure, action);
let s = this.measurements.get(key);
if (s) {
s.update(measure);
} else {
console.log('invalid measurement', measure);
s = new StreamingDataFrame(measure, this.config); //
this.measurements.set(key, s);
}
}
return this;

@ -1,31 +1,4 @@
import { DataFrame, Labels, FieldConfig } from '@grafana/data';
/**
* the raw channel events are batches of Measurements
*
* @alpha -- experimental
*/
export interface Measurement {
name: string;
time?: number; // Missing will use the browser time
values: Record<string, any>;
config?: Record<string, FieldConfig>;
labels?: Labels;
}
/**
* @alpha -- experimental
*/
export enum MeasurementAction {
/** The measurements will be added to the client buffer */
Append = 'append',
/** The measurements will replace the client buffer */
Replace = 'replace',
/** All measurements will be removed from the client buffer before processing */
Clear = 'clear',
}
import { DataFrame, DataFrameJSON } from '@grafana/data';
/**
* List of Measurements sent in a batch
@ -33,29 +6,17 @@ export enum MeasurementAction {
* @alpha -- experimental
*/
export interface MeasurementBatch {
/**
* The default action is to append values to the client buffer
*/
action?: MeasurementAction;
/**
* List of measurements to process
*/
measurements: Measurement[];
/**
* This will set the capacity on the client buffer for everything
* in the measurement channel
*/
capacity?: number;
batch: DataFrameJSON[];
}
/**
* @alpha -- experimental
*/
export interface MeasurementsQuery {
name?: string;
labels?: Labels;
key?: string;
fields?: string[]; // only include the fields with these names
}
@ -66,8 +27,6 @@ export interface MeasurementsQuery {
*/
export interface LiveMeasurements {
getData(query?: MeasurementsQuery): DataFrame[];
getDistinctNames(): string[];
getDistinctLabels(name: string): Labels[];
setCapacity(size: number): void;
getCapacity(): number;
getKeys(): string[];
ensureCapacity(size: number): void;
}

@ -43,7 +43,7 @@ export class QueryEditor extends PureComponent<Props> {
...query,
measurements: {
...query.measurements,
name: sel?.value,
key: sel?.value,
},
});
onRunQuery();
@ -79,12 +79,12 @@ export class QueryEditor extends PureComponent<Props> {
let foundName = false;
if (info) {
for (const name of info.getDistinctNames()) {
for (const name of info.getKeys()) {
names.push({
value: name,
label: name,
});
if (name === measurements.name) {
if (name === measurements.key) {
foundName = true;
}
}
@ -92,11 +92,11 @@ export class QueryEditor extends PureComponent<Props> {
console.log('NO INFO for', channel);
}
if (measurements.name && !foundName) {
if (measurements.key && !foundName) {
names.push({
label: measurements.name,
value: measurements.name,
description: `Frames with name ${measurements.name}`,
label: measurements.key,
value: measurements.key,
description: `Frames with key ${measurements.key}`,
});
}
}
@ -123,7 +123,7 @@ export class QueryEditor extends PureComponent<Props> {
<InlineField label="Measurement" grow={true} labelWidth={labelWidth}>
<Select
options={names}
value={names.find((v) => v.value === measurements?.name) || names[0]}
value={names.find((v) => v.value === measurements?.key) || names[0]}
onChange={this.onMeasurementNameChanged}
allowCustomValue={true}
backspaceRemovesValue={true}

Loading…
Cancel
Save