mirror of https://github.com/grafana/loki
lambda-promtail: Add ability to ingest logs from S3 (#5065)
* Add ability to ingest logs from S3 on lambda-promtail * fix ci * fix typo * bump golang and alpine version * update changelog * add s3 permissions on terraform * use for_each instead of count * fix typo * improve function naming * add documentation and an example of a s3 file path * refact logic to identify event type * add missing iam permission to allow lambda to run inside a vpc * fix typo * allow lambda to access only specified s3 buckets * configure a default log retention policy on log group * add missing depends_on to make sure iam role is created before lambda function * update docs * fix label naming convention * fix merge conflict * use new backoff lib and update dependencies * add option to limit batch size * cache s3 client * update docs and terraform * address some feedback on PR * fix typopull/5325/head
parent
d787a0fe28
commit
699fffe9e5
@ -1,7 +1,7 @@ |
||||
all: build docker |
||||
|
||||
build: |
||||
GOOS=linux CGO_ENABLED=0 go build lambda-promtail/main.go
|
||||
GOOS=linux CGO_ENABLED=0 go build -o ./main lambda-promtail/*.go
|
||||
|
||||
clean: |
||||
rm main
|
||||
|
||||
@ -0,0 +1,53 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"context" |
||||
"fmt" |
||||
"time" |
||||
|
||||
"github.com/aws/aws-lambda-go/events" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
func parseCWEvent(ctx context.Context, b *batch, ev *events.CloudwatchLogsEvent) error { |
||||
data, err := ev.AWSLogs.Parse() |
||||
if err != nil { |
||||
fmt.Println("error parsing log event: ", err) |
||||
return err |
||||
} |
||||
|
||||
labels := model.LabelSet{ |
||||
model.LabelName("__aws_cloudwatch_log_group"): model.LabelValue(data.LogGroup), |
||||
model.LabelName("__aws_cloudwatch_owner"): model.LabelValue(data.Owner), |
||||
} |
||||
if keepStream { |
||||
labels[model.LabelName("__aws_cloudwatch_log_stream")] = model.LabelValue(data.LogStream) |
||||
} |
||||
|
||||
for _, event := range data.LogEvents { |
||||
timestamp := time.UnixMilli(event.Timestamp) |
||||
|
||||
b.add(ctx, entry{labels, logproto.Entry{ |
||||
Line: event.Message, |
||||
Timestamp: timestamp, |
||||
}}) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func processCWEvent(ctx context.Context, ev *events.CloudwatchLogsEvent) error { |
||||
batch, _ := newBatch(ctx) |
||||
|
||||
err := parseCWEvent(ctx, batch, ev) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = sendToPromtail(ctx, batch) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
return nil |
||||
} |
||||
@ -0,0 +1,187 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"sort" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/golang/snappy" |
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
const ( |
||||
timeout = 5 * time.Second |
||||
minBackoff = 100 * time.Millisecond |
||||
maxBackoff = 30 * time.Second |
||||
maxRetries = 10 |
||||
|
||||
reservedLabelTenantID = "__tenant_id__" |
||||
|
||||
userAgent = "lambda-promtail" |
||||
) |
||||
|
||||
type entry struct { |
||||
labels model.LabelSet |
||||
entry logproto.Entry |
||||
} |
||||
|
||||
type batch struct { |
||||
streams map[string]*logproto.Stream |
||||
size int |
||||
} |
||||
|
||||
func newBatch(ctx context.Context, entries ...entry) (*batch, error) { |
||||
b := &batch{ |
||||
streams: map[string]*logproto.Stream{}, |
||||
} |
||||
|
||||
for _, entry := range entries { |
||||
err := b.add(ctx, entry) |
||||
return b, err |
||||
} |
||||
|
||||
return b, nil |
||||
} |
||||
|
||||
func (b *batch) add(ctx context.Context, e entry) error { |
||||
labels := labelsMapToString(e.labels, reservedLabelTenantID) |
||||
stream, ok := b.streams[labels] |
||||
if !ok { |
||||
b.streams[labels] = &logproto.Stream{ |
||||
Labels: labels, |
||||
Entries: []logproto.Entry{}, |
||||
} |
||||
stream = b.streams[labels] |
||||
} |
||||
|
||||
stream.Entries = append(stream.Entries, e.entry) |
||||
b.size += len(e.entry.Line) |
||||
|
||||
if b.size > batchSize { |
||||
return b.flushBatch(ctx) |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string { |
||||
lstrs := make([]string, 0, len(ls)) |
||||
Outer: |
||||
for l, v := range ls { |
||||
for _, w := range without { |
||||
if l == w { |
||||
continue Outer |
||||
} |
||||
} |
||||
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v)) |
||||
} |
||||
|
||||
sort.Strings(lstrs) |
||||
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", ")) |
||||
} |
||||
|
||||
func (b *batch) encode() ([]byte, int, error) { |
||||
req, entriesCount := b.createPushRequest() |
||||
buf, err := proto.Marshal(req) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
|
||||
buf = snappy.Encode(nil, buf) |
||||
return buf, entriesCount, nil |
||||
} |
||||
|
||||
func (b *batch) createPushRequest() (*logproto.PushRequest, int) { |
||||
req := logproto.PushRequest{ |
||||
Streams: make([]logproto.Stream, 0, len(b.streams)), |
||||
} |
||||
|
||||
entriesCount := 0 |
||||
for _, stream := range b.streams { |
||||
req.Streams = append(req.Streams, *stream) |
||||
entriesCount += len(stream.Entries) |
||||
} |
||||
return &req, entriesCount |
||||
} |
||||
|
||||
func (b *batch) flushBatch(ctx context.Context) error { |
||||
err := sendToPromtail(ctx, b) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
b.streams = make(map[string]*logproto.Stream) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func sendToPromtail(ctx context.Context, b *batch) error { |
||||
buf, _, err := b.encode() |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
backoff := backoff.New(ctx, backoff.Config{minBackoff, maxBackoff, maxRetries}) |
||||
var status int |
||||
for { |
||||
// send uses `timeout` internally, so `context.Background` is good enough.
|
||||
status, err = send(context.Background(), buf) |
||||
|
||||
// Only retry 429s, 500s and connection-level errors.
|
||||
if status > 0 && status != 429 && status/100 != 5 { |
||||
break |
||||
} |
||||
|
||||
fmt.Printf("error sending batch, will retry, status: %d error: %s\n", status, err) |
||||
backoff.Wait() |
||||
|
||||
// Make sure it sends at least once before checking for retry.
|
||||
if !backoff.Ongoing() { |
||||
break |
||||
} |
||||
} |
||||
|
||||
if err != nil { |
||||
fmt.Printf("Failed to send logs! %s\n", err) |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func send(ctx context.Context, buf []byte) (int, error) { |
||||
ctx, cancel := context.WithTimeout(ctx, timeout) |
||||
defer cancel() |
||||
|
||||
req, err := http.NewRequest("POST", writeAddress.String(), bytes.NewReader(buf)) |
||||
if err != nil { |
||||
return -1, err |
||||
} |
||||
req.Header.Set("Content-Type", contentType) |
||||
req.Header.Set("User-Agent", userAgent) |
||||
|
||||
resp, err := http.DefaultClient.Do(req.WithContext(ctx)) |
||||
if err != nil { |
||||
return -1, err |
||||
} |
||||
|
||||
if resp.StatusCode/100 != 2 { |
||||
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) |
||||
line := "" |
||||
if scanner.Scan() { |
||||
line = scanner.Text() |
||||
} |
||||
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) |
||||
} |
||||
|
||||
return resp.StatusCode, err |
||||
} |
||||
@ -0,0 +1,142 @@ |
||||
package main |
||||
|
||||
import ( |
||||
"bufio" |
||||
"compress/gzip" |
||||
"context" |
||||
"fmt" |
||||
"io" |
||||
"regexp" |
||||
"time" |
||||
|
||||
"github.com/aws/aws-lambda-go/events" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/aws/aws-sdk-go-v2/aws" |
||||
"github.com/aws/aws-sdk-go-v2/config" |
||||
"github.com/aws/aws-sdk-go-v2/service/s3" |
||||
) |
||||
|
||||
var ( |
||||
// regex that parses the log file name fields
|
||||
// source: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format
|
||||
// format: bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_app.load-balancer-id_end-time_ip-address_random-string.log.gz
|
||||
// example: my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz
|
||||
filenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/elasticloadbalancing\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_elasticloadbalancing\_\w+-\w+-\d_(?:(?:app|nlb)\.*?)?(?P<lb>[a-zA-Z\-]+)`) |
||||
|
||||
// regex that extracts the timestamp (RFC3339) from message log
|
||||
timestampRegex = regexp.MustCompile(`\w+ (?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) |
||||
) |
||||
|
||||
func getS3Object(ctx context.Context, labels map[string]string) (io.ReadCloser, error) { |
||||
var s3Client *s3.Client |
||||
|
||||
if c, ok := s3Clients[labels["bucket_region"]]; ok { |
||||
s3Client = c |
||||
} else { |
||||
cfg, err := config.LoadDefaultConfig(ctx, config.WithRegion(labels["bucket_region"])) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
s3Client = s3.NewFromConfig(cfg) |
||||
s3Clients[labels["bucket_region"]] = s3Client |
||||
} |
||||
|
||||
obj, err := s3Client.GetObject(ctx, |
||||
&s3.GetObjectInput{ |
||||
Bucket: aws.String(labels["bucket"]), |
||||
Key: aws.String(labels["key"]), |
||||
ExpectedBucketOwner: aws.String(labels["bucketOwner"]), |
||||
}) |
||||
|
||||
if err != nil { |
||||
fmt.Println("Failed to get object %s from bucket %s on account %s", labels["key"], labels["bucket"], labels["bucketOwner"]) |
||||
return nil, err |
||||
} |
||||
|
||||
return obj.Body, nil |
||||
} |
||||
|
||||
func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error { |
||||
gzreader, err := gzip.NewReader(obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
scanner := bufio.NewScanner(gzreader) |
||||
|
||||
ls := model.LabelSet{ |
||||
model.LabelName("__aws_log_type"): model.LabelValue("s3_lb"), |
||||
model.LabelName("__aws_s3_log_lb"): model.LabelValue(labels["lb"]), |
||||
model.LabelName("__aws_s3_log_lb_owner"): model.LabelValue(labels["account_id"]), |
||||
} |
||||
|
||||
for scanner.Scan() { |
||||
i := 0 |
||||
log_line := scanner.Text() |
||||
match := timestampRegex.FindStringSubmatch(log_line) |
||||
|
||||
timestamp, err := time.Parse(time.RFC3339, match[1]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
b.add(ctx, entry{ls, logproto.Entry{ |
||||
Line: log_line, |
||||
Timestamp: timestamp, |
||||
}}) |
||||
i++ |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func getLabels(record events.S3EventRecord) (map[string]string, error) { |
||||
|
||||
labels := make(map[string]string) |
||||
|
||||
labels["key"] = record.S3.Object.Key |
||||
labels["bucket"] = record.S3.Bucket.Name |
||||
labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID |
||||
labels["bucket_region"] = record.AWSRegion |
||||
|
||||
match := filenameRegex.FindStringSubmatch(labels["key"]) |
||||
for i, name := range filenameRegex.SubexpNames() { |
||||
if i != 0 && name != "" { |
||||
labels[name] = match[i] |
||||
} |
||||
} |
||||
|
||||
return labels, nil |
||||
} |
||||
|
||||
func processS3Event(ctx context.Context, ev *events.S3Event) error { |
||||
|
||||
batch, _ := newBatch(ctx) |
||||
|
||||
for _, record := range ev.Records { |
||||
labels, err := getLabels(record) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
obj, err := getS3Object(ctx, labels) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
err = parseS3Log(ctx, batch, labels, obj) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
} |
||||
|
||||
err := sendToPromtail(ctx, batch) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
Loading…
Reference in new issue