Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/integration/client/client.go

650 lines
15 KiB

package client
import (
"bytes"
"context"
"encoding/json"
"errors"
"fmt"
"io"
"net/http"
"net/url"
"strconv"
"strings"
"time"
"github.com/grafana/dskit/user"
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
"github.com/prometheus/prometheus/model/labels"
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
"go.opentelemetry.io/collector/pdata/pcommon"
"go.opentelemetry.io/collector/pdata/plog"
"go.opentelemetry.io/collector/pdata/plog/plogotlp"
)
const requestTimeout = 30 * time.Second
type roundTripper struct {
instanceID string
token string
injectHeaders map[string][]string
next http.RoundTripper
}
func (r *roundTripper) RoundTrip(req *http.Request) (*http.Response, error) {
req.Header.Set("X-Scope-OrgID", r.instanceID)
if r.token != "" {
req.SetBasicAuth(r.instanceID, r.token)
}
for key, values := range r.injectHeaders {
for _, v := range values {
req.Header.Add(key, v)
}
}
return r.next.RoundTrip(req)
}
type Option interface {
Type() string
}
type InjectHeadersOption map[string][]string
func (n InjectHeadersOption) Type() string {
return "headerinject"
}
// Client is a HTTP client that adds basic auth and scope
type Client struct {
Now time.Time
httpClient *http.Client
baseURL string
instanceID string
}
// NewLogsClient creates a new client
func New(instanceID, token, baseURL string, opts ...Option) *Client {
rt := &roundTripper{
instanceID: instanceID,
token: token,
next: http.DefaultTransport,
}
for _, opt := range opts {
switch opt.Type() {
case "headerinject":
rt.injectHeaders = opt.(InjectHeadersOption)
}
}
return &Client{
Now: time.Now(),
httpClient: &http.Client{
Transport: rt,
},
baseURL: baseURL,
instanceID: instanceID,
}
}
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
func (c *Client) PushLogLine(line string, timestamp time.Time, structuredMetadata map[string]string, extraLabelList ...map[string]string) error {
// If the structuredMetadata map is empty, labels.FromMap will allocate some empty slices.
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
// Since this code is executed for every log line we receive, as an optimization
// to avoid those allocations we'll call labels.FromMap only if the map is not empty.
var metadata labels.Labels
if len(structuredMetadata) > 0 {
metadata = labels.FromMap(structuredMetadata)
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
}
return c.pushLogLine(line, timestamp, metadata, extraLabelList...)
}
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
func (c *Client) PushOTLPLogLine(line string, timestamp time.Time, logAttributes map[string]any) error {
return c.pushOTLPLogLine(line, timestamp, logAttributes)
}
func formatTS(ts time.Time) string {
return strconv.FormatInt(ts.UnixNano(), 10)
}
type stream struct {
Stream map[string]string `json:"stream"`
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
Values [][]any `json:"values"`
}
// pushLogLine creates a new logline
func (c *Client) pushLogLine(line string, timestamp time.Time, structuredMetadata labels.Labels, extraLabelList ...map[string]string) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/push", c.baseURL)
s := stream{
Stream: map[string]string{
"job": "varlog",
},
Add metadata to push payload (#9694) **What this PR does / why we need it**: We are adding support for attaching labels to each log line. This is one of the series of the PRs broken up to make it easier to review changes. This PR updates the push payload to send labels with each log entry optionally. The log labels are supposed to be in the same format as the stream labels. Just to put it out, here is how it would look for proto and json push payload with same data: **proto(`endpoint`: `(/loki/api/v1/push|/api/prom/push)`, `Content-Type`: `application/x-protobuf`)**(payload built using [push.Stream](https://github.com/grafana/loki/blob/4cd1246b8830ccc241fa4afff85d208dc6ae2129/pkg/push/types.go#L12)): ``` push.Stream{ Entries: []logproto.Entry{ { Timestamp: time.Unix(0, 1688515200000000000), Line: "log line", Labels: `{foo="bar"}`, }, }, Labels: `{app="test"}`, } ``` **v1(`endpoint`: `/loki/api/v1/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "stream": { "app": "test" }, "values": [ ["1688515200000000000", "log line", { "foo": "bar" }] ] }] } ``` **legacy-json(`/api/prom/push`, `Content-Type`: `application/json`)**: ```json { "streams": [{ "labels": "{app=\"test\"}", "entries": [{ "ts": "2023-07-05T00:00:00.000000000Z", "line": "log line", "labels": "{foo=\"bar\"}" }] }] } ``` **Which issue(s) this PR fixes**: **Special notes for your reviewer**: We may need to add more thoughtful tests. **Checklist** - [x] Reviewed the [`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md) guide (**required**) - [ ] Documentation added - [x] Tests updated - [ ] `CHANGELOG.md` updated - [ ] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` - [ ] For Helm chart changes bump the Helm chart version in `production/helm/loki/Chart.yaml` and update `production/helm/loki/CHANGELOG.md` and `production/helm/loki/README.md`. [Example PR](https://github.com/grafana/loki/commit/d10549e3ece02120974929894ee333d07755d213) --------- Co-authored-by: Sandeep Sukhani <sandeep.d.sukhani@gmail.com>
2 years ago
Values: [][]any{
{
formatTS(timestamp),
line,
structuredMetadata,
},
},
}
// add extra labels
for _, labelList := range extraLabelList {
for k, v := range labelList {
s.Stream[k] = v
}
}
data, err := json.Marshal(&struct {
Streams []stream `json:"streams"`
}{
Streams: []stream{s},
})
if err != nil {
return err
}
req, err := http.NewRequest("POST", apiEndpoint, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Scope-OrgID", c.instanceID)
// Execute HTTP request
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode/100 == 2 {
defer res.Body.Close()
return nil
}
buf, err := io.ReadAll(res.Body)
if err != nil {
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
return fmt.Errorf("reading response failed with status code %v: %v", res.StatusCode, err)
}
return fmt.Errorf("request failed with status code %v: %s", res.StatusCode, buf)
}
// pushLogLine creates a new logline
func (c *Client) pushOTLPLogLine(line string, timestamp time.Time, logAttributes map[string]any) error {
apiEndpoint := fmt.Sprintf("%s/otlp/v1/logs", c.baseURL)
logs := plog.NewLogs()
logs.ResourceLogs().AppendEmpty().Resource().Attributes().PutStr("service.name", "varlog")
logRecord := logs.ResourceLogs().At(0).ScopeLogs().AppendEmpty().LogRecords().AppendEmpty()
logRecord.SetTimestamp(pcommon.Timestamp(timestamp.UnixNano()))
logRecord.Body().SetStr(line)
if len(logAttributes) > 0 {
if err := logRecord.Attributes().FromRaw(logAttributes); err != nil {
return err
}
}
ereq := plogotlp.NewExportRequestFromLogs(logs)
data, err := ereq.MarshalJSON()
if err != nil {
return err
}
req, err := http.NewRequest("POST", apiEndpoint, bytes.NewReader(data))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set("X-Scope-OrgID", c.instanceID)
// Execute HTTP request
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode/100 == 2 {
defer res.Body.Close()
return nil
}
buf, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("reading response failed with status code %v: %v", res.StatusCode, err)
}
ingestion: native otlp ingestion support (#10727) **What this PR does / why we need it**: Add support for natively supporting logs ingestion in OTLP format. `/otlp/v1/logs` is the new endpoint where users can push logs in OTLP format. It accepts logs serialized in JSON or proto format. Since OTEL format is very different than what Loki storage model, here is how data in OTEL format will be mapped to Loki data model: * Index labels: The Resource Attributes map quite well to Index labels in Loki since both usually identify the source of the logs. The problem however is that Resource attributes in OTLP can have an unbounded number of values while Loki has a default limit of having up to 30 labels. Since Index labels in Loki can largely drive the kind of querying experience the users are going to have, we have chosen select attributes which would be picked as Index Labels. The ones that are not picked up as Index labels would be stored as Structured Metadata with each log entry. * Timestamp: LogRecord.TimeUnixNano * LogLine: LogRecord.Body holds the body of the log. However, since Loki only supports Log body in string format, we will stringify non-string values using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353). * Structured Metadata: Anything which can’t be stored in Index labels and LogLine. Here is a non-exhaustive list of what will be stored in Structured Metadata to give a sense of what it will hold: * Resource Attributes not stored as Index labels is replicated and stored with each log entry. * Everything under InstrumentationScope is replicated and stored with each log entry. * Everything under LogRecord except LogRecord.Body, LogRecord.TimeUnixNano and sometimes LogRecord.ObservedTimestamp. *NOTES*: * Since Loki does not support `.` or any other special characters other than `_` in label names, we replace all non-supported characters with `_`. * Since Loki only supports string in values of Index Labels and Structured Metadata, all the complex types are converted as follows: * Map would be flattened into label keys using `_` as separator, same as how we do it in [json parser in LogQL](https://grafana.com/docs/loki/latest/query/log_queries/#json). * Everything else is stringified using [AsString method from OTEL collector lib](https://github.com/open-telemetry/opentelemetry-collector/blob/ab3d6c5b64701e690aaa340b0a63f443ff22c1f0/pdata/pcommon/value.go#L353) **Special notes for your reviewer**: I will open follow-up PRs for: * Documentation * Make blessed attributes list configurable per tenant. **Checklist** - [x] Tests updated - [x] `CHANGELOG.md` updated - [ ] If the change is worth mentioning in the release notes, add `add-to-release-notes` label
2 years ago
return fmt.Errorf("request failed with status code %v: %s", res.StatusCode, buf)
}
func (c *Client) Get(path string) (*http.Response, error) {
url := fmt.Sprintf("%s%s", c.baseURL, path)
req, err := http.NewRequest("GET", url, nil)
if err != nil {
return nil, err
}
return c.httpClient.Do(req)
}
// Get all the metrics
func (c *Client) Metrics() (string, error) {
url := fmt.Sprintf("%s/metrics", c.baseURL)
res, err := http.Get(url)
if err != nil {
return "", err
}
var sb strings.Builder
if _, err := io.Copy(&sb, res.Body); err != nil {
return "", err
}
if res.StatusCode != http.StatusOK {
return "", fmt.Errorf("request failed with status code %d", res.StatusCode)
}
return sb.String(), nil
}
// Flush all in-memory chunks held by the ingesters to the backing store
func (c *Client) Flush() error {
req, err := c.request(context.Background(), "POST", fmt.Sprintf("%s/flush", c.baseURL))
if err != nil {
return err
}
req.Header.Set("Content-Type", "application/json")
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
defer res.Body.Close()
if res.StatusCode/100 == 2 {
return nil
}
return fmt.Errorf("request failed with status code %d", res.StatusCode)
}
type DeleteRequestParams struct {
Query string `json:"query"`
Start string `json:"start,omitempty"`
End string `json:"end,omitempty"`
}
// AddDeleteRequest adds a new delete request
func (c *Client) AddDeleteRequest(params DeleteRequestParams) error {
apiEndpoint := fmt.Sprintf("%s/loki/api/v1/delete", c.baseURL)
req, err := http.NewRequest("POST", apiEndpoint, nil)
if err != nil {
return err
}
q := req.URL.Query()
q.Add("query", params.Query)
q.Add("start", params.Start)
q.Add("end", params.End)
req.URL.RawQuery = q.Encode()
fmt.Printf("Delete request URL: %v\n", req.URL.String())
res, err := c.httpClient.Do(req)
if err != nil {
return err
}
if res.StatusCode != http.StatusNoContent {
buf, err := io.ReadAll(res.Body)
if err != nil {
return fmt.Errorf("reading request failed with status code %v: %w", res.StatusCode, err)
}
defer res.Body.Close()
return fmt.Errorf("request failed with status code %v: %w", res.StatusCode, errors.New(string(buf)))
}
return nil
}
type DeleteRequests []DeleteRequest
type DeleteRequest struct {
StartTime int64 `json:"start_time"`
EndTime int64 `json:"end_time"`
Query string `json:"query"`
Status string `json:"status"`
}
// GetDeleteRequests returns all delete requests
func (c *Client) GetDeleteRequests() (DeleteRequests, error) {
resp, err := c.Get("/loki/api/v1/delete")
if err != nil {
return nil, err
}
defer resp.Body.Close()
buf, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("reading request failed with status code %v: %w", resp.StatusCode, err)
}
var deleteReqs DeleteRequests
err = json.Unmarshal(buf, &deleteReqs)
if err != nil {
return nil, fmt.Errorf("parsing json output failed: %w", err)
}
return deleteReqs, nil
}
// StreamValues holds a label key value pairs for the Stream and a list of a list of values
type StreamValues struct {
Stream map[string]string
Values [][]string
}
// MatrixValues holds a label key value pairs for the metric and a list of a list of values
type MatrixValues struct {
Metric map[string]string
Values [][]interface{}
}
// VectorValues holds a label key value pairs for the metric and single timestamp and value
type VectorValues struct {
Metric map[string]string `json:"metric"`
Time time.Time
Value string
}
func (a *VectorValues) UnmarshalJSON(b []byte) error {
var s struct {
Metric map[string]string `json:"metric"`
Value []interface{} `json:"value"`
}
if err := json.Unmarshal(b, &s); err != nil {
return err
}
a.Metric = s.Metric
if len(s.Value) != 2 {
return fmt.Errorf("unexpected value length %d", len(s.Value))
}
if ts, ok := s.Value[0].(int64); ok {
a.Time = time.Unix(ts, 0)
}
if val, ok := s.Value[1].(string); ok {
a.Value = val
}
return nil
}
// DataType holds the result type and a list of StreamValues
type DataType struct {
ResultType string
Stream []StreamValues
Matrix []MatrixValues
Vector []VectorValues
}
func (a *DataType) UnmarshalJSON(b []byte) error {
// get the result type
var s struct {
ResultType string `json:"resultType"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(b, &s); err != nil {
return err
}
switch s.ResultType {
case "streams":
if err := json.Unmarshal(s.Result, &a.Stream); err != nil {
return err
}
case "matrix":
if err := json.Unmarshal(s.Result, &a.Matrix); err != nil {
return err
}
case "vector":
if err := json.Unmarshal(s.Result, &a.Vector); err != nil {
return err
}
default:
return fmt.Errorf("unknown result type %s", s.ResultType)
}
a.ResultType = s.ResultType
return nil
}
// Response holds the status and data
type Response struct {
Status string
Data DataType
}
type RulesResponse struct {
Status string
Data RulesData
}
type RulesData struct {
Groups []Rules
}
type Rules struct {
Name string
File string
Rules []interface{}
}
// RunRangeQuery runs a query and returns an error if anything went wrong
func (c *Client) RunRangeQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
buf, statusCode, err := c.run(ctx, c.rangeQueryURL(query))
if err != nil {
return nil, err
}
return c.parseResponse(buf, statusCode)
}
// RunQuery runs a query and returns an error if anything went wrong
func (c *Client) RunQuery(ctx context.Context, query string) (*Response, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
v := url.Values{}
v.Set("query", query)
v.Set("time", formatTS(c.Now.Add(time.Second)))
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}
u.Path = "/loki/api/v1/query"
u.RawQuery = v.Encode()
buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
return c.parseResponse(buf, statusCode)
}
// GetRules returns the loki ruler rules
func (c *Client) GetRules(ctx context.Context) (*RulesResponse, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
u, err := url.Parse(c.baseURL)
if err != nil {
return nil, err
}
u.Path = "/prometheus/api/v1/rules"
buf, _, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
resp := RulesResponse{}
err = json.Unmarshal(buf, &resp)
if err != nil {
return nil, fmt.Errorf("error parsing response data %q: %w", buf, err)
}
return &resp, err
}
func (c *Client) parseResponse(buf []byte, statusCode int) (*Response, error) {
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
lokiResp := Response{}
err := json.Unmarshal(buf, &lokiResp)
if err != nil {
return nil, fmt.Errorf("error parsing response data '%s': %w", string(buf), err)
}
return &lokiResp, nil
}
func (c *Client) rangeQueryURL(query string) string {
v := url.Values{}
v.Set("query", query)
index-shipper: add support for multiple stores (#7754) Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> **What this PR does / why we need it**: Currently loki initializes a single instance of index-shipper to [handle all the table ranges](https://github.com/grafana/loki/blob/ff7b46297345b215fbf49c2cd4c364d125b6290b/pkg/storage/factory.go#L188) (from across periods) for a given index type `boltdb-shipper, tsdb`. Since index-shipper only has the object client handle to the store defined by `shared_store_type`, it limits the index uploads to a single store. Setting `shared_store_type` to a different store at a later point in time would mean losing access to the indexes stored in the previously configured store. With this PR, we initialize a separate index-shipper & table manager for each period if `shared_store_type` is not explicity configured. This offers the flexibility to store index in multiple stores (across providers). **Note**: - usage of `shared_store_type` in this commit text refers to one of these config options depending on the index in use: `-boltdb.shipper.shared-store`, `-tsdb.shipper.shared-store` - `shared_store_type` used to default to the `object_store` from the latest `period_config` if not explicitly configured. This PR removes these defaults in favor of supporting index uploads to multiple stores. **Which issue(s) this PR fixes**: Fixes #7276 **Special notes for your reviewer**: All the instances of downloads table manager operate on the same cacheDir. But it shouldn't be a problem as the tableRanges do not overlap across periods. **Checklist** - [X] Reviewed the `CONTRIBUTING.md` guide - [ ] Documentation added - [X] Tests updated - [x] `CHANGELOG.md` updated - [x] Changes that require user attention or interaction to upgrade are documented in `docs/sources/upgrading/_index.md` --------- Signed-off-by: Ashwanth Goli <iamashwanth@gmail.com> Co-authored-by: J Stickler <julie.stickler@grafana.com>
2 years ago
v.Set("start", formatTS(c.Now.Add(-7*24*time.Hour)))
v.Set("end", formatTS(c.Now.Add(time.Second)))
u, err := url.Parse(c.baseURL)
if err != nil {
panic(err)
}
u.Path = "/loki/api/v1/query_range"
u.RawQuery = v.Encode()
return u.String()
}
func (c *Client) LabelNames(ctx context.Context) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/labels", c.baseURL)
buf, statusCode, err := c.run(ctx, url)
if err != nil {
return nil, err
}
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
var values struct {
Data []string `json:"data"`
}
if err := json.Unmarshal(buf, &values); err != nil {
return nil, err
}
return values.Data, nil
}
// LabelValues return a LabelValues query result
func (c *Client) LabelValues(ctx context.Context, labelName string) ([]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
url := fmt.Sprintf("%s/loki/api/v1/label/%s/values", c.baseURL, url.PathEscape(labelName))
req, err := c.request(ctx, "GET", url)
if err != nil {
return nil, err
}
res, err := c.httpClient.Do(req)
if err != nil {
return nil, err
}
defer res.Body.Close()
if res.StatusCode/100 != 2 {
return nil, fmt.Errorf("unexpected status code of %d", res.StatusCode)
}
var values struct {
Data []string `json:"data"`
}
if err := json.NewDecoder(res.Body).Decode(&values); err != nil {
return nil, err
}
return values.Data, nil
}
// Series return a series query result
func (c *Client) Series(ctx context.Context, matcher string) ([]map[string]string, error) {
ctx, cancelFunc := context.WithTimeout(ctx, requestTimeout)
defer cancelFunc()
v := url.Values{}
v.Set("match[]", matcher)
u, err := url.Parse(c.baseURL)
if err != nil {
panic(err)
}
u.Path = "/loki/api/v1/series"
u.RawQuery = v.Encode()
buf, statusCode, err := c.run(ctx, u.String())
if err != nil {
return nil, err
}
if statusCode/100 != 2 {
return nil, fmt.Errorf("request failed with status code %d: %w", statusCode, errors.New(string(buf)))
}
var values struct {
Data []map[string]string `json:"data"`
}
if err := json.Unmarshal(buf, &values); err != nil {
return nil, err
}
return values.Data, nil
}
func (c *Client) request(ctx context.Context, method string, url string) (*http.Request, error) {
ctx = user.InjectOrgID(ctx, c.instanceID)
req, err := http.NewRequestWithContext(ctx, method, url, nil)
if err != nil {
return nil, err
}
req.Header.Set("X-Scope-OrgID", c.instanceID)
return req, nil
}
func (c *Client) run(ctx context.Context, u string) ([]byte, int, error) {
req, err := c.request(ctx, "GET", u)
if err != nil {
return nil, 0, err
}
// Execute HTTP request
res, err := c.httpClient.Do(req)
if err != nil {
return nil, 0, err
}
defer res.Body.Close()
buf, err := io.ReadAll(res.Body)
if err != nil {
return nil, 0, fmt.Errorf("request failed with status code %v: %w", res.StatusCode, err)
}
return buf, res.StatusCode, nil
}