feat(goldfish): add endpoints for serving stored results (#19640)

pull/19670/head
Trevor Whitney 2 months ago committed by GitHub
parent 94096b7452
commit e17ae2d98f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 13
      docs/sources/shared/configuration.md
  2. 1
      pkg/goldfish/storage.go
  3. 86
      pkg/goldfish/storage_mysql.go
  4. 5
      pkg/goldfish/storage_noop.go
  5. 9
      pkg/ui/config.go
  6. 98
      pkg/ui/goldfish_test.go
  7. 130
      pkg/ui/handler.go
  8. 192
      pkg/ui/handler_test.go
  9. 18
      pkg/ui/service.go
  10. 5
      tools/querytee/goldfish/manager_test.go
  11. 21
      tools/querytee/goldfish/result_store.go
  12. 46
      tools/querytee/goldfish/result_store_test.go
  13. 4
      tools/querytee/proxy_endpoint_test.go

@ -167,6 +167,18 @@ ui:
# CLI flag: -ui.goldfish.cell-b-namespace
[cell_b_namespace: <string> | default = ""]
# Results storage backend (gcs, s3) for fetching stored query results.
# CLI flag: -ui.goldfish.results-backend
[results_backend: <string> | default = ""]
# The thanos_object_store_config block configures the connection to object
# storage backend using thanos-io/objstore clients. This will become the
# default way of configuring object store clients in future releases.
# Currently this is opt-in and takes effect only when `-use-thanos-objstore`
# is set to true.
# The CLI flags prefix for this block configuration is: ui.goldfish.results
[results_bucket: <thanos_object_store_config>]
ring:
kvstore:
# Backend storage to use for the ring. Supported values are: consul, etcd,
@ -7163,6 +7175,7 @@ Currently this is opt-in and takes effect only when `-use-thanos-objstore` is se
- `common.storage.object-store`
- `object-store`
- `ruler-storage`
- `ui.goldfish.results`
&nbsp;

@ -12,6 +12,7 @@ type Storage interface {
// Read operations (used by UI)
GetSampledQueries(ctx context.Context, page, pageSize int, filter QueryFilter) (*APIResponse, error)
GetQueryByCorrelationID(ctx context.Context, correlationID string) (*QuerySample, error)
// Lifecycle
Close() error

@ -352,6 +352,92 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int
}, nil
}
// GetQueryByCorrelationID retrieves a single query sample by correlation ID
func (s *MySQLStorage) GetQueryByCorrelationID(ctx context.Context, correlationID string) (*QuerySample, error) {
query := `
SELECT
correlation_id, tenant_id, user, query, query_type, start_time, end_time, step_duration,
cell_a_exec_time_ms, cell_b_exec_time_ms, cell_a_queue_time_ms, cell_b_queue_time_ms,
cell_a_bytes_processed, cell_b_bytes_processed, cell_a_lines_processed, cell_b_lines_processed,
cell_a_bytes_per_second, cell_b_bytes_per_second, cell_a_lines_per_second, cell_b_lines_per_second,
cell_a_entries_returned, cell_b_entries_returned, cell_a_splits, cell_b_splits,
cell_a_shards, cell_b_shards, cell_a_response_hash, cell_b_response_hash,
cell_a_response_size, cell_b_response_size, cell_a_status_code, cell_b_status_code,
cell_a_result_uri, cell_b_result_uri,
cell_a_result_size_bytes, cell_b_result_size_bytes,
cell_a_result_compression, cell_b_result_compression,
cell_a_trace_id, cell_b_trace_id,
cell_a_span_id, cell_b_span_id,
cell_a_used_new_engine, cell_b_used_new_engine,
sampled_at, created_at
FROM sampled_queries
WHERE correlation_id = ?
`
var q QuerySample
var stepDurationMs int64
var createdAt time.Time
var cellASpanID, cellBSpanID sql.NullString
var cellAResultURI, cellBResultURI sql.NullString
var cellAResultCompression, cellBResultCompression sql.NullString
var cellAResultSize, cellBResultSize sql.NullInt64
err := s.db.QueryRowContext(ctx, query, correlationID).Scan(
&q.CorrelationID, &q.TenantID, &q.User, &q.Query, &q.QueryType, &q.StartTime, &q.EndTime, &stepDurationMs,
&q.CellAStats.ExecTimeMs, &q.CellBStats.ExecTimeMs, &q.CellAStats.QueueTimeMs, &q.CellBStats.QueueTimeMs,
&q.CellAStats.BytesProcessed, &q.CellBStats.BytesProcessed, &q.CellAStats.LinesProcessed, &q.CellBStats.LinesProcessed,
&q.CellAStats.BytesPerSecond, &q.CellBStats.BytesPerSecond, &q.CellAStats.LinesPerSecond, &q.CellBStats.LinesPerSecond,
&q.CellAStats.TotalEntriesReturned, &q.CellBStats.TotalEntriesReturned, &q.CellAStats.Splits, &q.CellBStats.Splits,
&q.CellAStats.Shards, &q.CellBStats.Shards, &q.CellAResponseHash, &q.CellBResponseHash,
&q.CellAResponseSize, &q.CellBResponseSize, &q.CellAStatusCode, &q.CellBStatusCode,
&cellAResultURI, &cellBResultURI,
&cellAResultSize, &cellBResultSize,
&cellAResultCompression, &cellBResultCompression,
&q.CellATraceID, &q.CellBTraceID,
&cellASpanID, &cellBSpanID,
&q.CellAUsedNewEngine, &q.CellBUsedNewEngine,
&q.SampledAt, &createdAt,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, fmt.Errorf("query with correlation ID %s not found", correlationID)
}
return nil, fmt.Errorf("failed to query by correlation ID: %w", err)
}
// Convert nullable strings to regular strings
if cellASpanID.Valid {
q.CellASpanID = cellASpanID.String
}
if cellBSpanID.Valid {
q.CellBSpanID = cellBSpanID.String
}
if cellAResultURI.Valid {
q.CellAResultURI = cellAResultURI.String
}
if cellBResultURI.Valid {
q.CellBResultURI = cellBResultURI.String
}
if cellAResultSize.Valid {
q.CellAResultSize = cellAResultSize.Int64
}
if cellBResultSize.Valid {
q.CellBResultSize = cellBResultSize.Int64
}
if cellAResultCompression.Valid {
q.CellAResultCompression = cellAResultCompression.String
}
if cellBResultCompression.Valid {
q.CellBResultCompression = cellBResultCompression.String
}
// Convert step duration from milliseconds to Duration
q.Step = time.Duration(stepDurationMs) * time.Millisecond
return &q, nil
}
// Close closes the storage connection
func (s *MySQLStorage) Close() error {
return s.db.Close()

@ -28,6 +28,11 @@ func (n *NoopStorage) GetSampledQueries(_ context.Context, _, _ int, _ QueryFilt
return nil, errors.New("goldfish feature is disabled")
}
// GetQueryByCorrelationID returns an error as goldfish is disabled
func (n *NoopStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*QuerySample, error) {
return nil, errors.New("goldfish feature is disabled")
}
// Close is a no-op
func (n *NoopStorage) Close() error {
return nil

@ -3,6 +3,7 @@ package ui
import (
"flag"
"github.com/grafana/loki/v3/pkg/storage/bucket"
lokiring "github.com/grafana/loki/v3/pkg/util/ring"
)
@ -19,6 +20,10 @@ type GoldfishConfig struct {
LogsDatasourceUID string `yaml:"logs_datasource_uid"` // UID of the Loki datasource in Grafana
CellANamespace string `yaml:"cell_a_namespace"` // Namespace for Cell A logs
CellBNamespace string `yaml:"cell_b_namespace"` // Namespace for Cell B logs
// Result storage configuration for fetching raw query results from object storage
ResultsBackend string `yaml:"results_backend"` // Results storage backend (gcs, s3)
ResultsBucket bucket.Config `yaml:"results_bucket"` // Bucket configuration for accessing stored results
}
type Config struct {
@ -48,4 +53,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&cfg.Goldfish.LogsDatasourceUID, "ui.goldfish.logs-datasource-uid", "", "UID of the Loki datasource in Grafana.")
f.StringVar(&cfg.Goldfish.CellANamespace, "ui.goldfish.cell-a-namespace", "", "Namespace for Cell A logs.")
f.StringVar(&cfg.Goldfish.CellBNamespace, "ui.goldfish.cell-b-namespace", "", "Namespace for Cell B logs.")
// Result storage configuration
f.StringVar(&cfg.Goldfish.ResultsBackend, "ui.goldfish.results-backend", "", "Results storage backend (gcs, s3) for fetching stored query results.")
cfg.Goldfish.ResultsBucket.RegisterFlagsWithPrefix("ui.goldfish.results.", f)
}

@ -14,6 +14,7 @@ import (
"github.com/go-kit/log"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/goldfish"
)
@ -23,6 +24,7 @@ type mockStorage struct {
queries []goldfish.QuerySample
capturedFilter goldfish.QueryFilter // Add this to capture the full filter
error error
getQueryFunc func(ctx context.Context, correlationID string) (*goldfish.QuerySample, error)
}
func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample) error {
@ -63,10 +65,106 @@ func (m *mockStorage) GetSampledQueries(_ context.Context, page, pageSize int, f
}, nil
}
func (m *mockStorage) GetQueryByCorrelationID(ctx context.Context, correlationID string) (*goldfish.QuerySample, error) {
if m.getQueryFunc != nil {
return m.getQueryFunc(ctx, correlationID)
}
if m.error != nil {
return nil, m.error
}
// Default behavior: search through queries
for _, q := range m.queries {
if q.CorrelationID == correlationID {
return &q, nil
}
}
return nil, errors.New("query not found")
}
func (m *mockStorage) Close() error {
return nil
}
// Mock bucket client for testing
type mockBucket struct {
getFunc func(ctx context.Context, key string) (io.ReadCloser, error)
}
func (m *mockBucket) Get(ctx context.Context, key string) (io.ReadCloser, error) {
if m.getFunc != nil {
return m.getFunc(ctx, key)
}
return nil, errors.New("not implemented")
}
func (m *mockBucket) Upload(_ context.Context, _ string, _ io.Reader) error {
return nil
}
func (m *mockBucket) Delete(_ context.Context, _ string) error {
return nil
}
func (m *mockBucket) Exists(_ context.Context, _ string) (bool, error) {
return false, nil
}
func (m *mockBucket) Iter(_ context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error {
return nil
}
func (m *mockBucket) GetRange(_ context.Context, _ string, _ int64, _ int64) (io.ReadCloser, error) {
return nil, nil
}
func (m *mockBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) {
return objstore.ObjectAttributes{}, nil
}
func (m *mockBucket) ObjectSize(_ context.Context, _ string) (uint64, error) {
return 0, nil
}
func (m *mockBucket) Close() error {
return nil
}
func (m *mockBucket) Name() string {
return "mock-bucket"
}
func (m *mockBucket) WithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.Bucket {
return m
}
func (m *mockBucket) ReaderWithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.BucketReader {
return m
}
func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType {
return nil
}
func (m *mockBucket) Provider() objstore.ObjProvider {
return objstore.ObjProvider("mock")
}
func (m *mockBucket) GetAndReplace(_ context.Context, _ string, _ func(existing io.ReadCloser) (io.ReadCloser, error)) error {
return errors.New("not implemented")
}
func (m *mockBucket) IsAccessDeniedErr(_ error) bool {
return false
}
func (m *mockBucket) IsObjNotFoundErr(_ error) bool {
return false
}
func (m *mockBucket) IterWithAttributes(_ context.Context, _ string, _ func(objstore.IterObjectAttributes) error, _ ...objstore.IterOption) error {
return nil
}
// createTestService creates a Service with common test defaults
func createTestService(storage goldfish.Storage) *Service {
return createTestServiceWithConfig(storage, GoldfishConfig{

@ -2,11 +2,15 @@
package ui
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/http/httputil"
"net/url"
"strconv"
"strings"
"time"
@ -29,8 +33,15 @@ const (
goldfishPath = prefixPath + "/api/v1/goldfish/queries"
notFoundPath = prefixPath + "/api/v1/404"
contentTypeJSON = "application/json"
cellA = "cell-a"
cellB = "cell-b"
)
func goldfishResultPath(cell string) string {
return prefixPath + "/api/v1/goldfish/results/{correlationId}/" + cell
}
// Context keys for trace information
type contextKey string
@ -47,6 +58,8 @@ func (s *Service) RegisterHandler() {
s.router.Path(detailsPath).Handler(s.detailsHandler())
s.router.Path(featuresPath).Handler(s.featuresHandler())
s.router.Path(goldfishPath).Handler(s.goldfishQueriesHandler())
s.router.Path(goldfishResultPath(cellA)).Handler(s.goldfishResultHandler(cellA))
s.router.Path(goldfishResultPath(cellB)).Handler(s.goldfishResultHandler(cellB))
s.router.PathPrefix(proxyPath).Handler(s.clusterProxyHandler())
s.router.PathPrefix(notFoundPath).Handler(s.notFoundHandler())
@ -332,3 +345,120 @@ func (s *Service) goldfishQueriesHandler() http.Handler {
}
})
}
func (s *Service) goldfishResultHandler(cell string) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
vars := mux.Vars(r)
correlationID := vars["correlationId"]
if !s.cfg.Goldfish.Enable {
s.writeJSONError(w, http.StatusNotFound, "goldfish feature is disabled")
return
}
// Check if bucket client is available
if s.goldfishBucket == nil {
s.writeJSONError(w, http.StatusNotImplemented, "result storage is not configured")
return
}
// Fetch query metadata from database
query, err := s.goldfishStorage.GetQueryByCorrelationID(r.Context(), correlationID)
if err != nil {
level.Error(s.logger).Log("msg", "failed to fetch query by correlation ID", "correlation_id", correlationID, "err", err)
s.writeJSONError(w, http.StatusNotFound, fmt.Sprintf("query with correlation ID %s not found", correlationID))
return
}
// Get the appropriate result URI and compression based on cell
var resultURI, compression string
if cell == "cell-a" {
resultURI = query.CellAResultURI
compression = query.CellAResultCompression
} else {
resultURI = query.CellBResultURI
compression = query.CellBResultCompression
}
// Check if result was persisted
if resultURI == "" {
s.writeJSONError(w, http.StatusNotFound, fmt.Sprintf("result for %s was not persisted to object storage", cell))
return
}
// Parse URI to extract bucket path
// URI format: "gcs://bucket-name/path/to/object" or "s3://bucket-name/path/to/object"
objectKey, err := parseObjectKeyFromURI(resultURI)
if err != nil {
level.Error(s.logger).Log("msg", "failed to parse result URI", "uri", resultURI, "err", err)
s.writeJSONError(w, http.StatusInternalServerError, "failed to parse result URI")
return
}
// Download object from bucket
reader, err := s.goldfishBucket.Get(r.Context(), objectKey)
if err != nil {
level.Error(s.logger).Log(
"msg", "failed to fetch object from bucket",
"uri", resultURI,
"object_key", objectKey,
"err", err)
s.writeJSONError(w, http.StatusInternalServerError, "failed to fetch result from storage")
return
}
defer reader.Close()
// Read object data
data, err := io.ReadAll(reader)
if err != nil {
level.Error(s.logger).Log("msg", "failed to read object data", "object_key", objectKey, "err", err)
s.writeJSONError(w, http.StatusInternalServerError, "failed to read result data")
return
}
// Decompress if needed
if compression == "gzip" {
gzReader, err := gzip.NewReader(bytes.NewReader(data))
if err != nil {
level.Error(s.logger).Log("msg", "failed to create gzip reader", "err", err)
s.writeJSONError(w, http.StatusInternalServerError, "failed to decompress result")
return
}
defer gzReader.Close()
data, err = io.ReadAll(gzReader)
if err != nil {
level.Error(s.logger).Log("msg", "failed to decompress data", "err", err)
s.writeJSONError(w, http.StatusInternalServerError, "failed to decompress result")
return
}
}
// Return JSON response
w.Header().Set("Content-Type", contentTypeJSON)
if _, err := w.Write(data); err != nil {
level.Error(s.logger).Log("msg", "failed to write response", "err", err)
}
})
}
// parseObjectKeyFromURI extracts the object key from a URI like "gcs://bucket/path" or "s3://bucket/path"
func parseObjectKeyFromURI(uri string) (string, error) {
parsed, err := url.Parse(uri)
if err != nil {
return "", fmt.Errorf("invalid URI: %w", err)
}
// Validate scheme
if parsed.Scheme != "gcs" && parsed.Scheme != "s3" {
return "", fmt.Errorf("unsupported URI scheme: %s (expected gcs or s3)", parsed.Scheme)
}
// The path has a leading "/" which we need to trim
key := strings.TrimPrefix(parsed.Path, "/")
if key == "" {
return "", fmt.Errorf("invalid URI: missing object path")
}
return key, nil
}

