Storage: Add new resource module and move some utilities (#89443)

pull/89397/head^2
Ryan McKinley 11 months ago committed by GitHub
parent 1cc58d19f5
commit 6834038e91
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 1
      .github/CODEOWNERS
  2. 1
      go.work
  3. 1
      go.work.sum
  4. 5
      pkg/services/store/entity/sqlstash/sql_storage_server.go
  5. 14
      pkg/storage/unified/resource/broadcaster.go
  6. 4
      pkg/storage/unified/resource/broadcaster_test.go
  7. 14
      pkg/storage/unified/resource/go.mod
  8. 8
      pkg/storage/unified/resource/go.sum

@ -107,6 +107,7 @@
/pkg/apimachinery/identity/ @grafana/identity-squad
/pkg/apimachinery/errutil/ @grafana/grafana-backend-group
/pkg/promlib @grafana/observability-metrics
/pkg/storage/ @grafana/grafana-search-and-storage
/pkg/services/annotations/ @grafana/grafana-search-and-storage
/pkg/services/apikey/ @grafana/identity-squad
/pkg/services/cleanup/ @grafana/grafana-backend-group

@ -7,6 +7,7 @@ use (
./pkg/build
./pkg/build/wire
./pkg/promlib
./pkg/storage/unified/resource
./pkg/util/xorm
)

@ -942,6 +942,7 @@ github.com/grafana/grafana-plugin-sdk-go v0.228.0/go.mod h1:u4K9vVN6eU86loO68977
github.com/grafana/grafana-plugin-sdk-go v0.229.0/go.mod h1:6V6ikT4ryva8MrAp7Bdz5fTJx3/ztzKvpMJFfpzr4CI=
github.com/grafana/grafana-plugin-sdk-go v0.231.1-0.20240523124942-62dae9836284/go.mod h1:bNgmNmub1I7Mc8dzIncgNqHC5jTgSZPPHlZ3aG8HKJQ=
github.com/grafana/grafana-plugin-sdk-go v0.234.0/go.mod h1:FlXjmBESxaD6Hoi8ojWLkH007nyjtJM3XC8SpwzF/YE=
github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240613114114-5e2f08de316d/go.mod h1:adT8O7k6ZSzUKjAC4WS6VfWlCE4G1VavPwSXVhvScCs=
github.com/grafana/grafana/pkg/promlib v0.0.3/go.mod h1:3El4NlsfALz8QQCbEGHGFvJUG+538QLMuALRhZ3pcoo=
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM=
github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs=

@ -25,6 +25,7 @@ import (
"github.com/grafana/grafana/pkg/services/store/entity"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/resource"
)
const entityTable = "entity"
@ -75,7 +76,7 @@ type sqlEntityServer struct {
db db.EntityDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
broadcaster Broadcaster[*entity.EntityWatchResponse]
broadcaster resource.Broadcaster[*entity.EntityWatchResponse]
ctx context.Context // TODO: remove
cancel context.CancelFunc
tracer trace.Tracer
@ -141,7 +142,7 @@ func (s *sqlEntityServer) init() error {
s.dialect = migrator.NewDialect(engine.DriverName())
// set up the broadcaster
s.broadcaster, err = NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error {
s.broadcaster, err = resource.NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error {
// start the poller
go s.poller(stream)

@ -1,4 +1,4 @@
package sqlstash
package resource
import (
"context"
@ -109,7 +109,7 @@ type broadcaster[T any] struct {
// subscription management
cache Cache[T]
cache channelCache[T]
subscribe chan chan T
unsubscribe chan (<-chan T)
subs map[<-chan T]chan T
@ -166,7 +166,7 @@ func (b *broadcaster[T]) init(ctx context.Context, connect ConnectFunc[T]) error
// initialize our internal state
b.shouldTerminate = ctx.Done()
b.cache = NewCache[T](ctx, 100)
b.cache = newChannelCache[T](ctx, 100)
b.subscribe = make(chan chan T, 100)
b.unsubscribe = make(chan (<-chan T), 100)
b.subs = make(map[<-chan T]chan T)
@ -239,9 +239,9 @@ func (b *broadcaster[T]) stream(input <-chan T) {
}
}
const DefaultCacheSize = 100
const defaultCacheSize = 100
type Cache[T any] interface {
type channelCache[T any] interface {
Len() int
Add(item T)
Get(i int) T
@ -260,12 +260,12 @@ type cache[T any] struct {
ctx context.Context
}
func NewCache[T any](ctx context.Context, size int) Cache[T] {
func newChannelCache[T any](ctx context.Context, size int) channelCache[T] {
c := &cache[T]{}
c.ctx = ctx
if size <= 0 {
size = DefaultCacheSize
size = defaultCacheSize
}
c.size = size
c.cache = make([]T, c.size)

@ -1,4 +1,4 @@
package sqlstash
package resource
import (
"context"
@ -8,7 +8,7 @@ import (
)
func TestCache(t *testing.T) {
c := NewCache[int](context.Background(), 10)
c := newChannelCache[int](context.Background(), 10)
e := []int{}
err := c.Range(func(i int) error {

@ -0,0 +1,14 @@
module github.com/grafana/grafana/pkg/storage/unified/resource
go 1.21.10
require github.com/stretchr/testify v1.9.0
require (
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect
github.com/kr/pretty v0.3.1 // indirect
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect
github.com/rogpeppe/go-internal v1.12.0 // indirect
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect
gopkg.in/yaml.v3 v3.0.1 // indirect
)

@ -0,0 +1,8 @@
github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM=
github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE=
github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY=
github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
Loading…
Cancel
Save