mirror of https://github.com/grafana/grafana
prometheushacktoberfestmetricsmonitoringalertinggrafanagoinfluxdbmysqlpostgresanalyticsdata-visualizationdashboardbusiness-intelligenceelasticsearch
You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
163 lines
4.8 KiB
163 lines
4.8 KiB
|
4 years ago
|
---
|
||
|
|
title: Build a streaming data source plugin
|
||
|
3 years ago
|
aliases:
|
||
|
|
- ../../../plugins/build-a-streaming-data-source-plugin/
|
||
|
|
description: How to build a streaming data source plugin.
|
||
|
|
keywords:
|
||
|
|
- grafana
|
||
|
|
- plugins
|
||
|
|
- plugin
|
||
|
|
- streaming
|
||
|
|
- streaming data source
|
||
|
|
- datasource
|
||
|
|
weight: 600
|
||
|
4 years ago
|
---
|
||
|
6 years ago
|
|
||
|
|
# Build a streaming data source plugin
|
||
|
|
|
||
|
3 years ago
|
In Grafana, you can set your dashboards to automatically refresh at a certain interval, no matter what data source you use. Unfortunately, this means that your queries are requesting all the data to be sent again, regardless of whether the data has actually changed. Adding streaming to a plugin helps reduce queries so your dashboard is only updated when new data becomes available.
|
||
|
|
|
||
|
|
## Before you begin
|
||
|
6 years ago
|
|
||
|
3 years ago
|
This guide assumes that you're already familiar with how to [Build a data source plugin]({{< relref "./build-a-data-source-plugin" >}})
|
||
|
6 years ago
|
|
||
|
3 years ago
|
Grafana uses [RxJS](https://rxjs.dev/) to continuously send data from a data source to a panel visualization.
|
||
|
|
|
||
|
|
> **Note:** To learn more about RxJs, refer to the [RxJS documentation](https://rxjs.dev/guide/overview).
|
||
|
6 years ago
|
|
||
|
3 years ago
|
## Add streaming to your data source
|
||
|
|
|
||
|
|
Enable streaming for your data source plugin to update your dashboard when new data becomes available.
|
||
|
6 years ago
|
|
||
|
|
For example, a streaming data source plugin can connect to a websocket, or subscribe to a message bus, and update the visualization whenever a new message is available.
|
||
|
|
|
||
|
3 years ago
|
### Step 1: Edit the `plugin.json` file
|
||
|
|
|
||
|
|
Enable streaming for your data source in the `plugin.json` file.
|
||
|
|
|
||
|
|
```json
|
||
|
|
{
|
||
|
|
"streaming": true
|
||
|
|
}
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
### Step 2: Change the signature of the `query` method
|
||
|
6 years ago
|
|
||
|
3 years ago
|
Modify the signature of the `query` method to return an `Observable` from the `rxjs` package. Make sure you remove the `async` keyword.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
import { Observable } from 'rxjs';
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
|
||
|
|
// ...
|
||
|
|
}
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
### Step 3: Create an `Observable` instance for each query
|
||
|
6 years ago
|
|
||
|
3 years ago
|
Create an `Observable` instance for each query, and then combine them all using the `merge` function from the `rxjs` package.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
import { Observable, merge } from 'rxjs';
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
const observables = options.targets.map((target) => {
|
||
|
|
return new Observable<DataQueryResponse>((subscriber) => {
|
||
|
|
// ...
|
||
|
|
});
|
||
|
|
});
|
||
|
6 years ago
|
|
||
|
3 years ago
|
return merge(...observables);
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
### Step 4: Create a `CircularDataFrame` instance
|
||
|
6 years ago
|
|
||
|
3 years ago
|
In the `subscribe` function, create a `CircularDataFrame` instance.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
import { CircularDataFrame } from '@grafana/data';
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
const frame = new CircularDataFrame({
|
||
|
|
append: 'tail',
|
||
|
|
capacity: 1000,
|
||
|
|
});
|
||
|
|
|
||
|
|
frame.refId = query.refId;
|
||
|
|
frame.addField({ name: 'time', type: FieldType.time });
|
||
|
|
frame.addField({ name: 'value', type: FieldType.number });
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
Circular data frames have a limited capacity. When a circular data frame reaches its capacity, the oldest data point is removed.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
### Step 5: Send the updated data frame
|
||
|
6 years ago
|
|
||
|
3 years ago
|
Use `subscriber.next()` to send the updated data frame whenever you receive new updates.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
import { LoadingState } from '@grafana/data';
|
||
|
|
```
|
||
|
5 years ago
|
|
||
|
3 years ago
|
```ts
|
||
|
|
const intervalId = setInterval(() => {
|
||
|
|
frame.add({ time: Date.now(), value: Math.random() });
|
||
|
6 years ago
|
|
||
|
3 years ago
|
subscriber.next({
|
||
|
|
data: [frame],
|
||
|
|
key: query.refId,
|
||
|
|
state: LoadingState.Streaming,
|
||
|
|
});
|
||
|
|
}, 500);
|
||
|
6 years ago
|
|
||
|
3 years ago
|
return () => {
|
||
|
|
clearInterval(intervalId);
|
||
|
|
};
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
> **Note:** In practice, you'd call `subscriber.next` as soon as you receive new data from a websocket or a message bus. In the example above, data is being received every 500 milliseconds.
|
||
|
6 years ago
|
|
||
|
3 years ago
|
### Example code for final `query` method
|
||
|
6 years ago
|
|
||
|
4 years ago
|
```ts
|
||
|
|
query(options: DataQueryRequest<MyQuery>): Observable<DataQueryResponse> {
|
||
|
|
const streams = options.targets.map(target => {
|
||
|
|
const query = defaults(target, defaultQuery);
|
||
|
|
|
||
|
|
return new Observable<DataQueryResponse>(subscriber => {
|
||
|
|
const frame = new CircularDataFrame({
|
||
|
|
append: 'tail',
|
||
|
|
capacity: 1000,
|
||
|
|
});
|
||
|
|
|
||
|
|
frame.refId = query.refId;
|
||
|
|
frame.addField({ name: 'time', type: FieldType.time });
|
||
|
|
frame.addField({ name: 'value', type: FieldType.number });
|
||
|
|
|
||
|
|
const intervalId = setInterval(() => {
|
||
|
|
frame.add({ time: Date.now(), value: Math.random() });
|
||
|
|
|
||
|
|
subscriber.next({
|
||
|
|
data: [frame],
|
||
|
|
key: query.refId,
|
||
|
4 years ago
|
state: LoadingState.Streaming,
|
||
|
4 years ago
|
});
|
||
|
|
}, 100);
|
||
|
|
|
||
|
|
return () => {
|
||
|
|
clearInterval(intervalId);
|
||
|
|
};
|
||
|
|
});
|
||
|
|
});
|
||
|
|
|
||
|
|
return merge(...streams);
|
||
|
|
}
|
||
|
|
```
|
||
|
6 years ago
|
|
||
|
3 years ago
|
One limitation with this example is that the panel visualization is cleared every time you update the dashboard. If you have access to historical data, you can add it, or _backfill_ it, to the data frame before the first call to `subscriber.next()`.
|
||
|
|
|
||
|
|
For another example of a streaming plugin, refer to the [streaming websocket example](https://github.com/grafana/grafana-plugin-examples/tree/main/examples/datasource-streaming-websocket) on GitHub.
|