mirror of https://github.com/grafana/loki
feat(loki-canary): Add support to push logs directly to Loki. (#7063)
Add additional `push` mode to loki-canary, which pushes the logs directly to given Loki URL as it generates logs. The real function of Loki Canary is to act like a tenant and help us know the whether Loki installation is working as perceived by a real tenant. Main rationale for this additional push mode is to make canary more standalone without needing for `promtail` (or `grafana-agent`) to scrape it's logs and send to loki, with this change, Loki canary happily tests Loki behavior without needing any other dependencies. **NOTES**: 1. If you run Loki behind any proxy that has different authorization policies to READ or WRITE to Loki, then important change that canary operator need to be aware of it, now the user credentials they pass via `-user` and `-pass` to access loki endpoints need to have both `read' and `write` permissions (previously canary just query the logs where as promtail is the one pushes the logs, so just READ permissions was sufficient). 2. There will be follow up PR(s) to cleanup, particularly `reader` and `comparitor` component in terms of logging with proper logger. Rationale is to keep the changes small per PR to make it easy to review. 3. This changes were tested it in one of the internal Loki dev cell. 4. **This PR is a no-op if this new `push` flag is disabled (it's disabled by default)** Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> Co-authored-by: Danny Kopping <danny.kopping@grafana.com>pull/7114/head
parent
3d4788fb51
commit
2c9fa05c29
@ -0,0 +1,236 @@ |
||||
package writer |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"crypto/tls" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"net/url" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/go-kit/log/level" |
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/golang/snappy" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/util/build" |
||||
) |
||||
|
||||
const ( |
||||
defaultContentType = "application/x-protobuf" |
||||
defaultMaxReponseBufferLen = 1024 |
||||
|
||||
pushEndpoint = "/loki/api/v1/push" |
||||
) |
||||
|
||||
var ( |
||||
defaultUserAgent = fmt.Sprintf("canary-push/%s", build.GetVersion().Version) |
||||
) |
||||
|
||||
// Push is a io.Writer, that writes given log entries by pushing
|
||||
// directly to the given loki server URL. Each `Push` instance handles for a single tenant.
|
||||
// No batching of log lines happens when sending to Loki.
|
||||
type Push struct { |
||||
lokiURL string |
||||
tenantID string |
||||
httpClient *http.Client |
||||
userAgent string |
||||
contentType string |
||||
logger log.Logger |
||||
|
||||
// auth
|
||||
username, password string |
||||
|
||||
// Will add these label to the logs pushed to loki
|
||||
labelName, labelValue, streamName, streamValue string |
||||
|
||||
// push retry and backoff
|
||||
backoff *backoff.Config |
||||
} |
||||
|
||||
// NewPush creates an instance of `Push` which writes logs directly to given `lokiAddr`
|
||||
func NewPush( |
||||
lokiAddr, tenantID string, |
||||
timeout time.Duration, |
||||
cfg config.HTTPClientConfig, |
||||
labelName, labelValue string, |
||||
streamName, streamValue string, |
||||
tlsCfg *tls.Config, |
||||
caFile string, |
||||
username, password string, |
||||
backoffCfg *backoff.Config, |
||||
logger log.Logger, |
||||
) (*Push, error) { |
||||
|
||||
client, err := config.NewClientFromConfig(cfg, "canary-push", config.WithHTTP2Disabled()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
client.Timeout = timeout |
||||
scheme := "http" |
||||
|
||||
// setup tls transport
|
||||
if tlsCfg != nil { |
||||
rt, err := config.NewTLSRoundTripper(tlsCfg, caFile, func(tls *tls.Config) (http.RoundTripper, error) { |
||||
return &http.Transport{TLSClientConfig: tls}, nil |
||||
}) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to create TLS config for transport: %w", err) |
||||
} |
||||
client.Transport = rt |
||||
scheme = "https" |
||||
} |
||||
|
||||
u := url.URL{ |
||||
Scheme: scheme, |
||||
Host: lokiAddr, |
||||
Path: pushEndpoint, |
||||
} |
||||
|
||||
return &Push{ |
||||
lokiURL: u.String(), |
||||
tenantID: tenantID, |
||||
httpClient: client, |
||||
userAgent: defaultUserAgent, |
||||
contentType: defaultContentType, |
||||
logger: logger, |
||||
labelName: labelName, |
||||
labelValue: labelValue, |
||||
streamName: streamName, |
||||
streamValue: streamValue, |
||||
username: username, |
||||
password: password, |
||||
backoff: backoffCfg, |
||||
}, nil |
||||
} |
||||
|
||||
// Write implements the io.Writer.
|
||||
func (p *Push) Write(payload []byte) (int, error) { |
||||
ctx, cancel := context.WithTimeout(context.Background(), p.httpClient.Timeout) |
||||
defer cancel() |
||||
if err := p.send(ctx, payload); err != nil { |
||||
return 0, err |
||||
} |
||||
return len(payload), nil |
||||
} |
||||
|
||||
func (p *Push) parsePayload(payload []byte) (*logproto.PushRequest, error) { |
||||
// payload that is sent by the `writer` will be in format `LogEntry`
|
||||
var ( |
||||
tsStr, logLine string |
||||
) |
||||
if _, err := fmt.Sscanf(string(payload), LogEntry, &tsStr, &logLine); err != nil { |
||||
return nil, fmt.Errorf("failed to parse payload written sent by writer: %w", err) |
||||
} |
||||
|
||||
ts, err := strconv.ParseInt(tsStr, 10, 64) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to parse unix nano timestamp: %w", err) |
||||
} |
||||
|
||||
labels := model.LabelSet{ |
||||
model.LabelName(p.labelName): model.LabelValue(p.labelValue), |
||||
model.LabelName(p.streamName): model.LabelValue(p.streamValue), |
||||
} |
||||
|
||||
return &logproto.PushRequest{ |
||||
Streams: []logproto.Stream{ |
||||
{ |
||||
Labels: labels.String(), |
||||
Entries: []logproto.Entry{ |
||||
{ |
||||
Timestamp: time.Unix(0, ts), |
||||
Line: string(payload), |
||||
}, |
||||
}, |
||||
Hash: uint64(labels.Fingerprint()), |
||||
}, |
||||
}, |
||||
}, nil |
||||
} |
||||
|
||||
// send does the heavy lifting of sending the generated logs into the Loki server.
|
||||
// It won't batch.
|
||||
func (p *Push) send(ctx context.Context, payload []byte) error { |
||||
var ( |
||||
resp *http.Response |
||||
err error |
||||
) |
||||
|
||||
preq, err := p.parsePayload(payload) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
payload, err = proto.Marshal(preq) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to marshal payload to json: %w", err) |
||||
} |
||||
|
||||
payload = snappy.Encode(nil, payload) |
||||
|
||||
req, err := http.NewRequest("POST", p.lokiURL, bytes.NewReader(payload)) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to create push request: %w", err) |
||||
} |
||||
req = req.WithContext(ctx) |
||||
req.Header.Set("Content-Type", p.contentType) |
||||
req.Header.Set("User-Agent", p.userAgent) |
||||
|
||||
// set org-id
|
||||
if p.tenantID != "" { |
||||
req.Header.Set("X-Scope-OrgID", p.tenantID) |
||||
} |
||||
|
||||
// basic auth if provided
|
||||
if p.username != "" { |
||||
req.SetBasicAuth(p.username, p.password) |
||||
} |
||||
|
||||
backoff := backoff.New(ctx, *p.backoff) |
||||
|
||||
// send log with retry
|
||||
for { |
||||
resp, err = p.httpClient.Do(req) |
||||
if err != nil { |
||||
return fmt.Errorf("failed to push payload: %w", err) |
||||
} |
||||
status := resp.StatusCode |
||||
|
||||
if status/100 != 2 { |
||||
scanner := bufio.NewScanner(io.LimitReader(resp.Body, defaultMaxReponseBufferLen)) |
||||
line := "" |
||||
if scanner.Scan() { |
||||
line = scanner.Text() |
||||
} |
||||
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, status, line) |
||||
|
||||
} |
||||
|
||||
if err := resp.Body.Close(); err != nil { |
||||
level.Error(p.logger).Log("msg", "failed to close response body", "error", err) |
||||
} |
||||
|
||||
if status > 0 && status != 429 && status/100 != 5 { |
||||
break |
||||
} |
||||
|
||||
if !backoff.Ongoing() { |
||||
break |
||||
} |
||||
|
||||
level.Info(p.logger).Log("msg", "retrying as server returned non successful error", "status", status, "error", err) |
||||
|
||||
} |
||||
|
||||
return err |
||||
} |
||||
@ -0,0 +1,179 @@ |
||||
package writer |
||||
|
||||
import ( |
||||
"encoding/base64" |
||||
"fmt" |
||||
"math" |
||||
"net/http" |
||||
"net/http/httptest" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/go-kit/log" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/prometheus/common/config" |
||||
"github.com/prometheus/common/model" |
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
|
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/util" |
||||
) |
||||
|
||||
const ( |
||||
testTenant = "test1" |
||||
testUsername = "canary" |
||||
testPassword = "secret" |
||||
) |
||||
|
||||
func Test_Push(t *testing.T) { |
||||
// create dummy loki server
|
||||
responses := make(chan response, 1) // buffered not to block the response handler
|
||||
backoff := backoff.Config{ |
||||
MinBackoff: 300 * time.Millisecond, |
||||
MaxBackoff: 5 * time.Minute, |
||||
MaxRetries: 10, |
||||
} |
||||
|
||||
// mock loki server
|
||||
mock := httptest.NewServer(createServerHandler(responses)) |
||||
require.NotNil(t, mock) |
||||
defer mock.Close() |
||||
|
||||
// without TLS
|
||||
push, err := NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "stream", "stdout", nil, "", "", "", &backoff, log.NewNopLogger()) |
||||
require.NoError(t, err) |
||||
ts, payload := testPayload() |
||||
n, err := push.Write([]byte(payload)) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, len(payload), n) |
||||
resp := <-responses |
||||
assertResponse(t, resp, false, labelSet("name", "loki-canary", "stream", "stdout"), ts, payload) |
||||
|
||||
// with basic Auth
|
||||
push, err = NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "stream", "stdout", nil, "", testUsername, testPassword, &backoff, log.NewNopLogger()) |
||||
require.NoError(t, err) |
||||
ts, payload = testPayload() |
||||
n, err = push.Write([]byte(payload)) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, len(payload), n) |
||||
resp = <-responses |
||||
assertResponse(t, resp, true, labelSet("name", "loki-canary", "stream", "stdout"), ts, payload) |
||||
|
||||
// with custom labels
|
||||
push, err = NewPush(mock.Listener.Addr().String(), "test1", 2*time.Second, config.DefaultHTTPClientConfig, "name", "loki-canary", "pod", "abc", nil, "", testUsername, testPassword, &backoff, log.NewNopLogger()) |
||||
require.NoError(t, err) |
||||
ts, payload = testPayload() |
||||
n, err = push.Write([]byte(payload)) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, len(payload), n) |
||||
resp = <-responses |
||||
assertResponse(t, resp, true, labelSet("name", "loki-canary", "pod", "abc"), ts, payload) |
||||
} |
||||
|
||||
// Test helpers
|
||||
|
||||
func assertResponse(t *testing.T, resp response, testAuth bool, labels model.LabelSet, ts time.Time, payload string) { |
||||
t.Helper() |
||||
|
||||
// assert metadata
|
||||
assert.Equal(t, testTenant, resp.tenantID) |
||||
|
||||
var expUser, expPass string |
||||
|
||||
if testAuth { |
||||
expUser = testUsername |
||||
expPass = testPassword |
||||
} |
||||
|
||||
assert.Equal(t, expUser, resp.username) |
||||
assert.Equal(t, expPass, resp.password) |
||||
assert.Equal(t, defaultContentType, resp.contentType) |
||||
assert.Equal(t, defaultUserAgent, resp.userAgent) |
||||
|
||||
// assert stream labels
|
||||
require.Len(t, resp.pushReq.Streams, 1) |
||||
assert.Equal(t, labels.String(), resp.pushReq.Streams[0].Labels) |
||||
assert.Equal(t, uint64(labels.Fingerprint()), resp.pushReq.Streams[0].Hash) |
||||
|
||||
// assert log entry
|
||||
require.Len(t, resp.pushReq.Streams, 1) |
||||
require.Len(t, resp.pushReq.Streams[0].Entries, 1) |
||||
assert.Equal(t, payload, resp.pushReq.Streams[0].Entries[0].Line) |
||||
assert.Equal(t, ts, resp.pushReq.Streams[0].Entries[0].Timestamp) |
||||
} |
||||
|
||||
type response struct { |
||||
tenantID string |
||||
pushReq logproto.PushRequest |
||||
contentType string |
||||
userAgent string |
||||
username, password string |
||||
} |
||||
|
||||
func createServerHandler(responses chan response) http.HandlerFunc { |
||||
return http.HandlerFunc(func(rw http.ResponseWriter, req *http.Request) { |
||||
// Parse the request
|
||||
var pushReq logproto.PushRequest |
||||
if err := util.ParseProtoReader(req.Context(), req.Body, int(req.ContentLength), math.MaxInt32, &pushReq, util.RawSnappy); err != nil { |
||||
rw.WriteHeader(500) |
||||
return |
||||
} |
||||
|
||||
var ( |
||||
username, password string |
||||
) |
||||
|
||||
basicAuth := req.Header.Get("Authorization") |
||||
if basicAuth != "" { |
||||
encoded := strings.TrimPrefix(basicAuth, "Basic ") // now we have just encoded `username:password`
|
||||
decoded, err := base64.StdEncoding.DecodeString(encoded) |
||||
if err != nil { |
||||
rw.WriteHeader(500) |
||||
return |
||||
} |
||||
fmt.Println("decoded", decoded) |
||||
toks := strings.FieldsFunc(string(decoded), func(r rune) bool { |
||||
return r == ':' |
||||
}) |
||||
username, password = toks[0], toks[1] |
||||
} |
||||
|
||||
responses <- response{ |
||||
tenantID: req.Header.Get("X-Scope-OrgID"), |
||||
contentType: req.Header.Get("Content-Type"), |
||||
userAgent: req.Header.Get("User-Agent"), |
||||
username: username, |
||||
password: password, |
||||
pushReq: pushReq, |
||||
} |
||||
|
||||
rw.WriteHeader(http.StatusOK) |
||||
}) |
||||
} |
||||
|
||||
func labelSet(keyVals ...string) model.LabelSet { |
||||
if len(keyVals)%2 != 0 { |
||||
panic("not matching key-value pairs") |
||||
} |
||||
|
||||
lbs := model.LabelSet{} |
||||
|
||||
i := 0 |
||||
j := i + 1 |
||||
for i < len(keyVals)-1 { |
||||
lbs[model.LabelName(keyVals[i])] = model.LabelValue(keyVals[i+1]) |
||||
i += 2 |
||||
j += 2 |
||||
} |
||||
|
||||
return lbs |
||||
} |
||||
|
||||
func testPayload() (time.Time, string) { |
||||
ts := time.Now().UTC() |
||||
payload := fmt.Sprintf(LogEntry, fmt.Sprint(ts.UnixNano()), "pppppp") |
||||
|
||||
return ts, payload |
||||
} |
||||
Loading…
Reference in new issue