From fba0d8ddb06c0acc2a5a66be6a0abab1e024462f Mon Sep 17 00:00:00 2001 From: Ivana Huckova <30407135+ivanahuckova@users.noreply.github.com> Date: Fri, 13 Jun 2025 10:49:55 +0200 Subject: [PATCH] Loki: Ensure that streaming is behind feature toggle in backend (#106657) * Loki: Ensure that streaming is behind feature toggle in backend * Update getLiveStreamKey to include orgId and datasourceUIS --- pkg/tsdb/loki/streaming.go | 7 +++ pkg/tsdb/loki/streaming_test.go | 61 +++++++++++++++++++ .../app/plugins/datasource/loki/streaming.ts | 2 +- 3 files changed, 69 insertions(+), 1 deletion(-) create mode 100644 pkg/tsdb/loki/streaming_test.go diff --git a/pkg/tsdb/loki/streaming.go b/pkg/tsdb/loki/streaming.go index 01dba69ed7c..5996161b554 100644 --- a/pkg/tsdb/loki/streaming.go +++ b/pkg/tsdb/loki/streaming.go @@ -13,9 +13,16 @@ import ( "github.com/gorilla/websocket" "github.com/grafana/grafana-plugin-sdk-go/backend" "github.com/grafana/grafana-plugin-sdk-go/data" + "github.com/grafana/grafana/pkg/services/featuremgmt" ) func (s *Service) SubscribeStream(ctx context.Context, req *backend.SubscribeStreamRequest) (*backend.SubscribeStreamResponse, error) { + if !isFeatureEnabled(ctx, featuremgmt.FlagLokiExperimentalStreaming) { + return &backend.SubscribeStreamResponse{ + Status: backend.SubscribeStreamStatusPermissionDenied, + }, fmt.Errorf("streaming is not supported") + } + dsInfo, err := s.getDSInfo(ctx, req.PluginContext) if err != nil { return &backend.SubscribeStreamResponse{ diff --git a/pkg/tsdb/loki/streaming_test.go b/pkg/tsdb/loki/streaming_test.go new file mode 100644 index 00000000000..2492eb79dd7 --- /dev/null +++ b/pkg/tsdb/loki/streaming_test.go @@ -0,0 +1,61 @@ +package loki + +import ( + "context" + "testing" + + "github.com/grafana/grafana-plugin-sdk-go/backend" + "github.com/grafana/grafana-plugin-sdk-go/backend/datasource" + "github.com/grafana/grafana-plugin-sdk-go/backend/httpclient" + "github.com/grafana/grafana-plugin-sdk-go/experimental/featuretoggles" + "github.com/grafana/grafana/pkg/infra/tracing" + "github.com/grafana/grafana/pkg/services/featuremgmt" + "github.com/stretchr/testify/require" +) + +func TestSubscribeStream(t *testing.T) { + // Create a service instance with required dependencies + service := &Service{ + im: datasource.NewInstanceManager(newInstanceSettings(httpclient.NewProvider())), + tracer: tracing.InitializeTracerForTest(), + logger: backend.NewLoggerWith("logger", "loki test"), + } + + // Create a test request + req := &backend.SubscribeStreamRequest{ + PluginContext: backend.PluginContext{ + DataSourceInstanceSettings: &backend.DataSourceInstanceSettings{ + ID: 1, + UID: "test", + Type: "loki", + URL: "http://localhost:3100", + }, + }, + Path: "tail/test", + Data: []byte(`{"expr": "test"}`), + } + + t.Run("when feature toggle is disabled", func(t *testing.T) { + // Create a context without the feature toggle enabled + ctx := context.Background() + + resp, err := service.SubscribeStream(ctx, req) + + require.Error(t, err) + require.Equal(t, "streaming is not supported", err.Error()) + require.Equal(t, backend.SubscribeStreamStatusPermissionDenied, resp.Status) + }) + + t.Run("when feature toggle is enabled", func(t *testing.T) { + // Create a context with the feature toggle enabled + cfg := backend.NewGrafanaCfg(map[string]string{ + featuretoggles.EnabledFeatures: featuremgmt.FlagLokiExperimentalStreaming, + }) + ctx := backend.WithGrafanaConfig(context.Background(), cfg) + + resp, err := service.SubscribeStream(ctx, req) + + require.NoError(t, err) + require.Equal(t, backend.SubscribeStreamStatusOK, resp.Status) + }) +} diff --git a/public/app/plugins/datasource/loki/streaming.ts b/public/app/plugins/datasource/loki/streaming.ts index 1c3f9f77927..0ee41959324 100644 --- a/public/app/plugins/datasource/loki/streaming.ts +++ b/public/app/plugins/datasource/loki/streaming.ts @@ -25,7 +25,7 @@ export async function getLiveStreamKey(query: LokiQuery): Promise { const msgUint8 = new TextEncoder().encode(str); // encode as (utf-8) Uint8Array const hashBuffer = await crypto.subtle.digest('SHA-1', msgUint8); // hash the message const hashArray = Array.from(new Uint8Array(hashBuffer.slice(0, 8))); // first 8 bytes - return hashArray.map((b) => b.toString(16).padStart(2, '0')).join(''); + return `${query.datasource?.uid}/${hashArray.map((b) => b.toString(16).padStart(2, '0')).join('')}/${config.bootData.user.orgId}`; } // This will get both v1 and v2 result formats