chore: refactor detected fields handler (#14288)

Co-authored-by: Paul Rogers <paul.rogers@grafana.com>
pull/14294/head^2
Trevor Whitney 2 years ago committed by GitHub
parent 9267ee3561
commit 39119c0c9f
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 90
      pkg/querier/queryrange/detected_fields.go
  2. 25
      pkg/querier/queryrange/detected_fields_test.go
  3. 3
      pkg/querier/queryrange/roundtrip.go

@ -27,55 +27,59 @@ func NewDetectedFieldsHandler(
limitedHandler base.Handler,
logHandler base.Handler,
limits Limits,
) base.Middleware {
return base.MiddlewareFunc(func(next base.Handler) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}
) base.Handler {
return base.HandlerFunc(
func(ctx context.Context, req base.Request) (base.Response, error) {
r, ok := req.(*DetectedFieldsRequest)
if !ok {
return nil, httpgrpc.Errorf(
http.StatusBadRequest,
"invalid request type, expected *DetectedFieldsRequest",
)
}
resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}
resp, err := makeDownstreamRequest(ctx, limits, limitedHandler, logHandler, r)
if err != nil {
return nil, err
}
re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
re, ok := resp.(*LokiResponse)
if !ok || re.Status != "success" {
return resp, nil
}
detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}
detectedFields := parseDetectedFields(r.FieldLimit, re.Data.Result)
fields := make([]*logproto.DetectedField, len(detectedFields))
fieldCount := 0
for k, v := range detectedFields {
p := v.parsers
if len(p) == 0 {
p = nil
}
fields[fieldCount] = &logproto.DetectedField{
Label: k,
Type: v.fieldType,
Cardinality: v.Estimate(),
Parsers: p,
}
fieldCount++
}
fieldCount++
}
dfResp := DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
},
Headers: re.Headers,
}
// Otherwise all they get is the field limit, which is a bit confusing
if len(fields) > 0 {
dfResp.Response.FieldLimit = r.GetFieldLimit()
}
return &DetectedFieldsResponse{
Response: &logproto.DetectedFieldsResponse{
Fields: fields,
FieldLimit: r.GetFieldLimit(),
},
Headers: re.Headers,
}, nil
})
})
return &dfResp, nil
})
}
func makeDownstreamRequest(

@ -1028,10 +1028,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)),
logHandler(mockLogfmtStreamWithLabels(1, 5, `{type="test", name="foo"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)
detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
@ -1058,10 +1055,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 5, `{type="test", name="bob"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)
detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
@ -1090,10 +1084,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
limitedHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)),
logHandler(mockLogfmtStreamWithLabels(1, 2, `{type="test", name="foo"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)
detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
@ -1136,10 +1127,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)
detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go
@ -1188,10 +1176,7 @@ func TestQuerier_DetectedFields(t *testing.T) {
),
logHandler(mockLogfmtStreamWithLabelsAndStructuredMetadata(1, 2, `{type="test", name="bob"}`)),
limits,
).Wrap(base.HandlerFunc(func(_ context.Context, _ base.Request) (base.Response, error) {
t.Fatal("should not be called")
return nil, nil
}))
)
detectedFields := handleRequest(handler, request)
// log lines come from querier_mock_test.go

@ -1222,7 +1222,6 @@ func NewDetectedFieldsTripperware(
limitedHandler := limitedTripperware.Wrap(next)
logHandler := logTripperware.Wrap(next)
detectedFieldsHandler := NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
return NewLimitedRoundTripper(next, limits, schema.Configs, detectedFieldsHandler)
return NewDetectedFieldsHandler(limitedHandler, logHandler, limits)
}), nil
}

Loading…
Cancel
Save