mirror of https://github.com/grafana/grafana
commit
15b958b2d1
@ -1,212 +0,0 @@ |
||||
package persistentcollection |
||||
|
||||
import ( |
||||
"context" |
||||
"encoding/json" |
||||
"fmt" |
||||
"os" |
||||
"path/filepath" |
||||
"sync" |
||||
) |
||||
|
||||
func NewLocalFSPersistentCollection[T any](name string, directory string, version int) PersistentCollection[T] { |
||||
c := &localFsCollection[T]{ |
||||
name: name, |
||||
version: version, |
||||
collectionsDir: filepath.Join(directory, "file-collections"), |
||||
} |
||||
err := c.createCollectionsDirectory() |
||||
if err != nil { |
||||
panic(err) |
||||
} |
||||
return c |
||||
} |
||||
|
||||
type CollectionFileContents[T any] struct { |
||||
Version int `json:"version"` |
||||
Items []T `json:"items"` |
||||
} |
||||
|
||||
type localFsCollection[T any] struct { |
||||
version int |
||||
name string |
||||
collectionsDir string |
||||
mu sync.Mutex |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) Insert(ctx context.Context, namespace string, item T) error { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
items, err := s.load(ctx, namespace) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return s.save(ctx, namespace, append(items, item)) |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) Delete(ctx context.Context, namespace string, predicate Predicate[T]) (int, error) { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
items, err := s.load(ctx, namespace) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
deletedCount := 0 |
||||
newItems := make([]T, 0) |
||||
for idx := range items { |
||||
del, err := predicate(items[idx]) |
||||
if err != nil { |
||||
return deletedCount, err |
||||
} |
||||
|
||||
if del { |
||||
deletedCount += 1 |
||||
} else { |
||||
newItems = append(newItems, items[idx]) |
||||
} |
||||
} |
||||
|
||||
if deletedCount != 0 { |
||||
return deletedCount, s.save(ctx, namespace, newItems) |
||||
} |
||||
|
||||
return deletedCount, nil |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) { |
||||
var nilResult T |
||||
|
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
items, err := s.load(ctx, namespace) |
||||
if err != nil { |
||||
return nilResult, err |
||||
} |
||||
|
||||
for idx := range items { |
||||
match, err := predicate(items[idx]) |
||||
if err != nil { |
||||
return nilResult, err |
||||
} |
||||
if match { |
||||
return items[idx], nil |
||||
} |
||||
} |
||||
|
||||
return nilResult, nil |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
items, err := s.load(ctx, namespace) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
result := make([]T, 0) |
||||
for idx := range items { |
||||
match, err := predicate(items[idx]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if match { |
||||
result = append(result, items[idx]) |
||||
} |
||||
} |
||||
|
||||
return result, nil |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (int, error) { |
||||
s.mu.Lock() |
||||
defer s.mu.Unlock() |
||||
|
||||
items, err := s.load(ctx, namespace) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
|
||||
newItems := make([]T, 0) |
||||
updatedCount := 0 |
||||
for idx := range items { |
||||
updated, updatedItem, err := updateFn(items[idx]) |
||||
if err != nil { |
||||
return updatedCount, err |
||||
} |
||||
|
||||
if updated { |
||||
updatedCount += 1 |
||||
newItems = append(newItems, updatedItem) |
||||
} else { |
||||
newItems = append(newItems, items[idx]) |
||||
} |
||||
} |
||||
|
||||
if updatedCount != 0 { |
||||
return updatedCount, s.save(ctx, namespace, newItems) |
||||
} |
||||
|
||||
return updatedCount, nil |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) load(ctx context.Context, namespace string) ([]T, error) { |
||||
filePath := s.collectionFilePath(namespace) |
||||
// Safe to ignore gosec warning G304, the path comes from grafana settings rather than the user input
|
||||
// nolint:gosec
|
||||
bytes, err := os.ReadFile(filePath) |
||||
if err != nil { |
||||
if os.IsNotExist(err) { |
||||
return []T{}, nil |
||||
} |
||||
return nil, fmt.Errorf("can't read %s file: %w", filePath, err) |
||||
} |
||||
var db CollectionFileContents[T] |
||||
if err = json.Unmarshal(bytes, &db); err != nil { |
||||
return nil, fmt.Errorf("can't unmarshal %s data: %w", filePath, err) |
||||
} |
||||
|
||||
if db.Version != s.version { |
||||
if err := s.save(ctx, namespace, []T{}); err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return []T{}, nil |
||||
} |
||||
|
||||
return db.Items, nil |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) save(_ context.Context, namespace string, items []T) error { |
||||
filePath := s.collectionFilePath(namespace) |
||||
|
||||
bytes, err := json.MarshalIndent(&CollectionFileContents[T]{ |
||||
Version: s.version, |
||||
Items: items, |
||||
}, "", " ") |
||||
if err != nil { |
||||
return fmt.Errorf("can't marshal items: %w", err) |
||||
} |
||||
|
||||
return os.WriteFile(filePath, bytes, 0600) |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) createCollectionsDirectory() error { |
||||
_, err := os.Stat(s.collectionsDir) |
||||
if os.IsNotExist(err) { |
||||
return os.MkdirAll(s.collectionsDir, 0750) |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
func (s *localFsCollection[T]) collectionFilePath(namespace string) string { |
||||
return filepath.Join(s.collectionsDir, fmt.Sprintf("%s-namespace-%s.json", s.name, namespace)) |
||||
} |
||||
@ -1,89 +0,0 @@ |
||||
package persistentcollection |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"os" |
||||
"path" |
||||
"testing" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
type item struct { |
||||
Name string `json:"name"` |
||||
Val int64 `json:"val"` |
||||
} |
||||
|
||||
func TestLocalFSPersistentCollection(t *testing.T) { |
||||
namespace := "1" |
||||
ctx := context.Background() |
||||
dir := path.Join(os.TempDir(), "persistent-collection-test") |
||||
defer func() { |
||||
if err := os.RemoveAll(dir); err != nil { |
||||
fmt.Printf("Failed to remove temporary directory %q: %s\n", dir, err.Error()) |
||||
} |
||||
}() |
||||
|
||||
coll := NewLocalFSPersistentCollection[*item]("test", dir, 1) |
||||
|
||||
firstInserted := &item{ |
||||
Name: "test", |
||||
Val: 10, |
||||
} |
||||
err := coll.Insert(ctx, namespace, firstInserted) |
||||
require.NoError(t, err) |
||||
|
||||
err = coll.Insert(ctx, namespace, &item{ |
||||
Name: "test", |
||||
Val: 20, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
err = coll.Insert(ctx, namespace, &item{ |
||||
Name: "test", |
||||
Val: 30, |
||||
}) |
||||
require.NoError(t, err) |
||||
|
||||
updatedCount, err := coll.Update(ctx, namespace, func(i *item) (bool, *item, error) { |
||||
if i.Val == 20 { |
||||
return true, &item{Val: 25, Name: "test"}, nil |
||||
} |
||||
return false, nil, nil |
||||
}) |
||||
require.Equal(t, 1, updatedCount) |
||||
require.NoError(t, err) |
||||
|
||||
deletedCount, err := coll.Delete(ctx, namespace, func(i *item) (bool, error) { |
||||
if i.Val == 30 { |
||||
return true, nil |
||||
} |
||||
return false, nil |
||||
}) |
||||
require.Equal(t, 1, deletedCount) |
||||
require.NoError(t, err) |
||||
|
||||
firstFound, err := coll.FindFirst(ctx, namespace, func(i *item) (bool, error) { |
||||
if i.Name == "test" { |
||||
return true, nil |
||||
} |
||||
|
||||
return false, nil |
||||
}) |
||||
require.NoError(t, err) |
||||
require.Equal(t, firstInserted, firstFound) |
||||
|
||||
all, err := coll.Find(ctx, namespace, func(i *item) (bool, error) { return true, nil }) |
||||
require.NoError(t, err) |
||||
require.Equal(t, []*item{ |
||||
{ |
||||
Name: "test", |
||||
Val: 10, |
||||
}, |
||||
{ |
||||
Name: "test", |
||||
Val: 25, |
||||
}, |
||||
}, all) |
||||
} |
||||
@ -1,21 +0,0 @@ |
||||
package persistentcollection |
||||
|
||||
import ( |
||||
"context" |
||||
) |
||||
|
||||
type Predicate[T any] func(item T) (bool, error) |
||||
type UpdateFn[T any] func(item T) (updated bool, updatedItem T, err error) |
||||
|
||||
// PersistentCollection is a collection of items that's going to retain its state between Grafana restarts.
|
||||
// The main purpose of this API is to reduce the time-to-Proof-of-Concept - this is NOT intended for production use.
|
||||
//
|
||||
// The item type needs to be serializable to JSON.
|
||||
// @alpha -- EXPERIMENTAL
|
||||
type PersistentCollection[T any] interface { |
||||
Delete(ctx context.Context, namespace string, predicate Predicate[T]) (deletedCount int, err error) |
||||
FindFirst(ctx context.Context, namespace string, predicate Predicate[T]) (T, error) |
||||
Find(ctx context.Context, namespace string, predicate Predicate[T]) ([]T, error) |
||||
Update(ctx context.Context, namespace string, updateFn UpdateFn[T]) (updatedCount int, err error) |
||||
Insert(ctx context.Context, namespace string, item T) error |
||||
} |
||||
Loading…
Reference in new issue