The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
grafana/pkg/storage/unified/sqlstash/sql_storage_server.go

367 lines
9.2 KiB

2 years ago
package sqlstash
import (
"context"
"database/sql"
"errors"
"fmt"
"strings"
"sync"
"github.com/prometheus/client_golang/prometheus"
"go.opentelemetry.io/otel/trace"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/infra/tracing"
"github.com/grafana/grafana/pkg/services/sqlstore/migrator"
"github.com/grafana/grafana/pkg/services/sqlstore/session"
"github.com/grafana/grafana/pkg/services/store/entity/db"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash"
"github.com/grafana/grafana/pkg/services/store/entity/sqlstash/sqltemplate"
"github.com/grafana/grafana/pkg/storage/unified/resource"
2 years ago
)
const resoruceTable = "resource"
const resourceVersionTable = "resource_version"
// Package-level errors.
var (
ErrNotFound = errors.New("entity not found")
ErrOptimisticLockingFailed = errors.New("optimistic locking failed")
ErrUserNotFoundInContext = errors.New("user not found in context")
ErrNextPageTokenNotSupported = errors.New("nextPageToken not yet supported")
ErrLimitNotSupported = errors.New("limit not yet supported")
ErrNotImplementedYet = errors.New("not implemented yet")
)
// Make sure we implement correct interfaces
var _ resource.ResourceStoreServer = &sqlResourceServer{}
2 years ago
func ProvideSQLResourceServer(db db.EntityDBInterface, tracer tracing.Tracer) (SqlResourceServer, error) {
2 years ago
ctx, cancel := context.WithCancel(context.Background())
server := &sqlResourceServer{
db: db,
log: log.New("sql-resource-server"),
ctx: ctx,
cancel: cancel,
tracer: tracer,
}
if err := prometheus.Register(sqlstash.NewStorageMetrics()); err != nil {
server.log.Warn("error registering storage server metrics", "error", err)
}
return server, nil
}
type SqlResourceServer interface {
resource.ResourceStoreServer
2 years ago
Init() error
Stop()
}
type sqlResourceServer struct {
log log.Logger
db db.EntityDBInterface // needed to keep xorm engine in scope
sess *session.SessionDB
dialect migrator.Dialect
broadcaster sqlstash.Broadcaster[*resource.WatchResponse]
2 years ago
ctx context.Context // TODO: remove
cancel context.CancelFunc
stream chan *resource.WatchResponse
2 years ago
tracer trace.Tracer
validator resource.EventValidator
2 years ago
once sync.Once
initErr error
sqlDB db.DB
sqlDialect sqltemplate.Dialect
}
func (s *sqlResourceServer) Init() error {
s.once.Do(func() {
s.initErr = s.init()
})
if s.initErr != nil {
return fmt.Errorf("initialize Entity Server: %w", s.initErr)
}
return s.initErr
}
func (s *sqlResourceServer) init() error {
if s.sess != nil {
return nil
}
if s.db == nil {
return errors.New("missing db")
}
err := s.db.Init()
if err != nil {
return err
}
sqlDB, err := s.db.GetDB()
if err != nil {
return err
}
s.sqlDB = sqlDB
driverName := sqlDB.DriverName()
driverName = strings.TrimSuffix(driverName, "WithHooks")
switch driverName {
case db.DriverMySQL:
s.sqlDialect = sqltemplate.MySQL
case db.DriverPostgres:
s.sqlDialect = sqltemplate.PostgreSQL
case db.DriverSQLite, db.DriverSQLite3:
s.sqlDialect = sqltemplate.SQLite
default:
return fmt.Errorf("no dialect for driver %q", driverName)
}
sess, err := s.db.GetSession()
if err != nil {
return err
}
engine, err := s.db.GetEngine()
if err != nil {
return err
}
s.sess = sess
s.dialect = migrator.NewDialect(engine.DriverName())
s.validator = resource.NewEventValidator(resource.EventValidatorOptions{
// use snowflake IDs
})
2 years ago
// set up the broadcaster
s.broadcaster, err = sqlstash.NewBroadcaster(s.ctx, func(stream chan *resource.WatchResponse) error {
2 years ago
s.stream = stream
// start the poller
go s.poller(stream)
return nil
})
if err != nil {
return err
}
return nil
}
func (s *sqlResourceServer) IsHealthy(ctx context.Context, r *resource.HealthCheckRequest) (*resource.HealthCheckResponse, error) {
2 years ago
ctxLogger := s.log.FromContext(log.WithContextualAttributes(ctx, []any{"method", "isHealthy"}))
if err := s.Init(); err != nil {
ctxLogger.Error("init error", "error", err)
return nil, err
}
if err := s.sqlDB.PingContext(ctx); err != nil {
return nil, err
}
// TODO: check the status of the watcher implementation as well
return &resource.HealthCheckResponse{Status: resource.HealthCheckResponse_SERVING}, nil
2 years ago
}
func (s *sqlResourceServer) Stop() {
s.cancel()
}
func (s *sqlResourceServer) GetResource(ctx context.Context, req *resource.GetResourceRequest) (*resource.GetResourceResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.GetResource")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
if req.Key.Group == "" {
return &resource.GetResourceResponse{Status: badRequest("missing group")}, nil
}
if req.Key.Resource == "" {
return &resource.GetResourceResponse{Status: badRequest("missing resource")}, nil
}
2 years ago
fmt.Printf("TODO, GET: %+v", req.Key)
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Create(ctx context.Context, req *resource.CreateRequest) (*resource.CreateResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.Create")
defer span.End()
if req.Key.ResourceVersion > 0 {
return &resource.CreateResponse{
Status: badRequest("can not update a specific resource version"),
}, nil
}
2 years ago
if err := s.Init(); err != nil {
return nil, err
}
event, err := s.validator.PrepareCreate(ctx, req)
if err != nil {
return nil, err
}
if event.Status != nil {
return &resource.CreateResponse{Status: event.Status}, nil
2 years ago
}
fmt.Printf("TODO, CREATE: %v", event)
2 years ago
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Update(ctx context.Context, req *resource.UpdateRequest) (*resource.UpdateResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.Update")
defer span.End()
if req.Key.ResourceVersion < 0 {
return &resource.UpdateResponse{
Status: badRequest("update must include the previous version"),
}, nil
}
2 years ago
if err := s.Init(); err != nil {
return nil, err
}
latest, err := s.GetResource(ctx, &resource.GetResourceRequest{
Key: req.Key.WithoutResourceVersion(),
2 years ago
})
if err != nil {
return nil, err
}
event, err := s.validator.PrepareUpdate(ctx, req, latest)
if err != nil {
return nil, err
}
if event.Status != nil {
return &resource.UpdateResponse{Status: event.Status}, nil
2 years ago
}
fmt.Printf("TODO, UPDATE: %v", event)
2 years ago
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) Delete(ctx context.Context, req *resource.DeleteRequest) (*resource.DeleteResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.Delete")
defer span.End()
if req.Key.ResourceVersion < 0 {
return &resource.DeleteResponse{
Status: badRequest("update must include the previous version"),
}, nil
}
2 years ago
if err := s.Init(); err != nil {
return nil, err
}
latest, err := s.GetResource(ctx, &resource.GetResourceRequest{
Key: req.Key.WithoutResourceVersion(),
})
if err != nil {
return nil, err
}
event, err := s.validator.PrepareDelete(ctx, req, latest)
if err != nil {
return nil, err
}
if event.Status != nil {
return &resource.DeleteResponse{Status: event.Status}, nil
}
fmt.Printf("TODO, DELETE: %+v ", req.Key)
2 years ago
return nil, ErrNotImplementedYet
}
func (s *sqlResourceServer) List(ctx context.Context, req *resource.ListRequest) (*resource.ListResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
var rv int64
err := s.sqlDB.WithTx(ctx, ReadCommitted, func(ctx context.Context, tx db.Tx) error {
req := sqlResourceVersionGetRequest{
SQLTemplate: sqltemplate.New(s.sqlDialect),
Group: req.Options.Key.Group,
Resource: req.Options.Key.Resource,
returnsResourceVersion: new(returnsResourceVersion),
}
res, err := queryRow(ctx, tx, sqlResourceVersionGet, req)
if err != nil && !errors.Is(err, sql.ErrNoRows) {
return err
}
if res != nil {
rv = res.ResourceVersion
}
return nil
})
if err != nil {
return nil, err
}
fmt.Printf("TODO, LIST: %+v // %d", req.Options.Key, rv)
return nil, ErrNotImplementedYet
}
// Get the raw blob bytes and metadata
func (s *sqlResourceServer) GetBlob(ctx context.Context, req *resource.GetBlobRequest) (*resource.GetBlobResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.List")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET BLOB: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Show resource history (and trash)
func (s *sqlResourceServer) History(ctx context.Context, req *resource.HistoryRequest) (*resource.HistoryResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}
// Used for efficient provisioning
func (s *sqlResourceServer) Origin(ctx context.Context, req *resource.OriginRequest) (*resource.OriginResponse, error) {
2 years ago
ctx, span := s.tracer.Start(ctx, "storage_server.History")
defer span.End()
if err := s.Init(); err != nil {
return nil, err
}
fmt.Printf("TODO, GET History: %+v", req.Key)
return nil, ErrNotImplementedYet
}