diff --git a/docs/sources/shared/configuration.md b/docs/sources/shared/configuration.md index 4498c7e75d..c07ca5abf5 100644 --- a/docs/sources/shared/configuration.md +++ b/docs/sources/shared/configuration.md @@ -167,6 +167,18 @@ ui: # CLI flag: -ui.goldfish.cell-b-namespace [cell_b_namespace: | default = ""] + # Results storage backend (gcs, s3) for fetching stored query results. + # CLI flag: -ui.goldfish.results-backend + [results_backend: | default = ""] + + # The thanos_object_store_config block configures the connection to object + # storage backend using thanos-io/objstore clients. This will become the + # default way of configuring object store clients in future releases. + # Currently this is opt-in and takes effect only when `-use-thanos-objstore` + # is set to true. + # The CLI flags prefix for this block configuration is: ui.goldfish.results + [results_bucket: ] + ring: kvstore: # Backend storage to use for the ring. Supported values are: consul, etcd, @@ -7163,6 +7175,7 @@ Currently this is opt-in and takes effect only when `-use-thanos-objstore` is se - `common.storage.object-store` - `object-store` - `ruler-storage` +- `ui.goldfish.results`   diff --git a/pkg/goldfish/storage.go b/pkg/goldfish/storage.go index 08089fe55b..7a74273382 100644 --- a/pkg/goldfish/storage.go +++ b/pkg/goldfish/storage.go @@ -12,6 +12,7 @@ type Storage interface { // Read operations (used by UI) GetSampledQueries(ctx context.Context, page, pageSize int, filter QueryFilter) (*APIResponse, error) + GetQueryByCorrelationID(ctx context.Context, correlationID string) (*QuerySample, error) // Lifecycle Close() error diff --git a/pkg/goldfish/storage_mysql.go b/pkg/goldfish/storage_mysql.go index a19ada55fe..df087ddce2 100644 --- a/pkg/goldfish/storage_mysql.go +++ b/pkg/goldfish/storage_mysql.go @@ -352,6 +352,92 @@ func (s *MySQLStorage) GetSampledQueries(ctx context.Context, page, pageSize int }, nil } +// GetQueryByCorrelationID retrieves a single query sample by correlation ID +func (s *MySQLStorage) GetQueryByCorrelationID(ctx context.Context, correlationID string) (*QuerySample, error) { + query := ` + SELECT + correlation_id, tenant_id, user, query, query_type, start_time, end_time, step_duration, + cell_a_exec_time_ms, cell_b_exec_time_ms, cell_a_queue_time_ms, cell_b_queue_time_ms, + cell_a_bytes_processed, cell_b_bytes_processed, cell_a_lines_processed, cell_b_lines_processed, + cell_a_bytes_per_second, cell_b_bytes_per_second, cell_a_lines_per_second, cell_b_lines_per_second, + cell_a_entries_returned, cell_b_entries_returned, cell_a_splits, cell_b_splits, + cell_a_shards, cell_b_shards, cell_a_response_hash, cell_b_response_hash, + cell_a_response_size, cell_b_response_size, cell_a_status_code, cell_b_status_code, + cell_a_result_uri, cell_b_result_uri, + cell_a_result_size_bytes, cell_b_result_size_bytes, + cell_a_result_compression, cell_b_result_compression, + cell_a_trace_id, cell_b_trace_id, + cell_a_span_id, cell_b_span_id, + cell_a_used_new_engine, cell_b_used_new_engine, + sampled_at, created_at + FROM sampled_queries + WHERE correlation_id = ? + ` + + var q QuerySample + var stepDurationMs int64 + var createdAt time.Time + var cellASpanID, cellBSpanID sql.NullString + var cellAResultURI, cellBResultURI sql.NullString + var cellAResultCompression, cellBResultCompression sql.NullString + var cellAResultSize, cellBResultSize sql.NullInt64 + + err := s.db.QueryRowContext(ctx, query, correlationID).Scan( + &q.CorrelationID, &q.TenantID, &q.User, &q.Query, &q.QueryType, &q.StartTime, &q.EndTime, &stepDurationMs, + &q.CellAStats.ExecTimeMs, &q.CellBStats.ExecTimeMs, &q.CellAStats.QueueTimeMs, &q.CellBStats.QueueTimeMs, + &q.CellAStats.BytesProcessed, &q.CellBStats.BytesProcessed, &q.CellAStats.LinesProcessed, &q.CellBStats.LinesProcessed, + &q.CellAStats.BytesPerSecond, &q.CellBStats.BytesPerSecond, &q.CellAStats.LinesPerSecond, &q.CellBStats.LinesPerSecond, + &q.CellAStats.TotalEntriesReturned, &q.CellBStats.TotalEntriesReturned, &q.CellAStats.Splits, &q.CellBStats.Splits, + &q.CellAStats.Shards, &q.CellBStats.Shards, &q.CellAResponseHash, &q.CellBResponseHash, + &q.CellAResponseSize, &q.CellBResponseSize, &q.CellAStatusCode, &q.CellBStatusCode, + &cellAResultURI, &cellBResultURI, + &cellAResultSize, &cellBResultSize, + &cellAResultCompression, &cellBResultCompression, + &q.CellATraceID, &q.CellBTraceID, + &cellASpanID, &cellBSpanID, + &q.CellAUsedNewEngine, &q.CellBUsedNewEngine, + &q.SampledAt, &createdAt, + ) + + if err != nil { + if err == sql.ErrNoRows { + return nil, fmt.Errorf("query with correlation ID %s not found", correlationID) + } + return nil, fmt.Errorf("failed to query by correlation ID: %w", err) + } + + // Convert nullable strings to regular strings + if cellASpanID.Valid { + q.CellASpanID = cellASpanID.String + } + if cellBSpanID.Valid { + q.CellBSpanID = cellBSpanID.String + } + if cellAResultURI.Valid { + q.CellAResultURI = cellAResultURI.String + } + if cellBResultURI.Valid { + q.CellBResultURI = cellBResultURI.String + } + if cellAResultSize.Valid { + q.CellAResultSize = cellAResultSize.Int64 + } + if cellBResultSize.Valid { + q.CellBResultSize = cellBResultSize.Int64 + } + if cellAResultCompression.Valid { + q.CellAResultCompression = cellAResultCompression.String + } + if cellBResultCompression.Valid { + q.CellBResultCompression = cellBResultCompression.String + } + + // Convert step duration from milliseconds to Duration + q.Step = time.Duration(stepDurationMs) * time.Millisecond + + return &q, nil +} + // Close closes the storage connection func (s *MySQLStorage) Close() error { return s.db.Close() diff --git a/pkg/goldfish/storage_noop.go b/pkg/goldfish/storage_noop.go index 63eef135d2..50c8f5023d 100644 --- a/pkg/goldfish/storage_noop.go +++ b/pkg/goldfish/storage_noop.go @@ -28,6 +28,11 @@ func (n *NoopStorage) GetSampledQueries(_ context.Context, _, _ int, _ QueryFilt return nil, errors.New("goldfish feature is disabled") } +// GetQueryByCorrelationID returns an error as goldfish is disabled +func (n *NoopStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*QuerySample, error) { + return nil, errors.New("goldfish feature is disabled") +} + // Close is a no-op func (n *NoopStorage) Close() error { return nil diff --git a/pkg/ui/config.go b/pkg/ui/config.go index 3b333e75be..236a997dc9 100644 --- a/pkg/ui/config.go +++ b/pkg/ui/config.go @@ -3,6 +3,7 @@ package ui import ( "flag" + "github.com/grafana/loki/v3/pkg/storage/bucket" lokiring "github.com/grafana/loki/v3/pkg/util/ring" ) @@ -19,6 +20,10 @@ type GoldfishConfig struct { LogsDatasourceUID string `yaml:"logs_datasource_uid"` // UID of the Loki datasource in Grafana CellANamespace string `yaml:"cell_a_namespace"` // Namespace for Cell A logs CellBNamespace string `yaml:"cell_b_namespace"` // Namespace for Cell B logs + + // Result storage configuration for fetching raw query results from object storage + ResultsBackend string `yaml:"results_backend"` // Results storage backend (gcs, s3) + ResultsBucket bucket.Config `yaml:"results_bucket"` // Bucket configuration for accessing stored results } type Config struct { @@ -48,4 +53,8 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.StringVar(&cfg.Goldfish.LogsDatasourceUID, "ui.goldfish.logs-datasource-uid", "", "UID of the Loki datasource in Grafana.") f.StringVar(&cfg.Goldfish.CellANamespace, "ui.goldfish.cell-a-namespace", "", "Namespace for Cell A logs.") f.StringVar(&cfg.Goldfish.CellBNamespace, "ui.goldfish.cell-b-namespace", "", "Namespace for Cell B logs.") + + // Result storage configuration + f.StringVar(&cfg.Goldfish.ResultsBackend, "ui.goldfish.results-backend", "", "Results storage backend (gcs, s3) for fetching stored query results.") + cfg.Goldfish.ResultsBucket.RegisterFlagsWithPrefix("ui.goldfish.results.", f) } diff --git a/pkg/ui/goldfish_test.go b/pkg/ui/goldfish_test.go index 03b9e0a5b8..4f7d179f36 100644 --- a/pkg/ui/goldfish_test.go +++ b/pkg/ui/goldfish_test.go @@ -14,6 +14,7 @@ import ( "github.com/go-kit/log" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" "github.com/grafana/loki/v3/pkg/goldfish" ) @@ -23,6 +24,7 @@ type mockStorage struct { queries []goldfish.QuerySample capturedFilter goldfish.QueryFilter // Add this to capture the full filter error error + getQueryFunc func(ctx context.Context, correlationID string) (*goldfish.QuerySample, error) } func (m *mockStorage) StoreQuerySample(_ context.Context, _ *goldfish.QuerySample) error { @@ -63,10 +65,106 @@ func (m *mockStorage) GetSampledQueries(_ context.Context, page, pageSize int, f }, nil } +func (m *mockStorage) GetQueryByCorrelationID(ctx context.Context, correlationID string) (*goldfish.QuerySample, error) { + if m.getQueryFunc != nil { + return m.getQueryFunc(ctx, correlationID) + } + if m.error != nil { + return nil, m.error + } + // Default behavior: search through queries + for _, q := range m.queries { + if q.CorrelationID == correlationID { + return &q, nil + } + } + return nil, errors.New("query not found") +} + func (m *mockStorage) Close() error { return nil } +// Mock bucket client for testing +type mockBucket struct { + getFunc func(ctx context.Context, key string) (io.ReadCloser, error) +} + +func (m *mockBucket) Get(ctx context.Context, key string) (io.ReadCloser, error) { + if m.getFunc != nil { + return m.getFunc(ctx, key) + } + return nil, errors.New("not implemented") +} + +func (m *mockBucket) Upload(_ context.Context, _ string, _ io.Reader) error { + return nil +} + +func (m *mockBucket) Delete(_ context.Context, _ string) error { + return nil +} + +func (m *mockBucket) Exists(_ context.Context, _ string) (bool, error) { + return false, nil +} + +func (m *mockBucket) Iter(_ context.Context, _ string, _ func(string) error, _ ...objstore.IterOption) error { + return nil +} + +func (m *mockBucket) GetRange(_ context.Context, _ string, _ int64, _ int64) (io.ReadCloser, error) { + return nil, nil +} + +func (m *mockBucket) Attributes(_ context.Context, _ string) (objstore.ObjectAttributes, error) { + return objstore.ObjectAttributes{}, nil +} + +func (m *mockBucket) ObjectSize(_ context.Context, _ string) (uint64, error) { + return 0, nil +} + +func (m *mockBucket) Close() error { + return nil +} + +func (m *mockBucket) Name() string { + return "mock-bucket" +} + +func (m *mockBucket) WithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.Bucket { + return m +} + +func (m *mockBucket) ReaderWithExpectedErrs(_ objstore.IsOpFailureExpectedFunc) objstore.BucketReader { + return m +} + +func (m *mockBucket) SupportedIterOptions() []objstore.IterOptionType { + return nil +} + +func (m *mockBucket) Provider() objstore.ObjProvider { + return objstore.ObjProvider("mock") +} + +func (m *mockBucket) GetAndReplace(_ context.Context, _ string, _ func(existing io.ReadCloser) (io.ReadCloser, error)) error { + return errors.New("not implemented") +} + +func (m *mockBucket) IsAccessDeniedErr(_ error) bool { + return false +} + +func (m *mockBucket) IsObjNotFoundErr(_ error) bool { + return false +} + +func (m *mockBucket) IterWithAttributes(_ context.Context, _ string, _ func(objstore.IterObjectAttributes) error, _ ...objstore.IterOption) error { + return nil +} + // createTestService creates a Service with common test defaults func createTestService(storage goldfish.Storage) *Service { return createTestServiceWithConfig(storage, GoldfishConfig{ diff --git a/pkg/ui/handler.go b/pkg/ui/handler.go index 16eb8dc7ec..9bba03ffa8 100644 --- a/pkg/ui/handler.go +++ b/pkg/ui/handler.go @@ -2,11 +2,15 @@ package ui import ( + "bytes" + "compress/gzip" "context" "encoding/json" "fmt" + "io" "net/http" "net/http/httputil" + "net/url" "strconv" "strings" "time" @@ -29,8 +33,15 @@ const ( goldfishPath = prefixPath + "/api/v1/goldfish/queries" notFoundPath = prefixPath + "/api/v1/404" contentTypeJSON = "application/json" + + cellA = "cell-a" + cellB = "cell-b" ) +func goldfishResultPath(cell string) string { + return prefixPath + "/api/v1/goldfish/results/{correlationId}/" + cell +} + // Context keys for trace information type contextKey string @@ -47,6 +58,8 @@ func (s *Service) RegisterHandler() { s.router.Path(detailsPath).Handler(s.detailsHandler()) s.router.Path(featuresPath).Handler(s.featuresHandler()) s.router.Path(goldfishPath).Handler(s.goldfishQueriesHandler()) + s.router.Path(goldfishResultPath(cellA)).Handler(s.goldfishResultHandler(cellA)) + s.router.Path(goldfishResultPath(cellB)).Handler(s.goldfishResultHandler(cellB)) s.router.PathPrefix(proxyPath).Handler(s.clusterProxyHandler()) s.router.PathPrefix(notFoundPath).Handler(s.notFoundHandler()) @@ -332,3 +345,120 @@ func (s *Service) goldfishQueriesHandler() http.Handler { } }) } + +func (s *Service) goldfishResultHandler(cell string) http.Handler { + return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + vars := mux.Vars(r) + correlationID := vars["correlationId"] + + if !s.cfg.Goldfish.Enable { + s.writeJSONError(w, http.StatusNotFound, "goldfish feature is disabled") + return + } + + // Check if bucket client is available + if s.goldfishBucket == nil { + s.writeJSONError(w, http.StatusNotImplemented, "result storage is not configured") + return + } + + // Fetch query metadata from database + query, err := s.goldfishStorage.GetQueryByCorrelationID(r.Context(), correlationID) + if err != nil { + level.Error(s.logger).Log("msg", "failed to fetch query by correlation ID", "correlation_id", correlationID, "err", err) + s.writeJSONError(w, http.StatusNotFound, fmt.Sprintf("query with correlation ID %s not found", correlationID)) + return + } + + // Get the appropriate result URI and compression based on cell + var resultURI, compression string + if cell == "cell-a" { + resultURI = query.CellAResultURI + compression = query.CellAResultCompression + } else { + resultURI = query.CellBResultURI + compression = query.CellBResultCompression + } + + // Check if result was persisted + if resultURI == "" { + s.writeJSONError(w, http.StatusNotFound, fmt.Sprintf("result for %s was not persisted to object storage", cell)) + return + } + + // Parse URI to extract bucket path + // URI format: "gcs://bucket-name/path/to/object" or "s3://bucket-name/path/to/object" + objectKey, err := parseObjectKeyFromURI(resultURI) + if err != nil { + level.Error(s.logger).Log("msg", "failed to parse result URI", "uri", resultURI, "err", err) + s.writeJSONError(w, http.StatusInternalServerError, "failed to parse result URI") + return + } + + // Download object from bucket + reader, err := s.goldfishBucket.Get(r.Context(), objectKey) + if err != nil { + level.Error(s.logger).Log( + "msg", "failed to fetch object from bucket", + "uri", resultURI, + "object_key", objectKey, + "err", err) + s.writeJSONError(w, http.StatusInternalServerError, "failed to fetch result from storage") + return + } + defer reader.Close() + + // Read object data + data, err := io.ReadAll(reader) + if err != nil { + level.Error(s.logger).Log("msg", "failed to read object data", "object_key", objectKey, "err", err) + s.writeJSONError(w, http.StatusInternalServerError, "failed to read result data") + return + } + + // Decompress if needed + if compression == "gzip" { + gzReader, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + level.Error(s.logger).Log("msg", "failed to create gzip reader", "err", err) + s.writeJSONError(w, http.StatusInternalServerError, "failed to decompress result") + return + } + defer gzReader.Close() + + data, err = io.ReadAll(gzReader) + if err != nil { + level.Error(s.logger).Log("msg", "failed to decompress data", "err", err) + s.writeJSONError(w, http.StatusInternalServerError, "failed to decompress result") + return + } + } + + // Return JSON response + w.Header().Set("Content-Type", contentTypeJSON) + if _, err := w.Write(data); err != nil { + level.Error(s.logger).Log("msg", "failed to write response", "err", err) + } + }) +} + +// parseObjectKeyFromURI extracts the object key from a URI like "gcs://bucket/path" or "s3://bucket/path" +func parseObjectKeyFromURI(uri string) (string, error) { + parsed, err := url.Parse(uri) + if err != nil { + return "", fmt.Errorf("invalid URI: %w", err) + } + + // Validate scheme + if parsed.Scheme != "gcs" && parsed.Scheme != "s3" { + return "", fmt.Errorf("unsupported URI scheme: %s (expected gcs or s3)", parsed.Scheme) + } + + // The path has a leading "/" which we need to trim + key := strings.TrimPrefix(parsed.Path, "/") + if key == "" { + return "", fmt.Errorf("invalid URI: missing object path") + } + + return key, nil +} diff --git a/pkg/ui/handler_test.go b/pkg/ui/handler_test.go index 6c8ad081f8..8e5681ce5e 100644 --- a/pkg/ui/handler_test.go +++ b/pkg/ui/handler_test.go @@ -1,14 +1,23 @@ package ui import ( + "bytes" + "compress/gzip" + "context" "encoding/json" + "errors" + "io" "net/http" "net/http/httptest" "testing" "github.com/go-kit/log" + "github.com/gorilla/mux" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/thanos-io/objstore" + + "github.com/grafana/loki/v3/pkg/goldfish" ) func TestFeaturesHandler_GoldfishWithNamespaces(t *testing.T) { @@ -107,3 +116,186 @@ func TestFeaturesHandler_GoldfishWithNamespaces(t *testing.T) { assert.Nil(t, goldfishData["cellBNamespace"], "cellBNamespace should be nil when not configured") }) } + +func TestGoldfishResultHandler(t *testing.T) { + setup := func(enabled bool, storage goldfish.Storage, bucket objstore.InstrumentedBucket) *httptest.ResponseRecorder { + service := &Service{ + cfg: Config{ + Goldfish: GoldfishConfig{ + Enable: enabled, + }, + }, + logger: log.NewNopLogger(), + goldfishStorage: storage, + goldfishBucket: bucket, + } + + handler := service.goldfishResultHandler(cellA) + req := httptest.NewRequest("GET", "/api/v1/goldfish/results/test-id/cell-a", nil) + req = mux.SetURLVars(req, map[string]string{"correlationId": "test-id"}) + rr := httptest.NewRecorder() + handler.ServeHTTP(rr, req) + + return rr + } + + t.Run("returns error when goldfish is disabled", func(t *testing.T) { + rr := setup(false, nil, nil) + assert.Equal(t, http.StatusNotFound, rr.Code) + assert.Contains(t, rr.Body.String(), "goldfish feature is disabled") + }) + + t.Run("returns error when bucket client is not configured", func(t *testing.T) { + rr := setup(true, nil, nil) + assert.Equal(t, http.StatusNotImplemented, rr.Code) + assert.Contains(t, rr.Body.String(), "result storage is not configured") + }) + + t.Run("returns 404 not found when correlation ID not found in database", func(t *testing.T) { + storage := &mockStorage{ + getQueryFunc: func(_ context.Context, _ string) (*goldfish.QuerySample, error) { + return nil, errors.New("not found") + }, + } + + rr := setup(true, storage, &mockBucket{}) + assert.Equal(t, http.StatusNotFound, rr.Code) + assert.Contains(t, rr.Body.String(), "not found") + }) + + t.Run("returns 404 not found when result URI is empty", func(t *testing.T) { + storage := &mockStorage{ + getQueryFunc: func(_ context.Context, correlationID string) (*goldfish.QuerySample, error) { + return &goldfish.QuerySample{ + CorrelationID: correlationID, + CellAResultURI: "", // No result URI + }, nil + }, + } + + rr := setup(true, storage, &mockBucket{}) + assert.Equal(t, http.StatusNotFound, rr.Code) + assert.Contains(t, rr.Body.String(), "was not persisted to object storage") + }) + + t.Run("successfully fetches and decompresses gzipped result", func(t *testing.T) { + originalData := []byte(`{"result": "compressed test data"}`) + + // Compress data with gzip + var compressedBuf bytes.Buffer + gzWriter := gzip.NewWriter(&compressedBuf) + _, err := gzWriter.Write(originalData) + require.NoError(t, err) + require.NoError(t, gzWriter.Close()) + + storage := &mockStorage{ + getQueryFunc: func(_ context.Context, correlationID string) (*goldfish.QuerySample, error) { + return &goldfish.QuerySample{ + CorrelationID: correlationID, + CellAResultURI: "s3://test-bucket/path/to/result.json.gz", + CellAResultCompression: "gzip", + }, nil + }, + } + + bucket := &mockBucket{ + getFunc: func(_ context.Context, key string) (io.ReadCloser, error) { + assert.Equal(t, "path/to/result.json.gz", key) + return io.NopCloser(bytes.NewReader(compressedBuf.Bytes())), nil + }, + } + + rr := setup(true, storage, bucket) + assert.Equal(t, http.StatusOK, rr.Code) + assert.Equal(t, "application/json", rr.Header().Get("Content-Type")) + assert.JSONEq(t, string(originalData), rr.Body.String()) + }) +} + +func TestParseObjectKeyFromURI(t *testing.T) { + tests := []struct { + name string + uri string + expectedKey string + expectError bool + }{ + { + name: "valid GCS URI", + uri: "gcs://my-bucket/path/to/object.json", + expectedKey: "path/to/object.json", + expectError: false, + }, + { + name: "valid S3 URI", + uri: "s3://my-bucket/path/to/object.json.gz", + expectedKey: "path/to/object.json.gz", + expectError: false, + }, + { + name: "valid URI with nested path", + uri: "gcs://my-bucket/prefix/2025/01/01/test-id/cell-a.json", + expectedKey: "prefix/2025/01/01/test-id/cell-a.json", + expectError: false, + }, + { + name: "valid URI with URL-encoded characters in path", + uri: "gcs://my-bucket/path/with%20spaces/object.json", + expectedKey: "path/with spaces/object.json", // url.Parse automatically decodes + expectError: false, + }, + { + name: "unsupported scheme http", + uri: "http://bucket/path", + expectedKey: "", + expectError: true, + }, + { + name: "unsupported scheme https", + uri: "https://bucket/path", + expectedKey: "", + expectError: true, + }, + { + name: "missing path", + uri: "gcs://my-bucket", + expectedKey: "", + expectError: true, + }, + { + name: "missing path with trailing slash", + uri: "gcs://my-bucket/", + expectedKey: "", + expectError: true, + }, + { + name: "empty URI", + uri: "", + expectedKey: "", + expectError: true, + }, + { + name: "malformed URI", + uri: "not-a-uri", + expectedKey: "", + expectError: true, + }, + { + name: "realistic example", + uri: "gcs://dev-us-central-0-loki-dev-005-goldfish-results/goldfish/results/2024/10/11/fc761f29-edad-4152-bedf-331d8cf2dbd5/cell-a.json.gz", + expectedKey: "goldfish/results/2024/10/11/fc761f29-edad-4152-bedf-331d8cf2dbd5/cell-a.json.gz", + expectError: false, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + key, err := parseObjectKeyFromURI(tt.uri) + if tt.expectError { + assert.Error(t, err) + } else { + require.NoError(t, err) + assert.Equal(t, tt.expectedKey, key) + } + }) + } +} diff --git a/pkg/ui/service.go b/pkg/ui/service.go index 95386163fa..d04d745cf1 100644 --- a/pkg/ui/service.go +++ b/pkg/ui/service.go @@ -16,12 +16,14 @@ import ( "github.com/grafana/dskit/ring" "github.com/grafana/dskit/services" "github.com/prometheus/client_golang/prometheus" + "github.com/thanos-io/objstore" "golang.org/x/net/http2" // This is equivalent to a main.go for the Loki UI, so the blank import is allowed _ "github.com/go-sql-driver/mysql" //nolint:revive "github.com/grafana/loki/v3/pkg/goldfish" + "github.com/grafana/loki/v3/pkg/storage/bucket" ) // This allows to rate limit the number of updates when the cluster is frequently changing (e.g. during rollout). @@ -42,6 +44,7 @@ type Service struct { reg prometheus.Registerer goldfishStorage goldfish.Storage goldfishMetrics *GoldfishMetrics + goldfishBucket objstore.InstrumentedBucket now func() time.Time } @@ -100,6 +103,9 @@ func (s *Service) stop(_ error) error { if s.goldfishStorage != nil { s.goldfishStorage.Close() } + if s.goldfishBucket != nil { + s.goldfishBucket.Close() + } return nil } @@ -170,5 +176,17 @@ func (s *Service) initGoldfishDB() error { s.goldfishStorage = storage level.Info(s.logger).Log("msg", "goldfish storage initialized successfully") + + // Initialize bucket client if results backend is configured + if s.cfg.Goldfish.ResultsBackend != "" { + bucketClient, err := bucket.NewClient(context.Background(), s.cfg.Goldfish.ResultsBackend, s.cfg.Goldfish.ResultsBucket, "goldfish-ui-results", s.logger) + if err != nil { + level.Warn(s.logger).Log("msg", "failed to create goldfish bucket client, result fetching will be disabled", "err", err) + } else { + s.goldfishBucket = bucketClient + level.Info(s.logger).Log("msg", "goldfish bucket client initialized successfully", "backend", s.cfg.Goldfish.ResultsBackend) + } + } + return nil } diff --git a/tools/querytee/goldfish/manager_test.go b/tools/querytee/goldfish/manager_test.go index 6fa477fe66..3c9baf0d72 100644 --- a/tools/querytee/goldfish/manager_test.go +++ b/tools/querytee/goldfish/manager_test.go @@ -48,6 +48,11 @@ func (m *mockStorage) Close() error { return nil } +func (m *mockStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*goldfish.QuerySample, error) { + // This is only used for UI, not needed in manager tests + return nil, nil +} + func TestManager_ShouldSample(t *testing.T) { tests := []struct { name string diff --git a/tools/querytee/goldfish/result_store.go b/tools/querytee/goldfish/result_store.go index b278b3f9a4..a06295814e 100644 --- a/tools/querytee/goldfish/result_store.go +++ b/tools/querytee/goldfish/result_store.go @@ -46,6 +46,7 @@ type StoredResult struct { type bucketResultStore struct { bucket objstore.InstrumentedBucket + bucketName string prefix string compression string backend string @@ -59,8 +60,23 @@ func NewResultStore(ctx context.Context, cfg ResultsStorageConfig, logger log.Lo return nil, fmt.Errorf("create bucket client: %w", err) } + var bucketName string + switch cfg.Backend { + case "s3": + bucketName = cfg.Bucket.S3.BucketName + case "gcs": + bucketName = cfg.Bucket.GCS.BucketName + default: + return nil, fmt.Errorf("unsupported backend: %s", cfg.Backend) + } + + if cfg.Bucket.StoragePrefix != "" { + bucketName = fmt.Sprintf("%s/%s", bucketName, strings.Trim(cfg.Bucket.StoragePrefix, "/")) + } + return &bucketResultStore{ bucket: bucketClient, + bucketName: bucketName, prefix: strings.Trim(cfg.ObjectPrefix, "/"), compression: cfg.Compression, backend: cfg.Backend, @@ -92,13 +108,14 @@ func (s *bucketResultStore) Store(ctx context.Context, payload []byte, opts Stor } } - uri := fmt.Sprintf("%s://%s/%s", s.backend, s.bucket.Name(), key) + uri := fmt.Sprintf("%s://%s/%s", s.backend, s.bucketName, key) level.Debug(s.logger).Log( "component", "goldfish-result-store", "object", key, - "bucket", s.bucket.Name(), + "bucket", s.bucketName, "backend", s.backend, + "uri", uri, "size_bytes", len(encoded), ) diff --git a/tools/querytee/goldfish/result_store_test.go b/tools/querytee/goldfish/result_store_test.go index 7f75807380..35559ace91 100644 --- a/tools/querytee/goldfish/result_store_test.go +++ b/tools/querytee/goldfish/result_store_test.go @@ -9,9 +9,12 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/flagext" + "github.com/grafana/loki/v3/pkg/storage/bucket" "github.com/prometheus/client_golang/prometheus" "github.com/stretchr/testify/require" "github.com/thanos-io/objstore" + "gopkg.in/yaml.v2" ) func TestBucketResultStoreStoresCompressedPayload(t *testing.T) { @@ -90,3 +93,46 @@ func TestBucketResultStoreStoresUncompressedPayload(t *testing.T) { require.NoError(t, err) require.Equal(t, payload, body) } + +const configWithGCSBackend = ` +storage_prefix: storage-prefix +gcs: + bucket_name: bucket + service_account: |- + { + "type": "service_account", + "project_id": "id", + "private_key_id": "id", + "private_key": "-----BEGIN PRIVATE KEY-----\nSOMETHING\n-----END PRIVATE KEY-----\n", + "client_email": "test@test.com", + "client_id": "12345", + "auth_uri": "https://accounts.google.com/o/oauth2/auth", + "token_uri": "https://oauth2.googleapis.com/token", + "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs", + "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/test%40test.com" + } +` + +func TestNewResultsBucket_CorrectBucketName(t *testing.T) { + bktCfg := bucket.Config{} + flagext.DefaultValues(&bktCfg) + + err := yaml.Unmarshal([]byte(configWithGCSBackend), &bktCfg) + require.NoError(t, err) + + rsCfg := ResultsStorageConfig{ + Enabled: false, + Mode: ResultsPersistenceModeMismatchOnly, + Backend: ResultsBackendGCS, + ObjectPrefix: "goldfish/results", + Compression: "gzip", + Bucket: bktCfg, + } + resultStorage, err := NewResultStore(context.Background(), rsCfg, log.NewNopLogger()) + require.NoError(t, err) + + rs, ok := resultStorage.(*bucketResultStore) + require.True(t, ok) + + require.Equal(t, "bucket/storage-prefix", rs.bucketName) +} diff --git a/tools/querytee/proxy_endpoint_test.go b/tools/querytee/proxy_endpoint_test.go index 66ce31e5da..0e7c9f98da 100644 --- a/tools/querytee/proxy_endpoint_test.go +++ b/tools/querytee/proxy_endpoint_test.go @@ -531,6 +531,10 @@ func (m *mockGoldfishStorage) Close() error { return nil } +func (m *mockGoldfishStorage) GetQueryByCorrelationID(_ context.Context, _ string) (*goldfish.QuerySample, error) { + return nil, nil +} + func Test_extractTenant(t *testing.T) { tests := []struct { name string