mirror of https://github.com/grafana/grafana
Live: move centrifuge service to a web worker (#41090)
* Fix: make webpack pickup workers written in TS * Add comlink to dependencies * Temporary fix: copy paste `toDataQueryError` from @grafana/runtime to avoid web dependencies * Implemented comlink-based centrifuge worker & worker proxy * Temporary fix: implement comlink transferHandlers for subscriptions and streamingdataframes * Move liveTimer filtering from CentrifugeService into GrafanaLiveService * Switch from CentrifugeService to CentrifugeServiceWorkerProxy in GrafanaLive * Naming fix * Refactor: move liveTimer-based data filtering from GrafanaLiveService to CentrifugeServiceWorker * observe dataStream on an async scheduler * Fix: - Unsubscribe is now propagated from the main thread to the worker, - improve worker&workerProxy types * Fix: Prettify types * Fix: Add error & complete observers * Docs: Add comment explaining the `subscriberTransferHandler` * Fix: Replace `StreamingDataFrameHandler` with explicitly converting StreamingDataFrame to a DataFrameDTO * Refactor: move liveTimer filtering to service.ts to make it easy to implement a `live-service-web-worker` feature flag * Feat: add `live-service-web-worker` feature flag * Fix: extract toDataQueryError.ts to a separate file within `@grafana-runtime` to avoid having a dependency from webworker to the whole package (@grafana-runtime/index.ts) * Update public/app/features/dashboard/dashgrid/liveTimer.ts Co-authored-by: Leon Sorokin <leeoniya@gmail.com> * Fix: fixed default import class in worker file * Fix: cast worker as Endpoint * Migrate from worker-loader to webpack native worker support v1 - broken prod build * Fix: Use custom path in HtmlWebpackPlugin * Fix: Loading workers from CDNs * Fix: Avoid issues with jest ESM support by mocking `createWorker` files * Fix: move the custom mockWorker rendering layout to `test/mocks` Co-authored-by: Leon Sorokin <leeoniya@gmail.com>pull/43065/head
parent
e2ed140de2
commit
f45eb309ef
@ -0,0 +1,31 @@ |
||||
import { DataQueryError } from '@grafana/data'; |
||||
|
||||
/** |
||||
* Convert an object into a DataQueryError -- if this is an HTTP response, |
||||
* it will put the correct values in the error field |
||||
* |
||||
* @public |
||||
*/ |
||||
export function toDataQueryError(err: DataQueryError | string | Object): DataQueryError { |
||||
const error = (err || {}) as DataQueryError; |
||||
|
||||
if (!error.message) { |
||||
if (typeof err === 'string' || err instanceof String) { |
||||
return { message: err } as DataQueryError; |
||||
} |
||||
|
||||
let message = 'Query error'; |
||||
if (error.message) { |
||||
message = error.message; |
||||
} else if (error.data && error.data.message) { |
||||
message = error.data.message; |
||||
} else if (error.data && error.data.error) { |
||||
message = error.data.error; |
||||
} else if (error.status) { |
||||
message = `Query error: ${error.status} ${error.statusText}`; |
||||
} |
||||
error.message = message; |
||||
} |
||||
|
||||
return error; |
||||
} |
@ -0,0 +1,23 @@ |
||||
// works with webpack plugin: scripts/webpack/plugins/CorsWorkerPlugin.js
|
||||
export class CorsWorker extends window.Worker { |
||||
constructor(url: URL, options?: WorkerOptions) { |
||||
// by default, worker inherits HTML document's location and pathname which leads to wrong public path value
|
||||
// the CorsWorkerPlugin will override it with the value based on the initial worker chunk, ie.
|
||||
// initial worker chunk: http://host.com/cdn/scripts/worker-123.js
|
||||
// resulting public path: http://host.com/cdn/scripts
|
||||
|
||||
const scriptUrl = url.toString(); |
||||
const urlParts = scriptUrl.split('/'); |
||||
urlParts.pop(); |
||||
const scriptsBasePathUrl = `${urlParts.join('/')}/`; |
||||
|
||||
const importScripts = `importScripts('${scriptUrl}');`; |
||||
const objectURL = URL.createObjectURL( |
||||
new Blob([`__webpack_worker_public_path__ = '${scriptsBasePathUrl}'; ${importScripts}`], { |
||||
type: 'application/javascript', |
||||
}) |
||||
); |
||||
super(objectURL, options); |
||||
URL.revokeObjectURL(objectURL); |
||||
} |
||||
} |
@ -0,0 +1,3 @@ |
||||
import { CorsWorker as Worker } from 'app/core/utils/CorsWorker'; |
||||
|
||||
export const createWorker = () => new Worker(new URL('./service.worker.ts', import.meta.url)); |
@ -0,0 +1,33 @@ |
||||
import * as comlink from 'comlink'; |
||||
import { from, Observable, switchMap } from 'rxjs'; |
||||
|
||||
export const remoteObservableAsObservable = <T>(remoteObs: comlink.RemoteObject<Observable<T>>): Observable<T> => |
||||
new Observable((subscriber) => { |
||||
// Passing the callbacks as 3 separate arguments is deprecated, but it's the only option for now
|
||||
//
|
||||
// RxJS recreates the functions via `Function.bind` https://github.com/ReactiveX/rxjs/blob/62aca850a37f598b5db6085661e0594b81ec4281/src/internal/Subscriber.ts#L169
|
||||
// and thus erases the ProxyMarker created via comlink.proxy(fN) when the callbacks
|
||||
// are grouped together in a Observer object (ie. { next: (v) => ..., error: (err) => ..., complete: () => ... })
|
||||
//
|
||||
// solution: TBD (autoproxy all functions?)
|
||||
const remoteSubPromise = remoteObs.subscribe( |
||||
comlink.proxy((nextValueInRemoteObs: T) => { |
||||
subscriber.next(nextValueInRemoteObs); |
||||
}), |
||||
comlink.proxy((err) => { |
||||
subscriber.error(err); |
||||
}), |
||||
comlink.proxy(() => { |
||||
subscriber.complete(); |
||||
}) |
||||
); |
||||
return { |
||||
unsubscribe: () => { |
||||
remoteSubPromise.then((remoteSub) => remoteSub.unsubscribe()); |
||||
}, |
||||
}; |
||||
}); |
||||
|
||||
export const promiseWithRemoteObservableAsObservable = <T>( |
||||
promiseWithProxyObservable: Promise<comlink.RemoteObject<Observable<T>>> |
||||
): Observable<T> => from(promiseWithProxyObservable).pipe(switchMap((val) => remoteObservableAsObservable(val))); |
@ -0,0 +1,52 @@ |
||||
import { CentrifugeService, CentrifugeSrvDeps } from './service'; |
||||
import * as comlink from 'comlink'; |
||||
import './transferHandlers'; |
||||
import { remoteObservableAsObservable } from './remoteObservable'; |
||||
import { LiveChannelAddress, LiveChannelConfig } from '@grafana/data'; |
||||
import { LiveDataStreamOptions } from '@grafana/runtime'; |
||||
|
||||
let centrifuge: CentrifugeService; |
||||
|
||||
const initialize = ( |
||||
deps: CentrifugeSrvDeps, |
||||
remoteDataStreamSubscriberReadiness: comlink.RemoteObject< |
||||
CentrifugeSrvDeps['dataStreamSubscriberReadiness'] & comlink.ProxyMarked |
||||
> |
||||
) => { |
||||
centrifuge = new CentrifugeService({ |
||||
...deps, |
||||
dataStreamSubscriberReadiness: remoteObservableAsObservable(remoteDataStreamSubscriberReadiness), |
||||
}); |
||||
}; |
||||
|
||||
const getConnectionState = () => { |
||||
return comlink.proxy(centrifuge.getConnectionState()); |
||||
}; |
||||
|
||||
const getDataStream = (options: LiveDataStreamOptions, config: LiveChannelConfig) => { |
||||
return comlink.proxy(centrifuge.getDataStream(options, config)); |
||||
}; |
||||
|
||||
const getStream = (address: LiveChannelAddress, config: LiveChannelConfig) => { |
||||
return comlink.proxy(centrifuge.getStream(address, config)); |
||||
}; |
||||
|
||||
const getPresence = async (address: LiveChannelAddress, config: LiveChannelConfig) => { |
||||
return await centrifuge.getPresence(address, config); |
||||
}; |
||||
|
||||
const workObj = { |
||||
initialize, |
||||
getConnectionState, |
||||
getDataStream, |
||||
getStream, |
||||
getPresence, |
||||
}; |
||||
|
||||
export type RemoteCentrifugeService = typeof workObj; |
||||
|
||||
comlink.expose(workObj); |
||||
|
||||
export default class { |
||||
constructor() {} |
||||
} |
@ -0,0 +1,40 @@ |
||||
import { CentrifugeSrv, CentrifugeSrvDeps } from './service'; |
||||
import { RemoteCentrifugeService } from './service.worker'; |
||||
import './transferHandlers'; |
||||
|
||||
import * as comlink from 'comlink'; |
||||
import { asyncScheduler, Observable, observeOn } from 'rxjs'; |
||||
import { LiveChannelAddress, LiveChannelConfig, LiveChannelEvent } from '@grafana/data'; |
||||
import { promiseWithRemoteObservableAsObservable } from './remoteObservable'; |
||||
import { createWorker } from './createCentrifugeServiceWorker'; |
||||
|
||||
export class CentrifugeServiceWorkerProxy implements CentrifugeSrv { |
||||
private centrifugeWorker; |
||||
|
||||
constructor(deps: CentrifugeSrvDeps) { |
||||
this.centrifugeWorker = comlink.wrap<RemoteCentrifugeService>(createWorker() as comlink.Endpoint); |
||||
this.centrifugeWorker.initialize(deps, comlink.proxy(deps.dataStreamSubscriberReadiness)); |
||||
} |
||||
|
||||
getConnectionState: CentrifugeSrv['getConnectionState'] = () => { |
||||
return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getConnectionState()); |
||||
}; |
||||
|
||||
getDataStream: CentrifugeSrv['getDataStream'] = (options, config) => { |
||||
return promiseWithRemoteObservableAsObservable(this.centrifugeWorker.getDataStream(options, config)).pipe( |
||||
// async scheduler splits the synchronous task of deserializing data from web worker and
|
||||
// consuming the message (ie. updating react component) into two to avoid blocking the event loop
|
||||
observeOn(asyncScheduler) |
||||
); |
||||
}; |
||||
|
||||
getPresence: CentrifugeSrv['getPresence'] = (address, config) => { |
||||
return this.centrifugeWorker.getPresence(address, config); |
||||
}; |
||||
|
||||
getStream: CentrifugeSrv['getStream'] = <T>(address: LiveChannelAddress, config: LiveChannelConfig) => { |
||||
return promiseWithRemoteObservableAsObservable( |
||||
this.centrifugeWorker.getStream(address, config) as Promise<comlink.Remote<Observable<LiveChannelEvent<T>>>> |
||||
); |
||||
}; |
||||
} |
@ -0,0 +1,27 @@ |
||||
import * as comlink from 'comlink'; |
||||
import { Subscriber } from 'rxjs'; |
||||
|
||||
// Observers, ie. functions passed to `observable.subscribe(...)`, are converted to a subclass of `Subscriber` before they are sent to the source Observable.
|
||||
// The conversion happens internally in the RxJS library - this transfer handler is catches them and wraps them with a proxy
|
||||
const subscriberTransferHandler: any = { |
||||
canHandle(value: any): boolean { |
||||
return value && value instanceof Subscriber; |
||||
}, |
||||
|
||||
serialize(value: Function): [MessagePort, Transferable[]] { |
||||
const obj = comlink.proxy(value); |
||||
|
||||
const { port1, port2 } = new MessageChannel(); |
||||
|
||||
comlink.expose(obj, port1); |
||||
|
||||
return [port2, [port2]]; |
||||
}, |
||||
|
||||
deserialize(value: MessagePort): comlink.Remote<MessagePort> { |
||||
value.start(); |
||||
|
||||
return comlink.wrap<MessagePort>(value); |
||||
}, |
||||
}; |
||||
comlink.transferHandlers.set('SubscriberHandler', subscriberTransferHandler); |
@ -1,12 +0,0 @@ |
||||
const { layout } = jest.requireActual('../layout.worker.js'); |
||||
|
||||
export default class TestWorker { |
||||
constructor() {} |
||||
postMessage(data) { |
||||
const { nodes, edges, config } = data; |
||||
setTimeout(() => { |
||||
layout(nodes, edges, config); |
||||
this.onmessage({ data: { nodes, edges } }); |
||||
}, 1); |
||||
} |
||||
} |
@ -0,0 +1,3 @@ |
||||
import { CorsWorker as Worker } from 'app/core/utils/CorsWorker'; |
||||
|
||||
export const createWorker = () => new Worker(new URL('./layout.worker.js', import.meta.url)); |
@ -0,0 +1,27 @@ |
||||
const { layout } = jest.requireActual('../../app/plugins/panel/nodeGraph/layout.worker.js'); |
||||
|
||||
class LayoutMockWorker { |
||||
constructor() {} |
||||
|
||||
postMessage(data: any) { |
||||
const { nodes, edges, config } = data; |
||||
setTimeout(() => { |
||||
layout(nodes, edges, config); |
||||
// @ts-ignore
|
||||
this.onmessage({ data: { nodes, edges } }); |
||||
}, 1); |
||||
} |
||||
} |
||||
|
||||
jest.mock('../../app/plugins/panel/nodeGraph/createLayoutWorker', () => ({ |
||||
createWorker: () => new LayoutMockWorker(), |
||||
})); |
||||
|
||||
class BasicMockWorker { |
||||
postMessage() {} |
||||
} |
||||
const mockCreateWorker = { |
||||
createWorker: () => new BasicMockWorker(), |
||||
}; |
||||
|
||||
jest.mock('../../app/features/live/centrifuge/createCentrifugeServiceWorker', () => mockCreateWorker); |
@ -0,0 +1,64 @@ |
||||
const { RuntimeGlobals, RuntimeModule } = require('webpack'); |
||||
|
||||
class CorsWorkerPublicPathRuntimeModule extends RuntimeModule { |
||||
constructor(publicPath) { |
||||
super('publicPath', RuntimeModule.STAGE_BASIC); |
||||
this.publicPath = publicPath; |
||||
} |
||||
|
||||
/** |
||||
* @returns {string} runtime code |
||||
*/ |
||||
generate() { |
||||
const { compilation, publicPath } = this; |
||||
|
||||
const publicPathValue = compilation.getPath(publicPath || '', { |
||||
hash: compilation.hash || 'XXXX', |
||||
}); |
||||
return `${RuntimeGlobals.publicPath} = __webpack_worker_public_path__ || '${publicPathValue}';`; |
||||
} |
||||
} |
||||
|
||||
// https://github.com/webpack/webpack/discussions/14648#discussioncomment-1604202
|
||||
// by @ https://github.com/piotr-oles
|
||||
class CorsWorkerPlugin { |
||||
/** |
||||
* @param {import('webpack').Compiler} compiler |
||||
*/ |
||||
apply(compiler) { |
||||
compiler.hooks.compilation.tap( |
||||
'CorsWorkerPlugin', |
||||
/** |
||||
* @param {import('webpack').Compilation} compilation |
||||
*/ |
||||
(compilation) => { |
||||
const getChunkLoading = (chunk) => { |
||||
const entryOptions = chunk.getEntryOptions(); |
||||
return entryOptions && entryOptions.chunkLoading !== undefined |
||||
? entryOptions.chunkLoading |
||||
: compilation.outputOptions.chunkLoading; |
||||
}; |
||||
const getChunkPublicPath = (chunk) => { |
||||
const entryOptions = chunk.getEntryOptions(); |
||||
return entryOptions && entryOptions.publicPath !== undefined |
||||
? entryOptions.publicPath |
||||
: compilation.outputOptions.publicPath; |
||||
}; |
||||
|
||||
compilation.hooks.runtimeRequirementInTree.for(RuntimeGlobals.publicPath).tap('CorsWorkerPlugin', (chunk) => { |
||||
if (getChunkLoading(chunk) === 'import-scripts') { |
||||
const publicPath = getChunkPublicPath(chunk); |
||||
|
||||
if (publicPath !== 'auto') { |
||||
const module = new CorsWorkerPublicPathRuntimeModule(publicPath); |
||||
compilation.addRuntimeModule(chunk, module); |
||||
return true; |
||||
} |
||||
} |
||||
}); |
||||
} |
||||
); |
||||
} |
||||
} |
||||
|
||||
module.exports = CorsWorkerPlugin; |
Loading…
Reference in new issue