diff --git a/CHANGELOG.md b/CHANGELOG.md index 69b9352f5d..5101bf8acd 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -6,6 +6,7 @@ ##### Enhancements +* [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion. * [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`. * [8067](https://github.com/grafana/loki/pull/8067) **DylanGuedes**: Distributor: Add auto-forget unhealthy members support. * [9175](https://github.com/grafana/loki/pull/9175) **MichelHollands**: Ingester: update the `prepare_shutdown` endpoint so it supports GET and DELETE and stores the state on disk. diff --git a/docs/sources/clients/lambda-promtail/_index.md b/docs/sources/clients/lambda-promtail/_index.md index 19e7a8e907..a42494700c 100644 --- a/docs/sources/clients/lambda-promtail/_index.md +++ b/docs/sources/clients/lambda-promtail/_index.md @@ -5,7 +5,7 @@ weight: 20 --- # Lambda Promtail -Grafana Loki includes [Terraform](https://www.terraform.io/) and [CloudFormation](https://aws.amazon.com/cloudformation/) for shipping Cloudwatch and loadbalancer logs to Loki via a [lambda function](https://aws.amazon.com/lambda/). This is done via [lambda-promtail](https://github.com/grafana/loki/blob/main/tools/lambda-promtail) which processes cloudwatch events and propagates them to Loki (or a Promtail instance) via the push-api [scrape config]({{}}). +Grafana Loki includes [Terraform](https://www.terraform.io/) and [CloudFormation](https://aws.amazon.com/cloudformation/) for shipping Cloudwatch, Cloudtrail, VPC Flow Logs and loadbalancer logs to Loki via a [lambda function](https://aws.amazon.com/lambda/). This is done via [lambda-promtail](https://github.com/grafana/loki/blob/main/tools/lambda-promtail) which processes cloudwatch events and propagates them to Loki (or a Promtail instance) via the push-api [scrape config]({{}}). ## Deployment @@ -105,6 +105,10 @@ One thing to be aware of with this is that the default flow log format doesn't h This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki. +### Cloudtrail logs + +This workflow allows ingesting AWS Cloudtrail logs stored on S3 to Loki. + ### Cloudfront real-time logs Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki. diff --git a/tools/lambda-promtail/lambda-promtail/cloudtrail.go b/tools/lambda-promtail/lambda-promtail/cloudtrail.go new file mode 100644 index 0000000000..500b232036 --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/cloudtrail.go @@ -0,0 +1,32 @@ +package main + +import ( + "encoding/json" + "time" + + "github.com/grafana/loki/pkg/logproto" +) + +// Parses a Cloudtrail Record and returns a logproto.Entry +func parseCloudtrailRecord(record Record) (logproto.Entry, error) { + timestamp := time.Now() + if record.Error != nil { + return logproto.Entry{}, record.Error + } + document, err := json.Marshal(record.Content) + if err != nil { + return logproto.Entry{}, err + } + if val, ok := record.Content["eventTime"]; ok { + time, err := time.Parse(time.RFC3339, val.(string)) + if err != nil { + return logproto.Entry{}, err + } else { + timestamp = time + } + } + return logproto.Entry{ + Line: string(document), + Timestamp: timestamp, + }, nil +} diff --git a/tools/lambda-promtail/lambda-promtail/cloudtrail_test.go b/tools/lambda-promtail/lambda-promtail/cloudtrail_test.go new file mode 100644 index 0000000000..e96c7b5bba --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/cloudtrail_test.go @@ -0,0 +1,31 @@ +package main + +import ( + "compress/gzip" + "os" + "testing" +) + +func TestParseJson(t *testing.T) { + records := make(chan Record) + jsonStream := NewJSONStream(records) + file, err := os.Open("../testdata/cloudtrail-log-file.json.gz") + if err != nil { + t.Error(err) + } + gzipReader,err := gzip.NewReader(file) + if err != nil { + t.Error(err) + } + go jsonStream.Start(gzipReader, 3) + + for record := range jsonStream.records { + if record.Error != nil { + t.Error(record.Error) + } + _, err := parseCloudtrailRecord(record) + if err != nil { + t.Error(err) + } + } +} diff --git a/tools/lambda-promtail/lambda-promtail/json_stream.go b/tools/lambda-promtail/lambda-promtail/json_stream.go new file mode 100644 index 0000000000..140ebd338d --- /dev/null +++ b/tools/lambda-promtail/lambda-promtail/json_stream.go @@ -0,0 +1,54 @@ +package main + +import ( + "encoding/json" + "fmt" + "io" +) + +// Stream helps transmit each recordss withing a channel. +type Stream struct { + records chan Record +} + +// Represents a Record in the Json Stream. If there is not error, error is nil +type Record struct { + Error error + Content map[string]any +} + +// Creates a new Stream of records +func NewJSONStream(recordChan chan Record) Stream { + return Stream{ + records: recordChan, + } +} + +// Streams the JSON file starting from the target token, record by record. +func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) { + defer r.Close() + defer close(s.records) + var decoder *json.Decoder + decoder = json.NewDecoder(r) + + // Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record" + for i := 0; i < tokenCountToTarget; i++ { + _, err := decoder.Token() + if err != nil { + s.records <- Record{Error: fmt.Errorf("failed decoding beginning token: %w", err)} + return + } + } + + // Read the JSON token content + i := 1 + for decoder.More() { + var content map[string]any + if err := decoder.Decode(&content); err != nil { + s.records <- Record{Error: fmt.Errorf("failed decoding record %d: %w", i, err)} + return + } + s.records <- Record{Content: content} + i++ + } +} diff --git a/tools/lambda-promtail/lambda-promtail/s3.go b/tools/lambda-promtail/lambda-promtail/s3.go index 44ea4cc534..60eb3a13b7 100644 --- a/tools/lambda-promtail/lambda-promtail/s3.go +++ b/tools/lambda-promtail/lambda-promtail/s3.go @@ -20,6 +20,13 @@ import ( "github.com/aws/aws-sdk-go-v2/service/s3" ) +const ( + FLOW_LOG_TYPE string = "vpcflowlogs" + LB_LOG_TYPE string = "elasticloadbalancing" + CLOUDTRAIL_LOG_TYPE string = "CloudTrail" + CLOUDTRAIL_DIGEST_LOG_TYPE string = "CloudTrail-Digest" +) + var ( // AWS Application Load Balancers // source: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format @@ -29,17 +36,20 @@ var ( // source: https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3.html#flow-logs-s3-path // format: bucket-and-optional-prefix/AWSLogs/account_id/vpcflowlogs/region/year/month/day/aws_account_id_vpcflowlogs_region_flow_log_id_YYYYMMDDTHHmmZ_hash.log.gz // example: 123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz - filenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P\w+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) - + // CloudTrail + // source: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html#cloudtrail-log-filename-format + // example: 111122223333_CloudTrail_us-east-2_20150801T0210Z_Mu0KsOhtH1ar15ZZ.json.gz + defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P[a-zA-Z0-9\-]+)`) + cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P\d+)\/(?P[a-zA-Z0-9_\-]+)\/(?P[\w-]+)\/(?P\d+)\/(?P\d+)\/(?P\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P[a-zA-Z0-9\-]+)`) + filenameRegexes = map[string]*regexp.Regexp{ + FLOW_LOG_TYPE: defaultFilenameRegex, + LB_LOG_TYPE: defaultFilenameRegex, + CLOUDTRAIL_LOG_TYPE: cloudtrailFilenameRegex, + } // regex that extracts the timestamp (RFC3339) from message log timestampRegex = regexp.MustCompile(`\w+ (?P\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`) ) -const ( - FLOW_LOG_TYPE string = "vpcflowlogs" - LB_LOG_TYPE string = "elasticloadbalancing" -) - func getS3Client(ctx context.Context, region string) (*s3.Client, error) { var s3Client *s3.Client @@ -65,12 +75,18 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. scanner := bufio.NewScanner(gzreader) skipHeader := false - logType := labels["type"] - if labels["type"] == FLOW_LOG_TYPE { + logType := "" + switch labels["type"] { + case FLOW_LOG_TYPE: skipHeader = true logType = "s3_vpc_flow" - } else if labels["type"] == LB_LOG_TYPE { + case LB_LOG_TYPE: logType = "s3_lb" + case CLOUDTRAIL_LOG_TYPE: + logType = "s3_cloudtrail" + case CLOUDTRAIL_DIGEST_LOG_TYPE: + // do not ingest digest files' content + return nil } ls := model.LabelSet{ @@ -82,6 +98,27 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. ls = applyExtraLabels(ls) timestamp := time.Now() + // extract the timestamp of the nested event and sends the rest as raw json + if labels["type"] == CLOUDTRAIL_LOG_TYPE { + records := make(chan Record) + jsonStream := NewJSONStream(records) + go jsonStream.Start(gzreader, 3) + // Stream json file + for record := range jsonStream.records { + if record.Error != nil { + return record.Error + } + trailEntry, err := parseCloudtrailRecord(record) + if err != nil { + return err + } + if err := b.add(ctx, entry{ls, trailEntry}); err != nil { + return err + } + } + return nil + } + var lineCount int for scanner.Scan() { log_line := scanner.Text() @@ -113,21 +150,29 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io. } func getLabels(record events.S3EventRecord) (map[string]string, error) { - labels := make(map[string]string) labels["key"] = record.S3.Object.Key labels["bucket"] = record.S3.Bucket.Name labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID labels["bucket_region"] = record.AWSRegion - - match := filenameRegex.FindStringSubmatch(labels["key"]) - for i, name := range filenameRegex.SubexpNames() { + var matchingExp *regexp.Regexp + var matchingType *string + for key, exp := range filenameRegexes { + if exp.MatchString(labels["key"]) { + matchingExp = exp + matchingType = aws.String(key) + } + } + match := matchingExp.FindStringSubmatch(labels["key"]) + for i, name := range matchingExp.SubexpNames() { if i != 0 && name != "" { labels[name] = match[i] } } - + if labels["type"] == "" { + labels["type"] = *matchingType + } return labels, nil } diff --git a/tools/lambda-promtail/lambda-promtail/s3_test.go b/tools/lambda-promtail/lambda-promtail/s3_test.go index c8ffc1f2e6..ad778c6757 100644 --- a/tools/lambda-promtail/lambda-promtail/s3_test.go +++ b/tools/lambda-promtail/lambda-promtail/s3_test.go @@ -50,7 +50,7 @@ func Test_getLabels(t *testing.T) { "month": "01", "region": "us-east-1", "src": "my-loadbalancer", - "type": "elasticloadbalancing", + "type": LB_LOG_TYPE, "year": "2022", }, wantErr: false, @@ -62,7 +62,7 @@ func Test_getLabels(t *testing.T) { AWSRegion: "us-east-1", S3: events.S3Entity{ Bucket: events.S3Bucket{ - Name: "elb_logs_test", + Name: "vpc_logs_test", OwnerIdentity: events.S3UserIdentity{ PrincipalID: "test", }, @@ -75,7 +75,7 @@ func Test_getLabels(t *testing.T) { }, want: map[string]string{ "account_id": "123456789012", - "bucket": "elb_logs_test", + "bucket": "vpc_logs_test", "bucket_owner": "test", "bucket_region": "us-east-1", "day": "24", @@ -83,7 +83,40 @@ func Test_getLabels(t *testing.T) { "month": "01", "region": "us-east-1", "src": "fl-1234abcd", - "type": "vpcflowlogs", + "type": FLOW_LOG_TYPE, + "year": "2022", + }, + wantErr: false, + }, + { + name: "cloudtrail_logs", + args: args{ + record: events.S3EventRecord{ + AWSRegion: "us-east-1", + S3: events.S3Entity{ + Bucket: events.S3Bucket{ + Name: "cloudtrail_logs_test", + OwnerIdentity: events.S3UserIdentity{ + PrincipalID: "test", + }, + }, + Object: events.S3Object{ + Key: "my-bucket/AWSLogs/123456789012/CloudTrail/us-east-1/2022/01/24/123456789012_CloudTrail_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + }, + }, + }, + }, + want: map[string]string{ + "account_id": "123456789012", + "bucket": "cloudtrail_logs_test", + "bucket_owner": "test", + "bucket_region": "us-east-1", + "day": "24", + "key": "my-bucket/AWSLogs/123456789012/CloudTrail/us-east-1/2022/01/24/123456789012_CloudTrail_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz", + "month": "01", + "region": "us-east-1", + "src": "4jhzXFO2Jlvu2b3y", + "type": CLOUDTRAIL_LOG_TYPE, "year": "2022", }, wantErr: false, @@ -151,6 +184,23 @@ func Test_parseS3Log(t *testing.T) { expectedStream: `{__aws_log_type="s3_lb", __aws_s3_lb="source", __aws_s3_lb_owner="123456789"}`, wantErr: false, }, + { + name: "cloudtraillogs", + args: args{ + batchSize: 131072, // Set large enough we don't try and send to promtail + filename: "../testdata/cloudtrail-log-file.json.gz", + b: &batch{ + streams: map[string]*logproto.Stream{}, + }, + labels: map[string]string{ + "type": CLOUDTRAIL_LOG_TYPE, + "src": "source", + "account_id": "123456789", + }, + }, + expectedStream: `{__aws_log_type="s3_cloudtrail", __aws_s3_cloudtrail="source", __aws_s3_cloudtrail_owner="123456789"}`, + wantErr: false, + }, } for _, tt := range tests { t.Run(tt.name, func(t *testing.T) { @@ -160,11 +210,9 @@ func Test_parseS3Log(t *testing.T) { if err != nil { t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err) } - if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr { t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr) } - require.Len(t, tt.args.b.streams, 1) stream, ok := tt.args.b.streams[tt.expectedStream] require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream) diff --git a/tools/lambda-promtail/testdata/cloudtrail-log-file.json.gz b/tools/lambda-promtail/testdata/cloudtrail-log-file.json.gz new file mode 100644 index 0000000000..b5f1fbf33f Binary files /dev/null and b/tools/lambda-promtail/testdata/cloudtrail-log-file.json.gz differ