Live: Migrate to centrifuge-js v3 (new API and client protocol) (#51977)

pull/52700/head
Alexander Emelin 3 years ago committed by GitHub
parent 42cf92a3f2
commit 0e6a8cc6ac
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 19
      go.mod
  2. 37
      go.sum
  3. 2
      package.json
  4. 10
      packages/grafana-data/src/types/live.ts
  5. 34
      pkg/services/live/live.go
  6. 2
      pkg/services/live/survey/survey.go
  7. 99
      public/app/features/live/centrifuge/channel.ts
  8. 37
      public/app/features/live/centrifuge/service.ts
  9. 3
      public/app/plugins/panel/live/LivePanel.tsx
  10. 17
      yarn.lock

@ -13,6 +13,9 @@ replace k8s.io/client-go => k8s.io/client-go v0.22.1
replace github.com/russellhaering/goxmldsig@v1.1.0 => github.com/russellhaering/goxmldsig v1.1.1
// Avoid using v2.0.0+incompatible Redigo used by dependencies as the latest maintained branch of Redigo is v1.
replace github.com/gomodule/redigo => github.com/gomodule/redigo v1.8.9
require (
cloud.google.com/go/storage v1.21.0
cuelang.org/go v0.4.3
@ -25,7 +28,7 @@ require (
github.com/beevik/etree v1.1.0
github.com/benbjohnson/clock v1.1.0
github.com/bradfitz/gomemcache v0.0.0-20190913173617-a41fca850d0b
github.com/centrifugal/centrifuge v0.19.0
github.com/centrifugal/centrifuge v0.25.0
github.com/cortexproject/cortex v1.10.1-0.20211014125347-85c378182d0d
github.com/crewjam/saml v0.4.8
github.com/davecgh/go-spew v1.1.1
@ -42,14 +45,14 @@ require (
github.com/go-sql-driver/mysql v1.6.0
github.com/go-stack/stack v1.8.1
github.com/gobwas/glob v0.2.3
github.com/gofrs/uuid v4.2.0+incompatible
github.com/gofrs/uuid v4.2.0+incompatible // indirect
github.com/gogo/protobuf v1.3.2
github.com/golang/mock v1.6.0
github.com/golang/snappy v0.0.4
github.com/google/go-cmp v0.5.8
github.com/google/uuid v1.3.0
github.com/google/wire v0.5.0
github.com/gorilla/websocket v1.4.2
github.com/gorilla/websocket v1.5.0
github.com/gosimple/slug v1.12.0
github.com/grafana/cuetsy v0.0.4-0.20220714174355-ebd987fdab27
github.com/grafana/grafana-aws-sdk v0.10.8
@ -132,7 +135,7 @@ require (
github.com/Azure/go-autorest/autorest/validation v0.3.1 // indirect
github.com/Azure/go-autorest/logger v0.2.1 // indirect
github.com/Azure/go-autorest/tracing v0.6.0 // indirect
github.com/FZambia/eagle v0.0.1 // indirect
github.com/FZambia/eagle v0.0.2 // indirect
github.com/FZambia/sentinel v1.1.0 // indirect
github.com/PuerkitoBio/purell v1.1.1 // indirect
github.com/PuerkitoBio/urlesc v0.0.0-20170810143723-de5bf2ad4578 // indirect
@ -142,7 +145,7 @@ require (
github.com/asaskevich/govalidator v0.0.0-20210307081110-f21760c49a8d // indirect
github.com/beorn7/perks v1.0.1 // indirect
github.com/cenkalti/backoff/v4 v4.1.2 // indirect
github.com/centrifugal/protocol v0.7.6 // indirect
github.com/centrifugal/protocol v0.8.10 // indirect
github.com/cespare/xxhash/v2 v2.1.2 // indirect
github.com/cheekybits/genny v1.0.0 // indirect
github.com/cockroachdb/apd/v2 v2.0.2 // indirect
@ -185,7 +188,7 @@ require (
github.com/hashicorp/go-sockaddr v1.0.2 // indirect
github.com/hashicorp/golang-lru v0.5.4 // indirect
github.com/hashicorp/yamux v0.0.0-20210826001029-26ff87cf9493 // indirect
github.com/igm/sockjs-go/v3 v3.0.1 // indirect
github.com/igm/sockjs-go/v3 v3.0.2 // indirect
github.com/jessevdk/go-flags v1.5.0 // indirect
github.com/jonboulle/clockwork v0.3.0 // indirect
github.com/josharian/intern v1.0.0 // indirect
@ -213,7 +216,7 @@ require (
github.com/protocolbuffers/txtpbfmt v0.0.0-20220428173112-74888fd59c2b // indirect
github.com/rs/cors v1.8.2 // indirect
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 // indirect
github.com/segmentio/encoding v0.3.2
github.com/segmentio/encoding v0.3.5
github.com/sercand/kuberesolver v2.4.0+incompatible // indirect
github.com/sergi/go-diff v1.1.0 // indirect
github.com/shurcooL/httpfs v0.0.0-20190707220628-8d4bc4ba7749 // indirect
@ -275,6 +278,7 @@ require (
github.com/mattn/go-colorable v0.1.12 // indirect
github.com/mitchellh/mapstructure v1.4.3 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/segmentio/asm v1.1.4 // indirect
go.starlark.net v0.0.0-20201118183435-e55f603d8c79 // indirect
)
@ -323,7 +327,6 @@ require (
github.com/mschoch/smat v0.2.0 // indirect
github.com/opencontainers/image-spec v1.0.3-0.20211202183452-c5a74bcca799 // indirect
github.com/pierrec/lz4/v4 v4.1.8 // indirect
github.com/segmentio/asm v1.1.1 // indirect
github.com/valyala/fasttemplate v1.2.1 // indirect
github.com/wk8/go-ordered-map v1.0.0
github.com/xanzy/ssh-agent v0.3.0 // indirect

@ -212,8 +212,8 @@ github.com/DATA-DOG/go-sqlmock v1.4.1/go.mod h1:f/Ixk793poVmq4qj/V1dPUg2JEAKC73Q
github.com/DataDog/datadog-go v2.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/datadog-go v3.2.0+incompatible/go.mod h1:LButxg5PwREeZtORoXG3tL4fMGNddJ+vMq1mwgfaqoQ=
github.com/DataDog/zstd v1.4.1/go.mod h1:1jcaCB/ufaK+sKp1NBhlGmpz41jOoPQ35bpF36t7BBo=
github.com/FZambia/eagle v0.0.1 h1:FN1yTkPihMb5nE8SrlRjoCf7T9H9bTKJFQOm6ach2YU=
github.com/FZambia/eagle v0.0.1/go.mod h1:xq6u/JeNZ5/8mrAQ76MMhzNTodASh9FavQlCgg4j48w=
github.com/FZambia/eagle v0.0.2 h1:35qHDuXSQevZ4w9A51k4wU7OE/tPHTEWXoywA93hvkY=
github.com/FZambia/eagle v0.0.2/go.mod h1:xq6u/JeNZ5/8mrAQ76MMhzNTodASh9FavQlCgg4j48w=
github.com/FZambia/sentinel v1.1.0 h1:qrCBfxc8SvJihYNjBWgwUI93ZCvFe/PJIPTHKmlp8a8=
github.com/FZambia/sentinel v1.1.0/go.mod h1:ytL1Am/RLlAoAXG6Kj5LNuw/TRRQrv2rt2FT26vP5gI=
github.com/GoogleCloudPlatform/cloudsql-proxy v1.29.0/go.mod h1:spvB9eLJH9dutlbPSRmHvSXXHOwGRyeXh1jVdquA2G8=
@ -485,10 +485,10 @@ github.com/cenkalti/backoff/v4 v4.1.2 h1:6Yo7N8UP2K6LWZnW94DLVSSrbobcWdVzAYOisuD
github.com/cenkalti/backoff/v4 v4.1.2/go.mod h1:scbssz8iZGpm3xbr14ovlUdkxfGXNInqkPWOWmG2CLw=
github.com/census-instrumentation/opencensus-proto v0.2.1/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/census-instrumentation/opencensus-proto v0.3.0/go.mod h1:f6KPmirojxKA12rnyqOA5BBL4O983OfeGPqjHWSTneU=
github.com/centrifugal/centrifuge v0.19.0 h1:YHws0dRpgsBiI73tRl1wwaB13gzuaI1AM4IFcQQQqcw=
github.com/centrifugal/centrifuge v0.19.0/go.mod h1:O2elf8q3Qkie3z97wkqVqxB52pnOpPsfFUa7L88Lpy0=
github.com/centrifugal/protocol v0.7.6 h1:AfMwTZfwnFwZslIzQL4QtRnWSVO32RPSuk4iNS/V9tg=
github.com/centrifugal/protocol v0.7.6/go.mod h1:cJo0/BuXglhPfg0fgSgTXvBZ7y+9rdg4+nPbIDOVmlA=
github.com/centrifugal/centrifuge v0.25.0 h1:QivFZRPWcN8w3I/gZ8Zs9rMe/KGZoKIS9Kgo1/bY4JE=
github.com/centrifugal/centrifuge v0.25.0/go.mod h1:bFcSFalnROq/wcFeRiTG+wIbHsxEMW66QUnq95RY1d0=
github.com/centrifugal/protocol v0.8.10 h1:eezzBIU/4pWyl7a+NUnANYojJBASqbkPZcQh9b8YQRI=
github.com/centrifugal/protocol v0.8.10/go.mod h1:dlHBjKakr0r+f1pkfwSMfZ+cnpvidN7pQe1ZrsKfhtE=
github.com/certifi/gocertifi v0.0.0-20191021191039-0944d244cd40/go.mod h1:sGbDF6GwGcLpkNXPUTkMRoywsNa/ol15pxFe6ERfguA=
github.com/cespare/xxhash v0.0.0-20181017004759-096ff4a8a059/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
github.com/cespare/xxhash v1.1.0/go.mod h1:XrSqR1VqqWfGrhpAt58auRo0WTKS1nRRg3ghfAqPWnc=
@ -1227,10 +1227,8 @@ github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219 h1:utua3L2IbQJmauC5IXdEA547bcoU5dozgQAfc8Onsg4=
github.com/golangci/lint-1 v0.0.0-20181222135242-d2cdd8c08219/go.mod h1:/X8TswGSh1pIozq4ZwCfxS0WA5JGXguxk94ar/4c87Y=
github.com/gomodule/redigo v1.8.4/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/gomodule/redigo v1.8.5/go.mod h1:P9dn9mFrCBvWhGE1wpxx6fgq7BAeLBk+UUUzlpkBYO0=
github.com/gomodule/redigo v2.0.0+incompatible h1:K/R+8tc58AaqLkqG2Ol3Qk+DR/TlNuhuh457pBFPtt0=
github.com/gomodule/redigo v2.0.0+incompatible/go.mod h1:B4C85qUVwatsJoIUNIfCRsp7qO0iAmpGFZ4EELWSbC4=
github.com/gomodule/redigo v1.8.9 h1:Sl3u+2BI/kk+VEatbj0scLdrFhjPmbxOc1myhDP41ws=
github.com/gomodule/redigo v1.8.9/go.mod h1:7ArFNvsTjH8GMMzB4uy1snslv2BwmginuMs06a1uzZE=
github.com/google/btree v0.0.0-20180124185431-e89373fe6b4a/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v0.0.0-20180813153112-4030bb1f1f0c/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
github.com/google/btree v1.0.0/go.mod h1:lNA+9X1NB3Zf8V7Ke586lFgjr2dZNuvo3lPJSGZ5JPQ=
@ -1363,8 +1361,9 @@ github.com/gorilla/sessions v1.2.1/go.mod h1:dk2InVEVJ0sfLlnXv9EAgkf6ecYs/i80K/z
github.com/gorilla/websocket v0.0.0-20170926233335-4201258b820c/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.0/go.mod h1:E7qHFY5m1UJ88s3WnNqhKjPHQ0heANvMoAMk2YaljkQ=
github.com/gorilla/websocket v1.4.1/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.4.2 h1:+/TMaTYc4QFitKJxsQ7Yye35DkWvkdLcvGKqM+x0Ufc=
github.com/gorilla/websocket v1.4.2/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gorilla/websocket v1.5.0 h1:PPwGk2jz7EePpoHN/+ClbZu8SPxiqlu12wZP/3sWmnc=
github.com/gorilla/websocket v1.5.0/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=
github.com/gosimple/slug v1.12.0 h1:xzuhj7G7cGtd34NXnW/yF0l+AGNfWqwgh/IXgFy7dnc=
github.com/gosimple/slug v1.12.0/go.mod h1:UiRaFH+GEilHstLUmcBgWcI42viBN7mAb818JrYOeFQ=
github.com/gosimple/unidecode v1.0.1 h1:hZzFTMMqSswvf0LBJZCZgThIZrpDHFXux9KeGmn6T/o=
@ -1529,8 +1528,8 @@ github.com/hudl/fargo v1.3.0/go.mod h1:y3CKSmjA+wD2gak7sUSXTAoopbhU08POFhmITJgmK
github.com/iancoleman/strcase v0.0.0-20180726023541-3605ed457bf7/go.mod h1:SK73tn/9oHe+/Y0h39VT4UCxmurVJkR5NA7kMEAOgSE=
github.com/ianlancetaylor/demangle v0.0.0-20181102032728-5e5cf60278f6/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/igm/sockjs-go/v3 v3.0.1 h1:rmgEkeKqBHCFf7uIAipYrYSX8x9LBB2nOxAac2sooak=
github.com/igm/sockjs-go/v3 v3.0.1/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
github.com/igm/sockjs-go/v3 v3.0.2 h1:2m0k53w0DBiGozeQUIEPR6snZFmpFpYvVsGnfLPNXbE=
github.com/igm/sockjs-go/v3 v3.0.2/go.mod h1:UqchsOjeagIBFHvd+RZpLaVRbCwGilEC08EDHsD1jYE=
github.com/imdario/mergo v0.3.4/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.5/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
github.com/imdario/mergo v0.3.6/go.mod h1:2EnlNZ0deacrJVfApfmtdGgDfMuh/nq6Ok1EcJh5FfA=
@ -1701,7 +1700,6 @@ github.com/klauspost/compress v1.15.2/go.mod h1:PhcZ0MbTNciWF3rruxRgKxI5NkcHHrHU
github.com/klauspost/cpuid v0.0.0-20170728055534-ae7887de9fa5/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.2.3/go.mod h1:Pj4uuM528wm8OyEC2QMXAi2YiTZ96dNQPGgoMS4s3ek=
github.com/klauspost/cpuid v1.3.1/go.mod h1:bYW4mA6ZgKPob1/Dlai2LviZJO7KGI3uoWLd42rAQw4=
github.com/klauspost/cpuid/v2 v2.0.6/go.mod h1:FInQzS24/EEf25PyTYn52gqo7WaD8xa0213Md/qVLRg=
github.com/klauspost/crc32 v0.0.0-20161016154125-cb6bfca970f6/go.mod h1:+ZoRqAPRLkC4NPOvfYeR5KNOrY6TD+/sAC3HXPZgDYg=
github.com/klauspost/pgzip v1.0.2-0.20170402124221-0bf5dcad4ada/go.mod h1:Ch1tH69qFZu15pkjo5kYi6mth2Zzwzt50oCQKQE9RUs=
github.com/knadh/koanf v1.2.0/go.mod h1:xpPTwMhsA/aaQLAilyCCqfpEiY1gpa160AiCuWHJUjY=
@ -2287,11 +2285,11 @@ github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529 h1:nn5Wsu0esKSJiIVhscUt
github.com/sean-/seed v0.0.0-20170313163322-e2103e2c3529/go.mod h1:DxrIzT+xaE7yg65j358z/aeFdxmN0P9QXhEzd20vsDc=
github.com/seccomp/libseccomp-golang v0.9.1/go.mod h1:GbW5+tmTXfcxTToHLXlScSlAvWlF4P2Ca7zGrPiEpWo=
github.com/securego/gosec v0.0.0-20200203094520-d13bb6d2420c/go.mod h1:gp0gaHj0WlmPh9BdsTmo1aq6C27yIPWdxCKGFGdVKBE=
github.com/segmentio/asm v1.1.0/go.mod h1:4EUJGaKsB8ImLUwOGORVsNd9vTRDeh44JGsY4aKp5I4=
github.com/segmentio/asm v1.1.1 h1:WzUpP9BRnRgoP+v8qywthpSe9U5KLq1uDGdJBEcSeQo=
github.com/segmentio/asm v1.1.1/go.mod h1:VCkA6aQH8usgIAUp4QMLQsAeaMl6iW7+VMLrW9Vkv0Y=
github.com/segmentio/encoding v0.3.2 h1:gkXXteOfNaPPlrXTEf/e5tWvaQGVJWnvT3LqMzUeH7U=
github.com/segmentio/encoding v0.3.2/go.mod h1:waft2p6XI4z2pk07M0YzZV4wEiqaRvsBSyWNHxVx4gU=
github.com/segmentio/asm v1.1.3/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg=
github.com/segmentio/asm v1.1.4 h1:Q/FKBtrgnmDc0YMrurLROqG9mXE6Ndn276EtDnoWtMM=
github.com/segmentio/asm v1.1.4/go.mod h1:Ld3L4ZXGNcSLRg4JBsZ3//1+f/TjYl0Mzen/DQy1EJg=
github.com/segmentio/encoding v0.3.5 h1:UZEiaZ55nlXGDL92scoVuw00RmiRCazIEmvPSbSvt8Y=
github.com/segmentio/encoding v0.3.5/go.mod h1:n0JeuIqEQrQoPDGsjo8UNd1iA0U8d8+oHAA4E3G3OxM=
github.com/segmentio/fasthash v0.0.0-20180216231524-a72b379d632e/go.mod h1:tm/wZFQ8e24NYaBGIlnO2WGCAi67re4HHuOm0sftE/M=
github.com/segmentio/kafka-go v0.1.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
github.com/segmentio/kafka-go v0.2.0/go.mod h1:X6itGqS9L4jDletMsxZ7Dz+JFWxM6JHfPOCvTvk+EJo=
@ -3097,6 +3095,7 @@ golang.org/x/sys v0.0.0-20220227234510-4e6760a101f9/go.mod h1:oPkhp1MJrh7nUepCBc
golang.org/x/sys v0.0.0-20220328115105-d36c6a25d886/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220330033206-e17cdc41300f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220412211240-33da011f77ad/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220422013727-9388b58f7150/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f h1:v4INt8xihDGvnrfjMDVXGxw9wrfxYyCjk0KbXjhR55s=
golang.org/x/sys v0.0.0-20220722155257-8c9f86f7a55f/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=

@ -305,7 +305,7 @@
"baron": "3.0.3",
"brace": "0.11.1",
"calculate-size": "1.1.1",
"centrifuge": "2.8.5",
"centrifuge": "3.0.1",
"classnames": "2.3.1",
"comlink": "4.3.1",
"common-tags": "1.8.2",

@ -20,9 +20,9 @@ export enum LiveChannelScope {
* @alpha
*/
export enum LiveChannelType {
DataStream = 'stream', // each message contains a batch of rows that will be appened to previous values
DataStream = 'stream', // each message contains a batch of rows that will be appended to previous values
DataFrame = 'frame', // each message is an entire data frame and should *replace* previous content
JSON = 'json', // arbitray json message
JSON = 'json', // arbitrary json message
}
export enum LiveChannelConnectionState {
@ -30,11 +30,13 @@ export enum LiveChannelConnectionState {
Pending = 'pending',
/** Connected to the channel */
Connected = 'connected',
/** Connecting to a channel */
Connecting = 'connecting',
/** Disconnected from the channel. The channel will reconnect when possible */
Disconnected = 'disconnected',
/** Was at some point connected, and will not try to reconnect */
Shutdown = 'shutdown',
/** Channel configuraiton was invalid and will not connect */
/** Channel configuration was invalid and will not connect */
Invalid = 'invalid',
}
@ -139,7 +141,7 @@ export interface LiveChannelAddress {
path: string;
/**
* Additional metadata passed to a channel. The backend will propigate this JSON object to
* Additional metadata passed to a channel. The backend will propagate this JSON object to
* each OnSubscribe and RunStream calls. This value should be constant across multiple requests
* to the same channel path
*/

@ -99,19 +99,14 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
logger.Debug("GrafanaLive initialization", "ha", g.IsHA())
// We use default config here as starting point. Default config contains
// reasonable values for available options.
scfg := centrifuge.DefaultConfig
// scfg.LogLevel = centrifuge.LogLevelDebug
scfg.LogHandler = handleLog
scfg.LogLevel = centrifuge.LogLevelError
scfg.MetricsNamespace = "grafana_live"
// Node is the core object in Centrifuge library responsible for many useful
// things. For example Node allows to publish messages to channels from server
// side with its Publish method.
node, err := centrifuge.New(scfg)
node, err := centrifuge.New(centrifuge.Config{
LogHandler: handleLog,
LogLevel: centrifuge.LogLevelError,
MetricsNamespace: "grafana_live",
})
if err != nil {
return nil, err
}
@ -312,12 +307,9 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
})
client.OnDisconnect(func(e centrifuge.DisconnectEvent) {
reason := "normal"
if e.Disconnect != nil {
reason = e.Disconnect.Reason
if e.Disconnect.Code == 3001 { // Shutdown
return
}
reason := e.Disconnect.Reason
if e.Disconnect.Code == 3001 { // Shutdown
return
}
logger.Debug("Client disconnected", "user", client.UserID(), "client", client.ID(), "reason", reason, "elapsed", time.Since(connectedAt))
})
@ -339,6 +331,7 @@ func ProvideService(plugCtxProvider *plugincontext.Provider, cfg *setting.Cfg, r
// Use a pure websocket transport.
wsHandler := centrifuge.NewWebsocketHandler(node, centrifuge.WebsocketConfig{
ProtocolVersion: centrifuge.ProtocolVersion2,
ReadBufferSize: 1024,
WriteBufferSize: 1024,
CheckOrigin: checkOrigin,
@ -711,10 +704,11 @@ func (g *GrafanaLive) handleOnSubscribe(ctx context.Context, client *centrifuge.
logger.Debug("Client subscribed", "user", client.UserID(), "client", client.ID(), "channel", e.Channel)
return centrifuge.SubscribeReply{
Options: centrifuge.SubscribeOptions{
Presence: reply.Presence,
JoinLeave: reply.JoinLeave,
Recover: reply.Recover,
Data: reply.Data,
EmitPresence: reply.Presence,
EmitJoinLeave: reply.JoinLeave,
PushJoinLeave: reply.JoinLeave,
EnableRecovery: reply.Recover,
Data: reply.Data,
},
}, nil
}

@ -89,7 +89,7 @@ func (c *Caller) CallManagedStreams(orgID int64) ([]*managedstream.ManagedChanne
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
defer cancel()
resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData)
resp, err := c.node.Survey(ctx, managedStreamsCall, jsonData, "")
if err != nil {
return nil, err
}

@ -1,11 +1,11 @@
import Centrifuge, {
JoinLeaveContext,
import {
Subscription,
JoinContext,
LeaveContext,
PublicationContext,
SubscribeErrorContext,
SubscribeSuccessContext,
SubscriptionEvents,
UnsubscribeContext,
} from 'centrifuge/dist/centrifuge';
SubscriptionErrorContext,
SubscribedContext,
} from 'centrifuge';
import { Subject, of, Observable } from 'rxjs';
import {
@ -34,7 +34,7 @@ export class CentrifugeLiveChannel<T = any> {
// Hold on to the last header with schema
lastMessageWithSchema?: DataFrameJSON;
subscription?: Centrifuge.Subscription;
subscription?: Subscription;
shutdownCallback?: () => void;
initalized?: boolean;
@ -54,46 +54,44 @@ export class CentrifugeLiveChannel<T = any> {
}
// This should only be called when centrifuge is connected
initalize(): SubscriptionEvents {
initalize(): void {
if (this.initalized) {
throw new Error('Channel already initalized: ' + this.id);
}
this.initalized = true;
const events: SubscriptionEvents = {
// Called when a message is received from the socket
publish: (ctx: PublicationContext) => {
try {
if (ctx.data) {
if (ctx.data.schema) {
this.lastMessageWithSchema = ctx.data as DataFrameJSON;
}
this.stream.next({
type: LiveChannelEventType.Message,
message: ctx.data,
});
this.subscription!.on('publication', (ctx: PublicationContext) => {
try {
if (ctx.data) {
if (ctx.data.schema) {
this.lastMessageWithSchema = ctx.data as DataFrameJSON;
}
// Clear any error messages
if (this.currentStatus.error) {
this.currentStatus.timestamp = Date.now();
delete this.currentStatus.error;
this.sendStatus();
}
} catch (err) {
console.log('publish error', this.addr, err);
this.currentStatus.error = err;
this.stream.next({
type: LiveChannelEventType.Message,
message: ctx.data,
});
}
// Clear any error messages
if (this.currentStatus.error) {
this.currentStatus.timestamp = Date.now();
delete this.currentStatus.error;
this.sendStatus();
}
},
error: (ctx: SubscribeErrorContext) => {
} catch (err) {
console.log('publish error', this.addr, err);
this.currentStatus.error = err;
this.currentStatus.timestamp = Date.now();
this.sendStatus();
}
})
.on('error', (ctx: SubscriptionErrorContext) => {
this.currentStatus.timestamp = Date.now();
this.currentStatus.error = ctx.message;
this.currentStatus.error = ctx.error.message;
this.sendStatus();
},
subscribe: (ctx: SubscribeSuccessContext) => {
})
.on('subscribed', (ctx: SubscribedContext) => {
this.currentStatus.timestamp = Date.now();
this.currentStatus.state = LiveChannelConnectionState.Connected;
delete this.currentStatus.error;
@ -101,23 +99,24 @@ export class CentrifugeLiveChannel<T = any> {
if (ctx.data?.schema) {
this.lastMessageWithSchema = ctx.data as DataFrameJSON;
}
this.sendStatus(ctx.data);
},
unsubscribe: (ctx: UnsubscribeContext) => {
})
.on('unsubscribed', () => {
this.currentStatus.timestamp = Date.now();
this.currentStatus.state = LiveChannelConnectionState.Disconnected;
this.sendStatus();
},
};
events.join = (ctx: JoinLeaveContext) => {
this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
};
events.leave = (ctx: JoinLeaveContext) => {
this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
};
return events;
})
.on('subscribing', () => {
this.currentStatus.timestamp = Date.now();
this.currentStatus.state = LiveChannelConnectionState.Connecting;
this.sendStatus();
})
.on('join', (ctx: JoinContext) => {
this.stream.next({ type: LiveChannelEventType.Join, user: ctx.info.user });
})
.on('leave', (ctx: LeaveContext) => {
this.stream.next({ type: LiveChannelEventType.Leave, user: ctx.info.user });
});
}
private sendStatus(message?: any) {
@ -171,7 +170,7 @@ export class CentrifugeLiveChannel<T = any> {
return this.subscription!.presence().then((v) => {
return {
users: Object.keys(v.presence),
users: Object.keys(v.clients),
};
});
}

@ -1,4 +1,4 @@
import Centrifuge from 'centrifuge/dist/centrifuge';
import { Centrifuge, State } from 'centrifuge';
import { BehaviorSubject, Observable, share, startWith } from 'rxjs';
import {
@ -78,30 +78,27 @@ export class CentrifugeService implements CentrifugeSrv {
this.centrifuge = new Centrifuge(liveUrl, {
timeout: 30000,
});
this.centrifuge.setConnectData({
sessionId: deps.sessionId,
orgId: deps.orgId,
});
// orgRole is set when logged in *or* anonomus users can use grafana
// orgRole is set when logged in *or* anonymous users can use grafana
if (deps.liveEnabled && deps.orgRole !== '') {
this.centrifuge.connect(); // do connection
}
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.isConnected());
this.connectionState = new BehaviorSubject<boolean>(this.centrifuge.state === State.Connected);
this.connectionBlocker = new Promise<void>((resolve) => {
if (this.centrifuge.isConnected()) {
if (this.centrifuge.state === State.Connected) {
return resolve();
}
const connectListener = () => {
resolve();
this.centrifuge.removeListener('connect', connectListener);
this.centrifuge.removeListener('connected', connectListener);
};
this.centrifuge.addListener('connect', connectListener);
this.centrifuge.addListener('connected', connectListener);
});
// Register global listeners
this.centrifuge.on('connect', this.onConnect);
this.centrifuge.on('disconnect', this.onDisconnect);
this.centrifuge.on('publish', this.onServerSideMessage);
this.centrifuge.on('connected', this.onConnect);
this.centrifuge.on('connecting', this.onDisconnect);
this.centrifuge.on('disconnected', this.onDisconnect);
this.centrifuge.on('publication', this.onServerSideMessage);
}
//----------------------------------------------------------
@ -154,11 +151,15 @@ export class CentrifugeService implements CentrifugeSrv {
}
private async initChannel(channel: CentrifugeLiveChannel): Promise<void> {
const events = channel.initalize();
if (!this.centrifuge.isConnected()) {
if (this.centrifuge.state !== State.Connected) {
await this.connectionBlocker;
}
channel.subscription = this.centrifuge.subscribe(channel.id, events, { data: channel.addr.data });
const subscription = this.centrifuge.newSubscription(channel.id, {
data: channel.addr.data,
});
channel.subscription = subscription;
channel.initalize();
subscription.subscribe();
return;
}
@ -220,10 +221,10 @@ export class CentrifugeService implements CentrifugeSrv {
* Since the initial request and subscription are on the same socket, this will support HA setups
*/
getQueryData: CentrifugeSrv['getQueryData'] = async (options) => {
if (!this.centrifuge.isConnected()) {
if (this.centrifuge.state !== State.Connected) {
await this.connectionBlocker;
}
return this.centrifuge.namedRPC('grafana.query', options.body);
return this.centrifuge.rpc('grafana.query', options.body);
};
/**

@ -318,6 +318,9 @@ const getStyles = stylesFactory((theme: GrafanaTheme) => ({
[LiveChannelConnectionState.Connected]: css`
border: 1px solid ${theme.palette.brandSuccess};
`,
[LiveChannelConnectionState.Connecting]: css`
border: 1px solid ${theme.palette.brandWarning};
`,
[LiveChannelConnectionState.Disconnected]: css`
border: 1px solid ${theme.palette.brandWarning};
`,

@ -16067,12 +16067,13 @@ __metadata:
languageName: node
linkType: hard
"centrifuge@npm:2.8.5":
version: 2.8.5
resolution: "centrifuge@npm:2.8.5"
"centrifuge@npm:3.0.1":
version: 3.0.1
resolution: "centrifuge@npm:3.0.1"
dependencies:
protobufjs: ^6.11.2
checksum: 27db6d3b92aa52fa7a5f7ac9a5643338bae0a231db61d791bbad0d4858a67f72991f12a63e6f042179bafd0854ebf832c42bde2ae5ddd2a26124d094c1d5cf65
events: ^3.3.0
protobufjs: ^6.11.3
checksum: cf17b63fe9c684f121348a8597da0686a1f45fff2f85eaed6a3452504caeb1098c1524b10f197fe3811a11199214b88b75f5935ac86cb9753146fcc0d11f418c
languageName: node
linkType: hard
@ -20546,7 +20547,7 @@ __metadata:
languageName: node
linkType: hard
"events@npm:^3.0.0, events@npm:^3.2.0":
"events@npm:^3.0.0, events@npm:^3.2.0, events@npm:^3.3.0":
version: 3.3.0
resolution: "events@npm:3.3.0"
checksum: f6f487ad2198aa41d878fa31452f1a3c00958f46e9019286ff4787c84aac329332ab45c9cdc8c445928fc6d7ded294b9e005a7fce9426488518017831b272780
@ -22389,7 +22390,7 @@ __metadata:
baron: 3.0.3
brace: 0.11.1
calculate-size: 1.1.1
centrifuge: 2.8.5
centrifuge: 3.0.1
classnames: 2.3.1
comlink: 4.3.1
common-tags: 1.8.2
@ -31394,7 +31395,7 @@ __metadata:
languageName: node
linkType: hard
"protobufjs@npm:^6.11.2":
"protobufjs@npm:^6.11.3":
version: 6.11.3
resolution: "protobufjs@npm:6.11.3"
dependencies:

Loading…
Cancel
Save