diff --git a/CHANGELOG.md b/CHANGELOG.md index 93f4818827..3add654155 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,8 @@ ##### Enhancements +* [9573](https://github.com/grafana/loki/pull/9573) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudFront log ingestion. +* [9497](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion. * [8886](https://github.com/grafana/loki/pull/8886) **MichelHollands**: Add new logql template function `unixToTime` * [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion. * [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`. diff --git a/docs/sources/clients/lambda-promtail/_index.md b/docs/sources/clients/lambda-promtail/_index.md index a42494700c..f3fff1bcea 100644 --- a/docs/sources/clients/lambda-promtail/_index.md +++ b/docs/sources/clients/lambda-promtail/_index.md @@ -109,9 +109,10 @@ This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki. This workflow allows ingesting AWS Cloudtrail logs stored on S3 to Loki. -### Cloudfront real-time logs - -Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. +### Cloudfront logs +Cloudfront logs can be either batched or streamed in real time to Loki: ++ Logging can be activated on a Cloudfront distribution with an S3 bucket as the destination. In this case, the workflow is the same as for other services (VPC Flow logs, Loadbalancer logs, Cloudtrail logs). ++ Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. ### Triggering Lambda-Promtail via SQS For AWS services supporting sending messages to SQS (for example, S3 with an S3 Notification to SQS), events can be processed through an [SQS queue using a lambda trigger](https://docs.aws.amazon.com/lambda/latest/dg/with-sqs.html) instead of directly configuring the source service to trigger lambda. Lambda-promtail will retrieve the nested events from the SQS messages' body and process them as if them came directly from the source service. diff --git a/tools/lambda-promtail/lambda-promtail/s3.go b/tools/lambda-promtail/lambda-promtail/s3.go index 93f3d8e9d8..bb7a94d032 100644 --- a/tools/lambda-promtail/lambda-promtail/s3.go +++ b/tools/lambda-promtail/lambda-promtail/s3.go @@ -21,11 +21,27 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) +type parserConfig struct { + // value to use for __aws_log_type label + logTypeLabel string + // regex matching filename and and exporting labels from it + filenameRegex *regexp.Regexp + // regex that extracts the timestamp from the log sample + timestampRegex *regexp.Regexp + // time format to use to convert the timestamp to time.Time + timestampFormat string + // how many lines or jsonToken to skip at the beginning of the file + skipHeaderCount int + // key of the metadata label to use as a value for the__aws__owner label + ownerLabelKey string +} + const ( FLOW_LOG_TYPE string = "vpcflowlogs" LB_LOG_TYPE string = "elasticloadbalancing" CLOUDTRAIL_LOG_TYPE string = "CloudTrail" CLOUDTRAIL_DIGEST_LOG_TYPE string = "CloudTrail-Digest" + CLOUDFRONT_LOG_TYPE string = "cloudfront" ) var ( @@ -40,15 +56,45 @@ var ( // CloudTrail // source: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html#cloudtrail-log-filename-format // example: 111122223333_CloudTrail_us-east-2_20150801T0210Z_Mu0KsOhtH1ar15ZZ.json.gz - defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) - cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P[a-zA-Z0-9\-]+)`) - filenameRegexes = map[string]*regexp.Regexp{ - FLOW_LOG_TYPE: defaultFilenameRegex, - LB_LOG_TYPE: defaultFilenameRegex, - CLOUDTRAIL_LOG_TYPE: cloudtrailFilenameRegex, + // CloudFront + // source https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/AccessLogs.html#AccessLogsFileNaming + // example: example-prefix/EMLARXS9EXAMPLE.2019-11-14-20.RT4KCN4SGK9.gz + defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) + defaultTimestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) + cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P[a-zA-Z0-9\-]+)`) + cloudfrontFilenameRegex = regexp.MustCompile(`(?P.*)\/(?P[A-Z0-9]+)\.(?P\d+)-(?P\d+)-(?P\d+)-(.+)`) + cloudfrontTimestampRegex = regexp.MustCompile(`(?P\d+-\d+-\d+\s\d+:\d+:\d+)`) + parsers = map[string]parserConfig{ + FLOW_LOG_TYPE: { + logTypeLabel: "s3_vpc_flow", + filenameRegex: defaultFilenameRegex, + ownerLabelKey: "account_id", + timestampRegex: defaultTimestampRegex, + timestampFormat: time.RFC3339, + skipHeaderCount: 1, + }, + LB_LOG_TYPE: { + logTypeLabel: "s3_lb", + filenameRegex: defaultFilenameRegex, + ownerLabelKey: "account_id", + timestampFormat: time.RFC3339, + timestampRegex: defaultTimestampRegex, + }, + CLOUDTRAIL_LOG_TYPE: { + logTypeLabel: "s3_cloudtrail", + ownerLabelKey: "account_id", + skipHeaderCount: 3, + filenameRegex: cloudtrailFilenameRegex, + }, + CLOUDFRONT_LOG_TYPE: { + logTypeLabel: "s3_cloudfront", + filenameRegex: cloudfrontFilenameRegex, + ownerLabelKey: "prefix", + timestampRegex: cloudfrontTimestampRegex, + timestampFormat: "2006-01-02\x0915:04:05", + skipHeaderCount: 2, + }, } - // regex that extracts the timestamp (RFC3339) from message log - timestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) ) func getS3Client(ctx context.Context, region string) (*s3.Client, error) { @@ -68,6 +114,13 @@ func getS3Client(ctx context.Context, region string) (*s3.Client, error) { } func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.ReadCloser) error { + parser, ok := parsers[labels["type"]] + if !ok { + if labels["type"] == CLOUDTRAIL_DIGEST_LOG_TYPE { + return nil + } + return fmt.Errorf("could not find parser for type %s", labels["type"]) + } gzreader, err := gzip.NewReader(obj) if err != nil { return err @@ -75,25 +128,10 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. scanner := bufio.NewScanner(gzreader) - skipHeader := false - logType := "" - switch labels["type"] { - case FLOW_LOG_TYPE: - skipHeader = true - logType = "s3_vpc_flow" - case LB_LOG_TYPE: - logType = "s3_lb" - case CLOUDTRAIL_LOG_TYPE: - logType = "s3_cloudtrail" - case CLOUDTRAIL_DIGEST_LOG_TYPE: - // do not ingest digest files' content - return nil - } - ls := model.LabelSet{ - model.LabelName("__aws_log_type"): model.LabelValue(logType), - model.LabelName(fmt.Sprintf("__aws_%s", logType)): model.LabelValue(labels["src"]), - model.LabelName(fmt.Sprintf("__aws_%s_owner", logType)): model.LabelValue(labels["account_id"]), + model.LabelName("__aws_log_type"): model.LabelValue(parser.logTypeLabel), + model.LabelName(fmt.Sprintf("__aws_%s", parser.logTypeLabel)): model.LabelValue(labels["src"]), + model.LabelName(fmt.Sprintf("__aws_%s_owner", parser.logTypeLabel)): model.LabelValue(labels[parser.ownerLabelKey]), } ls = applyExtraLabels(ls) @@ -102,7 +140,7 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. if labels["type"] == CLOUDTRAIL_LOG_TYPE { records := make(chan Record) jsonStream := NewJSONStream(records) - go jsonStream.Start(gzreader, 3) + go jsonStream.Start(gzreader, parser.skipHeaderCount) // Stream json file for record := range jsonStream.records { if record.Error != nil { @@ -123,17 +161,17 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. for scanner.Scan() { log_line := scanner.Text() lineCount++ - if lineCount == 1 && skipHeader { + if lineCount <= parser.skipHeaderCount { continue } if printLogLine { fmt.Println(log_line) } - match := timestampRegex.FindStringSubmatch(log_line) timestamp := time.Now() + match := parser.timestampRegex.FindStringSubmatch(log_line) if len(match) > 0 { - timestamp, err = time.Parse(time.RFC3339, match[1]) + timestamp, err = time.Parse(parser.timestampFormat, match[1]) if err != nil { return err } @@ -151,24 +189,23 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. } func getLabels(record events.S3EventRecord) (map[string]string, error) { + labels := make(map[string]string) labels["key"] = record.S3.Object.Key labels["bucket"] = record.S3.Bucket.Name labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID labels["bucket_region"] = record.AWSRegion - var matchingExp *regexp.Regexp var matchingType *string - for key, exp := range filenameRegexes { - if exp.MatchString(labels["key"]) { - matchingExp = exp + for key, p := range parsers { + if p.filenameRegex.MatchString(labels["key"]) { matchingType = aws.String(key) - } - } - match := matchingExp.FindStringSubmatch(labels["key"]) - for i, name := range matchingExp.SubexpNames() { - if i != 0 && name != "" { - labels[name] = match[i] + match := p.filenameRegex.FindStringSubmatch(labels["key"]) + for i, name := range p.filenameRegex.SubexpNames() { + if i != 0 && name != "" { + labels[name] = match[i] + } + } } } if labels["type"] == "" { diff --git a/tools/lambda-promtail/lambda-promtail/s3_test.go b/tools/lambda-promtail/lambda-promtail/s3_test.go index 83f1161b6d..18f8088257 100644 --- a/tools/lambda-promtail/lambda-promtail/s3_test.go +++ b/tools/lambda-promtail/lambda-promtail/s3_test.go @@ -89,6 +89,39 @@ func Test_getLabels(t *testing.T) { }, wantErr: false, }, + { + name: "cloudtrail_digest_logs", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "cloudtrail_digest_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + }, + }, + }, + }, + want: map[string]string{ + "account_id": "123456789012", + "bucket": "cloudtrail_digest_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "24", + "key": "my-bucket/AWSLogs/123456789012/CloudTrail-Digest/us-east-1/2022/01/24/123456789012_CloudTrail-Digest_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + "month": "01", + "region": "us-east-1", + "src": "4jhzXFO2Jlvu2b3y", + "type": CLOUDTRAIL_DIGEST_LOG_TYPE, + "year": "2022", + }, + wantErr: false, + }, { name: "cloudtrail_logs", args: args{ @@ -122,6 +155,38 @@ func Test_getLabels(t *testing.T) { }, wantErr: false, }, + { + name: "s3_cloudfront", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "cloudfront_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz", + }, + }, + }, + }, + want: map[string]string{ + "bucket": "cloudfront_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "18", + "key": "my/bucket/prefix/E2K2LNL5N3WR51.2022-07-18-12.a10a8496.gz", + "month": "07", + "prefix": "my/bucket/prefix", + "src": "E2K2LNL5N3WR51", + "type": CLOUDFRONT_LOG_TYPE, + "year": "2022", + }, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -149,6 +214,7 @@ func Test_parseS3Log(t *testing.T) { name string args args wantErr bool + expectedLen int expectedStream string }{ { @@ -165,6 +231,7 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_vpc_flow", __aws_s3_vpc_flow="source", __aws_s3_vpc_flow_owner="123456789"}`, wantErr: false, }, @@ -182,6 +249,7 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_lb", __aws_s3_lb="source", __aws_s3_lb_owner="123456789"}`, wantErr: false, }, @@ -199,9 +267,46 @@ func Test_parseS3Log(t *testing.T) { "account_id": "123456789", }, }, + expectedLen: 1, expectedStream: `{__aws_log_type="s3_cloudtrail", __aws_s3_cloudtrail="source", __aws_s3_cloudtrail_owner="123456789"}`, wantErr: false, }, + { + name: "cloudtrail_digest_logs", + args: args{ + batchSize: 131072, // Set large enough we don't try and send to promtail + filename: "../testdata/cloudtrail-log-file.json.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": CLOUDTRAIL_DIGEST_LOG_TYPE, + "src": "source", + "account_id": "123456789", + }, + }, + expectedLen: 0, + expectedStream: ``, + wantErr: false, + }, + { + name: "cloudfrontlogs", + args: args{ + batchSize: 131072, // Set large enough we don't try and send to promtail + filename: "../testdata/cloudfront.log.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": CLOUDFRONT_LOG_TYPE, + "src": "DISTRIBUTIONID", + "prefix": "path/to/file", + }, + }, + expectedLen: 1, + expectedStream: `{__aws_log_type="s3_cloudfront", __aws_s3_cloudfront="DISTRIBUTIONID", __aws_s3_cloudfront_owner="path/to/file"}`, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -214,10 +319,12 @@ func Test_parseS3Log(t *testing.T) { if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr { t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr) } - require.Len(t, tt.args.b.streams, 1) - stream, ok := tt.args.b.streams[tt.expectedStream] - require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) - require.NotNil(t, stream) + require.Len(t, tt.args.b.streams, tt.expectedLen) + if tt.expectedStream != "" { + stream, ok := tt.args.b.streams[tt.expectedStream] + require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) + require.NotNil(t, stream) + } }) } } diff --git a/tools/lambda-promtail/testdata/cloudfront.log.gz b/tools/lambda-promtail/testdata/cloudfront.log.gz new file mode 100644 index 0000000000..c37a63d55a Binary files /dev/null and b/tools/lambda-promtail/testdata/cloudfront.log.gz differ