From 6834038e9164072229fb35bdd04d3f39a08686e9 Mon Sep 17 00:00:00 2001 From: Ryan McKinley Date: Thu, 20 Jun 2024 16:11:21 +0300 Subject: [PATCH] Storage: Add new resource module and move some utilities (#89443) --- .github/CODEOWNERS | 1 + go.work | 1 + go.work.sum | 1 + .../store/entity/sqlstash/sql_storage_server.go | 5 +++-- .../unified/resource}/broadcaster.go | 14 +++++++------- .../unified/resource}/broadcaster_test.go | 4 ++-- pkg/storage/unified/resource/go.mod | 14 ++++++++++++++ pkg/storage/unified/resource/go.sum | 8 ++++++++ 8 files changed, 37 insertions(+), 11 deletions(-) rename pkg/{services/store/entity/sqlstash => storage/unified/resource}/broadcaster.go (97%) rename pkg/{services/store/entity/sqlstash => storage/unified/resource}/broadcaster_test.go (96%) create mode 100644 pkg/storage/unified/resource/go.mod create mode 100644 pkg/storage/unified/resource/go.sum diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS index 61359aed23a..a2fc88371ba 100644 --- a/.github/CODEOWNERS +++ b/.github/CODEOWNERS @@ -107,6 +107,7 @@ /pkg/apimachinery/identity/ @grafana/identity-squad /pkg/apimachinery/errutil/ @grafana/grafana-backend-group /pkg/promlib @grafana/observability-metrics +/pkg/storage/ @grafana/grafana-search-and-storage /pkg/services/annotations/ @grafana/grafana-search-and-storage /pkg/services/apikey/ @grafana/identity-squad /pkg/services/cleanup/ @grafana/grafana-backend-group diff --git a/go.work b/go.work index 2f858b9675e..67f2d6728c2 100644 --- a/go.work +++ b/go.work @@ -7,6 +7,7 @@ use ( ./pkg/build ./pkg/build/wire ./pkg/promlib + ./pkg/storage/unified/resource ./pkg/util/xorm ) diff --git a/go.work.sum b/go.work.sum index d8edfcf36bd..d971095824b 100644 --- a/go.work.sum +++ b/go.work.sum @@ -942,6 +942,7 @@ github.com/grafana/grafana-plugin-sdk-go v0.228.0/go.mod h1:u4K9vVN6eU86loO68977 github.com/grafana/grafana-plugin-sdk-go v0.229.0/go.mod h1:6V6ikT4ryva8MrAp7Bdz5fTJx3/ztzKvpMJFfpzr4CI= github.com/grafana/grafana-plugin-sdk-go v0.231.1-0.20240523124942-62dae9836284/go.mod h1:bNgmNmub1I7Mc8dzIncgNqHC5jTgSZPPHlZ3aG8HKJQ= github.com/grafana/grafana-plugin-sdk-go v0.234.0/go.mod h1:FlXjmBESxaD6Hoi8ojWLkH007nyjtJM3XC8SpwzF/YE= +github.com/grafana/grafana/pkg/apimachinery v0.0.0-20240613114114-5e2f08de316d/go.mod h1:adT8O7k6ZSzUKjAC4WS6VfWlCE4G1VavPwSXVhvScCs= github.com/grafana/grafana/pkg/promlib v0.0.3/go.mod h1:3El4NlsfALz8QQCbEGHGFvJUG+538QLMuALRhZ3pcoo= github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7 h1:pdN6V1QBWetyv/0+wjACpqVH+eVULgEjkurDLq3goeM= github.com/grpc-ecosystem/go-grpc-middleware v1.0.0/go.mod h1:FiyG127CGDf3tlThmgyCl78X/SZQqEOJBCDaAfeWzPs= diff --git a/pkg/services/store/entity/sqlstash/sql_storage_server.go b/pkg/services/store/entity/sqlstash/sql_storage_server.go index 0a7d7cce617..046ed69e4c7 100644 --- a/pkg/services/store/entity/sqlstash/sql_storage_server.go +++ b/pkg/services/store/entity/sqlstash/sql_storage_server.go @@ -25,6 +25,7 @@ import ( "github.com/grafana/grafana/pkg/services/store/entity" "github.com/grafana/grafana/pkg/services/store/entity/db" "github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate" + "github.com/grafana/grafana/pkg/storage/unified/resource" ) const entityTable = "entity" @@ -75,7 +76,7 @@ type sqlEntityServer struct { db db.EntityDBInterface // needed to keep xorm engine in scope sess *session.SessionDB dialect migrator.Dialect - broadcaster Broadcaster[*entity.EntityWatchResponse] + broadcaster resource.Broadcaster[*entity.EntityWatchResponse] ctx context.Context // TODO: remove cancel context.CancelFunc tracer trace.Tracer @@ -141,7 +142,7 @@ func (s *sqlEntityServer) init() error { s.dialect = migrator.NewDialect(engine.DriverName()) // set up the broadcaster - s.broadcaster, err = NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error { + s.broadcaster, err = resource.NewBroadcaster(s.ctx, func(stream chan<- *entity.EntityWatchResponse) error { // start the poller go s.poller(stream) diff --git a/pkg/services/store/entity/sqlstash/broadcaster.go b/pkg/storage/unified/resource/broadcaster.go similarity index 97% rename from pkg/services/store/entity/sqlstash/broadcaster.go rename to pkg/storage/unified/resource/broadcaster.go index 56a010bf2a0..547bfe576e8 100644 --- a/pkg/services/store/entity/sqlstash/broadcaster.go +++ b/pkg/storage/unified/resource/broadcaster.go @@ -1,4 +1,4 @@ -package sqlstash +package resource import ( "context" @@ -109,7 +109,7 @@ type broadcaster[T any] struct { // subscription management - cache Cache[T] + cache channelCache[T] subscribe chan chan T unsubscribe chan (<-chan T) subs map[<-chan T]chan T @@ -166,7 +166,7 @@ func (b *broadcaster[T]) init(ctx context.Context, connect ConnectFunc[T]) error // initialize our internal state b.shouldTerminate = ctx.Done() - b.cache = NewCache[T](ctx, 100) + b.cache = newChannelCache[T](ctx, 100) b.subscribe = make(chan chan T, 100) b.unsubscribe = make(chan (<-chan T), 100) b.subs = make(map[<-chan T]chan T) @@ -239,9 +239,9 @@ func (b *broadcaster[T]) stream(input <-chan T) { } } -const DefaultCacheSize = 100 +const defaultCacheSize = 100 -type Cache[T any] interface { +type channelCache[T any] interface { Len() int Add(item T) Get(i int) T @@ -260,12 +260,12 @@ type cache[T any] struct { ctx context.Context } -func NewCache[T any](ctx context.Context, size int) Cache[T] { +func newChannelCache[T any](ctx context.Context, size int) channelCache[T] { c := &cache[T]{} c.ctx = ctx if size <= 0 { - size = DefaultCacheSize + size = defaultCacheSize } c.size = size c.cache = make([]T, c.size) diff --git a/pkg/services/store/entity/sqlstash/broadcaster_test.go b/pkg/storage/unified/resource/broadcaster_test.go similarity index 96% rename from pkg/services/store/entity/sqlstash/broadcaster_test.go rename to pkg/storage/unified/resource/broadcaster_test.go index fc6a6369a2d..8eedfa01ce5 100644 --- a/pkg/services/store/entity/sqlstash/broadcaster_test.go +++ b/pkg/storage/unified/resource/broadcaster_test.go @@ -1,4 +1,4 @@ -package sqlstash +package resource import ( "context" @@ -8,7 +8,7 @@ import ( ) func TestCache(t *testing.T) { - c := NewCache[int](context.Background(), 10) + c := newChannelCache[int](context.Background(), 10) e := []int{} err := c.Range(func(i int) error { diff --git a/pkg/storage/unified/resource/go.mod b/pkg/storage/unified/resource/go.mod new file mode 100644 index 00000000000..2ebcdfc5fed --- /dev/null +++ b/pkg/storage/unified/resource/go.mod @@ -0,0 +1,14 @@ +module github.com/grafana/grafana/pkg/storage/unified/resource + +go 1.21.10 + +require github.com/stretchr/testify v1.9.0 + +require ( + github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect + github.com/kr/pretty v0.3.1 // indirect + github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 // indirect + github.com/rogpeppe/go-internal v1.12.0 // indirect + gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c // indirect + gopkg.in/yaml.v3 v3.0.1 // indirect +) diff --git a/pkg/storage/unified/resource/go.sum b/pkg/storage/unified/resource/go.sum new file mode 100644 index 00000000000..4d56934746b --- /dev/null +++ b/pkg/storage/unified/resource/go.sum @@ -0,0 +1,8 @@ +github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc h1:U9qPSI2PIWSS1VwoXQT9A3Wy9MM3WgvqSxFWenqJduM= +github.com/kr/pretty v0.3.1 h1:flRD4NNwYAUpkphVc1HcthR4KEIFJ65n8Mw5qdRn3LE= +github.com/kr/text v0.2.0 h1:5Nx0Ya0ZqY2ygV366QzturHI13Jq95ApcVaJBhpS+AY= +github.com/pmezard/go-difflib v1.0.1-0.20181226105442-5d4384ee4fb2 h1:Jamvg5psRIccs7FGNTlIRMkT8wgtp5eCXdBlqhYGL6U= +github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= +github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= +gopkg.in/check.v1 v1.0.0-20201130134442-10cb98267c6c h1:Hei/4ADfdWqJk1ZMxUNpqntNwaWcugrBjAiHlqqRiVk= +gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=