feat(lambda-promtail): add cloudtrail log ingestion support (#9497)

**What this PR does / why we need it**:
### Add support for AWS CloudTrail audit logs ingestion using
lambda-promtail

Calls to AWS APIs are logged in AWS Cloudtrail and are helpful for
security and debugging purposes. However, I've experienced difficulties
with it:
+ The AWS CloudTrail service is not well integrated with Prometheus (no
metrics, no alerts) and I don't want to manage alerts in CloudWatch
Alerts
+ The search experience is painful with CloudTrail via the AWS Console
(I will not elaborate 😅).

This PR allows ingesting CloudTrail audit logs sent to an S3 bucket
using the same approach as VPC flow logs or Load Balancer logs.

**Special notes for your reviewer**:

+ Because the Cloudtrail file format is not text but json, we stream the
json CloudTrail records instead of using the already existing scanner.
+ Because the Cloudtrail filename format is not the same as for the Flow
log or the Load balancer log files, we need to split the regexes by
service (although many AWS services seem to share the same
`defaultFilenameRegex`).
+ In the `getLabels` function, we expect the `type` parameter to be
found in the filename using the Regex. For some log files (ex:
Cloudfront log files). The file name has no reference to the service
name. This is why, as a default, when no type is found in the name of
the file, I set it to use the key of the matching Regex expression.

**Checklist**
- [x] Reviewed the
[`CONTRIBUTING.md`](https://github.com/grafana/loki/blob/main/CONTRIBUTING.md)
guide (**required**)
- [x] Documentation added
- [x] Tests updated
- [x] `CHANGELOG.md` updated
- [x] Changes that require user attention or interaction to upgrade are
documented in `docs/sources/upgrading/_index.md`
pull/9558/head^2
Christophe Collot 2 years ago committed by GitHub
parent a7976b584f
commit b709b32d6a
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      CHANGELOG.md
  2. 6
      docs/sources/clients/lambda-promtail/_index.md
  3. 32
      tools/lambda-promtail/lambda-promtail/cloudtrail.go
  4. 31
      tools/lambda-promtail/lambda-promtail/cloudtrail_test.go
  5. 54
      tools/lambda-promtail/lambda-promtail/json_stream.go
  6. 75
      tools/lambda-promtail/lambda-promtail/s3.go
  7. 60
      tools/lambda-promtail/lambda-promtail/s3_test.go
  8. BIN
      tools/lambda-promtail/testdata/cloudtrail-log-file.json.gz

@ -6,6 +6,7 @@
##### Enhancements
* [8067](https://github.com/grafana/loki/pull/9497) **CCOLLOT**: Lambda-Promtail: Add support for AWS CloudTrail log ingestion.
* [9515](https://github.com/grafana/loki/pull/9515) **MichelHollands**: Fix String() on vector aggregation LogQL expressions that contain `without ()`.
* [8067](https://github.com/grafana/loki/pull/8067) **DylanGuedes**: Distributor: Add auto-forget unhealthy members support.
* [9175](https://github.com/grafana/loki/pull/9175) **MichelHollands**: Ingester: update the `prepare_shutdown` endpoint so it supports GET and DELETE and stores the state on disk.

@ -5,7 +5,7 @@ weight: 20
---
# Lambda Promtail
Grafana Loki includes [Terraform](https://www.terraform.io/) and [CloudFormation](https://aws.amazon.com/cloudformation/) for shipping Cloudwatch and loadbalancer logs to Loki via a [lambda function](https://aws.amazon.com/lambda/). This is done via [lambda-promtail](https://github.com/grafana/loki/blob/main/tools/lambda-promtail) which processes cloudwatch events and propagates them to Loki (or a Promtail instance) via the push-api [scrape config]({{<relref "../promtail/configuration#loki_push_api">}}).
Grafana Loki includes [Terraform](https://www.terraform.io/) and [CloudFormation](https://aws.amazon.com/cloudformation/) for shipping Cloudwatch, Cloudtrail, VPC Flow Logs and loadbalancer logs to Loki via a [lambda function](https://aws.amazon.com/lambda/). This is done via [lambda-promtail](https://github.com/grafana/loki/blob/main/tools/lambda-promtail) which processes cloudwatch events and propagates them to Loki (or a Promtail instance) via the push-api [scrape config]({{<relref "../promtail/configuration#loki_push_api">}}).
## Deployment
@ -105,6 +105,10 @@ One thing to be aware of with this is that the default flow log format doesn't h
This workflow allows ingesting AWS loadbalancer logs stored on S3 to Loki.
### Cloudtrail logs
This workflow allows ingesting AWS Cloudtrail logs stored on S3 to Loki.
### Cloudfront real-time logs
Cloudfront [real-time logs](https://docs.aws.amazon.com/AmazonCloudFront/latest/DeveloperGuide/real-time-logs.html) can be sent to a Kinesis data stream. The data stream can be mapped to be an [event source](https://docs.aws.amazon.com/lambda/latest/dg/invocation-eventsourcemapping.html) for lambda-promtail to deliver the logs to Loki.

@ -0,0 +1,32 @@
package main
import (
"encoding/json"
"time"
"github.com/grafana/loki/pkg/logproto"
)
// Parses a Cloudtrail Record and returns a logproto.Entry
func parseCloudtrailRecord(record Record) (logproto.Entry, error) {
timestamp := time.Now()
if record.Error != nil {
return logproto.Entry{}, record.Error
}
document, err := json.Marshal(record.Content)
if err != nil {
return logproto.Entry{}, err
}
if val, ok := record.Content["eventTime"]; ok {
time, err := time.Parse(time.RFC3339, val.(string))
if err != nil {
return logproto.Entry{}, err
} else {
timestamp = time
}
}
return logproto.Entry{
Line: string(document),
Timestamp: timestamp,
}, nil
}

@ -0,0 +1,31 @@
package main
import (
"compress/gzip"
"os"
"testing"
)
func TestParseJson(t *testing.T) {
records := make(chan Record)
jsonStream := NewJSONStream(records)
file, err := os.Open("../testdata/cloudtrail-log-file.json.gz")
if err != nil {
t.Error(err)
}
gzipReader,err := gzip.NewReader(file)
if err != nil {
t.Error(err)
}
go jsonStream.Start(gzipReader, 3)
for record := range jsonStream.records {
if record.Error != nil {
t.Error(record.Error)
}
_, err := parseCloudtrailRecord(record)
if err != nil {
t.Error(err)
}
}
}

@ -0,0 +1,54 @@
package main
import (
"encoding/json"
"fmt"
"io"
)
// Stream helps transmit each recordss withing a channel.
type Stream struct {
records chan Record
}
// Represents a Record in the Json Stream. If there is not error, error is nil
type Record struct {
Error error
Content map[string]any
}
// Creates a new Stream of records
func NewJSONStream(recordChan chan Record) Stream {
return Stream{
records: recordChan,
}
}
// Streams the JSON file starting from the target token, record by record.
func (s Stream) Start(r io.ReadCloser, tokenCountToTarget int) {
defer r.Close()
defer close(s.records)
var decoder *json.Decoder
decoder = json.NewDecoder(r)
// Skip the provided count of JSON tokens to get the the target array, ex: "{" "Record"
for i := 0; i < tokenCountToTarget; i++ {
_, err := decoder.Token()
if err != nil {
s.records <- Record{Error: fmt.Errorf("failed decoding beginning token: %w", err)}
return
}
}
// Read the JSON token content
i := 1
for decoder.More() {
var content map[string]any
if err := decoder.Decode(&content); err != nil {
s.records <- Record{Error: fmt.Errorf("failed decoding record %d: %w", i, err)}
return
}
s.records <- Record{Content: content}
i++
}
}

@ -20,6 +20,13 @@ import (
"github.com/aws/aws-sdk-go-v2/service/s3"
)
const (
FLOW_LOG_TYPE string = "vpcflowlogs"
LB_LOG_TYPE string = "elasticloadbalancing"
CLOUDTRAIL_LOG_TYPE string = "CloudTrail"
CLOUDTRAIL_DIGEST_LOG_TYPE string = "CloudTrail-Digest"
)
var (
// AWS Application Load Balancers
// source: https://docs.aws.amazon.com/elasticloadbalancing/latest/application/load-balancer-access-logs.html#access-log-file-format
@ -29,17 +36,20 @@ var (
// source: https://docs.aws.amazon.com/vpc/latest/userguide/flow-logs-s3.html#flow-logs-s3-path
// format: bucket-and-optional-prefix/AWSLogs/account_id/vpcflowlogs/region/year/month/day/aws_account_id_vpcflowlogs_region_flow_log_id_YYYYMMDDTHHmmZ_hash.log.gz
// example: 123456789012_vpcflowlogs_us-east-1_fl-1234abcd_20180620T1620Z_fe123456.log.gz
filenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>\w+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
// CloudTrail
// source: https://docs.aws.amazon.com/awscloudtrail/latest/userguide/cloudtrail-log-file-examples.html#cloudtrail-log-filename-format
// example: 111122223333_CloudTrail_us-east-2_20150801T0210Z_Mu0KsOhtH1ar15ZZ.json.gz
defaultFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:elasticloadbalancing|vpcflowlogs)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?(?P<src>[a-zA-Z0-9\-]+)`)
cloudtrailFilenameRegex = regexp.MustCompile(`AWSLogs\/(?P<account_id>\d+)\/(?P<type>[a-zA-Z0-9_\-]+)\/(?P<region>[\w-]+)\/(?P<year>\d+)\/(?P<month>\d+)\/(?P<day>\d+)\/\d+\_(?:CloudTrail|CloudTrail-Digest)\_\w+-\w+-\d_(?:(?:app|nlb|net)\.*?)?.+_(?P<src>[a-zA-Z0-9\-]+)`)
filenameRegexes = map[string]*regexp.Regexp{
FLOW_LOG_TYPE: defaultFilenameRegex,
LB_LOG_TYPE: defaultFilenameRegex,
CLOUDTRAIL_LOG_TYPE: cloudtrailFilenameRegex,
}
// regex that extracts the timestamp (RFC3339) from message log
timestampRegex = regexp.MustCompile(`\w+ (?P<timestamp>\d+-\d+-\d+T\d+:\d+:\d+\.\d+Z)`)
)
const (
FLOW_LOG_TYPE string = "vpcflowlogs"
LB_LOG_TYPE string = "elasticloadbalancing"
)
func getS3Client(ctx context.Context, region string) (*s3.Client, error) {
var s3Client *s3.Client
@ -65,12 +75,18 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
scanner := bufio.NewScanner(gzreader)
skipHeader := false
logType := labels["type"]
if labels["type"] == FLOW_LOG_TYPE {
logType := ""
switch labels["type"] {
case FLOW_LOG_TYPE:
skipHeader = true
logType = "s3_vpc_flow"
} else if labels["type"] == LB_LOG_TYPE {
case LB_LOG_TYPE:
logType = "s3_lb"
case CLOUDTRAIL_LOG_TYPE:
logType = "s3_cloudtrail"
case CLOUDTRAIL_DIGEST_LOG_TYPE:
// do not ingest digest files' content
return nil
}
ls := model.LabelSet{
@ -82,6 +98,27 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
ls = applyExtraLabels(ls)
timestamp := time.Now()
// extract the timestamp of the nested event and sends the rest as raw json
if labels["type"] == CLOUDTRAIL_LOG_TYPE {
records := make(chan Record)
jsonStream := NewJSONStream(records)
go jsonStream.Start(gzreader, 3)
// Stream json file
for record := range jsonStream.records {
if record.Error != nil {
return record.Error
}
trailEntry, err := parseCloudtrailRecord(record)
if err != nil {
return err
}
if err := b.add(ctx, entry{ls, trailEntry}); err != nil {
return err
}
}
return nil
}
var lineCount int
for scanner.Scan() {
log_line := scanner.Text()
@ -113,21 +150,29 @@ func parseS3Log(ctx context.Context, b *batch, labels map[string]string, obj io.
}
func getLabels(record events.S3EventRecord) (map[string]string, error) {
labels := make(map[string]string)
labels["key"] = record.S3.Object.Key
labels["bucket"] = record.S3.Bucket.Name
labels["bucket_owner"] = record.S3.Bucket.OwnerIdentity.PrincipalID
labels["bucket_region"] = record.AWSRegion
match := filenameRegex.FindStringSubmatch(labels["key"])
for i, name := range filenameRegex.SubexpNames() {
var matchingExp *regexp.Regexp
var matchingType *string
for key, exp := range filenameRegexes {
if exp.MatchString(labels["key"]) {
matchingExp = exp
matchingType = aws.String(key)
}
}
match := matchingExp.FindStringSubmatch(labels["key"])
for i, name := range matchingExp.SubexpNames() {
if i != 0 && name != "" {
labels[name] = match[i]
}
}
if labels["type"] == "" {
labels["type"] = *matchingType
}
return labels, nil
}

@ -50,7 +50,7 @@ func Test_getLabels(t *testing.T) {
"month": "01",
"region": "us-east-1",
"src": "my-loadbalancer",
"type": "elasticloadbalancing",
"type": LB_LOG_TYPE,
"year": "2022",
},
wantErr: false,
@ -62,7 +62,7 @@ func Test_getLabels(t *testing.T) {
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "elb_logs_test",
Name: "vpc_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
@ -75,7 +75,7 @@ func Test_getLabels(t *testing.T) {
},
want: map[string]string{
"account_id": "123456789012",
"bucket": "elb_logs_test",
"bucket": "vpc_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "24",
@ -83,7 +83,40 @@ func Test_getLabels(t *testing.T) {
"month": "01",
"region": "us-east-1",
"src": "fl-1234abcd",
"type": "vpcflowlogs",
"type": FLOW_LOG_TYPE,
"year": "2022",
},
wantErr: false,
},
{
name: "cloudtrail_logs",
args: args{
record: events.S3EventRecord{
AWSRegion: "us-east-1",
S3: events.S3Entity{
Bucket: events.S3Bucket{
Name: "cloudtrail_logs_test",
OwnerIdentity: events.S3UserIdentity{
PrincipalID: "test",
},
},
Object: events.S3Object{
Key: "my-bucket/AWSLogs/123456789012/CloudTrail/us-east-1/2022/01/24/123456789012_CloudTrail_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz",
},
},
},
},
want: map[string]string{
"account_id": "123456789012",
"bucket": "cloudtrail_logs_test",
"bucket_owner": "test",
"bucket_region": "us-east-1",
"day": "24",
"key": "my-bucket/AWSLogs/123456789012/CloudTrail/us-east-1/2022/01/24/123456789012_CloudTrail_us-east-1_20220124T0000Z_4jhzXFO2Jlvu2b3y.json.gz",
"month": "01",
"region": "us-east-1",
"src": "4jhzXFO2Jlvu2b3y",
"type": CLOUDTRAIL_LOG_TYPE,
"year": "2022",
},
wantErr: false,
@ -151,6 +184,23 @@ func Test_parseS3Log(t *testing.T) {
expectedStream: `{__aws_log_type="s3_lb", __aws_s3_lb="source", __aws_s3_lb_owner="123456789"}`,
wantErr: false,
},
{
name: "cloudtraillogs",
args: args{
batchSize: 131072, // Set large enough we don't try and send to promtail
filename: "../testdata/cloudtrail-log-file.json.gz",
b: &batch{
streams: map[string]*logproto.Stream{},
},
labels: map[string]string{
"type": CLOUDTRAIL_LOG_TYPE,
"src": "source",
"account_id": "123456789",
},
},
expectedStream: `{__aws_log_type="s3_cloudtrail", __aws_s3_cloudtrail="source", __aws_s3_cloudtrail_owner="123456789"}`,
wantErr: false,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@ -160,11 +210,9 @@ func Test_parseS3Log(t *testing.T) {
if err != nil {
t.Errorf("parseS3Log() failed to open test file: %s - %v", tt.args.filename, err)
}
if err := parseS3Log(context.Background(), tt.args.b, tt.args.labels, tt.args.obj); (err != nil) != tt.wantErr {
t.Errorf("parseS3Log() error = %v, wantErr %v", err, tt.wantErr)
}
require.Len(t, tt.args.b.streams, 1)
stream, ok := tt.args.b.streams[tt.expectedStream]
require.True(t, ok, "batch does not contain stream: %s", tt.expectedStream)

Loading…
Cancel
Save