fix: detected fields incorrect type bug (#13515)

pull/13529/head
Trevor Whitney 10 months ago committed by GitHub
parent e506995e59
commit f6a94d3034
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 6
      pkg/querier/querier.go
  2. 82
      pkg/querier/querier_mock_test.go
  3. 172
      pkg/querier/querier_test.go

@ -1118,9 +1118,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
detectedFields := parseDetectedFields(ctx, req.FieldLimit, streams)
//TODO: detected field needs to contain the sketch
// make sure response to frontend is GRPC
//only want cardinality in JSON
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
@ -1141,7 +1138,6 @@ func (q *SingleTenantQuerier) DetectedFields(ctx context.Context, req *logproto.
fieldCount++
}
//TODO: detected fields response needs to include the sketch
return &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: req.GetFieldLimit(),
@ -1218,7 +1214,6 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
fieldCount := uint32(0)
for _, stream := range streams {
detectType := true
level.Debug(spanlogger.FromContext(ctx)).Log(
"detected_fields", "true",
"msg", fmt.Sprintf("looking for detected fields in stream %d with %d lines", stream.Hash, len(stream.Entries)))
@ -1241,6 +1236,7 @@ func parseDetectedFields(ctx context.Context, limit uint32, streams logqlmodel.S
df.parsers = append(df.parsers, *parser)
}
detectType := true
for _, v := range vals {
parsedFields := detectedFields[k]
if detectType {

@ -9,6 +9,8 @@ import (
"github.com/grafana/loki/v3/pkg/logql/log"
"github.com/grafana/loki/pkg/push"
"github.com/grafana/loki/v3/pkg/loghttp"
"github.com/grafana/dskit/grpcclient"
@ -118,7 +120,6 @@ func (c *querierClientMock) GetDetectedLabels(ctx context.Context, in *logproto.
return (*logproto.LabelToValuesResponse)(nil), args.Error(1)
}
return res.(*logproto.LabelToValuesResponse), args.Error(1)
}
func (c *querierClientMock) GetVolume(ctx context.Context, in *logproto.VolumeRequest, opts ...grpc.CallOption) (*logproto.VolumeResponse, error) {
@ -517,6 +518,20 @@ func mockStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockStream(from, quantity))
}
// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIterator(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStream(from, quantity))
}
// mockLogfmtStreamIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from, and the line is in logfmt format with the fields message, count and fake
func mockLogfmtStreamIteratorWithStructuredMetadata(from int, quantity int) iter.EntryIterator {
return iter.NewStreamIterator(mockLogfmtStreamWithStructuredMetadata(from, quantity))
}
// mockSampleIterator returns an iterator with 1 stream and quantity entries,
// where entries timestamp and line string are constructed as sequential numbers
// starting at from
@ -546,6 +561,71 @@ func mockStreamWithLabels(from int, quantity int, labels string) logproto.Stream
}
}
func mockLogfmtStream(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabels(from, quantity, `{type="test"}`)
}
func mockLogfmtStreamWithLabels(_ int, quantity int, labels string) logproto.Stream {
entries := make([]logproto.Entry, 0, quantity)
// used for detected fields queries which are always BACKWARD
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(
`message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t`,
i,
i,
(i * 10),
(i * 256),
float32(i*10.0),
(i%2 == 0)),
})
}
return logproto.Stream{
Entries: entries,
Labels: labels,
}
}
func mockLogfmtStreamWithStructuredMetadata(from int, quantity int) logproto.Stream {
return mockLogfmtStreamWithLabelsAndStructuredMetadata(from, quantity, `{type="test"}`)
}
func mockLogfmtStreamWithLabelsAndStructuredMetadata(
from int,
quantity int,
labels string,
) logproto.Stream {
var entries []logproto.Entry
metadata := push.LabelsAdapter{
{
Name: "constant",
Value: "constant",
},
}
for i := from; i < from+quantity; i++ {
metadata = append(metadata, push.LabelAdapter{
Name: "variable",
Value: fmt.Sprintf("value%d", i),
})
}
for i := quantity; i > 0; i-- {
entries = append(entries, logproto.Entry{
Timestamp: time.Unix(int64(i), 0),
Line: fmt.Sprintf(`message="line %d" count=%d fake=true`, i, i),
StructuredMetadata: metadata,
})
}
return logproto.Stream{
Labels: labels,
Entries: entries,
}
}
type querierMock struct {
util.ExtendedMock
}

@ -324,9 +324,11 @@ func TestQuerier_SeriesAPI(t *testing.T) {
{Key: "a", Value: "1"},
{Key: "b", Value: "2"},
}},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"}},
{
Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
{Key: "b", Value: "3"},
},
},
{Labels: []logproto.SeriesIdentifier_LabelsEntry{
{Key: "a", Value: "1"},
@ -994,7 +996,6 @@ func TestQuerier_RequestingIngesters(t *testing.T) {
for _, tc := range tests {
t.Run(tc.desc, func(t *testing.T) {
conf := mockQuerierConfig()
conf.QueryIngestersWithin = time.Minute * 30
if tc.setIngesterQueryStoreMaxLookback {
@ -1175,7 +1176,6 @@ func setupIngesterQuerierMocks(conf Config, limits *validation.Overrides) (*quer
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
if err != nil {
return nil, nil, nil, err
}
@ -1191,6 +1191,7 @@ type fakeTimeLimits struct {
func (f fakeTimeLimits) MaxQueryLookback(_ context.Context, _ string) time.Duration {
return f.maxQueryLookback
}
func (f fakeTimeLimits) MaxQueryLength(_ context.Context, _ string) time.Duration {
return f.maxQueryLength
}
@ -1697,3 +1698,164 @@ func BenchmarkQuerierDetectedLabels(b *testing.B) {
assert.NoError(b, err)
}
}
func TestQuerier_DetectedFields(t *testing.T) {
limits, err := validation.NewOverrides(defaultLimitsTestConfig(), nil)
require.NoError(t, err)
ctx := user.InjectOrgID(context.Background(), "test")
conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0
request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}
t.Run("returns detected fields from queried logs", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 5), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 5)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)
resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
expectedCardinality := map[string]uint64{
"message": 5,
"count": 5,
"fake": 1,
"bytes": 5,
"duration": 5,
"percent": 5,
"even": 2,
}
for _, d := range detectedFields {
card := expectedCardinality[d.Label]
assert.Equal(t, card, d.Cardinality, "Expected cardinality mismatch for: %s", d.Label)
}
})
t.Run("correctly identifies different field types", func(t *testing.T) {
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)
querier, err := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
require.NoError(t, err)
resp, err := querier.DetectedFields(ctx, &request)
require.NoError(t, err)
detectedFields := resp.Fields
// log lines come from querier_mock_test.go
// message="line %d" count=%d fake=true bytes=%dMB duration=%dms percent=%f even=%t
assert.Len(t, detectedFields, 7)
var messageField, countField, bytesField, durationField, floatField, evenField *logproto.DetectedField
for _, field := range detectedFields {
switch field.Label {
case "message":
messageField = field
case "count":
countField = field
case "bytes":
bytesField = field
case "duration":
durationField = field
case "percent":
floatField = field
case "even":
evenField = field
}
}
assert.Equal(t, logproto.DetectedFieldString, messageField.Type)
assert.Equal(t, logproto.DetectedFieldInt, countField.Type)
assert.Equal(t, logproto.DetectedFieldBytes, bytesField.Type)
assert.Equal(t, logproto.DetectedFieldDuration, durationField.Type)
assert.Equal(t, logproto.DetectedFieldFloat, floatField.Type)
assert.Equal(t, logproto.DetectedFieldBoolean, evenField.Type)
})
}
func BenchmarkQuerierDetectedFields(b *testing.B) {
limits, _ := validation.NewOverrides(defaultLimitsTestConfig(), nil)
ctx := user.InjectOrgID(context.Background(), "test")
conf := mockQuerierConfig()
conf.IngesterQueryStoreMaxLookback = 0
request := logproto.DetectedFieldsRequest{
Start: time.Now().Add(-1 * time.Minute),
End: time.Now(),
Query: `{type="test"}`,
LineLimit: 1000,
FieldLimit: 1000,
}
store := newStoreMock()
store.On("SelectLogs", mock.Anything, mock.Anything).
Return(mockLogfmtStreamIterator(1, 2), nil)
queryClient := newQueryClientMock()
queryClient.On("Recv").
Return(mockQueryResponse([]logproto.Stream{mockLogfmtStream(1, 2)}), nil)
ingesterClient := newQuerierClientMock()
ingesterClient.On("Query", mock.Anything, mock.Anything, mock.Anything).
Return(queryClient, nil)
querier, _ := newQuerier(
conf,
mockIngesterClientConfig(),
newIngesterClientMockFactory(ingesterClient),
mockReadRingWithOneActiveIngester(),
&mockDeleteGettter{},
store, limits)
b.ReportAllocs()
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := querier.DetectedFields(ctx, &request)
assert.NoError(b, err)
}
}

Loading…
Cancel
Save