@ -1,14 +1,23 @@
package ui
import (
"bytes"
"compress/gzip"
"context"
"encoding/json"
"errors"
"io"
"net/http"
"net/http/httptest"
"testing"
"github.com/go-kit/log"
"github.com/gorilla/mux"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"github.com/grafana/loki/v3/pkg/goldfish"
)
func TestFeaturesHandler_GoldfishWithNamespaces(t *testing.T) {
@ -107,3 +116,186 @@ func TestFeaturesHandler_GoldfishWithNamespaces(t *testing.T) {
assert.Nil(t, goldfishData["cellBNamespace"], "cellBNamespace should be nil when not configured")
})
}
func TestGoldfishResultHandler(t *testing.T) {
setup := func(enabled bool, storage goldfish.Storage, bucket objstore.InstrumentedBucket) *httptest.ResponseRecorder {
service := &Service{
cfg: Config{
Goldfish: GoldfishConfig{
Enable: enabled,
},
},
logger: log.NewNopLogger(),
goldfishStorage: storage,
goldfishBucket: bucket,
}
handler := service.goldfishResultHandler(cellA)
req := httptest.NewRequest("GET", "/api/v1/goldfish/results/test-id/cell-a", nil)
req = mux.SetURLVars(req, map[string]string{"correlationId": "test-id"})
rr := httptest.NewRecorder()
handler.ServeHTTP(rr, req)
return rr
}
t.Run("returns error when goldfish is disabled", func(t *testing.T) {
rr := setup(false, nil, nil)
assert.Equal(t, http.StatusNotFound, rr.Code)
assert.Contains(t, rr.Body.String(), "goldfish feature is disabled")
})
t.Run("returns error when bucket client is not configured", func(t *testing.T) {
rr := setup(true, nil, nil)
assert.Equal(t, http.StatusNotImplemented, rr.Code)
assert.Contains(t, rr.Body.String(), "result storage is not configured")
})
t.Run("returns 404 not found when correlation ID not found in database", func(t *testing.T) {
storage := &mockStorage{
getQueryFunc: func(_ context.Context, _ string) (*goldfish.QuerySample, error) {
return nil, errors.New("not found")
},
}
rr := setup(true, storage, &mockBucket{})
assert.Equal(t, http.StatusNotFound, rr.Code)
assert.Contains(t, rr.Body.String(), "not found")
})
t.Run("returns 404 not found when result URI is empty", func(t *testing.T) {
storage := &mockStorage{
getQueryFunc: func(_ context.Context, correlationID string) (*goldfish.QuerySample, error) {
return &goldfish.QuerySample{
CorrelationID: correlationID,
CellAResultURI: "", // No result URI
}, nil
},
}
rr := setup(true, storage, &mockBucket{})
assert.Equal(t, http.StatusNotFound, rr.Code)
assert.Contains(t, rr.Body.String(), "was not persisted to object storage")
})
t.Run("successfully fetches and decompresses gzipped result", func(t *testing.T) {
originalData := []byte(`{"result": "compressed test data"}`)
// Compress data with gzip
var compressedBuf bytes.Buffer
gzWriter := gzip.NewWriter(&compressedBuf)
_, err := gzWriter.Write(originalData)
require.NoError(t, err)
require.NoError(t, gzWriter.Close())
storage := &mockStorage{
getQueryFunc: func(_ context.Context, correlationID string) (*goldfish.QuerySample, error) {
return &goldfish.QuerySample{
CorrelationID: correlationID,
CellAResultURI: "s3://test-bucket/path/to/result.json.gz",
CellAResultCompression: "gzip",
}, nil
},
}
bucket := &mockBucket{
getFunc: func(_ context.Context, key string) (io.ReadCloser, error) {
assert.Equal(t, "path/to/result.json.gz", key)
return io.NopCloser(bytes.NewReader(compressedBuf.Bytes())), nil
},
}
rr := setup(true, storage, bucket)
assert.Equal(t, http.StatusOK, rr.Code)
assert.Equal(t, "application/json", rr.Header().Get("Content-Type"))
assert.JSONEq(t, string(originalData), rr.Body.String())
})
}
func TestParseObjectKeyFromURI(t *testing.T) {
tests := []struct {
name string
uri string
expectedKey string
expectError bool
}{
{
name: "valid GCS URI",
uri: "gcs://my-bucket/path/to/object.json",
expectedKey: "path/to/object.json",
expectError: false,
},
{
name: "valid S3 URI",
uri: "s3://my-bucket/path/to/object.json.gz",
expectedKey: "path/to/object.json.gz",
expectError: false,
},
{
name: "valid URI with nested path",
uri: "gcs://my-bucket/prefix/2025/01/01/test-id/cell-a.json",
expectedKey: "prefix/2025/01/01/test-id/cell-a.json",
expectError: false,
},
{
name: "valid URI with URL-encoded characters in path",
uri: "gcs://my-bucket/path/with%20spaces/object.json",
expectedKey: "path/with spaces/object.json", // url.Parse automatically decodes
expectError: false,
},
{
name: "unsupported scheme http",
uri: "http://bucket/path",
expectedKey: "",
expectError: true,
},
{
name: "unsupported scheme https",
uri: "https://bucket/path",
expectedKey: "",
expectError: true,
},
{
name: "missing path",
uri: "gcs://my-bucket",
expectedKey: "",
expectError: true,
},
{
name: "missing path with trailing slash",
uri: "gcs://my-bucket/",
expectedKey: "",
expectError: true,
},
{
name: "empty URI",
uri: "",
expectedKey: "",
expectError: true,
},
{
name: "malformed URI",
uri: "not-a-uri",
expectedKey: "",
expectError: true,
},
{
name: "realistic example",
uri: "gcs://dev-us-central-0-loki-dev-005-goldfish-results/goldfish/results/2024/10/11/fc761f29-edad-4152-bedf-331d8cf2dbd5/cell-a.json.gz",
expectedKey: "goldfish/results/2024/10/11/fc761f29-edad-4152-bedf-331d8cf2dbd5/cell-a.json.gz",
expectError: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
key, err := parseObjectKeyFromURI(tt.uri)
if tt.expectError {
assert.Error(t, err)
} else {
require.NoError(t, err)
assert.Equal(t, tt.expectedKey, key)
}
})
}
}

