@ -2,6 +2,7 @@ package cloudwatch
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
@ -13,13 +14,13 @@ import (
"github.com/aws/aws-sdk-go/service/cloudwatchlogs/cloudwatchlogsiface"
"github.com/grafana/grafana-plugin-sdk-go/backend"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana/pkg/components/simplejson"
"golang.org/x/sync/errgroup"
)
const (
LimitExceededException = "LimitExceededException"
defaultLimit = 10
limitExceededException = "LimitExceededException"
defaultLimit = int64 ( 10 )
logGroupDefaultLimit = int64 ( 50 )
)
type AWSError struct {
@ -28,6 +29,26 @@ type AWSError struct {
Payload map [ string ] string
}
type LogQueryJson struct {
LogType string ` json:"type" `
SubType string
Limit * int64
Time int64
StartTime int64
EndTime int64
LogGroupName string
LogGroupNames [ ] string
LogGroupNamePrefix string
LogStreamName string
StartFromHead bool
Region string
QueryString string
QueryId string
StatsGroups [ ] string
Subtype string
Expression string
}
func ( e * AWSError ) Error ( ) string {
return fmt . Sprintf ( "%s: %s" , e . Code , e . Message )
}
@ -39,7 +60,8 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
eg , ectx := errgroup . WithContext ( ctx )
for _ , query := range req . Queries {
model , err := simplejson . NewJson ( query . JSON )
var model LogQueryJson
err := json . Unmarshal ( query . JSON , & model )
if err != nil {
return nil , err
}
@ -58,7 +80,7 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
return err
}
groupedFrames , err := groupResponseFrame ( dataframe , model . Get ( "statsGroups" ) . MustStringArray ( ) )
groupedFrames , err := groupResponseFrame ( dataframe , model . StatsGroups )
if err != nil {
return err
}
@ -86,25 +108,24 @@ func (e *cloudWatchExecutor) executeLogActions(ctx context.Context, req *backend
return resp , nil
}
func ( e * cloudWatchExecutor ) executeLogAction ( ctx context . Context , model * simplejson . Json , query backend . DataQuery , pluginCtx backend . PluginContext ) ( * data . Frame , error ) {
subType := model . Get ( "subtype" ) . MustString ( )
func ( e * cloudWatchExecutor ) executeLogAction ( ctx context . Context , model LogQueryJson , query backend . DataQuery , pluginCtx backend . PluginContext ) ( * data . Frame , error ) {
dsInfo , err := e . getDSInfo ( pluginCtx )
if err != nil {
return nil , err
}
defaultRegion := dsInfo . region
region := dsInfo . region
if model . Region != "" {
region = model . Region
}
region := model . Get ( "region" ) . MustString ( defaultRegion )
logsClient , err := e . getCWLogsClient ( pluginCtx , region )
if err != nil {
return nil , err
}
var data * data . Frame = nil
switch subType {
switch model . SubType {
case "DescribeLogGroups" :
data , err = e . handleDescribeLogGroups ( ctx , logsClient , model )
case "GetLogGroupFields" :
@ -119,38 +140,36 @@ func (e *cloudWatchExecutor) executeLogAction(ctx context.Context, model *simple
data , err = e . handleGetLogEvents ( ctx , logsClient , model )
}
if err != nil {
return nil , fmt . Errorf ( "failed to execute log action with subtype: %s: %w" , s ubType, err )
return nil , fmt . Errorf ( "failed to execute log action with subtype: %s: %w" , model . S ubType, err )
}
return data , nil
}
func ( e * cloudWatchExecutor ) handleGetLogEvents ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json ) ( * data . Frame , error ) {
queryRequest := & cloudwatchlogs . GetLogEventsInput {
Limit : aws . Int64 ( parameters . Get ( "limit" ) . MustInt64 ( defaultLimit ) ) ,
StartFromHead : aws . Bool ( parameters . Get ( "startFromHead" ) . MustBool ( false ) ) ,
parameters LogQuery Json) ( * data . Frame , error ) {
limit := defaultLimit
if parameters . Limit != nil && * parameters . Limit > 0 {
limit = * parameters . Limit
}
logGroupName , err := parameters . Get ( "logGroupName" ) . String ( )
if err != nil {
return nil , fmt . Errorf ( "Error: Parameter 'logGroupName' is required" )
queryRequest := & cloudwatchlogs . GetLogEventsInput {
Limit : aws . Int64 ( limit ) ,
StartFromHead : aws . Bool ( parameters . StartFromHead ) ,
}
queryRequest . SetLogGroupName ( logGroupName )
logStreamName , err := parameters . Get ( "logStreamName" ) . String ( )
if err != nil {
return nil , fmt . Errorf ( "Error: Parameter 'logStream' is required" )
if parameters . LogGroupName == "" {
return nil , fmt . Errorf ( "Error: Parameter 'logGroupName' is required" )
}
queryRequest . SetLogStreamName ( logStream Name )
queryRequest . SetLogGroupName ( parameters . LogGroup Name )
if startTime , err := parameters . Get ( "startTime" ) . Int64 ( ) ; err == nil {
queryRequest . SetStartTime ( startTime )
if parameters . LogStreamName == "" {
return nil , fmt . Errorf ( "Error: Parameter 'logStreamName' is required" )
}
queryRequest . SetLogStreamName ( parameters . LogStreamName )
if endTime , err := parameters . Get ( "endTime" ) . Int64 ( ) ; err == nil {
queryRequest . SetEndTime ( endTime )
}
queryRequest . SetStartTime ( parameters . StartTime )
queryRequest . SetEndTime ( parameters . EndTime )
logEvents , err := logsClient . GetLogEventsWithContext ( ctx , queryRequest )
if err != nil {
@ -178,19 +197,22 @@ func (e *cloudWatchExecutor) handleGetLogEvents(ctx context.Context, logsClient
}
func ( e * cloudWatchExecutor ) handleDescribeLogGroups ( ctx context . Context ,
logsClient cloudwatchlogsiface . CloudWatchLogsAPI , parameters * simplejson . Json ) ( * data . Frame , error ) {
logGroupNamePrefix := parameters . Get ( "logGroupNamePrefix" ) . MustString ( "" )
logsClient cloudwatchlogsiface . CloudWatchLogsAPI , parameters LogQueryJson ) ( * data . Frame , error ) {
logGroupLimit := logGroupDefaultLimit
if parameters . Limit != nil && * parameters . Limit != 0 {
logGroupLimit = * parameters . Limit
}
var response * cloudwatchlogs . DescribeLogGroupsOutput = nil
var err error
if len ( l ogGroupNamePrefix) == 0 {
if len ( parameters . L ogGroupNamePrefix) == 0 {
response , err = logsClient . DescribeLogGroupsWithContext ( ctx , & cloudwatchlogs . DescribeLogGroupsInput {
Limit : aws . Int64 ( parameters . Get ( "limit" ) . MustInt64 ( 50 ) ) ,
Limit : aws . Int64 ( logGroupLimit ) ,
} )
} else {
response , err = logsClient . DescribeLogGroupsWithContext ( ctx , & cloudwatchlogs . DescribeLogGroupsInput {
Limit : aws . Int64 ( parameters . Get ( "limit" ) . MustInt64 ( 50 ) ) ,
LogGroupNamePrefix : aws . String ( l ogGroupNamePrefix) ,
Limit : aws . Int64 ( logGroupLimit ) ,
LogGroupNamePrefix : aws . String ( parameters . L ogGroupNamePrefix) ,
} )
}
if err != nil || response == nil {
@ -209,7 +231,7 @@ func (e *cloudWatchExecutor) handleDescribeLogGroups(ctx context.Context,
}
func ( e * cloudWatchExecutor ) executeStartQuery ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json , timeRange backend . TimeRange ) ( * cloudwatchlogs . StartQueryOutput , error ) {
parameters LogQuery Json, timeRange backend . TimeRange ) ( * cloudwatchlogs . StartQueryOutput , error ) {
startTime := timeRange . From
endTime := timeRange . To
@ -222,7 +244,7 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
// The usage of ltrim around the @log/@logStream fields is a necessary workaround, as without it,
// CloudWatch wouldn't consider a query using a non-alised @log/@logStream valid.
modifiedQueryString := "fields @timestamp,ltrim(@log) as " + logIdentifierInternal + ",ltrim(@logStream) as " +
logStreamIdentifierInternal + "|" + parameters . Get ( "queryString" ) . MustString ( "" )
logStreamIdentifierInternal + "|" + parameters . QueryString
startQueryInput := & cloudwatchlogs . StartQueryInput {
StartTime : aws . Int64 ( startTime . Unix ( ) ) ,
@ -232,25 +254,25 @@ func (e *cloudWatchExecutor) executeStartQuery(ctx context.Context, logsClient c
// and also a little bit more but as CW logs accept only seconds as integers there is not much to do about
// that.
EndTime : aws . Int64 ( int64 ( math . Ceil ( float64 ( endTime . UnixNano ( ) ) / 1e9 ) ) ) ,
LogGroupNames : aws . StringSlice ( parameters . Get ( "logGroupNames" ) . MustStringArray ( ) ) ,
LogGroupNames : aws . StringSlice ( parameters . LogGroupNames ) ,
QueryString : aws . String ( modifiedQueryString ) ,
}
if resultsLimit , err := parameters . Get ( "limit" ) . Int64 ( ) ; err = = nil {
startQueryInput . Limit = aws . Int64 ( results Limit)
if parameters . Limit ! = nil {
startQueryInput . Limit = aws . Int64 ( * parameters . Limit )
}
return logsClient . StartQueryWithContext ( ctx , startQueryInput )
}
func ( e * cloudWatchExecutor ) handleStartQuery ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
model * simplejson . Json , timeRange backend . TimeRange , refID string ) ( * data . Frame , error ) {
model LogQuery Json, timeRange backend . TimeRange , refID string ) ( * data . Frame , error ) {
startQueryResponse , err := e . executeStartQuery ( ctx , logsClient , model , timeRange )
if err != nil {
var awsErr awserr . Error
if errors . As ( err , & awsErr ) && awsErr . Code ( ) == "LimitExceededException" {
plog . Debug ( "executeStartQuery limit exceeded" , "err" , awsErr )
return nil , & AWSError { Code : L imitExceededException, Message : err . Error ( ) }
return nil , & AWSError { Code : l imitExceededException, Message : err . Error ( ) }
}
return nil , err
}
@ -258,11 +280,14 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl
dataFrame := data . NewFrame ( refID , data . NewField ( "queryId" , nil , [ ] string { * startQueryResponse . QueryId } ) )
dataFrame . RefID = refID
clientRegion := model . Get ( "region" ) . MustString ( "default" )
region := "default"
if model . Region != "" {
region = model . Region
}
dataFrame . Meta = & data . FrameMeta {
Custom : map [ string ] interface { } {
"Region" : clientR egion,
"Region" : r egion,
} ,
}
@ -270,9 +295,9 @@ func (e *cloudWatchExecutor) handleStartQuery(ctx context.Context, logsClient cl
}
func ( e * cloudWatchExecutor ) executeStopQuery ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json ) ( * cloudwatchlogs . StopQueryOutput , error ) {
parameters LogQuery Json) ( * cloudwatchlogs . StopQueryOutput , error ) {
queryInput := & cloudwatchlogs . StopQueryInput {
QueryId : aws . String ( parameters . Get ( "queryId" ) . MustString ( ) ) ,
QueryId : aws . String ( parameters . QueryId ) ,
}
response , err := logsClient . StopQueryWithContext ( ctx , queryInput )
@ -291,7 +316,7 @@ func (e *cloudWatchExecutor) executeStopQuery(ctx context.Context, logsClient cl
}
func ( e * cloudWatchExecutor ) handleStopQuery ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json ) ( * data . Frame , error ) {
parameters LogQuery Json) ( * data . Frame , error ) {
response , err := e . executeStopQuery ( ctx , logsClient , parameters )
if err != nil {
return nil , err
@ -302,16 +327,16 @@ func (e *cloudWatchExecutor) handleStopQuery(ctx context.Context, logsClient clo
}
func ( e * cloudWatchExecutor ) executeGetQueryResults ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json ) ( * cloudwatchlogs . GetQueryResultsOutput , error ) {
parameters LogQuery Json) ( * cloudwatchlogs . GetQueryResultsOutput , error ) {
queryInput := & cloudwatchlogs . GetQueryResultsInput {
QueryId : aws . String ( parameters . Get ( "queryId" ) . MustString ( ) ) ,
QueryId : aws . String ( parameters . QueryId ) ,
}
return logsClient . GetQueryResultsWithContext ( ctx , queryInput )
}
func ( e * cloudWatchExecutor ) handleGetQueryResults ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json , refID string ) ( * data . Frame , error ) {
parameters LogQuery Json, refID string ) ( * data . Frame , error ) {
getQueryResultsOutput , err := e . executeGetQueryResults ( ctx , logsClient , parameters )
if err != nil {
return nil , err
@ -329,10 +354,10 @@ func (e *cloudWatchExecutor) handleGetQueryResults(ctx context.Context, logsClie
}
func ( e * cloudWatchExecutor ) handleGetLogGroupFields ( ctx context . Context , logsClient cloudwatchlogsiface . CloudWatchLogsAPI ,
parameters * simplejson . Json , refID string ) ( * data . Frame , error ) {
parameters LogQuery Json, refID string ) ( * data . Frame , error ) {
queryInput := & cloudwatchlogs . GetLogGroupFieldsInput {
LogGroupName : aws . String ( parameters . Get ( "logGroupName" ) . MustString ( ) ) ,
Time : aws . Int64 ( parameters . Get ( "time" ) . MustInt64 ( ) ) ,
LogGroupName : aws . String ( parameters . LogGroupName ) ,
Time : aws . Int64 ( parameters . Time ) ,
}
getLogGroupFieldsOutput , err := logsClient . GetLogGroupFieldsWithContext ( ctx , queryInput )