Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/integration/client/client.go

576 lines
14 KiB

package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)
const requestTimeout = 30 * time.Second
type roundTripper struct {
instanceID string
token string
injectHeaders map[string][]string
next http.RoundTripper
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Scope-OrgID", r.instanceID)
if r.token != "" {
req.SetBasicAuth(r.instanceID, r.token)
}
for key, values := range r.injectHeaders {
for _, v := range values {
req.Header.Add(key, v)
}
}
return r.next.RoundTrip(req)
}
type Option interface {
Type() string
}
type InjectHeadersOption map[string][]string
func (n InjectHeadersOption) Type() string {
return "headerinject"
}
// Client is a HTTP client that adds basic auth and scope
type Client struct {
Now time.Time
httpClient *http.Client
baseURL string
instanceID string
}
// NewLogsClient creates a new client
func New(instanceID, token, baseURL string, opts ...Option) *Client {
rt := &roundTripper{
instanceID: instanceID,
token: token,
next: http.DefaultTransport,
}
for _, opt := range opts {
switch opt.Type() {
case "headerinject":
rt.injectHeaders = opt.(InjectHeadersOption)
}
}
return &Client{
Now: time.Now(),
httpClient: &http.Client{
Transport: rt,
},
baseURL: baseURL,
instanceID: instanceID,
}
}
// PushLogLine creates a new logline with the current time as timestamp
func (c *Client) PushLogLine(line string, extraLabels ...map[string]string) error {
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
return c.pushLogLine(line, c.Now, nil, extraLabels...)
}
func (c *Client) PushLogLineWithNonIndexedLabels(line string, logLabels map[string]string, extraLabels ...map[string]string) error {
return c.PushLogLineWithTimestampAndNonIndexedLabels(line, c.Now, logLabels, extraLabels...)
}
// PushLogLineWithTimestamp creates a new logline at the given timestamp
// The timestamp has to be a Unix timestamp (epoch seconds)
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
func (c *Client) PushLogLineWithTimestamp(line string, timestamp time.Time, extraLabels ...map[string]string) error {
return c.pushLogLine(line, timestamp, nil, extraLabels...)
}
func (c *Client) PushLogLineWithTimestampAndNonIndexedLabels(line string, timestamp time.Time, logLabels map[string]string, extraLabelList ...map[string]string) error {
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
// If the logLabels map is empty, labels.FromMap will allocate some empty slices.
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
var lbls labels.Labels
if len(logLabels) > 0 {
lbls = labels.FromMap(logLabels)
}
return c.pushLogLine(line, timestamp, lbls, extraLabelList...)
}
func formatTS(ts time.Time) string {
return strconv.FormatInt(ts.UnixNano(), 10)
}
type stream struct {
Stream map[string]string `json:"stream"`
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
Values [][]any `json:"values"`
}
// pushLogLine creates a new logline
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
func (c *Client) pushLogLine(line string, timestamp time.Time, logLabels labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)
s := stream{
Stream: map[string]string{
"job": "varlog",
},
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
Values: [][]any{
{
formatTS(timestamp),
line,
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
logLabels,
},
},
}
// add extra labels
for _, labelList := range extraLabelList {
for k, v := range labelList {
s.Stream[k] = v
}
}
data, err := json.Marshal(&struct {
Streams []stream `json:"streams"`
}{
Streams: []stream{s},
})
if err != nil {
return err
}
req, err := http.NewRequest("POST", apiEndpoint, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Scope-OrgID", c.instanceID)
// Execute HTTP request
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode/100 == 2 {
defer res.Body.Close()
return nil
}
buf, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err)
}
return fmt.Errorf("request failed with status code %v: %w", res.StatusCode, errors.New(string(buf)))
}
func (c *Client) Get(path string) (*http.Response, error) {
url := fmt.Sprintf("%s%s", c.baseURL, path)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.httpClient.Do(req)
}
// Get all the metrics
func (c *Client) Metrics() (string, error) {
url := fmt.Sprintf("%s/metrics", c.baseURL)
res, err := http.Get(url)
if err != nil {
return "", err
}
var sb strings.Builder
if _, err := io.Copy(&sb, res.Body); err != nil {
return "", err
}
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("request failed with status code %d", res.StatusCode)
}
return sb.String(), nil
}
// Flush all in-memory chunks held by the ingesters to the backing store
func (c *Client) Flush() error {
req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/flush", c.baseURL))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode/100 == 2 {
return nil
}
return fmt.Errorf("request failed with status code %d", res.StatusCode)
}
type DeleteRequestParams struct {
Query string `json:"query"`
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
}
// AddDeleteRequest adds a new delete request
func (c *Client) AddDeleteRequest(params DeleteRequestParams) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/delete", c.baseURL)
req, err := http.NewRequest("POST", apiEndpoint, nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("query", params.Query)
q.Add("start", params.Start)
q.Add("end", params.End)
req.URL.RawQuery = q.Encode()
fmt.Printf("Delete request URL: %v\n", req.URL.String())
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusNoContent {
buf, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err)
}
defer res.Body.Close()
return fmt.Errorf("request failed with status code %v: %w", res.StatusCode, errors.New(string(buf)))
}
return nil
}
type DeleteRequests []DeleteRequest
type DeleteRequest struct {
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Query string `json:"query"`
Status string `json:"status"`
}
// GetDeleteRequests returns all delete requests
func (c *Client) GetDeleteRequests() (DeleteRequests, error) {
resp, err := c.Get("/loki/api/v1/delete")
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading request failed with status code %v: %w", resp.StatusCode, err)
}
var deleteReqs DeleteRequests
err = json.Unmarshal(buf, &deleteReqs)
if err != nil {
return nil, fmt.Errorf("parsing json output failed: %w", err)
}
return deleteReqs, nil
}
// StreamValues holds a label key value pairs for the Stream and a list of a list of values
type StreamValues struct {
Stream map[string]string
Values [][]string
}
// MatrixValues holds a label key value pairs for the metric and a list of a list of values
type MatrixValues struct {
Metric map[string]string
Values [][]interface{}
}
// VectorValues holds a label key value pairs for the metric and single timestamp and value
type VectorValues struct {
Metric map[string]string `json:"metric"`
Time time.Time
Value string
}
func (a *VectorValues) UnmarshalJSON(b []byte) error {
var s struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"`
}
if err := json.Unmarshal(b, &s); err != nil {
return err
}
a.Metric = s.Metric
if len(s.Value) != 2 {
return fmt.Errorf("unexpected value length %d", len(s.Value))
}
if ts, ok := s.Value[0].(int64); ok {
a.Time = time.Unix(ts, 0)
}
if val, ok := s.Value[1].(string); ok {
a.Value = val
}
return nil
}
// DataType holds the result type and a list of StreamValues
type DataType struct {
ResultType string
Stream []StreamValues
Matrix []MatrixValues
Vector []VectorValues
}
func (a *DataType) UnmarshalJSON(b []byte) error {
// get the result type
var s struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(b, &s); err != nil {
return err
}
switch s.ResultType {
case "streams":
if err := json.Unmarshal(s.Result, &a.Stream); err != nil {
return err
}
case "matrix":
if err := json.Unmarshal(s.Result, &a.Matrix); err != nil {
return err
}
case "vector":
if err := json.Unmarshal(s.Result, &a.Vector); err != nil {
return err
}
default:
return fmt.Errorf("unknown result type %s", s.ResultType)
}
a.ResultType = s.ResultType
return nil
}
// Response holds the status and data
type Response struct {
Status string
Data DataType
}
type RulesResponse struct {
Status string
Data RulesData
}
type RulesData struct {
Groups []Rules
}
type Rules struct {
Name string
File string
Rules []interface{}
}
// RunRangeQuery runs a query and returns an error if anything went wrong
func (c *Client) RunRangeQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query))
if err != nil {
return nil, err
}
return c.parseResponse(buf, statusCode)
}
// RunQuery runs a query and returns an error if anything went wrong
func (c *Client) RunQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
v := url.Values{}
v.Set("query", query)
v.Set("time", formatTS(c.Now.Add(time.Second)))
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}
u.Path = "/loki/api/v1/query"
u.RawQuery = v.Encode()
buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
return c.parseResponse(buf, statusCode)
}
// GetRules returns the loki ruler rules
func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}
u.Path = "/prometheus/api/v1/rules"
buf, _, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
resp := RulesResponse{}
err = json.Unmarshal(buf, &resp)
if err != nil {
return nil, fmt.Errorf("error parsing response data %q: %w", buf, err)
}
return &resp, err
}
func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
lokiResp := Response{}
err := json.Unmarshal(buf, &lokiResp)
if err != nil {
return nil, fmt.Errorf("error parsing response data '%s': %w", string(buf), err)
}
return &lokiResp, nil
}
func (c *Client) rangeQueryURL(query string) string {
v := url.Values{}
v.Set("query", query)
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
2 years ago
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))
u, err := url.Parse(c.baseURL)
if err != nil {
panic(err)
}
u.Path = "/loki/api/v1/query_range"
u.RawQuery = v.Encode()
return u.String()
}
func (c *Client) LabelNames(ctx context.Context) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/labels", c.baseURL)
buf, statusCode, err := c.run(ctx, url)
if err != nil {
return nil, err
}
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
var values struct {
Data []string `json:"data"`
}
if err := json.Unmarshal(buf, &values); err != nil {
return nil, err
}
return values.Data, nil
}
// LabelValues return a LabelValues query
func (c *Client) LabelValues(ctx context.Context, labelName string) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/label/%s/values", c.baseURL, url.PathEscape(labelName))
req, err := c.request(ctx, "GET", url)
if err != nil {
return nil, err
}
res, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode/100 != 2 {
return nil, fmt.Errorf("unexpected status code of %d", res.StatusCode)
}
var values struct {
Data []string `json:"data"`
}
if err := json.NewDecoder(res.Body).Decode(&values); err != nil {
return nil, err
}
return values.Data, nil
}
func (c *Client) request(ctx context.Context, method string, url string) (*http.Request, error) {
ctx = user.InjectOrgID(ctx, c.instanceID)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.instanceID)
return req, nil
}
func (c *Client) run(ctx context.Context, u string) ([]byte, int, error) {
req, err := c.request(ctx, "GET", u)
if err != nil {
return nil, 0, err
}
// Execute HTTP request
res, err := c.httpClient.Do(req)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return nil, 0, fmt.Errorf("request failed with status code %v: %w", res.StatusCode, err)
}
return buf, res.StatusCode, nil
}