@ -16,12 +16,14 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/prometheus/client_golang/prometheus"
"github.com/thanos-io/objstore"
"golang.org/x/net/http2"
// This is equivalent to a main.go for the Loki UI, so the blank import is allowed
_ "github.com/go-sql-driver/mysql" //nolint:revive
"github.com/grafana/loki/v3/pkg/goldfish"
"github.com/grafana/loki/v3/pkg/storage/bucket"
)
// This allows to rate limit the number of updates when the cluster is frequently changing (e.g. during rollout).
@ -42,6 +44,7 @@ type Service struct {
reg prometheus.Registerer
goldfishStorage goldfish.Storage
goldfishMetrics *GoldfishMetrics
goldfishBucket objstore.InstrumentedBucket
now func() time.Time
}
@ -100,6 +103,9 @@ func (s *Service) stop(_ error) error {
if s.goldfishStorage != nil {
s.goldfishStorage.Close()
}
if s.goldfishBucket != nil {
s.goldfishBucket.Close()
}
return nil
}
@ -170,5 +176,17 @@ func (s *Service) initGoldfishDB() error {
s.goldfishStorage = storage
level.Info(s.logger).Log("msg", "goldfish storage initialized successfully")
// Initialize bucket client if results backend is configured
if s.cfg.Goldfish.ResultsBackend != "" {
bucketClient, err := bucket.NewClient(context.Background(), s.cfg.Goldfish.ResultsBackend, s.cfg.Goldfish.ResultsBucket, "goldfish-ui-results", s.logger)
if err != nil {
level.Warn(s.logger).Log("msg", "failed to create goldfish bucket client, result fetching will be disabled", "err", err)
} else {
s.goldfishBucket = bucketClient
level.Info(s.logger).Log("msg", "goldfish bucket client initialized successfully", "backend", s.cfg.Goldfish.ResultsBackend)
}
}
return nil
}

