Datasource: grafana-pyroscope-datasource/parca: Fix the passing of response headers (#106293)

pkgs/tsdb/[grafana-pyroscope-datasource|parca]: Fix use of request headers in responses

In the parca and the grafana-pyroscope-datasource we were wrongly using the request headers instead of the response
header when communication the results to the backend.

This PR fixes this bug.

Was reported by an user via community slack, who faced issues, with a request header of `content-length: 0` being
inserted by a intermediate proxy.
pull/106429/head
Christian Simon 1 month ago committed by GitHub
parent 90c4868c8c
commit 549511597c
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 26
      pkg/tsdb/grafana-pyroscope-datasource/instance.go
  2. 50
      pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient.go
  3. 6
      pkg/tsdb/grafana-pyroscope-datasource/pyroscopeClient_test.go
  4. 6
      pkg/tsdb/grafana-pyroscope-datasource/query.go
  5. 21
      pkg/tsdb/grafana-pyroscope-datasource/query_test.go
  6. 6
      pkg/tsdb/parca/resources.go

@ -29,12 +29,12 @@ var (
)
type ProfilingClient interface {
ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, error)
LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, error)
LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, error)
GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, error)
GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error)
GetSpanProfile(ctx context.Context, profileTypeID string, labelSelector string, spanSelector []string, start int64, end int64, maxNodes *int64) (*ProfileResponse, error)
ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, http.Header, error)
LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, http.Header, error)
LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, http.Header, error)
GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, http.Header, error)
GetProfile(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error)
GetSpanProfile(ctx context.Context, profileTypeID string, labelSelector string, spanSelector []string, start int64, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error)
}
// PyroscopeDatasource is a datasource for querying application performance profiles.
@ -109,7 +109,7 @@ func (d *PyroscopeDatasource) profileTypes(ctx context.Context, req *backend.Cal
}
}
types, err := d.client.ProfileTypes(ctx, start, end)
types, respHeaders, err := d.client.ProfileTypes(ctx, start, end)
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return err
@ -119,7 +119,7 @@ func (d *PyroscopeDatasource) profileTypes(ctx context.Context, req *backend.Cal
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: bodyData, Headers: respHeaders, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
@ -146,7 +146,7 @@ func (d *PyroscopeDatasource) labelNames(ctx context.Context, req *backend.CallR
return fmt.Errorf("failed parsing label selector: %v", err)
}
labelNames, err := d.client.LabelNames(ctx, labelSelector, start, end)
labelNames, respHeaders, err := d.client.LabelNames(ctx, labelSelector, start, end)
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return fmt.Errorf("error calling LabelNames: %v", err)
@ -167,7 +167,7 @@ func (d *PyroscopeDatasource) labelNames(ctx context.Context, req *backend.CallR
ctxLogger.Error("Failed to marshal response", "error", err, "function", logEntrypoint())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: jsonResponse, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: jsonResponse, Headers: respHeaders, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
@ -195,7 +195,7 @@ func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.Call
end, _ := strconv.ParseInt(query.Get("end"), 10, 64)
label := query.Get("label")
res, err := d.client.LabelValues(ctx, label, query.Get("query"), start, end)
res, respHeaders, err := d.client.LabelValues(ctx, label, query.Get("query"), start, end)
if err != nil {
ctxLogger.Error("Received error from client", "error", err, "function", logEntrypoint())
return fmt.Errorf("error calling LabelValues: %v", err)
@ -207,7 +207,7 @@ func (d *PyroscopeDatasource) labelValues(ctx context.Context, req *backend.Call
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: respHeaders, Status: 200})
if err != nil {
ctxLogger.Error("Failed to send response", "error", err, "function", logEntrypoint())
return err
@ -255,7 +255,7 @@ func (d *PyroscopeDatasource) CheckHealth(ctx context.Context, _ *backend.CheckH
// request succeeded or failed, we set the window to be small.
start := time.Unix(1, 0).UnixMilli()
end := time.Unix(4, 0).UnixMilli()
if _, err := d.client.ProfileTypes(ctx, start, end); err != nil {
if _, _, err := d.client.ProfileTypes(ctx, start, end); err != nil {
status = backend.HealthStatusError
message = err.Error()
}

@ -71,7 +71,7 @@ func NewPyroscopeClient(httpClient *http.Client, url string) *PyroscopeClient {
}
}
func (c *PyroscopeClient) ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, error) {
func (c *PyroscopeClient) ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.ProfileTypes")
defer span.End()
res, err := c.connectClient.ProfileTypes(ctx, connect.NewRequest(&querierv1.ProfileTypesRequest{
@ -82,11 +82,11 @@ func (c *PyroscopeClient) ProfileTypes(ctx context.Context, start int64, end int
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, nil, err
}
if res.Msg.ProfileTypes == nil {
// Let's make sure we send at least empty array if we don't have any types
return []*ProfileType{}, nil
return []*ProfileType{}, nil, nil
} else {
pTypes := make([]*ProfileType, len(res.Msg.ProfileTypes))
for i, pType := range res.Msg.ProfileTypes {
@ -95,11 +95,11 @@ func (c *PyroscopeClient) ProfileTypes(ctx context.Context, start int64, end int
Label: pType.Name + " - " + pType.SampleType,
}
}
return pTypes, nil
return pTypes, res.Header(), nil
}
}
func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, error) {
func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, labelSelector string, start int64, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.GetSeries", trace.WithAttributes(attribute.String("profileTypeID", profileTypeID), attribute.String("labelSelector", labelSelector)))
defer span.End()
req := connect.NewRequest(&querierv1.SelectSeriesRequest{
@ -117,7 +117,7 @@ func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, l
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, nil, err
}
series := make([]*Series, len(resp.Msg.Series))
@ -152,10 +152,10 @@ func (c *PyroscopeClient) GetSeries(ctx context.Context, profileTypeID string, l
Series: series,
Units: getUnits(profileTypeID),
Label: parts[1],
}, nil
}, resp.Header(), nil
}
func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.GetProfile", trace.WithAttributes(attribute.String("profileTypeID", profileTypeID), attribute.String("labelSelector", labelSelector)))
defer span.End()
req := &connect.Request[querierv1.SelectMergeStacktracesRequest]{
@ -173,18 +173,18 @@ func (c *PyroscopeClient) GetProfile(ctx context.Context, profileTypeID, labelSe
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, nil, err
}
if resp.Msg.Flamegraph == nil {
// Not an error, can happen when querying data oout of range.
return nil, nil
return nil, nil, nil
}
return profileQuery(ctx, err, span, resp.Msg.Flamegraph, profileTypeID)
return profileQuery(resp.Msg.Flamegraph, profileTypeID), resp.Header(), nil
}
func (c *PyroscopeClient) GetSpanProfile(ctx context.Context, profileTypeID, labelSelector string, spanSelector []string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
func (c *PyroscopeClient) GetSpanProfile(ctx context.Context, profileTypeID, labelSelector string, spanSelector []string, start, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.GetSpanProfile", trace.WithAttributes(attribute.String("profileTypeID", profileTypeID), attribute.String("labelSelector", labelSelector), attribute.String("spanSelector", strings.Join(spanSelector, ","))))
defer span.End()
req := &connect.Request[querierv1.SelectMergeSpanProfileRequest]{
@ -202,18 +202,18 @@ func (c *PyroscopeClient) GetSpanProfile(ctx context.Context, profileTypeID, lab
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, nil, err
}
if resp.Msg.Flamegraph == nil {
// Not an error, can happen when querying data oout of range.
return nil, nil
return nil, nil, nil
}
return profileQuery(ctx, err, span, resp.Msg.Flamegraph, profileTypeID)
return profileQuery(resp.Msg.Flamegraph, profileTypeID), resp.Header(), nil
}
func profileQuery(ctx context.Context, err error, span trace.Span, flamegraph *querierv1.FlameGraph, profileTypeID string) (*ProfileResponse, error) {
func profileQuery(flamegraph *querierv1.FlameGraph, profileTypeID string) *ProfileResponse {
levels := make([]*Level, len(flamegraph.Levels))
for i, level := range flamegraph.Levels {
levels[i] = &Level{
@ -229,7 +229,7 @@ func profileQuery(ctx context.Context, err error, span trace.Span, flamegraph *q
MaxSelf: flamegraph.MaxSelf,
},
Units: getUnits(profileTypeID),
}, nil
}
}
func getUnits(profileTypeID string) string {
@ -244,7 +244,7 @@ func getUnits(profileTypeID string) string {
return unit
}
func (c *PyroscopeClient) LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, error) {
func (c *PyroscopeClient) LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.LabelNames")
defer span.End()
resp, err := c.connectClient.LabelNames(ctx, connect.NewRequest(&typesv1.LabelNamesRequest{
@ -256,11 +256,11 @@ func (c *PyroscopeClient) LabelNames(ctx context.Context, labelSelector string,
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, fmt.Errorf("error sending LabelNames request %v", err)
return nil, nil, fmt.Errorf("error sending LabelNames request %v", err)
}
if resp.Msg.Names == nil {
return []string{}, nil
return []string{}, nil, nil
}
var filtered []string
@ -270,10 +270,10 @@ func (c *PyroscopeClient) LabelNames(ctx context.Context, labelSelector string,
}
}
return filtered, nil
return filtered, resp.Header(), nil
}
func (c *PyroscopeClient) LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, error) {
func (c *PyroscopeClient) LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, http.Header, error) {
ctx, span := tracing.DefaultTracer().Start(ctx, "datasource.pyroscope.LabelValues")
defer span.End()
resp, err := c.connectClient.LabelValues(ctx, connect.NewRequest(&typesv1.LabelValuesRequest{
@ -286,12 +286,12 @@ func (c *PyroscopeClient) LabelValues(ctx context.Context, label string, labelSe
logger.Error("Received error from client", "error", err, "function", logEntrypoint())
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
return nil, err
return nil, nil, err
}
if resp.Msg.Names == nil {
return []string{}, nil
return []string{}, nil, nil
}
return resp.Msg.Names, nil
return resp.Msg.Names, resp.Header(), nil
}
func isPrivateLabel(label string) bool {

@ -19,7 +19,7 @@ func Test_PyroscopeClient(t *testing.T) {
t.Run("GetSeries", func(t *testing.T) {
limit := int64(42)
resp, err := client.GetSeries(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, []string{}, &limit, 15)
resp, _, err := client.GetSeries(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, []string{}, &limit, 15)
require.Nil(t, err)
series := &SeriesResponse{
@ -34,7 +34,7 @@ func Test_PyroscopeClient(t *testing.T) {
t.Run("GetProfile", func(t *testing.T) {
maxNodes := int64(-1)
resp, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes)
resp, _, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes)
require.Nil(t, err)
series := &ProfileResponse{
@ -56,7 +56,7 @@ func Test_PyroscopeClient(t *testing.T) {
t.Run("GetProfile with empty response", func(t *testing.T) {
connectClient.SendEmptyProfileResponse = true
maxNodes := int64(-1)
resp, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes)
resp, _, err := client.GetProfile(context.Background(), "memory:alloc_objects:count:space:bytes", "{}", 0, 100, &maxNodes)
require.Nil(t, err)
// Mainly ensuring this does not panic like before
require.Nil(t, resp)

@ -77,7 +77,7 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
}
}
logger.Debug("Sending SelectSeriesRequest", "queryModel", qm, "function", logEntrypoint())
seriesResp, err := d.client.GetSeries(
seriesResp, _, err := d.client.GetSeries(
gCtx,
profileTypeId,
labelSelector,
@ -114,7 +114,7 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
var profileResp *ProfileResponse
if len(qm.SpanSelector) > 0 {
logger.Debug("Calling GetSpanProfile", "queryModel", qm, "function", logEntrypoint())
prof, err := d.client.GetSpanProfile(gCtx, profileTypeId, labelSelector, qm.SpanSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
prof, _, err := d.client.GetSpanProfile(gCtx, profileTypeId, labelSelector, qm.SpanSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
@ -124,7 +124,7 @@ func (d *PyroscopeDatasource) query(ctx context.Context, pCtx backend.PluginCont
profileResp = prof
} else {
logger.Debug("Calling GetProfile", "queryModel", qm, "function", logEntrypoint())
prof, err := d.client.GetProfile(gCtx, profileTypeId, labelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
prof, _, err := d.client.GetProfile(gCtx, profileTypeId, labelSelector, query.TimeRange.From.UnixMilli(), query.TimeRange.To.UnixMilli(), qm.MaxNodes)
if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())

@ -2,6 +2,7 @@ package pyroscope
import (
"context"
"net/http"
"testing"
"time"
@ -418,7 +419,7 @@ type FakeClient struct {
Args []any
}
func (f *FakeClient) ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, error) {
func (f *FakeClient) ProfileTypes(ctx context.Context, start int64, end int64) ([]*ProfileType, http.Header, error) {
return []*ProfileType{
{
ID: "type:1",
@ -428,18 +429,18 @@ func (f *FakeClient) ProfileTypes(ctx context.Context, start int64, end int64) (
ID: "type:2",
Label: "memory",
},
}, nil
}, http.Header{}, nil
}
func (f *FakeClient) LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, error) {
func (f *FakeClient) LabelValues(ctx context.Context, label string, labelSelector string, start int64, end int64) ([]string, http.Header, error) {
panic("implement me")
}
func (f *FakeClient) LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, error) {
func (f *FakeClient) LabelNames(ctx context.Context, labelSelector string, start int64, end int64) ([]string, http.Header, error) {
panic("implement me")
}
func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID, labelSelector string, start, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error) {
return &ProfileResponse{
Flamebearer: &Flamebearer{
Names: []string{"foo", "bar", "baz"},
@ -452,10 +453,10 @@ func (f *FakeClient) GetProfile(ctx context.Context, profileTypeID, labelSelecto
MaxSelf: 56,
},
Units: "count",
}, nil
}, http.Header{}, nil
}
func (f *FakeClient) GetSpanProfile(ctx context.Context, profileTypeID, labelSelector string, spanSelector []string, start, end int64, maxNodes *int64) (*ProfileResponse, error) {
func (f *FakeClient) GetSpanProfile(ctx context.Context, profileTypeID, labelSelector string, spanSelector []string, start, end int64, maxNodes *int64) (*ProfileResponse, http.Header, error) {
return &ProfileResponse{
Flamebearer: &Flamebearer{
Names: []string{"foo", "bar", "baz"},
@ -468,10 +469,10 @@ func (f *FakeClient) GetSpanProfile(ctx context.Context, profileTypeID, labelSel
MaxSelf: 56,
},
Units: "count",
}, nil
}, http.Header{}, nil
}
func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID, labelSelector string, start, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, error) {
func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID, labelSelector string, start, end int64, groupBy []string, limit *int64, step float64) (*SeriesResponse, http.Header, error) {
f.Args = []any{profileTypeID, labelSelector, start, end, groupBy, step}
return &SeriesResponse{
Series: []*Series{
@ -482,5 +483,5 @@ func (f *FakeClient) GetSeries(ctx context.Context, profileTypeID, labelSelector
},
Units: "count",
Label: "test",
}, nil
}, http.Header{}, nil
}

@ -65,7 +65,7 @@ func (d *ParcaDatasource) callProfileTypes(ctx context.Context, req *backend.Cal
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: res.Header(), Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)
@ -98,7 +98,7 @@ func (d *ParcaDatasource) callLabelNames(ctx context.Context, req *backend.CallR
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: res.Header(), Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)
@ -142,7 +142,7 @@ func (d *ParcaDatasource) callLabelValues(ctx context.Context, req *backend.Call
span.SetStatus(codes.Error, err.Error())
return err
}
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: req.Headers, Status: 200})
err = sender.Send(&backend.CallResourceResponse{Body: data, Headers: res.Header(), Status: 200})
if err != nil {
ctxLogger.Error("Failed to send data to Parca", "error", err, "function", logEntrypoint())
span.RecordError(err)

Loading…
Cancel
Save