|
|
|
|
@ -26,7 +26,6 @@ import ( |
|
|
|
|
|
|
|
|
|
"github.com/gogo/protobuf/proto" |
|
|
|
|
"github.com/golang/snappy" |
|
|
|
|
"github.com/pkg/errors" |
|
|
|
|
"github.com/prometheus/client_golang/prometheus" |
|
|
|
|
config_util "github.com/prometheus/common/config" |
|
|
|
|
"github.com/prometheus/common/model" |
|
|
|
|
@ -222,7 +221,7 @@ func (c *Client) Store(ctx context.Context, req []byte) error { |
|
|
|
|
if scanner.Scan() { |
|
|
|
|
line = scanner.Text() |
|
|
|
|
} |
|
|
|
|
err = errors.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) |
|
|
|
|
err = fmt.Errorf("server returned HTTP status %s: %s", httpResp.Status, line) |
|
|
|
|
} |
|
|
|
|
if httpResp.StatusCode/100 == 5 { |
|
|
|
|
return RecoverableError{err, defaultBackoff} |
|
|
|
|
@ -273,13 +272,13 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe |
|
|
|
|
} |
|
|
|
|
data, err := proto.Marshal(req) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrapf(err, "unable to marshal read request") |
|
|
|
|
return nil, fmt.Errorf("unable to marshal read request: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
compressed := snappy.Encode(nil, data) |
|
|
|
|
httpReq, err := http.NewRequest("POST", c.url.String(), bytes.NewReader(compressed)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "unable to create request") |
|
|
|
|
return nil, fmt.Errorf("unable to create request: %w", err) |
|
|
|
|
} |
|
|
|
|
httpReq.Header.Add("Content-Encoding", "snappy") |
|
|
|
|
httpReq.Header.Add("Accept-Encoding", "snappy") |
|
|
|
|
@ -296,7 +295,7 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe |
|
|
|
|
start := time.Now() |
|
|
|
|
httpResp, err := c.Client.Do(httpReq.WithContext(ctx)) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "error sending request") |
|
|
|
|
return nil, fmt.Errorf("error sending request: %w", err) |
|
|
|
|
} |
|
|
|
|
defer func() { |
|
|
|
|
io.Copy(io.Discard, httpResp.Body) |
|
|
|
|
@ -307,26 +306,26 @@ func (c *Client) Read(ctx context.Context, query *prompb.Query) (*prompb.QueryRe |
|
|
|
|
|
|
|
|
|
compressed, err = io.ReadAll(httpResp.Body) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, fmt.Sprintf("error reading response. HTTP status code: %s", httpResp.Status)) |
|
|
|
|
return nil, fmt.Errorf("error reading response. HTTP status code: %s: %w", httpResp.Status, err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if httpResp.StatusCode/100 != 2 { |
|
|
|
|
return nil, errors.Errorf("remote server %s returned HTTP status %s: %s", c.url.String(), httpResp.Status, strings.TrimSpace(string(compressed))) |
|
|
|
|
return nil, fmt.Errorf("remote server %s returned HTTP status %s: %s", c.url.String(), httpResp.Status, strings.TrimSpace(string(compressed))) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
uncompressed, err := snappy.Decode(nil, compressed) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "error reading response") |
|
|
|
|
return nil, fmt.Errorf("error reading response: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
var resp prompb.ReadResponse |
|
|
|
|
err = proto.Unmarshal(uncompressed, &resp) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, errors.Wrap(err, "unable to unmarshal response body") |
|
|
|
|
return nil, fmt.Errorf("unable to unmarshal response body: %w", err) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
if len(resp.Results) != len(req.Queries) { |
|
|
|
|
return nil, errors.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) |
|
|
|
|
return nil, fmt.Errorf("responses: want %d, got %d", len(req.Queries), len(resp.Results)) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return resp.Results[0], nil |
|
|
|
|
|