fix: pattern ingester pushes detected level as structured metadata (#16924)

pull/16939/head
Trevor Whitney 10 months ago committed by GitHub
parent c01d9c7512
commit 66033bb967
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 18
      pkg/pattern/aggregation/push.go
  2. 15
      pkg/pattern/aggregation/push_test.go
  3. 20
      pkg/pattern/ingester_test.go
  4. 6
      pkg/pattern/instance.go

@ -44,7 +44,7 @@ var defaultUserAgent = fmt.Sprintf("pattern-ingester-push/%s", build.GetVersion(
type EntryWriter interface {
// WriteEntry handles sending the log to the output
// To maintain consistent log timing, Write is expected to be non-blocking
WriteEntry(ts time.Time, entry string, lbls labels.Labels)
WriteEntry(ts time.Time, e string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter)
Stop()
}
@ -78,9 +78,10 @@ type Push struct {
}
type entry struct {
ts time.Time
entry string
labels labels.Labels
ts time.Time
entry string
labels labels.Labels
structuredMetadata []logproto.LabelAdapter
}
type entries struct {
@ -156,8 +157,8 @@ func NewPush(
}
// WriteEntry implements EntryWriter
func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels) {
p.entries.add(entry{ts: ts, entry: e, labels: lbls})
func (p *Push) WriteEntry(ts time.Time, e string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) {
p.entries.add(entry{ts: ts, entry: e, labels: lbls, structuredMetadata: structuredMetadata})
}
// Stop will cancel any ongoing requests and stop the goroutine listening for requests
@ -190,8 +191,9 @@ func (p *Push) buildPayload(ctx context.Context) ([]byte, error) {
}
entries = append(entries, logproto.Entry{
Timestamp: e.ts,
Line: e.entry,
Timestamp: e.ts,
Line: e.entry,
StructuredMetadata: e.structuredMetadata,
})
entriesByStream[stream] = entries
}

@ -21,6 +21,7 @@ import (
"github.com/grafana/loki/v3/pkg/logproto"
"github.com/grafana/loki/v3/pkg/util"
"github.com/grafana/loki/v3/pkg/util/constants"
)
const (
@ -32,6 +33,9 @@ const (
func Test_Push(t *testing.T) {
lbls := labels.New(labels.Label{Name: "test", Value: "test"})
structuredMetadata := []logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: "info"},
}
// create dummy loki server
responses := make(chan response, 1) // buffered not to block the response handler
@ -62,7 +66,7 @@ func Test_Push(t *testing.T) {
)
require.NoError(t, err)
ts, payload := testPayload()
push.WriteEntry(ts, payload, lbls)
push.WriteEntry(ts, payload, lbls, structuredMetadata)
resp := <-responses
assertResponse(t, resp, false, labelSet("test", "test"), ts, payload)
})
@ -87,7 +91,7 @@ func Test_Push(t *testing.T) {
)
require.NoError(t, err)
ts, payload := testPayload()
push.WriteEntry(ts, payload, lbls)
push.WriteEntry(ts, payload, lbls, structuredMetadata)
resp := <-responses
assertResponse(t, resp, true, labelSet("test", "test"), ts, payload)
})
@ -142,32 +146,37 @@ func Test_Push(t *testing.T) {
wayBack,
AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test_service", lbls1),
lbls1,
structuredMetadata,
)
p.WriteEntry(
then,
AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test_service", lbls1),
lbls1,
structuredMetadata,
)
p.WriteEntry(
now,
AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test_service", lbls1),
lbls1,
structuredMetadata,
)
p.WriteEntry(
wayBack,
AggregatedMetricEntry(model.TimeFromUnix(wayBack.Unix()), 1, 1, "test2_service", lbls2),
lbls2,
structuredMetadata,
)
p.WriteEntry(
then,
AggregatedMetricEntry(model.TimeFromUnix(then.Unix()), 2, 2, "test2_service", lbls2),
lbls2,
structuredMetadata,
)
p.WriteEntry(
now,
AggregatedMetricEntry(model.TimeFromUnix(now.Unix()), 3, 3, "test2_service", lbls2),
lbls2,
structuredMetadata,
)
p.running.Add(1)

@ -47,7 +47,7 @@ func TestInstancePushQuery(t *testing.T) {
}
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
inst, err := newInstance(
"foo",
@ -135,7 +135,7 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
}
mockWriter := &mockEntryWriter{}
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything)
mockWriter.On("WriteEntry", mock.Anything, mock.Anything, mock.Anything, mock.Anything)
inst, err := newInstance(
"foo",
@ -285,8 +285,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "test_service"},
labels.Label{Name: "level", Value: "info"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelInfo},
},
)
mockWriter.AssertCalled(
@ -302,8 +304,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "foo_service"},
labels.Label{Name: "level", Value: "error"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelError},
},
)
mockWriter.AssertCalled(
@ -319,8 +323,10 @@ func TestInstancePushAggregateMetrics(t *testing.T) {
),
labels.New(
labels.Label{Name: loghttp_push.AggregatedMetricLabel, Value: "baz_service"},
labels.Label{Name: "level", Value: "error"},
),
[]logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: constants.LogLevelError},
},
)
require.Equal(t, 0, len(inst.aggMetricsByStreamAndLevel))
@ -331,8 +337,8 @@ type mockEntryWriter struct {
mock.Mock
}
func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels) {
_ = m.Called(ts, entry, lbls)
func (m *mockEntryWriter) WriteEntry(ts time.Time, entry string, lbls labels.Labels, structuredMetadata []logproto.LabelAdapter) {
_ = m.Called(ts, entry, lbls, structuredMetadata)
}
func (m *mockEntryWriter) Stop() {

@ -328,7 +328,10 @@ func (i *instance) writeAggregatedMetrics(
newLbls := labels.Labels{
labels.Label{Name: push.AggregatedMetricLabel, Value: service},
labels.Label{Name: "level", Value: level},
}
sturcturedMetadata := []logproto.LabelAdapter{
{Name: constants.LevelLabel, Value: level},
}
if i.writer != nil {
@ -336,6 +339,7 @@ func (i *instance) writeAggregatedMetrics(
now.Time(),
aggregation.AggregatedMetricEntry(now, totalBytes, totalCount, service, streamLbls),
newLbls,
sturcturedMetadata,
)
i.metrics.samples.WithLabelValues(service).Inc()

Loading…
Cancel
Save