@ -48,6 +48,11 @@ func (m *mockStorage) Close() error {
return nil
}
func (m *mockStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*goldfish.QuerySample, error) {
// This is only used for UI, not needed in manager tests
return nil, nil
}
func TestManager_ShouldSample(t *testing.T) {
tests := []struct {
name string

@ -46,6 +46,7 @@ type StoredResult struct {
type bucketResultStore struct {
bucket objstore.InstrumentedBucket
bucketName string
prefix string
compression string
backend string
@ -59,8 +60,23 @@ func NewResultStore(ctx context.Context, cfg ResultsStorageConfig, logger log.Lo
return nil, fmt.Errorf("create bucket client: %w", err)
}
var bucketName string
switch cfg.Backend {
case "s3":
bucketName = cfg.Bucket.S3.BucketName
case "gcs":
bucketName = cfg.Bucket.GCS.BucketName
default:
return nil, fmt.Errorf("unsupported backend: %s", cfg.Backend)
}
if cfg.Bucket.StoragePrefix != "" {
bucketName = fmt.Sprintf("%s/%s", bucketName, strings.Trim(cfg.Bucket.StoragePrefix, "/"))
}
return &bucketResultStore{
bucket: bucketClient,
bucketName: bucketName,
prefix: strings.Trim(cfg.ObjectPrefix, "/"),
compression: cfg.Compression,
backend: cfg.Backend,
@ -92,13 +108,14 @@ func (s *bucketResultStore) Store(ctx context.Context, payload []byte, opts Stor
}
}
uri := fmt.Sprintf("%s://%s/%s", s.backend, s.bucket.Name(), key)
uri := fmt.Sprintf("%s://%s/%s", s.backend, s.bucketName, key)
level.Debug(s.logger).Log(
"component", "goldfish-result-store",
"object", key,
"bucket", s.bucket.Name(),
"bucket", s.bucketName,
"backend", s.backend,
"uri", uri,
"size_bytes", len(encoded),
)

@ -9,9 +9,12 @@ import (
"time"
"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/loki/v3/pkg/storage/bucket"
"github.com/prometheus/client_golang/prometheus"
"github.com/stretchr/testify/require"
"github.com/thanos-io/objstore"
"gopkg.in/yaml.v2"
)
func TestBucketResultStoreStoresCompressedPayload(t *testing.T) {
@ -90,3 +93,46 @@ func TestBucketResultStoreStoresUncompressedPayload(t *testing.T) {
require.NoError(t, err)
require.Equal(t, payload, body)
}
const configWithGCSBackend = `
storage_prefix: storage-prefix
gcs:
bucket_name: bucket
service_account: |-
{
"type": "service_account",
"project_id": "id",
"private_key_id": "id",
"private_key": "-----BEGIN PRIVATE KEY-----\nSOMETHING\n-----END PRIVATE KEY-----\n",
"client_email": "test@test.com",
"client_id": "12345",
"auth_uri": "https://accounts.google.com/o/oauth2/auth",
"token_uri": "https://oauth2.googleapis.com/token",
"auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
"client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%40test.com"
}
`
func TestNewResultsBucket_CorrectBucketName(t *testing.T) {
bktCfg := bucket.Config{}
flagext.DefaultValues(&bktCfg)
err := yaml.Unmarshal([]byte(configWithGCSBackend), &bktCfg)
require.NoError(t, err)
rsCfg := ResultsStorageConfig{
Enabled: false,
Mode: ResultsPersistenceModeMismatchOnly,
Backend: ResultsBackendGCS,
ObjectPrefix: "goldfish/results",
Compression: "gzip",
Bucket: bktCfg,
}
resultStorage, err := NewResultStore(context.Background(), rsCfg, log.NewNopLogger())
require.NoError(t, err)
rs, ok := resultStorage.(*bucketResultStore)
require.True(t, ok)
require.Equal(t, "bucket/storage-prefix", rs.bucketName)
}

@ -531,6 +531,10 @@ func (m *mockGoldfishStorage) Close() error {
return nil
}
func (m *mockGoldfishStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*goldfish.QuerySample, error) {
return nil, nil
}
func Test_extractTenant(t *testing.T) {
tests := []struct {
name string

Loading…
Cancel
Save