diff --git a/docs/sources/clients/lambda-promtail/_index.md b/docs/sources/clients/lambda-promtail/_index.md index 777952cc46..8c02a07060 100644 --- a/docs/sources/clients/lambda-promtail/_index.md +++ b/docs/sources/clients/lambda-promtail/_index.md @@ -94,6 +94,12 @@ For those using Cloudwatch and wishing to test out Loki in a low-risk way, this Note: Propagating logs from Cloudwatch to Loki means you'll still need to _pay_ for Cloudwatch. +### VPC Flow logs + +This workflow allows ingesting AWS VPC Flow logs from s3. + +One thing to be aware of with this is that the default flow log format doesn't have a timestamp, so the log timestamp will be set to the time the lambda starts processing the log file. + ### Loadbalancer logs This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki. diff --git a/tools/lambda-promtail/lambda-promtail/main.go b/tools/lambda-promtail/lambda-promtail/main.go index 3a8979a77d..64c5387948 100644 --- a/tools/lambda-promtail/lambda-promtail/main.go +++ b/tools/lambda-promtail/lambda-promtail/main.go @@ -34,6 +34,7 @@ var ( s3Clients map[string]*s3.Client extraLabels model.LabelSet skipTlsVerify bool + printLogLine bool ) func setupArguments() { @@ -90,6 +91,12 @@ func setupArguments() { batchSize, _ = strconv.Atoi(batch) } + print := os.Getenv("PRINT_LOG_LINE") + printLogLine = true + if strings.EqualFold(print, "false") { + printLogLine = false + } + s3Clients = make(map[string]*s3.Client) } diff --git a/tools/lambda-promtail/lambda-promtail/s3.go b/tools/lambda-promtail/lambda-promtail/s3.go index eb10c2e546..c3acc3c543 100644 --- a/tools/lambda-promtail/lambda-promtail/s3.go +++ b/tools/lambda-promtail/lambda-promtail/s3.go @@ -19,16 +19,25 @@ import ( ) var ( - // regex that parses the log file name fields + // AWS Application Load Balancers // source: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format // format: bucket[/prefix]/AWSLogs/aws-account-id/elasticloadbalancing/region/yyyy/mm/dd/aws-account-id_elasticloadbalancing_region_app.load-balancer-id_end-time_ip-address_random-string.log.gz // example: my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz - filenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/elasticloadbalancing\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_elasticloadbalancing\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) + // VPC Flow Logs + // source: https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3.html#flow-logs-s3-path + // format: bucket-and-optional-prefix/AWSLogs/account_id/vpcflowlogs/region/year/month/day/aws_account_id_vpcflowlogs_region_flow_log_id_YYYYMMDDTHHmmZ_hash.log.gz + // example: 123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz + filenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P\w+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) // regex that extracts the timestamp (RFC3339) from message log timestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) ) +const ( + FLOW_LOG_TYPE string = "vpcflowlogs" + LB_LOG_TYPE string = "elasticloadbalancing" +) + func getS3Object(ctx context.Context, labels map[string]string) (io.ReadCloser, error) { var s3Client *s3.Client @@ -66,21 +75,41 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. scanner := bufio.NewScanner(gzreader) + skipHeader := false + logType := labels["type"] + if labels["type"] == FLOW_LOG_TYPE { + skipHeader = true + logType = "s3_vpc_flow" + } else if labels["type"] == LB_LOG_TYPE { + logType = "s3_lb" + } + ls := model.LabelSet{ - model.LabelName("__aws_log_type"): model.LabelValue("s3_lb"), - model.LabelName("__aws_s3_log_lb"): model.LabelValue(labels["lb"]), - model.LabelName("__aws_s3_log_lb_owner"): model.LabelValue(labels["account_id"]), + model.LabelName("__aws_log_type"): model.LabelValue(logType), + model.LabelName(fmt.Sprintf("__aws_%s_lb", logType)): model.LabelValue(labels["src"]), + model.LabelName(fmt.Sprintf("__aws_%s_lb_owner", logType)): model.LabelValue(labels["account_id"]), } ls = applyExtraLabels(ls) + timestamp := time.Now() + var lineCount int for scanner.Scan() { log_line := scanner.Text() - match := timestampRegex.FindStringSubmatch(log_line) + lineCount++ + if lineCount == 1 && skipHeader { + continue + } + if printLogLine { + fmt.Println(log_line) + } - timestamp, err := time.Parse(time.RFC3339, match[1]) - if err != nil { - return err + match := timestampRegex.FindStringSubmatch(log_line) + if len(match) > 0 { + timestamp, err = time.Parse(time.RFC3339, match[1]) + if err != nil { + return err + } } if err := b.add(ctx, entry{ls, logproto.Entry{ diff --git a/tools/lambda-promtail/lambda-promtail/s3_test.go b/tools/lambda-promtail/lambda-promtail/s3_test.go new file mode 100644 index 0000000000..9a54510330 --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/s3_test.go @@ -0,0 +1,161 @@ +package main + +import ( + "context" + "io" + "os" + "reflect" + "testing" + + "github.com/aws/aws-lambda-go/events" + "github.com/grafana/loki/pkg/logproto" +) + +func Test_getLabels(t *testing.T) { + type args struct { + record events.S3EventRecord + } + tests := []struct { + name string + args args + want map[string]string + wantErr bool + }{ + { + name: "s3_lb", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "elb_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz", + }, + }, + }, + }, + want: map[string]string{ + "account_id": "123456789012", + "bucket": "elb_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "24", + "key": "my-bucket/AWSLogs/123456789012/elasticloadbalancing/us-east-1/2022/01/24/123456789012_elasticloadbalancing_us-east-1_app.my-loadbalancer.b13ea9d19f16d015_20220124T0000Z_0.0.0.0_2et2e1mx.log.gz", + "month": "01", + "region": "us-east-1", + "src": "my-loadbalancer", + "type": "elasticloadbalancing", + "year": "2022", + }, + wantErr: false, + }, + { + name: "s3_flow_logs", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "elb_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my-bucket/AWSLogs/123456789012/vpcflowlogs/us-east-1/2022/01/24/123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz", + }, + }, + }, + }, + want: map[string]string{ + "account_id": "123456789012", + "bucket": "elb_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "24", + "key": "my-bucket/AWSLogs/123456789012/vpcflowlogs/us-east-1/2022/01/24/123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz", + "month": "01", + "region": "us-east-1", + "src": "fl-1234abcd", + "type": "vpcflowlogs", + "year": "2022", + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + got, err := getLabels(tt.args.record) + if (err != nil) != tt.wantErr { + t.Errorf("getLabels() error = %v, wantErr %v", err, tt.wantErr) + return + } + if !reflect.DeepEqual(got, tt.want) { + t.Errorf("getLabels() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_parseS3Log(t *testing.T) { + type args struct { + b *batch + labels map[string]string + obj io.ReadCloser + filename string + batchSize int + } + tests := []struct { + name string + args args + wantErr bool + }{ + { + name: "vpcflowlogs", + args: args{ + batchSize: 1024, // Set large enough we don't try and send to promtail + filename: "../testdata/vpcflowlog.log.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": FLOW_LOG_TYPE, + }, + }, + wantErr: false, + }, + { + name: "albaccesslogs", + args: args{ + batchSize: 1024, // Set large enough we don't try and send to promtail + filename: "../testdata/albaccesslog.log.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": LB_LOG_TYPE, + }, + }, + wantErr: false, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var err error + batchSize = tt.args.batchSize + tt.args.obj, err = os.Open(tt.args.filename) + if err != nil { + t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err) + } + + if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr { + t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr) + } + }) + } +} diff --git a/tools/lambda-promtail/main.tf b/tools/lambda-promtail/main.tf index f0a75bcedd..4a4ff8405c 100644 --- a/tools/lambda-promtail/main.tf +++ b/tools/lambda-promtail/main.tf @@ -52,7 +52,7 @@ resource "aws_iam_role_policy" "logs" { "Resource" : "arn:aws:kms:*:*:*", }, { - "Action": [ + "Action" : [ "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterface", "ec2:DeleteNetworkInterface", @@ -60,7 +60,7 @@ resource "aws_iam_role_policy" "logs" { "ec2:AttachNetworkInterface" ], "Effect" : "Allow", - "Resource": "*", + "Resource" : "*", }, { "Action" : [ @@ -115,6 +115,8 @@ resource "aws_lambda_function" "lambda_promtail" { EXTRA_LABELS = var.extra_labels TENANT_ID = var.tenant_id SKIP_TLS_VERIFY = var.skip_tls_verify + PRINT_LOG_LINE = var.print_log_line + SKIP_TLS_VERIFY = var.skip_tls_verify } } @@ -158,7 +160,7 @@ resource "aws_lambda_permission" "allow-s3-invoke-lambda-promtail" { } resource "aws_kinesis_stream" "kinesis_stream" { - for_each = toset(var.kinesis_stream_name) + for_each = toset(var.kinesis_stream_name) name = each.value shard_count = 1 retention_period = 48 diff --git a/tools/lambda-promtail/testdata/albaccesslog.log.gz b/tools/lambda-promtail/testdata/albaccesslog.log.gz new file mode 100644 index 0000000000..c17b04f0da Binary files /dev/null and b/tools/lambda-promtail/testdata/albaccesslog.log.gz differ diff --git a/tools/lambda-promtail/testdata/vpcflowlog.log.gz b/tools/lambda-promtail/testdata/vpcflowlog.log.gz new file mode 100644 index 0000000000..2a48134779 Binary files /dev/null and b/tools/lambda-promtail/testdata/vpcflowlog.log.gz differ diff --git a/tools/lambda-promtail/variables.tf b/tools/lambda-promtail/variables.tf index 5547432a98..6772de98e2 100644 --- a/tools/lambda-promtail/variables.tf +++ b/tools/lambda-promtail/variables.tf @@ -54,10 +54,16 @@ variable "keep_stream" { default = "false" } +variable "print_log_line" { + type = string + description = "Determines whether we want the lambda to output the parsed log line before sending it on to promtail. Value needed to disable is the string 'false'" + default = "true" +} + variable "extra_labels" { - type = string + type = string description = "Comma separated list of extra labels, in the format 'name1,value1,name2,value2,...,nameN,valueN' to add to entries forwarded by lambda-promtail." - default = "" + default = "" } variable "batch_size" { @@ -94,4 +100,4 @@ variable "kinesis_stream_name" { type = list(string) description = "Enter kinesis name if kinesis stream is configured as event source in lambda." default = [] -} \ No newline at end of file +}