The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/storage/unified/resource/cdk_backend.go

375 lines
8.2 KiB

package resource
import (
"bytes"
"context"
"errors"
"fmt"
"io"
"net/http"
"sort"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"go.opentelemetry.io/otel/trace"
"go.opentelemetry.io/otel/trace/noop"
"gocloud.dev/blob"
_ "gocloud.dev/blob/fileblob"
_ "gocloud.dev/blob/memblob"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
)
type CDKBackendOptions struct {
Tracer trace.Tracer
Bucket CDKBucket
RootFolder string
}
func NewCDKBackend(ctx context.Context, opts CDKBackendOptions) (StorageBackend, error) {
if opts.Tracer == nil {
opts.Tracer = noop.NewTracerProvider().Tracer("cdk-appending-store")
}
if opts.Bucket == nil {
return nil, fmt.Errorf("missing bucket")
}
found, _, err := opts.Bucket.ListPage(ctx, blob.FirstPageToken, 1, &blob.ListOptions{
Prefix: opts.RootFolder,
Delimiter: "/",
})
if err != nil {
return nil, err
}
if found == nil {
return nil, fmt.Errorf("the root folder does not exist")
}
backend := &cdkBackend{
tracer: opts.Tracer,
bucket: opts.Bucket,
root: opts.RootFolder,
}
backend.rv.Swap(time.Now().UnixMilli())
return backend, nil
}
type cdkBackend struct {
tracer trace.Tracer
bucket CDKBucket
root string
mutex sync.Mutex
rv atomic.Int64
// Simple watch stream -- NOTE, this only works for single tenant!
broadcaster Broadcaster[*WrittenEvent]
stream chan<- *WrittenEvent
}
func (s *cdkBackend) getPath(key *ResourceKey, rv int64) string {
var buffer bytes.Buffer
buffer.WriteString(s.root)
if key.Group == "" {
return buffer.String()
}
buffer.WriteString(key.Group)
if key.Resource == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Resource)
if key.Namespace == "" {
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/__cluster__")
} else {
buffer.WriteString("/")
buffer.WriteString(key.Namespace)
}
if key.Name == "" {
return buffer.String()
}
buffer.WriteString("/")
buffer.WriteString(key.Name)
if rv > 0 {
buffer.WriteString(fmt.Sprintf("/%d.json", rv))
}
return buffer.String()
}
func (s *cdkBackend) WriteEvent(ctx context.Context, event WriteEvent) (rv int64, err error) {
// Scope the lock
{
s.mutex.Lock()
defer s.mutex.Unlock()
rv = s.rv.Add(1)
err = s.bucket.WriteAll(ctx, s.getPath(event.Key, rv), event.Value, &blob.WriterOptions{
ContentType: "application/json",
})
}
// Async notify all subscribers
if s.stream != nil {
go func() {
write := &WrittenEvent{
WriteEvent: event,
Timestamp: time.Now().UnixMilli(),
ResourceVersion: rv,
}
s.stream <- write
}()
}
return rv, err
}
func (s *cdkBackend) ReadResource(ctx context.Context, req *ReadRequest) *ReadResponse {
rv := req.ResourceVersion
path := s.getPath(req.Key, rv)
if rv < 1 {
iter := s.bucket.List(&blob.ListOptions{Prefix: path + "/", Delimiter: "/"})
for {
obj, err := iter.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
v, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil && v > rv {
rv = v
path = obj.Key // find the path with biggest resource version
}
}
}
}
}
raw, err := s.bucket.ReadAll(ctx, path)
if raw == nil && req.ResourceVersion > 0 {
if req.ResourceVersion > s.rv.Load() {
return &ReadResponse{
Error: &ErrorResult{
Code: http.StatusGatewayTimeout,
Reason: string(metav1.StatusReasonTimeout), // match etcd behavior
Message: "ResourceVersion is larger than max",
Details: &ErrorDetails{
Causes: []*ErrorCause{
{
Reason: string(metav1.CauseTypeResourceVersionTooLarge),
Message: fmt.Sprintf("requested: %d, current %d", req.ResourceVersion, s.rv.Load()),
},
},
},
},
}
}
// If the there was an explicit request, get the latest
rsp := s.ReadResource(ctx, &ReadRequest{Key: req.Key})
if rsp != nil && len(rsp.Value) > 0 {
raw = rsp.Value
rv = rsp.ResourceVersion
err = nil
}
}
if err == nil && isDeletedMarker(raw) {
raw = nil
}
if raw == nil {
return &ReadResponse{Error: NewNotFoundError(req.Key)}
}
return &ReadResponse{
ResourceVersion: rv,
Value: raw,
}
}
func isDeletedMarker(raw []byte) bool {
if bytes.Contains(raw, []byte(`"DeletedMarker"`)) {
tmp := &unstructured.Unstructured{}
err := tmp.UnmarshalJSON(raw)
if err == nil && tmp.GetKind() == "DeletedMarker" {
return true
}
}
return false
}
func (s *cdkBackend) ListIterator(ctx context.Context, req *ListRequest, cb func(ListIterator) error) (int64, error) {
resources, err := buildTree(ctx, s, req.Options.Key)
if err != nil {
return 0, err
}
err = cb(resources)
return resources.listRV, err
}
func (s *cdkBackend) WatchWriteEvents(ctx context.Context) (<-chan *WrittenEvent, error) {
s.mutex.Lock()
defer s.mutex.Unlock()
if s.broadcaster == nil {
var err error
s.broadcaster, err = NewBroadcaster(context.Background(), func(c chan<- *WrittenEvent) error {
s.stream = c
return nil
})
if err != nil {
return nil, err
}
}
return s.broadcaster.Subscribe(ctx)
}
// group > resource > namespace > name > versions
type cdkResource struct {
prefix string
versions []cdkVersion
}
type cdkVersion struct {
rv int64
key string
}
type cdkListIterator struct {
bucket CDKBucket
ctx context.Context
err error
listRV int64
resources []cdkResource
index int
currentRV int64
currentKey string
currentVal []byte
}
// Next implements ListIterator.
func (c *cdkListIterator) Next() bool {
if c.err != nil {
return false
}
for {
c.currentVal = nil
c.index += 1
if c.index >= len(c.resources) {
return false
}
item := c.resources[c.index]
latest := item.versions[0]
raw, err := c.bucket.ReadAll(c.ctx, latest.key)
if err != nil {
c.err = err
return false
}
if !isDeletedMarker(raw) {
c.currentRV = latest.rv
c.currentKey = latest.key
c.currentVal = raw
return true
}
}
}
// Error implements ListIterator.
func (c *cdkListIterator) Error() error {
return c.err
}
// ResourceVersion implements ListIterator.
func (c *cdkListIterator) ResourceVersion() int64 {
return c.currentRV
}
// Value implements ListIterator.
func (c *cdkListIterator) Value() []byte {
return c.currentVal
}
// ContinueToken implements ListIterator.
func (c *cdkListIterator) ContinueToken() string {
return fmt.Sprintf("index:%d/key:%s", c.index, c.currentKey)
}
// Name implements ListIterator.
func (c *cdkListIterator) Name() string {
return c.currentKey // TODO (parse name from key)
}
// Namespace implements ListIterator.
func (c *cdkListIterator) Namespace() string {
return c.currentKey // TODO (parse namespace from key)
}
var _ ListIterator = (*cdkListIterator)(nil)
func buildTree(ctx context.Context, s *cdkBackend, key *ResourceKey) (*cdkListIterator, error) {
byPrefix := make(map[string]*cdkResource)
path := s.getPath(key, 0)
iter := s.bucket.List(&blob.ListOptions{Prefix: path, Delimiter: ""}) // "" is recursive
for {
obj, err := iter.Next(ctx)
if errors.Is(err, io.EOF) {
break
}
if strings.HasSuffix(obj.Key, ".json") {
idx := strings.LastIndex(obj.Key, "/") + 1
edx := strings.LastIndex(obj.Key, ".")
if idx > 0 {
rv, err := strconv.ParseInt(obj.Key[idx:edx], 10, 64)
if err == nil {
prefix := obj.Key[:idx]
res, ok := byPrefix[prefix]
if !ok {
res = &cdkResource{prefix: prefix}
byPrefix[prefix] = res
}
res.versions = append(res.versions, cdkVersion{
rv: rv,
key: obj.Key,
})
}
}
}
}
// Now sort all versions
resources := make([]cdkResource, 0, len(byPrefix))
for _, res := range byPrefix {
sort.Slice(res.versions, func(i, j int) bool {
return res.versions[i].rv > res.versions[j].rv
})
resources = append(resources, *res)
}
sort.Slice(resources, func(i, j int) bool {
a := resources[i].prefix
b := resources[j].prefix
return a < b
})
return &cdkListIterator{
ctx: ctx,
bucket: s.bucket,
resources: resources,
listRV: s.rv.Load(),
index: -1, // must call next first
}, nil
}