From 66033bb967ffbf0bbadb463b09e4509ce8c2242d Mon Sep 17 00:00:00 2001 From: Trevor Whitney Date: Thu, 27 Mar 2025 08:57:30 -0600 Subject: [PATCH] fix: pattern ingester pushes detected level as structured metadata (#16924) --- pkg/pattern/aggregation/push.go | 18 ++++++++++-------- pkg/pattern/aggregation/push_test.go | 15 ++++++++++++--- pkg/pattern/ingester_test.go | 20 +++++++++++++------- pkg/pattern/instance.go | 6 +++++- 4 files changed, 40 insertions(+), 19 deletions(-) diff --git a/pkg/pattern/aggregation/push.go b/pkg/pattern/aggregation/push.go index d5f755b07d..55d266b86a 100644 --- a/pkg/pattern/aggregation/push.go +++ b/pkg/pattern/aggregation/push.go @@ -44,7 +44,7 @@ var defaultUserAgent = fmt.Sprintf("pattern-ingester-push/%s", build.GetVersion( type EntryWriter interface { // WriteEntry handles sending the log to the output // To maintain consistent log timing, Write is expected to be non-blocking - WriteEntry(ts time.Time, entry string, lbls labels.Labels) + WriteEntry(ts time.Time, e string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) Stop() } @@ -78,9 +78,10 @@ type Push struct { } type entry struct { - ts time.Time - entry string - labels labels.Labels + ts time.Time + entry string + labels labels.Labels + structuredMetadata []logproto.LabelAdapter } type entries struct { @@ -156,8 +157,8 @@ func NewPush( } // WriteEntry implements EntryWriter -func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) { - p.entries.add(entry{ts: ts, entry: e, labels: lbls}) +func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) { + p.entries.add(entry{ts: ts, entry: e, labels: lbls, structuredMetadata: structuredMetadata}) } // Stop will cancel any ongoing requests and stop the goroutine listening for requests @@ -190,8 +191,9 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) { } entries = append(entries, logproto.Entry{ - Timestamp: e.ts, - Line: e.entry, + Timestamp: e.ts, + Line: e.entry, + StructuredMetadata: e.structuredMetadata, }) entriesByStream[stream] = entries } diff --git a/pkg/pattern/aggregation/push_test.go b/pkg/pattern/aggregation/push_test.go index 08810272da..f222418720 100644 --- a/pkg/pattern/aggregation/push_test.go +++ b/pkg/pattern/aggregation/push_test.go @@ -21,6 +21,7 @@ import ( "github.com/grafana/loki/v3/pkg/logproto" "github.com/grafana/loki/v3/pkg/util" + "github.com/grafana/loki/v3/pkg/util/constants" ) const ( @@ -32,6 +33,9 @@ const ( func Test_Push(t *testing.T) { lbls := labels.New(labels.Label{Name: "test", Value: "test"}) + structuredMetadata := []logproto.LabelAdapter{ + {Name: constants.LevelLabel, Value: "info"}, + } // create dummy loki server responses := make(chan response, 1) // buffered not to block the response handler @@ -62,7 +66,7 @@ func Test_Push(t *testing.T) { ) require.NoError(t, err) ts, payload := testPayload() - push.WriteEntry(ts, payload, lbls) + push.WriteEntry(ts, payload, lbls, structuredMetadata) resp := <-responses assertResponse(t, resp, false, labelSet("test", "test"), ts, payload) }) @@ -87,7 +91,7 @@ func Test_Push(t *testing.T) { ) require.NoError(t, err) ts, payload := testPayload() - push.WriteEntry(ts, payload, lbls) + push.WriteEntry(ts, payload, lbls, structuredMetadata) resp := <-responses assertResponse(t, resp, true, labelSet("test", "test"), ts, payload) }) @@ -142,32 +146,37 @@ func Test_Push(t *testing.T) { wayBack, AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1), lbls1, + structuredMetadata, ) p.WriteEntry( then, AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1), lbls1, + structuredMetadata, ) p.WriteEntry( now, AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1), lbls1, + structuredMetadata, ) - p.WriteEntry( wayBack, AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2), lbls2, + structuredMetadata, ) p.WriteEntry( then, AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2), lbls2, + structuredMetadata, ) p.WriteEntry( now, AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2), lbls2, + structuredMetadata, ) p.running.Add(1) diff --git a/pkg/pattern/ingester_test.go b/pkg/pattern/ingester_test.go index 0b1404fe54..96d708db5d 100644 --- a/pkg/pattern/ingester_test.go +++ b/pkg/pattern/ingester_test.go @@ -47,7 +47,7 @@ func TestInstancePushQuery(t *testing.T) { } mockWriter := &mockEntryWriter{} - mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything) + mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything) inst, err := newInstance( "foo", @@ -135,7 +135,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) { } mockWriter := &mockEntryWriter{} - mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything) + mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything) inst, err := newInstance( "foo", @@ -285,8 +285,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) { ), labels.New( labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "test_service"}, - labels.Label{Name: "level", Value: "info"}, ), + []logproto.LabelAdapter{ + {Name: constants.LevelLabel, Value: constants.LogLevelInfo}, + }, ) mockWriter.AssertCalled( @@ -302,8 +304,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) { ), labels.New( labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "foo_service"}, - labels.Label{Name: "level", Value: "error"}, ), + []logproto.LabelAdapter{ + {Name: constants.LevelLabel, Value: constants.LogLevelError}, + }, ) mockWriter.AssertCalled( @@ -319,8 +323,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) { ), labels.New( labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "baz_service"}, - labels.Label{Name: "level", Value: "error"}, ), + []logproto.LabelAdapter{ + {Name: constants.LevelLabel, Value: constants.LogLevelError}, + }, ) require.Equal(t, 0, len(inst.aggMetricsByStreamAndLevel)) @@ -331,8 +337,8 @@ type mockEntryWriter struct { mock.Mock } -func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels) { - _ = m.Called(ts, entry, lbls) +func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) { + _ = m.Called(ts, entry, lbls, structuredMetadata) } func (m *mockEntryWriter) Stop() { diff --git a/pkg/pattern/instance.go b/pkg/pattern/instance.go index 7ec5fd3bdc..085b4967b3 100644 --- a/pkg/pattern/instance.go +++ b/pkg/pattern/instance.go @@ -328,7 +328,10 @@ func (i *instance) writeAggregatedMetrics( newLbls := labels.Labels{ labels.Label{Name: push.AggregatedMetricLabel, Value: service}, - labels.Label{Name: "level", Value: level}, + } + + sturcturedMetadata := []logproto.LabelAdapter{ + {Name: constants.LevelLabel, Value: level}, } if i.writer != nil { @@ -336,6 +339,7 @@ func (i *instance) writeAggregatedMetrics( now.Time(), aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls), newLbls, + sturcturedMetadata, ) i.metrics.samples.WithLabelValues(service).Inc()