lambda-promtail: Add support for VPC flow Logs to lambda-promtail (#7868)

This add support to the S3 parsing logic for AWS VPC Flow logs.

It also makes a small change to allow not printing log lines during
processing. It leaves the default of printing the log lines in place.

Signed-off-by: Thomas Belian <thomas.belian@bt909.de>
Co-authored-by: Thomas Belián <72987757+bt909@users.noreply.github.com>
pull/8111/head
chriskuchin 3 years ago committed by GitHub
parent 9322ed0a68
commit 4d5678aa17
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 6
      docs/sources/clients/lambda-promtail/_index.md
  2. 7
      tools/lambda-promtail/lambda-promtail/main.go
  3. 47
      tools/lambda-promtail/lambda-promtail/s3.go
  4. 161
      tools/lambda-promtail/lambda-promtail/s3_test.go
  5. 8
      tools/lambda-promtail/main.tf
  6. BIN
      tools/lambda-promtail/testdata/albaccesslog.log.gz
  7. BIN
      tools/lambda-promtail/testdata/vpcflowlog.log.gz
  8. 12
      tools/lambda-promtail/variables.tf

@ -94,6 +94,12 @@ For those using Cloudwatch and wishing to test out Loki in a low-risk way, this
Note: Propagating logs from Cloudwatch to Loki means you'll still need to _pay_ for Cloudwatch.
### VPC Flow logs
This workflow allows ingesting AWS VPC Flow logs from s3.
One thing to be aware of with this is that the default flow log format doesn't have a timestamp, so the log timestamp will be set to the time the lambda starts processing the log file.
### Loadbalancer logs
This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki.

@ -34,6 +34,7 @@ var (
s3Clients map[string]*s3.Client
extraLabels model.LabelSet
skipTlsVerify bool
printLogLine bool
)
func setupArguments() {
@ -90,6 +91,12 @@ func setupArguments() {
batchSize, _ = strconv.Atoi(batch)
}
print := os.Getenv("PRINT_LOG_LINE")
printLogLine = true
if strings.EqualFold(print, "false") {
printLogLine = false
}
s3Clients = make(map[string]*s3.Client)
}

@ -19,16 +19,25 @@ import (
)
var (
// regex that parses the log file name fields
// AWS Application Load Balancers
// source: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format
// format: bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_app.load-balancer-id_end-time_ip-address_random-string.log.gz
// example: my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz
filenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/elasticloadbalancing\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_elasticloadbalancing\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<lb>[a-zA-Z0-9\-]+)`)
// VPC Flow Logs
// source: https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3.html#flow-logs-s3-path
// format: bucket-and-optional-prefix/AWSLogs/account_id/vpcflowlogs/region/year/month/day/aws_account_id_vpcflowlogs_region_flow_log_id_YYYYMMDDTHHmmZ_hash.log.gz
// example: 123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz
filenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>\w+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
// regex that extracts the timestamp (RFC3339) from message log
timestampRegex = regexp.MustCompile(`\w+ (?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`)
)
const (
FLOW_LOG_TYPE string = "vpcflowlogs"
LB_LOG_TYPE string = "elasticloadbalancing"
)
func getS3Object(ctx context.Context, labels map[string]string) (io.ReadCloser, error) {
var s3Client *s3.Client
@ -66,21 +75,41 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
scanner := bufio.NewScanner(gzreader)
skipHeader := false
logType := labels["type"]
if labels["type"] == FLOW_LOG_TYPE {
skipHeader = true
logType = "s3_vpc_flow"
} else if labels["type"] == LB_LOG_TYPE {
logType = "s3_lb"
}
ls := model.LabelSet{
model.LabelName("__aws_log_type"): model.LabelValue("s3_lb"),
model.LabelName("__aws_s3_log_lb"): model.LabelValue(labels["lb"]),
model.LabelName("__aws_s3_log_lb_owner"): model.LabelValue(labels["account_id"]),
model.LabelName("__aws_log_type"): model.LabelValue(logType),
model.LabelName(fmt.Sprintf("__aws_%s_lb", logType)): model.LabelValue(labels["src"]),
model.LabelName(fmt.Sprintf("__aws_%s_lb_owner", logType)): model.LabelValue(labels["account_id"]),
}
ls = applyExtraLabels(ls)
timestamp := time.Now()
var lineCount int
for scanner.Scan() {
log_line := scanner.Text()
match := timestampRegex.FindStringSubmatch(log_line)
lineCount++
if lineCount == 1 && skipHeader {
continue
}
if printLogLine {
fmt.Println(log_line)
}
timestamp, err := time.Parse(time.RFC3339, match[1])
if err != nil {
return err
match := timestampRegex.FindStringSubmatch(log_line)
if len(match) > 0 {
timestamp, err = time.Parse(time.RFC3339, match[1])
if err != nil {
return err
}
}
if err := b.add(ctx, entry{ls, logproto.Entry{

@ -0,0 +1,161 @@
package main
import (
"context"
"io"
"os"
"reflect"
"testing"
"github.com/aws/aws-lambda-go/events"
"github.com/grafana/loki/pkg/logproto"
)
func Test_getLabels(t *testing.T) {
type args struct {
record events.S3EventRecord
}
tests := []struct {
name string
args args
want map[string]string
wantErr bool
}{
{
name: "s3_lb",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "elb_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz",
},
},
},
},
want: map[string]string{
"account_id": "123456789012",
"bucket": "elb_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "24",
"key": "my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz",
"month": "01",
"region": "us-east-1",
"src": "my-loadbalancer",
"type": "elasticloadbalancing",
"year": "2022",
},
wantErr: false,
},
{
name: "s3_flow_logs",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "elb_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "my-bucket/AWSLogs/123456789012/vpcflowlogs/us-east-1/2022/01/24/123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz",
},
},
},
},
want: map[string]string{
"account_id": "123456789012",
"bucket": "elb_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "24",
"key": "my-bucket/AWSLogs/123456789012/vpcflowlogs/us-east-1/2022/01/24/123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz",
"month": "01",
"region": "us-east-1",
"src": "fl-1234abcd",
"type": "vpcflowlogs",
"year": "2022",
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
got, err := getLabels(tt.args.record)
if (err != nil) != tt.wantErr {
t.Errorf("getLabels() error = %v, wantErr %v", err, tt.wantErr)
return
}
if !reflect.DeepEqual(got, tt.want) {
t.Errorf("getLabels() = %v, want %v", got, tt.want)
}
})
}
}
func Test_parseS3Log(t *testing.T) {
type args struct {
b *batch
labels map[string]string
obj io.ReadCloser
filename string
batchSize int
}
tests := []struct {
name string
args args
wantErr bool
}{
{
name: "vpcflowlogs",
args: args{
batchSize: 1024, // Set large enough we don't try and send to promtail
filename: "../testdata/vpcflowlog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": FLOW_LOG_TYPE,
},
},
wantErr: false,
},
{
name: "albaccesslogs",
args: args{
batchSize: 1024, // Set large enough we don't try and send to promtail
filename: "../testdata/albaccesslog.log.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": LB_LOG_TYPE,
},
},
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var err error
batchSize = tt.args.batchSize
tt.args.obj, err = os.Open(tt.args.filename)
if err != nil {
t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err)
}
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
})
}
}

@ -52,7 +52,7 @@ resource "aws_iam_role_policy" "logs" {
"Resource" : "arn:aws:kms:*:*:*",
},
{
"Action": [
"Action" : [
"ec2:DescribeNetworkInterfaces",
"ec2:CreateNetworkInterface",
"ec2:DeleteNetworkInterface",
@ -60,7 +60,7 @@ resource "aws_iam_role_policy" "logs" {
"ec2:AttachNetworkInterface"
],
"Effect" : "Allow",
"Resource": "*",
"Resource" : "*",
},
{
"Action" : [
@ -115,6 +115,8 @@ resource "aws_lambda_function" "lambda_promtail" {
EXTRA_LABELS = var.extra_labels
TENANT_ID = var.tenant_id
SKIP_TLS_VERIFY = var.skip_tls_verify
PRINT_LOG_LINE = var.print_log_line
SKIP_TLS_VERIFY = var.skip_tls_verify
}
}
@ -158,7 +160,7 @@ resource "aws_lambda_permission" "allow-s3-invoke-lambda-promtail" {
}
resource "aws_kinesis_stream" "kinesis_stream" {
for_each = toset(var.kinesis_stream_name)
for_each = toset(var.kinesis_stream_name)
name = each.value
shard_count = 1
retention_period = 48

@ -54,10 +54,16 @@ variable "keep_stream" {
default = "false"
}
variable "print_log_line" {
type = string
description = "Determines whether we want the lambda to output the parsed log line before sending it on to promtail. Value needed to disable is the string 'false'"
default = "true"
}
variable "extra_labels" {
type = string
type = string
description = "Comma separated list of extra labels, in the format 'name1,value1,name2,value2,...,nameN,valueN' to add to entries forwarded by lambda-promtail."
default = ""
default = ""
}
variable "batch_size" {
@ -94,4 +100,4 @@ variable "kinesis_stream_name" {
type = list(string)
description = "Enter kinesis name if kinesis stream is configured as event source in lambda."
default = []
}
}

Loading…
Cancel
Save