From 0e6a8cc6ac6286aba1a165d7ea2674b557edf59a Mon Sep 17 00:00:00 2001 From: Alexander Emelin Date: Tue, 20 Sep 2022 18:39:46 +0300 Subject: [PATCH] Live: Migrate to centrifuge-js v3 (new API and client protocol) (#51977) --- go.mod | 19 ++-- go.sum | 37 ++++--- package.json | 2 +- packages/grafana-data/src/types/live.ts | 10 +- pkg/services/live/live.go | 34 +++---- pkg/services/live/survey/survey.go | 2 +- .../app/features/live/centrifuge/channel.ts | 99 +++++++++---------- .../app/features/live/centrifuge/service.ts | 37 +++---- public/app/plugins/panel/live/LivePanel.tsx | 3 + yarn.lock | 17 ++-- 10 files changed, 131 insertions(+), 129 deletions(-) diff --git a/go.mod b/go.mod index e0e86b35bf5..f30d4328d38 100644 --- a/go.mod +++ b/go.mod @@ -13,6 +13,9 @@ replace k8s.io/client-go => k8s.io/client-go v0.22.1 replace github.com/russellhaering/goxmldsig@v1.1.0 => github.com/russellhaering/goxmldsig v1.1.1 +// Avoid using v2.0.0+incompatible Redigo used by dependencies as the latest maintained branch of Redigo is v1. +replace github.com/gomodule/redigo => github.com/gomodule/redigo v1.8.9 + require ( cloud.google.com/go/storage v1.21.0 cuelang.org/go v0.4.3 @@ -25,7 +28,7 @@ require ( github.com/beevik/etree v1.1.0 github.com/benbjohnson/clock v1.1.0 github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b - github.com/centrifugal/centrifuge v0.19.0 + github.com/centrifugal/centrifuge v0.25.0 github.com/cortexproject/cortex v1.10.1-0.20211014125347-85c378182d0d github.com/crewjam/saml v0.4.8 github.com/davecgh/go-spew v1.1.1 @@ -42,14 +45,14 @@ require ( github.com/go-sql-driver/mysql v1.6.0 github.com/go-stack/stack v1.8.1 github.com/gobwas/glob v0.2.3 - github.com/gofrs/uuid v4.2.0+incompatible + github.com/gofrs/uuid v4.2.0+incompatible // indirect github.com/gogo/protobuf v1.3.2 github.com/golang/mock v1.6.0 github.com/golang/snappy v0.0.4 github.com/google/go-cmp v0.5.8 github.com/google/uuid v1.3.0 github.com/google/wire v0.5.0 - github.com/gorilla/websocket v1.4.2 + github.com/gorilla/websocket v1.5.0 github.com/gosimple/slug v1.12.0 github.com/grafana/cuetsy v0.0.4-0.20220714174355-ebd987fdab27 github.com/grafana/grafana-aws-sdk v0.10.8 @@ -132,7 +135,7 @@ require ( github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect github.com/Azure/go-autorest/logger v0.2.1 // indirect github.com/Azure/go-autorest/tracing v0.6.0 // indirect - github.com/FZambia/eagle v0.0.1 // indirect + github.com/FZambia/eagle v0.0.2 // indirect github.com/FZambia/sentinel v1.1.0 // indirect github.com/PuerkitoBio/purell v1.1.1 // indirect github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect @@ -142,7 +145,7 @@ require ( github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cenkalti/backoff/v4 v4.1.2 // indirect - github.com/centrifugal/protocol v0.7.6 // indirect + github.com/centrifugal/protocol v0.8.10 // indirect github.com/cespare/xxhash/v2 v2.1.2 // indirect github.com/cheekybits/genny v1.0.0 // indirect github.com/cockroachdb/apd/v2 v2.0.2 // indirect @@ -185,7 +188,7 @@ require ( github.com/hashicorp/go-sockaddr v1.0.2 // indirect github.com/hashicorp/golang-lru v0.5.4 // indirect github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493 // indirect - github.com/igm/sockjs-go/v3 v3.0.1 // indirect + github.com/igm/sockjs-go/v3 v3.0.2 // indirect github.com/jessevdk/go-flags v1.5.0 // indirect github.com/jonboulle/clockwork v0.3.0 // indirect github.com/josharian/intern v1.0.0 // indirect @@ -213,7 +216,7 @@ require ( github.com/protocolbuffers/txtpbfmt v0.0.0-20220428173112-74888fd59c2b // indirect github.com/rs/cors v1.8.2 // indirect github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect - github.com/segmentio/encoding v0.3.2 + github.com/segmentio/encoding v0.3.5 github.com/sercand/kuberesolver v2.4.0+incompatible // indirect github.com/sergi/go-diff v1.1.0 // indirect github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect @@ -275,6 +278,7 @@ require ( github.com/mattn/go-colorable v0.1.12 // indirect github.com/mitchellh/mapstructure v1.4.3 // indirect github.com/russross/blackfriday/v2 v2.1.0 // indirect + github.com/segmentio/asm v1.1.4 // indirect go.starlark.net v0.0.0-20201118183435-e55f603d8c79 // indirect ) @@ -323,7 +327,6 @@ require ( github.com/mschoch/smat v0.2.0 // indirect github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect github.com/pierrec/lz4/v4 v4.1.8 // indirect - github.com/segmentio/asm v1.1.1 // indirect github.com/valyala/fasttemplate v1.2.1 // indirect github.com/wk8/go-ordered-map v1.0.0 github.com/xanzy/ssh-agent v0.3.0 // indirect diff --git a/go.sum b/go.sum index 4eec2e68fef..ddef442eea2 100644 --- a/go.sum +++ b/go.sum @@ -212,8 +212,8 @@ github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ= github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo= -github.com/FZambia/eagle v0.0.1 h1:FN1yTkPihMb5nE8SrlRjoCf7T9H9bTKJFQOm6ach2YU= -github.com/FZambia/eagle v0.0.1/go.mod h1:xq6u/JeNZ5/8mrAQ76MMhzNTodASh9FavQlCgg4j48w= +github.com/FZambia/eagle v0.0.2 h1:35qHDuXSQevZ4w9A51k4wU7OE/tPHTEWXoywA93hvkY= +github.com/FZambia/eagle v0.0.2/go.mod h1:xq6u/JeNZ5/8mrAQ76MMhzNTodASh9FavQlCgg4j48w= github.com/FZambia/sentinel v1.1.0 h1:qrCBfxc8SvJihYNjBWgwUI93ZCvFe/PJIPTHKmlp8a8= github.com/FZambia/sentinel v1.1.0/go.mod h1:ytL1Am/RLlAoAXG6Kj5LNuw/TRRQrv2rt2FT26vP5gI= github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8= @@ -485,10 +485,10 @@ github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuD github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw= github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU= -github.com/centrifugal/centrifuge v0.19.0 h1:YHws0dRpgsBiI73tRl1wwaB13gzuaI1AM4IFcQQQqcw= -github.com/centrifugal/centrifuge v0.19.0/go.mod h1:O2elf8q3Qkie3z97wkqVqxB52pnOpPsfFUa7L88Lpy0= -github.com/centrifugal/protocol v0.7.6 h1:AfMwTZfwnFwZslIzQL4QtRnWSVO32RPSuk4iNS/V9tg= -github.com/centrifugal/protocol v0.7.6/go.mod h1:cJo0/BuXglhPfg0fgSgTXvBZ7y+9rdg4+nPbIDOVmlA= +github.com/centrifugal/centrifuge v0.25.0 h1:QivFZRPWcN8w3I/gZ8Zs9rMe/KGZoKIS9Kgo1/bY4JE= +github.com/centrifugal/centrifuge v0.25.0/go.mod h1:bFcSFalnROq/wcFeRiTG+wIbHsxEMW66QUnq95RY1d0= +github.com/centrifugal/protocol v0.8.10 h1:eezzBIU/4pWyl7a+NUnANYojJBASqbkPZcQh9b8YQRI= +github.com/centrifugal/protocol v0.8.10/go.mod h1:dlHBjKakr0r+f1pkfwSMfZ+cnpvidN7pQe1ZrsKfhtE= github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA= github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc= @@ -1227,10 +1227,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC5IXdEA547bcoU5dozgQAfc8Onsg4= github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y= -github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/gomodule/redigo v1.8.5/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0= -github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0= -github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4= +github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws= +github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE= github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ= @@ -1363,8 +1361,9 @@ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/z github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ= github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= -github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc= github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= +github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc= +github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= github.com/gosimple/slug v1.12.0 h1:xzuhj7G7cGtd34NXnW/yF0l+AGNfWqwgh/IXgFy7dnc= github.com/gosimple/slug v1.12.0/go.mod h1:UiRaFH+GEilHstLUmcBgWcI42viBN7mAb818JrYOeFQ= github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6T/o= @@ -1529,8 +1528,8 @@ github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmK github.com/iancoleman/strcase v0.0.0-20180726023541-3605ed457bf7/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE= github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/igm/sockjs-go/v3 v3.0.1 h1:rmgEkeKqBHCFf7uIAipYrYSX8x9LBB2nOxAac2sooak= -github.com/igm/sockjs-go/v3 v3.0.1/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= +github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE= +github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE= github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA= @@ -1701,7 +1700,6 @@ github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHU github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek= github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4= -github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg= github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg= github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs= github.com/knadh/koanf v1.2.0/go.mod h1:xpPTwMhsA/aaQLAilyCCqfpEiY1gpa160AiCuWHJUjY= @@ -2287,11 +2285,11 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc= github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo= github.com/securego/gosec v0.0.0-20200203094520-d13bb6d2420c/go.mod h1:gp0gaHj0WlmPh9BdsTmo1aq6C27yIPWdxCKGFGdVKBE= -github.com/segmentio/asm v1.1.0/go.mod h1:4EUJGaKsB8ImLUwOGORVsNd9vTRDeh44JGsY4aKp5I4= -github.com/segmentio/asm v1.1.1 h1:WzUpP9BRnRgoP+v8qywthpSe9U5KLq1uDGdJBEcSeQo= -github.com/segmentio/asm v1.1.1/go.mod h1:VCkA6aQH8usgIAUp4QMLQsAeaMl6iW7+VMLrW9Vkv0Y= -github.com/segmentio/encoding v0.3.2 h1:gkXXteOfNaPPlrXTEf/e5tWvaQGVJWnvT3LqMzUeH7U= -github.com/segmentio/encoding v0.3.2/go.mod h1:waft2p6XI4z2pk07M0YzZV4wEiqaRvsBSyWNHxVx4gU= +github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= +github.com/segmentio/asm v1.1.4 h1:Q/FKBtrgnmDc0YMrurLROqG9mXE6Ndn276EtDnoWtMM= +github.com/segmentio/asm v1.1.4/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg= +github.com/segmentio/encoding v0.3.5 h1:UZEiaZ55nlXGDL92scoVuw00RmiRCazIEmvPSbSvt8Y= +github.com/segmentio/encoding v0.3.5/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM= github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M= github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo= @@ -3097,6 +3095,7 @@ golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBc golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s= golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/package.json b/package.json index 441b3c0d190..b5ed070b6d0 100644 --- a/package.json +++ b/package.json @@ -305,7 +305,7 @@ "baron": "3.0.3", "brace": "0.11.1", "calculate-size": "1.1.1", - "centrifuge": "2.8.5", + "centrifuge": "3.0.1", "classnames": "2.3.1", "comlink": "4.3.1", "common-tags": "1.8.2", diff --git a/packages/grafana-data/src/types/live.ts b/packages/grafana-data/src/types/live.ts index f4cb0618e33..0321ef79e00 100644 --- a/packages/grafana-data/src/types/live.ts +++ b/packages/grafana-data/src/types/live.ts @@ -20,9 +20,9 @@ export enum LiveChannelScope { * @alpha */ export enum LiveChannelType { - DataStream = 'stream', // each message contains a batch of rows that will be appened to previous values + DataStream = 'stream', // each message contains a batch of rows that will be appended to previous values DataFrame = 'frame', // each message is an entire data frame and should *replace* previous content - JSON = 'json', // arbitray json message + JSON = 'json', // arbitrary json message } export enum LiveChannelConnectionState { @@ -30,11 +30,13 @@ export enum LiveChannelConnectionState { Pending = 'pending', /** Connected to the channel */ Connected = 'connected', + /** Connecting to a channel */ + Connecting = 'connecting', /** Disconnected from the channel. The channel will reconnect when possible */ Disconnected = 'disconnected', /** Was at some point connected, and will not try to reconnect */ Shutdown = 'shutdown', - /** Channel configuraiton was invalid and will not connect */ + /** Channel configuration was invalid and will not connect */ Invalid = 'invalid', } @@ -139,7 +141,7 @@ export interface LiveChannelAddress { path: string; /** - * Additional metadata passed to a channel. The backend will propigate this JSON object to + * Additional metadata passed to a channel. The backend will propagate this JSON object to * each OnSubscribe and RunStream calls. This value should be constant across multiple requests * to the same channel path */ diff --git a/pkg/services/live/live.go b/pkg/services/live/live.go index 83176463324..0169366133f 100644 --- a/pkg/services/live/live.go +++ b/pkg/services/live/live.go @@ -99,19 +99,14 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r logger.Debug("GrafanaLive initialization", "ha", g.IsHA()) - // We use default config here as starting point. Default config contains - // reasonable values for available options. - scfg := centrifuge.DefaultConfig - - // scfg.LogLevel = centrifuge.LogLevelDebug - scfg.LogHandler = handleLog - scfg.LogLevel = centrifuge.LogLevelError - scfg.MetricsNamespace = "grafana_live" - // Node is the core object in Centrifuge library responsible for many useful // things. For example Node allows to publish messages to channels from server // side with its Publish method. - node, err := centrifuge.New(scfg) + node, err := centrifuge.New(centrifuge.Config{ + LogHandler: handleLog, + LogLevel: centrifuge.LogLevelError, + MetricsNamespace: "grafana_live", + }) if err != nil { return nil, err } @@ -312,12 +307,9 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r }) client.OnDisconnect(func(e centrifuge.DisconnectEvent) { - reason := "normal" - if e.Disconnect != nil { - reason = e.Disconnect.Reason - if e.Disconnect.Code == 3001 { // Shutdown - return - } + reason := e.Disconnect.Reason + if e.Disconnect.Code == 3001 { // Shutdown + return } logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt)) }) @@ -339,6 +331,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r // Use a pure websocket transport. wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{ + ProtocolVersion: centrifuge.ProtocolVersion2, ReadBufferSize: 1024, WriteBufferSize: 1024, CheckOrigin: checkOrigin, @@ -711,10 +704,11 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge. logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel) return centrifuge.SubscribeReply{ Options: centrifuge.SubscribeOptions{ - Presence: reply.Presence, - JoinLeave: reply.JoinLeave, - Recover: reply.Recover, - Data: reply.Data, + EmitPresence: reply.Presence, + EmitJoinLeave: reply.JoinLeave, + PushJoinLeave: reply.JoinLeave, + EnableRecovery: reply.Recover, + Data: reply.Data, }, }, nil } diff --git a/pkg/services/live/survey/survey.go b/pkg/services/live/survey/survey.go index e3d98fc1fc4..98474f70d73 100644 --- a/pkg/services/live/survey/survey.go +++ b/pkg/services/live/survey/survey.go @@ -89,7 +89,7 @@ func (c *Caller) CallManagedStreams(orgID int64) ([]*managedstream.ManagedChanne ctx, cancel := context.WithTimeout(context.Background(), time.Second) defer cancel() - resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData) + resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData, "") if err != nil { return nil, err } diff --git a/public/app/features/live/centrifuge/channel.ts b/public/app/features/live/centrifuge/channel.ts index 134aee45c51..62e989ea687 100644 --- a/public/app/features/live/centrifuge/channel.ts +++ b/public/app/features/live/centrifuge/channel.ts @@ -1,11 +1,11 @@ -import Centrifuge, { - JoinLeaveContext, +import { + Subscription, + JoinContext, + LeaveContext, PublicationContext, - SubscribeErrorContext, - SubscribeSuccessContext, - SubscriptionEvents, - UnsubscribeContext, -} from 'centrifuge/dist/centrifuge'; + SubscriptionErrorContext, + SubscribedContext, +} from 'centrifuge'; import { Subject, of, Observable } from 'rxjs'; import { @@ -34,7 +34,7 @@ export class CentrifugeLiveChannel { // Hold on to the last header with schema lastMessageWithSchema?: DataFrameJSON; - subscription?: Centrifuge.Subscription; + subscription?: Subscription; shutdownCallback?: () => void; initalized?: boolean; @@ -54,46 +54,44 @@ export class CentrifugeLiveChannel { } // This should only be called when centrifuge is connected - initalize(): SubscriptionEvents { + initalize(): void { if (this.initalized) { throw new Error('Channel already initalized: ' + this.id); } this.initalized = true; - const events: SubscriptionEvents = { - // Called when a message is received from the socket - publish: (ctx: PublicationContext) => { - try { - if (ctx.data) { - if (ctx.data.schema) { - this.lastMessageWithSchema = ctx.data as DataFrameJSON; - } - - this.stream.next({ - type: LiveChannelEventType.Message, - message: ctx.data, - }); + this.subscription!.on('publication', (ctx: PublicationContext) => { + try { + if (ctx.data) { + if (ctx.data.schema) { + this.lastMessageWithSchema = ctx.data as DataFrameJSON; } - // Clear any error messages - if (this.currentStatus.error) { - this.currentStatus.timestamp = Date.now(); - delete this.currentStatus.error; - this.sendStatus(); - } - } catch (err) { - console.log('publish error', this.addr, err); - this.currentStatus.error = err; + this.stream.next({ + type: LiveChannelEventType.Message, + message: ctx.data, + }); + } + + // Clear any error messages + if (this.currentStatus.error) { this.currentStatus.timestamp = Date.now(); + delete this.currentStatus.error; this.sendStatus(); } - }, - error: (ctx: SubscribeErrorContext) => { + } catch (err) { + console.log('publish error', this.addr, err); + this.currentStatus.error = err; + this.currentStatus.timestamp = Date.now(); + this.sendStatus(); + } + }) + .on('error', (ctx: SubscriptionErrorContext) => { this.currentStatus.timestamp = Date.now(); - this.currentStatus.error = ctx.message; + this.currentStatus.error = ctx.error.message; this.sendStatus(); - }, - subscribe: (ctx: SubscribeSuccessContext) => { + }) + .on('subscribed', (ctx: SubscribedContext) => { this.currentStatus.timestamp = Date.now(); this.currentStatus.state = LiveChannelConnectionState.Connected; delete this.currentStatus.error; @@ -101,23 +99,24 @@ export class CentrifugeLiveChannel { if (ctx.data?.schema) { this.lastMessageWithSchema = ctx.data as DataFrameJSON; } - this.sendStatus(ctx.data); - }, - unsubscribe: (ctx: UnsubscribeContext) => { + }) + .on('unsubscribed', () => { this.currentStatus.timestamp = Date.now(); this.currentStatus.state = LiveChannelConnectionState.Disconnected; this.sendStatus(); - }, - }; - - events.join = (ctx: JoinLeaveContext) => { - this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user }); - }; - events.leave = (ctx: JoinLeaveContext) => { - this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user }); - }; - return events; + }) + .on('subscribing', () => { + this.currentStatus.timestamp = Date.now(); + this.currentStatus.state = LiveChannelConnectionState.Connecting; + this.sendStatus(); + }) + .on('join', (ctx: JoinContext) => { + this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user }); + }) + .on('leave', (ctx: LeaveContext) => { + this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user }); + }); } private sendStatus(message?: any) { @@ -171,7 +170,7 @@ export class CentrifugeLiveChannel { return this.subscription!.presence().then((v) => { return { - users: Object.keys(v.presence), + users: Object.keys(v.clients), }; }); } diff --git a/public/app/features/live/centrifuge/service.ts b/public/app/features/live/centrifuge/service.ts index 1bf1e8056d0..c6414d4425d 100644 --- a/public/app/features/live/centrifuge/service.ts +++ b/public/app/features/live/centrifuge/service.ts @@ -1,4 +1,4 @@ -import Centrifuge from 'centrifuge/dist/centrifuge'; +import { Centrifuge, State } from 'centrifuge'; import { BehaviorSubject, Observable, share, startWith } from 'rxjs'; import { @@ -78,30 +78,27 @@ export class CentrifugeService implements CentrifugeSrv { this.centrifuge = new Centrifuge(liveUrl, { timeout: 30000, }); - this.centrifuge.setConnectData({ - sessionId: deps.sessionId, - orgId: deps.orgId, - }); - // orgRole is set when logged in *or* anonomus users can use grafana + // orgRole is set when logged in *or* anonymous users can use grafana if (deps.liveEnabled && deps.orgRole !== '') { this.centrifuge.connect(); // do connection } - this.connectionState = new BehaviorSubject(this.centrifuge.isConnected()); + this.connectionState = new BehaviorSubject(this.centrifuge.state === State.Connected); this.connectionBlocker = new Promise((resolve) => { - if (this.centrifuge.isConnected()) { + if (this.centrifuge.state === State.Connected) { return resolve(); } const connectListener = () => { resolve(); - this.centrifuge.removeListener('connect', connectListener); + this.centrifuge.removeListener('connected', connectListener); }; - this.centrifuge.addListener('connect', connectListener); + this.centrifuge.addListener('connected', connectListener); }); // Register global listeners - this.centrifuge.on('connect', this.onConnect); - this.centrifuge.on('disconnect', this.onDisconnect); - this.centrifuge.on('publish', this.onServerSideMessage); + this.centrifuge.on('connected', this.onConnect); + this.centrifuge.on('connecting', this.onDisconnect); + this.centrifuge.on('disconnected', this.onDisconnect); + this.centrifuge.on('publication', this.onServerSideMessage); } //---------------------------------------------------------- @@ -154,11 +151,15 @@ export class CentrifugeService implements CentrifugeSrv { } private async initChannel(channel: CentrifugeLiveChannel): Promise { - const events = channel.initalize(); - if (!this.centrifuge.isConnected()) { + if (this.centrifuge.state !== State.Connected) { await this.connectionBlocker; } - channel.subscription = this.centrifuge.subscribe(channel.id, events, { data: channel.addr.data }); + const subscription = this.centrifuge.newSubscription(channel.id, { + data: channel.addr.data, + }); + channel.subscription = subscription; + channel.initalize(); + subscription.subscribe(); return; } @@ -220,10 +221,10 @@ export class CentrifugeService implements CentrifugeSrv { * Since the initial request and subscription are on the same socket, this will support HA setups */ getQueryData: CentrifugeSrv['getQueryData'] = async (options) => { - if (!this.centrifuge.isConnected()) { + if (this.centrifuge.state !== State.Connected) { await this.connectionBlocker; } - return this.centrifuge.namedRPC('grafana.query', options.body); + return this.centrifuge.rpc('grafana.query', options.body); }; /** diff --git a/public/app/plugins/panel/live/LivePanel.tsx b/public/app/plugins/panel/live/LivePanel.tsx index e9c2aebb0ab..a3091e45b2a 100644 --- a/public/app/plugins/panel/live/LivePanel.tsx +++ b/public/app/plugins/panel/live/LivePanel.tsx @@ -318,6 +318,9 @@ const getStyles = stylesFactory((theme: GrafanaTheme) => ({ [LiveChannelConnectionState.Connected]: css` border: 1px solid ${theme.palette.brandSuccess}; `, + [LiveChannelConnectionState.Connecting]: css` + border: 1px solid ${theme.palette.brandWarning}; + `, [LiveChannelConnectionState.Disconnected]: css` border: 1px solid ${theme.palette.brandWarning}; `, diff --git a/yarn.lock b/yarn.lock index 0e0fb09d8c4..fa2b564a187 100644 --- a/yarn.lock +++ b/yarn.lock @@ -16067,12 +16067,13 @@ __metadata: languageName: node linkType: hard -"centrifuge@npm:2.8.5": - version: 2.8.5 - resolution: "centrifuge@npm:2.8.5" +"centrifuge@npm:3.0.1": + version: 3.0.1 + resolution: "centrifuge@npm:3.0.1" dependencies: - protobufjs: ^6.11.2 - checksum: 27db6d3b92aa52fa7a5f7ac9a5643338bae0a231db61d791bbad0d4858a67f72991f12a63e6f042179bafd0854ebf832c42bde2ae5ddd2a26124d094c1d5cf65 + events: ^3.3.0 + protobufjs: ^6.11.3 + checksum: cf17b63fe9c684f121348a8597da0686a1f45fff2f85eaed6a3452504caeb1098c1524b10f197fe3811a11199214b88b75f5935ac86cb9753146fcc0d11f418c languageName: node linkType: hard @@ -20546,7 +20547,7 @@ __metadata: languageName: node linkType: hard -"events@npm:^3.0.0, events@npm:^3.2.0": +"events@npm:^3.0.0, events@npm:^3.2.0, events@npm:^3.3.0": version: 3.3.0 resolution: "events@npm:3.3.0" checksum: f6f487ad2198aa41d878fa31452f1a3c00958f46e9019286ff4787c84aac329332ab45c9cdc8c445928fc6d7ded294b9e005a7fce9426488518017831b272780 @@ -22389,7 +22390,7 @@ __metadata: baron: 3.0.3 brace: 0.11.1 calculate-size: 1.1.1 - centrifuge: 2.8.5 + centrifuge: 3.0.1 classnames: 2.3.1 comlink: 4.3.1 common-tags: 1.8.2 @@ -31394,7 +31395,7 @@ __metadata: languageName: node linkType: hard -"protobufjs@npm:^6.11.2": +"protobufjs@npm:^6.11.3": version: 6.11.3 resolution: "protobufjs@npm:6.11.3" dependencies: