From 726bb4477b0a9354466d324ee3c4e9ff7bd43b60 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Sun, 4 Oct 2020 23:53:52 -0700 Subject: [PATCH] Live: cleanup and simple changes (#28028) --- devenv/docker/blocks/influxdb/telegraf.conf | 2 +- packages/grafana-data/src/types/live.ts | 19 ++++--- .../grafana-data/src/utils/labels.test.ts | 25 ++++++++- packages/grafana-data/src/utils/labels.ts | 15 ++++++ packages/grafana-runtime/src/services/live.ts | 8 +-- pkg/api/api.go | 2 +- pkg/services/live/live.go | 4 +- public/app/features/admin/LivePanel.tsx | 2 +- public/app/features/live/channel.ts | 51 +++++++++---------- .../live/dashboard/dashboardWatcher.ts | 6 ++- public/app/features/live/live.ts | 25 ++++----- .../influxdb/components/FluxQueryEditor.tsx | 2 +- 12 files changed, 99 insertions(+), 62 deletions(-) diff --git a/devenv/docker/blocks/influxdb/telegraf.conf b/devenv/docker/blocks/influxdb/telegraf.conf index 2a56e4671d8..890544f0395 100644 --- a/devenv/docker/blocks/influxdb/telegraf.conf +++ b/devenv/docker/blocks/influxdb/telegraf.conf @@ -3346,7 +3346,7 @@ # # pid_finder = "pgrep" -# # Reads last_run_summary.yaml file and converts to measurments +# # Reads last_run_summary.yaml file and converts to measurements # [[inputs.puppetagent]] # ## Location of puppet last run summary file # location = "/var/lib/puppet/state/last_run_summary.yaml" diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index 6dfccd6fa86..ef23caa880e 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -143,6 +143,15 @@ export interface LiveChannelPresenceStatus { users: any; // @experimental -- will be filled in when we improve the UI } +/** + * @experimental + */ +export interface LiveChannelAddress { + scope: LiveChannelScope; + namespace: string; // depends on the scope + path: string; +} + /** * @experimental */ @@ -150,14 +159,8 @@ export interface LiveChannel { /** The fully qualified channel id: ${scope}/${namespace}/${path} */ id: string; - /** The scope for this channel */ - scope: LiveChannelScope; - - /** datasourceId/plugin name/feature depending on scope */ - namespace: string; - - /** additional qualifier */ - path: string; + /** The channel address */ + addr: LiveChannelAddress; /** Unix timestamp for when the channel connected */ opened: number; diff --git a/packages/grafana-data/src/utils/labels.test.ts b/packages/grafana-data/src/utils/labels.test.ts index 82169e54711..1fb84f5f23c 100644 --- a/packages/grafana-data/src/utils/labels.test.ts +++ b/packages/grafana-data/src/utils/labels.test.ts @@ -1,4 +1,5 @@ -import { parseLabels, formatLabels, findCommonLabels, findUniqueLabels } from './labels'; +import { parseLabels, formatLabels, findCommonLabels, findUniqueLabels, matchAllLabels } from './labels'; +import { Labels } from '../types/data'; describe('parseLabels()', () => { it('returns no labels on empty labels string', () => { @@ -53,3 +54,25 @@ describe('findUniqueLabels()', () => { expect(findUniqueLabels({ foo: '"bar"', baz: '"42"' }, { foo: '"bar"' })).toEqual({ baz: '"42"' }); }); }); + +describe('matchAllLabels()', () => { + it('empty labels do math', () => { + expect(matchAllLabels({}, {})).toBeTruthy(); + }); + + it('missing labels', () => { + expect(matchAllLabels({ foo: 'bar' }, {})).toBeFalsy(); + }); + + it('extra labels should match', () => { + expect(matchAllLabels({ foo: 'bar' }, { foo: 'bar', baz: '22' })).toBeTruthy(); + }); + + it('be graceful with null values (match)', () => { + expect(matchAllLabels({ foo: 'bar' })).toBeFalsy(); + }); + + it('be graceful with null values (match)', () => { + expect(matchAllLabels((undefined as unknown) as Labels, { foo: 'bar' })).toBeTruthy(); + }); +}); diff --git a/packages/grafana-data/src/utils/labels.ts b/packages/grafana-data/src/utils/labels.ts index 5e72421d4bb..d81d95d34ef 100644 --- a/packages/grafana-data/src/utils/labels.ts +++ b/packages/grafana-data/src/utils/labels.ts @@ -59,6 +59,21 @@ export function findUniqueLabels(labels: Labels | undefined, commonLabels: Label return uncommonLabels; } +/** + * Check that all labels exist in another set of labels + */ +export function matchAllLabels(expect: Labels, against?: Labels): boolean { + if (!expect) { + return true; // nothing to match + } + for (const [key, value] of Object.entries(expect)) { + if (!against || against[key] !== value) { + return false; + } + } + return true; +} + /** * Serializes the given labels to a string. */ diff --git a/packages/grafana-runtime/src/services/live.ts b/packages/grafana-runtime/src/services/live.ts index 94f0d3663be..df39df6f1bd 100644 --- a/packages/grafana-runtime/src/services/live.ts +++ b/packages/grafana-runtime/src/services/live.ts @@ -1,4 +1,4 @@ -import { LiveChannel, LiveChannelScope } from '@grafana/data'; +import { LiveChannel, LiveChannelAddress } from '@grafana/data'; import { Observable } from 'rxjs'; /** @@ -23,11 +23,7 @@ export interface GrafanaLiveSrv { * Multiple requests for this channel will return the same object until * the channel is shutdown */ - getChannel( - scope: LiveChannelScope, - namespace: string, - path: string - ): LiveChannel; + getChannel(address: LiveChannelAddress): LiveChannel; } let singletonInstance: GrafanaLiveSrv; diff --git a/pkg/api/api.go b/pkg/api/api.go index 982f0b55603..7e3a1d5a209 100644 --- a/pkg/api/api.go +++ b/pkg/api/api.go @@ -426,7 +426,7 @@ func (hs *HTTPServer) registerRoutes() { // Live streaming if hs.Live != nil { - r.Any("/live/*", hs.Live.Handler) + r.Any("/live/*", hs.Live.WebsocketHandler) } // Snapshots diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index e693f20d8e7..28e9175323d 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -31,7 +31,7 @@ type GrafanaLive struct { node *centrifuge.Node // The websocket handler - Handler interface{} + WebsocketHandler interface{} // Full channel handler channels map[string]models.ChannelHandler @@ -171,7 +171,7 @@ func InitializeBroker() (*GrafanaLive, error) { WriteBufferSize: 1024, }) - glive.Handler = func(ctx *models.ReqContext) { + glive.WebsocketHandler = func(ctx *models.ReqContext) { user := ctx.SignedInUser if user == nil { ctx.Resp.WriteHeader(401) diff --git a/public/app/features/admin/LivePanel.tsx b/public/app/features/admin/LivePanel.tsx index 07de747d92b..5177f3b1674 100644 --- a/public/app/features/admin/LivePanel.tsx +++ b/public/app/features/admin/LivePanel.tsx @@ -62,7 +62,7 @@ export class LivePanel extends PureComponent { startSubscription = () => { const { scope, namespace, path } = this.props; - const channel = getGrafanaLiveSrv().getChannel(scope, namespace, path); + const channel = getGrafanaLiveSrv().getChannel({ scope, namespace, path }); if (this.state.channel === channel) { return; // no change! } diff --git a/public/app/features/live/channel.ts b/public/app/features/live/channel.ts index 18424e93e35..88ee79e1617 100644 --- a/public/app/features/live/channel.ts +++ b/public/app/features/live/channel.ts @@ -1,12 +1,12 @@ import { LiveChannelConfig, LiveChannel, - LiveChannelScope, LiveChannelStatusEvent, LiveChannelEvent, LiveChannelEventType, LiveChannelConnectionState, LiveChannelPresenceStatus, + LiveChannelAddress, } from '@grafana/data'; import Centrifuge, { JoinLeaveContext, @@ -26,9 +26,7 @@ export class CentrifugeLiveChannel implements Li readonly opened = Date.now(); readonly id: string; - readonly scope: LiveChannelScope; - readonly namespace: string; - readonly path: string; + readonly addr: LiveChannelAddress; readonly stream = new Subject>(); @@ -37,11 +35,9 @@ export class CentrifugeLiveChannel implements Li subscription?: Centrifuge.Subscription; shutdownCallback?: () => void; - constructor(id: string, scope: LiveChannelScope, namespace: string, path: string) { + constructor(id: string, addr: LiveChannelAddress) { this.id = id; - this.scope = scope; - this.namespace = namespace; - this.path = path; + this.addr = addr; this.currentStatus = { type: LiveChannelEventType.Status, id, @@ -61,15 +57,25 @@ export class CentrifugeLiveChannel implements Li const events: SubscriptionEvents = { // This means a message was received from the server publish: (ctx: PublicationContext) => { - this.stream.next({ - type: LiveChannelEventType.Message, - message: prepare(ctx.data), - }); - - // Clear any error messages - if (this.currentStatus.error) { + try { + const message = prepare(ctx.data); + if (message) { + this.stream.next({ + type: LiveChannelEventType.Message, + message, + }); + } + + // Clear any error messages + if (this.currentStatus.error) { + this.currentStatus.timestamp = Date.now(); + delete this.currentStatus.error; + this.sendStatus(); + } + } catch (err) { + console.log('publish error', config.path, err); + this.currentStatus.error = err; this.currentStatus.timestamp = Date.now(); - delete this.currentStatus.error; this.sendStatus(); } }, @@ -81,6 +87,7 @@ export class CentrifugeLiveChannel implements Li subscribe: (ctx: SubscribeSuccessContext) => { this.currentStatus.timestamp = Date.now(); this.currentStatus.state = LiveChannelConnectionState.Connected; + delete this.currentStatus.error; this.sendStatus(); }, unsubscribe: (ctx: UnsubscribeContext) => { @@ -159,19 +166,11 @@ export class CentrifugeLiveChannel implements Li } } -export function getErrorChannel( - msg: string, - id: string, - scope: LiveChannelScope, - namespace: string, - path: string -): LiveChannel { +export function getErrorChannel(msg: string, id: string, addr: LiveChannelAddress): LiveChannel { return { id, opened: Date.now(), - scope, - namespace, - path, + addr, // return an error getStream: () => diff --git a/public/app/features/live/dashboard/dashboardWatcher.ts b/public/app/features/live/dashboard/dashboardWatcher.ts index 6dc6a9fd726..b7c78df5e49 100644 --- a/public/app/features/live/dashboard/dashboardWatcher.ts +++ b/public/app/features/live/dashboard/dashboardWatcher.ts @@ -56,7 +56,11 @@ class DashboardWatcher { // Check for changes if (uid !== this.uid) { this.leave(); - this.channel = live.getChannel(LiveChannelScope.Grafana, 'dashboard', uid); + this.channel = live.getChannel({ + scope: LiveChannelScope.Grafana, + namespace: 'dashboard', + path: uid, + }); this.channel.getStream().subscribe(this.observer); this.uid = uid; } diff --git a/public/app/features/live/live.ts b/public/app/features/live/live.ts index 0898b5453ad..cc87beaa96c 100644 --- a/public/app/features/live/live.ts +++ b/public/app/features/live/live.ts @@ -2,7 +2,7 @@ import Centrifuge from 'centrifuge/dist/centrifuge.protobuf'; import SockJS from 'sockjs-client'; import { GrafanaLiveSrv, setGrafanaLiveSrv, getGrafanaLiveSrv, config } from '@grafana/runtime'; import { BehaviorSubject } from 'rxjs'; -import { LiveChannel, LiveChannelScope } from '@grafana/data'; +import { LiveChannel, LiveChannelScope, LiveChannelAddress } from '@grafana/data'; import { CentrifugeLiveChannel, getErrorChannel } from './channel'; import { GrafanaLiveScope, @@ -84,23 +84,19 @@ export class CentrifugeSrv implements GrafanaLiveSrv { * Get a channel. If the scope, namespace, or path is invalid, a shutdown * channel will be returned with an error state indicated in its status */ - getChannel( - scopeId: LiveChannelScope, - namespace: string, - path: string - ): LiveChannel { - const id = `${scopeId}/${namespace}/${path}`; + getChannel(addr: LiveChannelAddress): LiveChannel { + const id = `${addr.scope}/${addr.namespace}/${addr.path}`; let channel = this.open.get(id); if (channel != null) { return channel; } - const scope = this.scopes[scopeId]; + const scope = this.scopes[addr.scope]; if (!scope) { - return getErrorChannel('invalid scope', id, scopeId, namespace, path); + return getErrorChannel('invalid scope', id, addr); } - channel = new CentrifugeLiveChannel(id, scopeId, namespace, path); + channel = new CentrifugeLiveChannel(id, addr); channel.shutdownCallback = () => { this.open.delete(id); // remove it from the list of open channels }; @@ -117,13 +113,14 @@ export class CentrifugeSrv implements GrafanaLiveSrv { } private async initChannel(scope: GrafanaLiveScope, channel: CentrifugeLiveChannel): Promise { - const support = await scope.getChannelSupport(channel.namespace); + const { addr } = channel; + const support = await scope.getChannelSupport(addr.namespace); if (!support) { - throw new Error(channel.namespace + 'does not support streaming'); + throw new Error(channel.addr.namespace + 'does not support streaming'); } - const config = support.getChannelConfig(channel.path); + const config = support.getChannelConfig(addr.path); if (!config) { - throw new Error('unknown path: ' + channel.path); + throw new Error('unknown path: ' + addr.path); } if (config.canPublish?.()) { channel.publish = (data: any) => this.centrifuge.publish(channel.id, data); diff --git a/public/app/plugins/datasource/influxdb/components/FluxQueryEditor.tsx b/public/app/plugins/datasource/influxdb/components/FluxQueryEditor.tsx index a7a22e231cb..4f6861d9577 100644 --- a/public/app/plugins/datasource/influxdb/components/FluxQueryEditor.tsx +++ b/public/app/plugins/datasource/influxdb/components/FluxQueryEditor.tsx @@ -21,7 +21,7 @@ const samples: Array> = [ { label: 'Show buckets', description: 'List the avaliable buckets (table)', value: 'buckets()' }, { label: 'Simple query', - description: 'filter by measurment and field', + description: 'filter by measurement and field', value: `from(bucket: "db/rp") |> range(start: v.timeRangeStart, stop:v.timeRangeStop) |> filter(fn: (r) =>