mirror of https://github.com/grafana/grafana
Merge pull request #11624 from mtanda/cw_get_metric_data
CloudWatch GetMetricData supportpull/12661/merge
commit
5129ea8f87
@ -0,0 +1,46 @@ |
||||
// Package csm provides Client Side Monitoring (CSM) which enables sending metrics
|
||||
// via UDP connection. Using the Start function will enable the reporting of
|
||||
// metrics on a given port. If Start is called, with different parameters, again,
|
||||
// a panic will occur.
|
||||
//
|
||||
// Pause can be called to pause any metrics publishing on a given port. Sessions
|
||||
// that have had their handlers modified via InjectHandlers may still be used.
|
||||
// However, the handlers will act as a no-op meaning no metrics will be published.
|
||||
//
|
||||
// Example:
|
||||
// r, err := csm.Start("clientID", ":31000")
|
||||
// if err != nil {
|
||||
// panic(fmt.Errorf("failed starting CSM: %v", err))
|
||||
// }
|
||||
//
|
||||
// sess, err := session.NewSession(&aws.Config{})
|
||||
// if err != nil {
|
||||
// panic(fmt.Errorf("failed loading session: %v", err))
|
||||
// }
|
||||
//
|
||||
// r.InjectHandlers(&sess.Handlers)
|
||||
//
|
||||
// client := s3.New(sess)
|
||||
// resp, err := client.GetObject(&s3.GetObjectInput{
|
||||
// Bucket: aws.String("bucket"),
|
||||
// Key: aws.String("key"),
|
||||
// })
|
||||
//
|
||||
// // Will pause monitoring
|
||||
// r.Pause()
|
||||
// resp, err = client.GetObject(&s3.GetObjectInput{
|
||||
// Bucket: aws.String("bucket"),
|
||||
// Key: aws.String("key"),
|
||||
// })
|
||||
//
|
||||
// // Resume monitoring
|
||||
// r.Continue()
|
||||
//
|
||||
// Start returns a Reporter that is used to enable or disable monitoring. If
|
||||
// access to the Reporter is required later, calling Get will return the Reporter
|
||||
// singleton.
|
||||
//
|
||||
// Example:
|
||||
// r := csm.Get()
|
||||
// r.Continue()
|
||||
package csm |
@ -0,0 +1,67 @@ |
||||
package csm |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sync" |
||||
) |
||||
|
||||
var ( |
||||
lock sync.Mutex |
||||
) |
||||
|
||||
// Client side metric handler names
|
||||
const ( |
||||
APICallMetricHandlerName = "awscsm.SendAPICallMetric" |
||||
APICallAttemptMetricHandlerName = "awscsm.SendAPICallAttemptMetric" |
||||
) |
||||
|
||||
// Start will start the a long running go routine to capture
|
||||
// client side metrics. Calling start multiple time will only
|
||||
// start the metric listener once and will panic if a different
|
||||
// client ID or port is passed in.
|
||||
//
|
||||
// Example:
|
||||
// r, err := csm.Start("clientID", "127.0.0.1:8094")
|
||||
// if err != nil {
|
||||
// panic(fmt.Errorf("expected no error, but received %v", err))
|
||||
// }
|
||||
// sess := session.NewSession()
|
||||
// r.InjectHandlers(sess.Handlers)
|
||||
//
|
||||
// svc := s3.New(sess)
|
||||
// out, err := svc.GetObject(&s3.GetObjectInput{
|
||||
// Bucket: aws.String("bucket"),
|
||||
// Key: aws.String("key"),
|
||||
// })
|
||||
func Start(clientID string, url string) (*Reporter, error) { |
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
|
||||
if sender == nil { |
||||
sender = newReporter(clientID, url) |
||||
} else { |
||||
if sender.clientID != clientID { |
||||
panic(fmt.Errorf("inconsistent client IDs. %q was expected, but received %q", sender.clientID, clientID)) |
||||
} |
||||
|
||||
if sender.url != url { |
||||
panic(fmt.Errorf("inconsistent URLs. %q was expected, but received %q", sender.url, url)) |
||||
} |
||||
} |
||||
|
||||
if err := connect(url); err != nil { |
||||
sender = nil |
||||
return nil, err |
||||
} |
||||
|
||||
return sender, nil |
||||
} |
||||
|
||||
// Get will return a reporter if one exists, if one does not exist, nil will
|
||||
// be returned.
|
||||
func Get() *Reporter { |
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
|
||||
return sender |
||||
} |
@ -0,0 +1,51 @@ |
||||
package csm |
||||
|
||||
import ( |
||||
"strconv" |
||||
"time" |
||||
) |
||||
|
||||
type metricTime time.Time |
||||
|
||||
func (t metricTime) MarshalJSON() ([]byte, error) { |
||||
ns := time.Duration(time.Time(t).UnixNano()) |
||||
return []byte(strconv.FormatInt(int64(ns/time.Millisecond), 10)), nil |
||||
} |
||||
|
||||
type metric struct { |
||||
ClientID *string `json:"ClientId,omitempty"` |
||||
API *string `json:"Api,omitempty"` |
||||
Service *string `json:"Service,omitempty"` |
||||
Timestamp *metricTime `json:"Timestamp,omitempty"` |
||||
Type *string `json:"Type,omitempty"` |
||||
Version *int `json:"Version,omitempty"` |
||||
|
||||
AttemptCount *int `json:"AttemptCount,omitempty"` |
||||
Latency *int `json:"Latency,omitempty"` |
||||
|
||||
Fqdn *string `json:"Fqdn,omitempty"` |
||||
UserAgent *string `json:"UserAgent,omitempty"` |
||||
AttemptLatency *int `json:"AttemptLatency,omitempty"` |
||||
|
||||
SessionToken *string `json:"SessionToken,omitempty"` |
||||
Region *string `json:"Region,omitempty"` |
||||
AccessKey *string `json:"AccessKey,omitempty"` |
||||
HTTPStatusCode *int `json:"HttpStatusCode,omitempty"` |
||||
XAmzID2 *string `json:"XAmzId2,omitempty"` |
||||
XAmzRequestID *string `json:"XAmznRequestId,omitempty"` |
||||
|
||||
AWSException *string `json:"AwsException,omitempty"` |
||||
AWSExceptionMessage *string `json:"AwsExceptionMessage,omitempty"` |
||||
SDKException *string `json:"SdkException,omitempty"` |
||||
SDKExceptionMessage *string `json:"SdkExceptionMessage,omitempty"` |
||||
|
||||
DestinationIP *string `json:"DestinationIp,omitempty"` |
||||
ConnectionReused *int `json:"ConnectionReused,omitempty"` |
||||
|
||||
AcquireConnectionLatency *int `json:"AcquireConnectionLatency,omitempty"` |
||||
ConnectLatency *int `json:"ConnectLatency,omitempty"` |
||||
RequestLatency *int `json:"RequestLatency,omitempty"` |
||||
DNSLatency *int `json:"DnsLatency,omitempty"` |
||||
TCPLatency *int `json:"TcpLatency,omitempty"` |
||||
SSLLatency *int `json:"SslLatency,omitempty"` |
||||
} |
@ -0,0 +1,54 @@ |
||||
package csm |
||||
|
||||
import ( |
||||
"sync/atomic" |
||||
) |
||||
|
||||
const ( |
||||
runningEnum = iota |
||||
pausedEnum |
||||
) |
||||
|
||||
var ( |
||||
// MetricsChannelSize of metrics to hold in the channel
|
||||
MetricsChannelSize = 100 |
||||
) |
||||
|
||||
type metricChan struct { |
||||
ch chan metric |
||||
paused int64 |
||||
} |
||||
|
||||
func newMetricChan(size int) metricChan { |
||||
return metricChan{ |
||||
ch: make(chan metric, size), |
||||
} |
||||
} |
||||
|
||||
func (ch *metricChan) Pause() { |
||||
atomic.StoreInt64(&ch.paused, pausedEnum) |
||||
} |
||||
|
||||
func (ch *metricChan) Continue() { |
||||
atomic.StoreInt64(&ch.paused, runningEnum) |
||||
} |
||||
|
||||
func (ch *metricChan) IsPaused() bool { |
||||
v := atomic.LoadInt64(&ch.paused) |
||||
return v == pausedEnum |
||||
} |
||||
|
||||
// Push will push metrics to the metric channel if the channel
|
||||
// is not paused
|
||||
func (ch *metricChan) Push(m metric) bool { |
||||
if ch.IsPaused() { |
||||
return false |
||||
} |
||||
|
||||
select { |
||||
case ch.ch <- m: |
||||
return true |
||||
default: |
||||
return false |
||||
} |
||||
} |
@ -0,0 +1,230 @@ |
||||
package csm |
||||
|
||||
import ( |
||||
"encoding/json" |
||||
"net" |
||||
"time" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/awserr" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
const ( |
||||
// DefaultPort is used when no port is specified
|
||||
DefaultPort = "31000" |
||||
) |
||||
|
||||
// Reporter will gather metrics of API requests made and
|
||||
// send those metrics to the CSM endpoint.
|
||||
type Reporter struct { |
||||
clientID string |
||||
url string |
||||
conn net.Conn |
||||
metricsCh metricChan |
||||
done chan struct{} |
||||
} |
||||
|
||||
var ( |
||||
sender *Reporter |
||||
) |
||||
|
||||
func connect(url string) error { |
||||
const network = "udp" |
||||
if err := sender.connect(network, url); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if sender.done == nil { |
||||
sender.done = make(chan struct{}) |
||||
go sender.start() |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func newReporter(clientID, url string) *Reporter { |
||||
return &Reporter{ |
||||
clientID: clientID, |
||||
url: url, |
||||
metricsCh: newMetricChan(MetricsChannelSize), |
||||
} |
||||
} |
||||
|
||||
func (rep *Reporter) sendAPICallAttemptMetric(r *request.Request) { |
||||
if rep == nil { |
||||
return |
||||
} |
||||
|
||||
now := time.Now() |
||||
creds, _ := r.Config.Credentials.Get() |
||||
|
||||
m := metric{ |
||||
ClientID: aws.String(rep.clientID), |
||||
API: aws.String(r.Operation.Name), |
||||
Service: aws.String(r.ClientInfo.ServiceID), |
||||
Timestamp: (*metricTime)(&now), |
||||
UserAgent: aws.String(r.HTTPRequest.Header.Get("User-Agent")), |
||||
Region: r.Config.Region, |
||||
Type: aws.String("ApiCallAttempt"), |
||||
Version: aws.Int(1), |
||||
|
||||
XAmzRequestID: aws.String(r.RequestID), |
||||
|
||||
AttemptCount: aws.Int(r.RetryCount + 1), |
||||
AttemptLatency: aws.Int(int(now.Sub(r.AttemptTime).Nanoseconds() / int64(time.Millisecond))), |
||||
AccessKey: aws.String(creds.AccessKeyID), |
||||
} |
||||
|
||||
if r.HTTPResponse != nil { |
||||
m.HTTPStatusCode = aws.Int(r.HTTPResponse.StatusCode) |
||||
} |
||||
|
||||
if r.Error != nil { |
||||
if awserr, ok := r.Error.(awserr.Error); ok { |
||||
setError(&m, awserr) |
||||
} |
||||
} |
||||
|
||||
rep.metricsCh.Push(m) |
||||
} |
||||
|
||||
func setError(m *metric, err awserr.Error) { |
||||
msg := err.Message() |
||||
code := err.Code() |
||||
|
||||
switch code { |
||||
case "RequestError", |
||||
"SerializationError", |
||||
request.CanceledErrorCode: |
||||
|
||||
m.SDKException = &code |
||||
m.SDKExceptionMessage = &msg |
||||
default: |
||||
m.AWSException = &code |
||||
m.AWSExceptionMessage = &msg |
||||
} |
||||
} |
||||
|
||||
func (rep *Reporter) sendAPICallMetric(r *request.Request) { |
||||
if rep == nil { |
||||
return |
||||
} |
||||
|
||||
now := time.Now() |
||||
m := metric{ |
||||
ClientID: aws.String(rep.clientID), |
||||
API: aws.String(r.Operation.Name), |
||||
Service: aws.String(r.ClientInfo.ServiceID), |
||||
Timestamp: (*metricTime)(&now), |
||||
Type: aws.String("ApiCall"), |
||||
AttemptCount: aws.Int(r.RetryCount + 1), |
||||
Latency: aws.Int(int(time.Now().Sub(r.Time) / time.Millisecond)), |
||||
XAmzRequestID: aws.String(r.RequestID), |
||||
} |
||||
|
||||
// TODO: Probably want to figure something out for logging dropped
|
||||
// metrics
|
||||
rep.metricsCh.Push(m) |
||||
} |
||||
|
||||
func (rep *Reporter) connect(network, url string) error { |
||||
if rep.conn != nil { |
||||
rep.conn.Close() |
||||
} |
||||
|
||||
conn, err := net.Dial(network, url) |
||||
if err != nil { |
||||
return awserr.New("UDPError", "Could not connect", err) |
||||
} |
||||
|
||||
rep.conn = conn |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (rep *Reporter) close() { |
||||
if rep.done != nil { |
||||
close(rep.done) |
||||
} |
||||
|
||||
rep.metricsCh.Pause() |
||||
} |
||||
|
||||
func (rep *Reporter) start() { |
||||
defer func() { |
||||
rep.metricsCh.Pause() |
||||
}() |
||||
|
||||
for { |
||||
select { |
||||
case <-rep.done: |
||||
rep.done = nil |
||||
return |
||||
case m := <-rep.metricsCh.ch: |
||||
// TODO: What to do with this error? Probably should just log
|
||||
b, err := json.Marshal(m) |
||||
if err != nil { |
||||
continue |
||||
} |
||||
|
||||
rep.conn.Write(b) |
||||
} |
||||
} |
||||
} |
||||
|
||||
// Pause will pause the metric channel preventing any new metrics from
|
||||
// being added.
|
||||
func (rep *Reporter) Pause() { |
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
|
||||
if rep == nil { |
||||
return |
||||
} |
||||
|
||||
rep.close() |
||||
} |
||||
|
||||
// Continue will reopen the metric channel and allow for monitoring
|
||||
// to be resumed.
|
||||
func (rep *Reporter) Continue() { |
||||
lock.Lock() |
||||
defer lock.Unlock() |
||||
if rep == nil { |
||||
return |
||||
} |
||||
|
||||
if !rep.metricsCh.IsPaused() { |
||||
return |
||||
} |
||||
|
||||
rep.metricsCh.Continue() |
||||
} |
||||
|
||||
// InjectHandlers will will enable client side metrics and inject the proper
|
||||
// handlers to handle how metrics are sent.
|
||||
//
|
||||
// Example:
|
||||
// // Start must be called in order to inject the correct handlers
|
||||
// r, err := csm.Start("clientID", "127.0.0.1:8094")
|
||||
// if err != nil {
|
||||
// panic(fmt.Errorf("expected no error, but received %v", err))
|
||||
// }
|
||||
//
|
||||
// sess := session.NewSession()
|
||||
// r.InjectHandlers(&sess.Handlers)
|
||||
//
|
||||
// // create a new service client with our client side metric session
|
||||
// svc := s3.New(sess)
|
||||
func (rep *Reporter) InjectHandlers(handlers *request.Handlers) { |
||||
if rep == nil { |
||||
return |
||||
} |
||||
|
||||
apiCallHandler := request.NamedHandler{Name: APICallMetricHandlerName, Fn: rep.sendAPICallMetric} |
||||
handlers.Complete.PushFrontNamed(apiCallHandler) |
||||
|
||||
apiCallAttemptHandler := request.NamedHandler{Name: APICallAttemptMetricHandlerName, Fn: rep.sendAPICallAttemptMetric} |
||||
handlers.AfterRetry.PushFrontNamed(apiCallAttemptHandler) |
||||
} |
@ -0,0 +1,144 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/base64" |
||||
"encoding/json" |
||||
"fmt" |
||||
"strconv" |
||||
) |
||||
|
||||
type decodedMessage struct { |
||||
rawMessage |
||||
Headers decodedHeaders `json:"headers"` |
||||
} |
||||
type jsonMessage struct { |
||||
Length json.Number `json:"total_length"` |
||||
HeadersLen json.Number `json:"headers_length"` |
||||
PreludeCRC json.Number `json:"prelude_crc"` |
||||
Headers decodedHeaders `json:"headers"` |
||||
Payload []byte `json:"payload"` |
||||
CRC json.Number `json:"message_crc"` |
||||
} |
||||
|
||||
func (d *decodedMessage) UnmarshalJSON(b []byte) (err error) { |
||||
var jsonMsg jsonMessage |
||||
if err = json.Unmarshal(b, &jsonMsg); err != nil { |
||||
return err |
||||
} |
||||
|
||||
d.Length, err = numAsUint32(jsonMsg.Length) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
d.HeadersLen, err = numAsUint32(jsonMsg.HeadersLen) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
d.PreludeCRC, err = numAsUint32(jsonMsg.PreludeCRC) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
d.Headers = jsonMsg.Headers |
||||
d.Payload = jsonMsg.Payload |
||||
d.CRC, err = numAsUint32(jsonMsg.CRC) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func (d *decodedMessage) MarshalJSON() ([]byte, error) { |
||||
jsonMsg := jsonMessage{ |
||||
Length: json.Number(strconv.Itoa(int(d.Length))), |
||||
HeadersLen: json.Number(strconv.Itoa(int(d.HeadersLen))), |
||||
PreludeCRC: json.Number(strconv.Itoa(int(d.PreludeCRC))), |
||||
Headers: d.Headers, |
||||
Payload: d.Payload, |
||||
CRC: json.Number(strconv.Itoa(int(d.CRC))), |
||||
} |
||||
|
||||
return json.Marshal(jsonMsg) |
||||
} |
||||
|
||||
func numAsUint32(n json.Number) (uint32, error) { |
||||
v, err := n.Int64() |
||||
if err != nil { |
||||
return 0, fmt.Errorf("failed to get int64 json number, %v", err) |
||||
} |
||||
|
||||
return uint32(v), nil |
||||
} |
||||
|
||||
func (d decodedMessage) Message() Message { |
||||
return Message{ |
||||
Headers: Headers(d.Headers), |
||||
Payload: d.Payload, |
||||
} |
||||
} |
||||
|
||||
type decodedHeaders Headers |
||||
|
||||
func (hs *decodedHeaders) UnmarshalJSON(b []byte) error { |
||||
var jsonHeaders []struct { |
||||
Name string `json:"name"` |
||||
Type valueType `json:"type"` |
||||
Value interface{} `json:"value"` |
||||
} |
||||
|
||||
decoder := json.NewDecoder(bytes.NewReader(b)) |
||||
decoder.UseNumber() |
||||
if err := decoder.Decode(&jsonHeaders); err != nil { |
||||
return err |
||||
} |
||||
|
||||
var headers Headers |
||||
for _, h := range jsonHeaders { |
||||
value, err := valueFromType(h.Type, h.Value) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
headers.Set(h.Name, value) |
||||
} |
||||
(*hs) = decodedHeaders(headers) |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func valueFromType(typ valueType, val interface{}) (Value, error) { |
||||
switch typ { |
||||
case trueValueType: |
||||
return BoolValue(true), nil |
||||
case falseValueType: |
||||
return BoolValue(false), nil |
||||
case int8ValueType: |
||||
v, err := val.(json.Number).Int64() |
||||
return Int8Value(int8(v)), err |
||||
case int16ValueType: |
||||
v, err := val.(json.Number).Int64() |
||||
return Int16Value(int16(v)), err |
||||
case int32ValueType: |
||||
v, err := val.(json.Number).Int64() |
||||
return Int32Value(int32(v)), err |
||||
case int64ValueType: |
||||
v, err := val.(json.Number).Int64() |
||||
return Int64Value(v), err |
||||
case bytesValueType: |
||||
v, err := base64.StdEncoding.DecodeString(val.(string)) |
||||
return BytesValue(v), err |
||||
case stringValueType: |
||||
v, err := base64.StdEncoding.DecodeString(val.(string)) |
||||
return StringValue(string(v)), err |
||||
case timestampValueType: |
||||
v, err := val.(json.Number).Int64() |
||||
return TimestampValue(timeFromEpochMilli(v)), err |
||||
case uuidValueType: |
||||
v, err := base64.StdEncoding.DecodeString(val.(string)) |
||||
var tv UUIDValue |
||||
copy(tv[:], v) |
||||
return tv, err |
||||
default: |
||||
panic(fmt.Sprintf("unknown type, %s, %T", typ.String(), val)) |
||||
} |
||||
} |
@ -0,0 +1,199 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/binary" |
||||
"encoding/hex" |
||||
"encoding/json" |
||||
"fmt" |
||||
"hash" |
||||
"hash/crc32" |
||||
"io" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
) |
||||
|
||||
// Decoder provides decoding of an Event Stream messages.
|
||||
type Decoder struct { |
||||
r io.Reader |
||||
logger aws.Logger |
||||
} |
||||
|
||||
// NewDecoder initializes and returns a Decoder for decoding event
|
||||
// stream messages from the reader provided.
|
||||
func NewDecoder(r io.Reader) *Decoder { |
||||
return &Decoder{ |
||||
r: r, |
||||
} |
||||
} |
||||
|
||||
// Decode attempts to decode a single message from the event stream reader.
|
||||
// Will return the event stream message, or error if Decode fails to read
|
||||
// the message from the stream.
|
||||
func (d *Decoder) Decode(payloadBuf []byte) (m Message, err error) { |
||||
reader := d.r |
||||
if d.logger != nil { |
||||
debugMsgBuf := bytes.NewBuffer(nil) |
||||
reader = io.TeeReader(reader, debugMsgBuf) |
||||
defer func() { |
||||
logMessageDecode(d.logger, debugMsgBuf, m, err) |
||||
}() |
||||
} |
||||
|
||||
crc := crc32.New(crc32IEEETable) |
||||
hashReader := io.TeeReader(reader, crc) |
||||
|
||||
prelude, err := decodePrelude(hashReader, crc) |
||||
if err != nil { |
||||
return Message{}, err |
||||
} |
||||
|
||||
if prelude.HeadersLen > 0 { |
||||
lr := io.LimitReader(hashReader, int64(prelude.HeadersLen)) |
||||
m.Headers, err = decodeHeaders(lr) |
||||
if err != nil { |
||||
return Message{}, err |
||||
} |
||||
} |
||||
|
||||
if payloadLen := prelude.PayloadLen(); payloadLen > 0 { |
||||
buf, err := decodePayload(payloadBuf, io.LimitReader(hashReader, int64(payloadLen))) |
||||
if err != nil { |
||||
return Message{}, err |
||||
} |
||||
m.Payload = buf |
||||
} |
||||
|
||||
msgCRC := crc.Sum32() |
||||
if err := validateCRC(reader, msgCRC); err != nil { |
||||
return Message{}, err |
||||
} |
||||
|
||||
return m, nil |
||||
} |
||||
|
||||
// UseLogger specifies the Logger that that the decoder should use to log the
|
||||
// message decode to.
|
||||
func (d *Decoder) UseLogger(logger aws.Logger) { |
||||
d.logger = logger |
||||
} |
||||
|
||||
func logMessageDecode(logger aws.Logger, msgBuf *bytes.Buffer, msg Message, decodeErr error) { |
||||
w := bytes.NewBuffer(nil) |
||||
defer func() { logger.Log(w.String()) }() |
||||
|
||||
fmt.Fprintf(w, "Raw message:\n%s\n", |
||||
hex.Dump(msgBuf.Bytes())) |
||||
|
||||
if decodeErr != nil { |
||||
fmt.Fprintf(w, "Decode error: %v\n", decodeErr) |
||||
return |
||||
} |
||||
|
||||
rawMsg, err := msg.rawMessage() |
||||
if err != nil { |
||||
fmt.Fprintf(w, "failed to create raw message, %v\n", err) |
||||
return |
||||
} |
||||
|
||||
decodedMsg := decodedMessage{ |
||||
rawMessage: rawMsg, |
||||
Headers: decodedHeaders(msg.Headers), |
||||
} |
||||
|
||||
fmt.Fprintf(w, "Decoded message:\n") |
||||
encoder := json.NewEncoder(w) |
||||
if err := encoder.Encode(decodedMsg); err != nil { |
||||
fmt.Fprintf(w, "failed to generate decoded message, %v\n", err) |
||||
} |
||||
} |
||||
|
||||
func decodePrelude(r io.Reader, crc hash.Hash32) (messagePrelude, error) { |
||||
var p messagePrelude |
||||
|
||||
var err error |
||||
p.Length, err = decodeUint32(r) |
||||
if err != nil { |
||||
return messagePrelude{}, err |
||||
} |
||||
|
||||
p.HeadersLen, err = decodeUint32(r) |
||||
if err != nil { |
||||
return messagePrelude{}, err |
||||
} |
||||
|
||||
if err := p.ValidateLens(); err != nil { |
||||
return messagePrelude{}, err |
||||
} |
||||
|
||||
preludeCRC := crc.Sum32() |
||||
if err := validateCRC(r, preludeCRC); err != nil { |
||||
return messagePrelude{}, err |
||||
} |
||||
|
||||
p.PreludeCRC = preludeCRC |
||||
|
||||
return p, nil |
||||
} |
||||
|
||||
func decodePayload(buf []byte, r io.Reader) ([]byte, error) { |
||||
w := bytes.NewBuffer(buf[0:0]) |
||||
|
||||
_, err := io.Copy(w, r) |
||||
return w.Bytes(), err |
||||
} |
||||
|
||||
func decodeUint8(r io.Reader) (uint8, error) { |
||||
type byteReader interface { |
||||
ReadByte() (byte, error) |
||||
} |
||||
|
||||
if br, ok := r.(byteReader); ok { |
||||
v, err := br.ReadByte() |
||||
return uint8(v), err |
||||
} |
||||
|
||||
var b [1]byte |
||||
_, err := io.ReadFull(r, b[:]) |
||||
return uint8(b[0]), err |
||||
} |
||||
func decodeUint16(r io.Reader) (uint16, error) { |
||||
var b [2]byte |
||||
bs := b[:] |
||||
_, err := io.ReadFull(r, bs) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return binary.BigEndian.Uint16(bs), nil |
||||
} |
||||
func decodeUint32(r io.Reader) (uint32, error) { |
||||
var b [4]byte |
||||
bs := b[:] |
||||
_, err := io.ReadFull(r, bs) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return binary.BigEndian.Uint32(bs), nil |
||||
} |
||||
func decodeUint64(r io.Reader) (uint64, error) { |
||||
var b [8]byte |
||||
bs := b[:] |
||||
_, err := io.ReadFull(r, bs) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return binary.BigEndian.Uint64(bs), nil |
||||
} |
||||
|
||||
func validateCRC(r io.Reader, expect uint32) error { |
||||
msgCRC, err := decodeUint32(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if msgCRC != expect { |
||||
return ChecksumError{} |
||||
} |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,114 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/binary" |
||||
"hash" |
||||
"hash/crc32" |
||||
"io" |
||||
) |
||||
|
||||
// Encoder provides EventStream message encoding.
|
||||
type Encoder struct { |
||||
w io.Writer |
||||
|
||||
headersBuf *bytes.Buffer |
||||
} |
||||
|
||||
// NewEncoder initializes and returns an Encoder to encode Event Stream
|
||||
// messages to an io.Writer.
|
||||
func NewEncoder(w io.Writer) *Encoder { |
||||
return &Encoder{ |
||||
w: w, |
||||
headersBuf: bytes.NewBuffer(nil), |
||||
} |
||||
} |
||||
|
||||
// Encode encodes a single EventStream message to the io.Writer the Encoder
|
||||
// was created with. An error is returned if writing the message fails.
|
||||
func (e *Encoder) Encode(msg Message) error { |
||||
e.headersBuf.Reset() |
||||
|
||||
err := encodeHeaders(e.headersBuf, msg.Headers) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
crc := crc32.New(crc32IEEETable) |
||||
hashWriter := io.MultiWriter(e.w, crc) |
||||
|
||||
headersLen := uint32(e.headersBuf.Len()) |
||||
payloadLen := uint32(len(msg.Payload)) |
||||
|
||||
if err := encodePrelude(hashWriter, crc, headersLen, payloadLen); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if headersLen > 0 { |
||||
if _, err := io.Copy(hashWriter, e.headersBuf); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
if payloadLen > 0 { |
||||
if _, err := hashWriter.Write(msg.Payload); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
msgCRC := crc.Sum32() |
||||
return binary.Write(e.w, binary.BigEndian, msgCRC) |
||||
} |
||||
|
||||
func encodePrelude(w io.Writer, crc hash.Hash32, headersLen, payloadLen uint32) error { |
||||
p := messagePrelude{ |
||||
Length: minMsgLen + headersLen + payloadLen, |
||||
HeadersLen: headersLen, |
||||
} |
||||
if err := p.ValidateLens(); err != nil { |
||||
return err |
||||
} |
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian, |
||||
p.Length, |
||||
p.HeadersLen, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
p.PreludeCRC = crc.Sum32() |
||||
err = binary.Write(w, binary.BigEndian, p.PreludeCRC) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func encodeHeaders(w io.Writer, headers Headers) error { |
||||
for _, h := range headers { |
||||
hn := headerName{ |
||||
Len: uint8(len(h.Name)), |
||||
} |
||||
copy(hn.Name[:hn.Len], h.Name) |
||||
if err := hn.encode(w); err != nil { |
||||
return err |
||||
} |
||||
|
||||
if err := h.Value.encode(w); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
func binaryWriteFields(w io.Writer, order binary.ByteOrder, vs ...interface{}) error { |
||||
for _, v := range vs { |
||||
if err := binary.Write(w, order, v); err != nil { |
||||
return err |
||||
} |
||||
} |
||||
return nil |
||||
} |
@ -0,0 +1,23 @@ |
||||
package eventstream |
||||
|
||||
import "fmt" |
||||
|
||||
// LengthError provides the error for items being larger than a maximum length.
|
||||
type LengthError struct { |
||||
Part string |
||||
Want int |
||||
Have int |
||||
Value interface{} |
||||
} |
||||
|
||||
func (e LengthError) Error() string { |
||||
return fmt.Sprintf("%s length invalid, %d/%d, %v", |
||||
e.Part, e.Want, e.Have, e.Value) |
||||
} |
||||
|
||||
// ChecksumError provides the error for message checksum invalidation errors.
|
||||
type ChecksumError struct{} |
||||
|
||||
func (e ChecksumError) Error() string { |
||||
return "message checksum mismatch" |
||||
} |
160
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/api.go
generated
vendored
160
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/api.go
generated
vendored
@ -0,0 +1,160 @@ |
||||
package eventstreamapi |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/private/protocol" |
||||
"github.com/aws/aws-sdk-go/private/protocol/eventstream" |
||||
) |
||||
|
||||
// Unmarshaler provides the interface for unmarshaling a EventStream
|
||||
// message into a SDK type.
|
||||
type Unmarshaler interface { |
||||
UnmarshalEvent(protocol.PayloadUnmarshaler, eventstream.Message) error |
||||
} |
||||
|
||||
// EventStream headers with specific meaning to async API functionality.
|
||||
const ( |
||||
MessageTypeHeader = `:message-type` // Identifies type of message.
|
||||
EventMessageType = `event` |
||||
ErrorMessageType = `error` |
||||
ExceptionMessageType = `exception` |
||||
|
||||
// Message Events
|
||||
EventTypeHeader = `:event-type` // Identifies message event type e.g. "Stats".
|
||||
|
||||
// Message Error
|
||||
ErrorCodeHeader = `:error-code` |
||||
ErrorMessageHeader = `:error-message` |
||||
|
||||
// Message Exception
|
||||
ExceptionTypeHeader = `:exception-type` |
||||
) |
||||
|
||||
// EventReader provides reading from the EventStream of an reader.
|
||||
type EventReader struct { |
||||
reader io.ReadCloser |
||||
decoder *eventstream.Decoder |
||||
|
||||
unmarshalerForEventType func(string) (Unmarshaler, error) |
||||
payloadUnmarshaler protocol.PayloadUnmarshaler |
||||
|
||||
payloadBuf []byte |
||||
} |
||||
|
||||
// NewEventReader returns a EventReader built from the reader and unmarshaler
|
||||
// provided. Use ReadStream method to start reading from the EventStream.
|
||||
func NewEventReader( |
||||
reader io.ReadCloser, |
||||
payloadUnmarshaler protocol.PayloadUnmarshaler, |
||||
unmarshalerForEventType func(string) (Unmarshaler, error), |
||||
) *EventReader { |
||||
return &EventReader{ |
||||
reader: reader, |
||||
decoder: eventstream.NewDecoder(reader), |
||||
payloadUnmarshaler: payloadUnmarshaler, |
||||
unmarshalerForEventType: unmarshalerForEventType, |
||||
payloadBuf: make([]byte, 10*1024), |
||||
} |
||||
} |
||||
|
||||
// UseLogger instructs the EventReader to use the logger and log level
|
||||
// specified.
|
||||
func (r *EventReader) UseLogger(logger aws.Logger, logLevel aws.LogLevelType) { |
||||
if logger != nil && logLevel.Matches(aws.LogDebugWithEventStreamBody) { |
||||
r.decoder.UseLogger(logger) |
||||
} |
||||
} |
||||
|
||||
// ReadEvent attempts to read a message from the EventStream and return the
|
||||
// unmarshaled event value that the message is for.
|
||||
//
|
||||
// For EventStream API errors check if the returned error satisfies the
|
||||
// awserr.Error interface to get the error's Code and Message components.
|
||||
//
|
||||
// EventUnmarshalers called with EventStream messages must take copies of the
|
||||
// message's Payload. The payload will is reused between events read.
|
||||
func (r *EventReader) ReadEvent() (event interface{}, err error) { |
||||
msg, err := r.decoder.Decode(r.payloadBuf) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
defer func() { |
||||
// Reclaim payload buffer for next message read.
|
||||
r.payloadBuf = msg.Payload[0:0] |
||||
}() |
||||
|
||||
typ, err := GetHeaderString(msg, MessageTypeHeader) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
switch typ { |
||||
case EventMessageType: |
||||
return r.unmarshalEventMessage(msg) |
||||
case ErrorMessageType: |
||||
return nil, r.unmarshalErrorMessage(msg) |
||||
default: |
||||
return nil, fmt.Errorf("unknown eventstream message type, %v", typ) |
||||
} |
||||
} |
||||
|
||||
func (r *EventReader) unmarshalEventMessage( |
||||
msg eventstream.Message, |
||||
) (event interface{}, err error) { |
||||
eventType, err := GetHeaderString(msg, EventTypeHeader) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
ev, err := r.unmarshalerForEventType(eventType) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
err = ev.UnmarshalEvent(r.payloadUnmarshaler, msg) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return ev, nil |
||||
} |
||||
|
||||
func (r *EventReader) unmarshalErrorMessage(msg eventstream.Message) (err error) { |
||||
var msgErr messageError |
||||
|
||||
msgErr.code, err = GetHeaderString(msg, ErrorCodeHeader) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
msgErr.msg, err = GetHeaderString(msg, ErrorMessageHeader) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
return msgErr |
||||
} |
||||
|
||||
// Close closes the EventReader's EventStream reader.
|
||||
func (r *EventReader) Close() error { |
||||
return r.reader.Close() |
||||
} |
||||
|
||||
// GetHeaderString returns the value of the header as a string. If the header
|
||||
// is not set or the value is not a string an error will be returned.
|
||||
func GetHeaderString(msg eventstream.Message, headerName string) (string, error) { |
||||
headerVal := msg.Headers.Get(headerName) |
||||
if headerVal == nil { |
||||
return "", fmt.Errorf("error header %s not present", headerName) |
||||
} |
||||
|
||||
v, ok := headerVal.Get().(string) |
||||
if !ok { |
||||
return "", fmt.Errorf("error header value is not a string, %T", headerVal) |
||||
} |
||||
|
||||
return v, nil |
||||
} |
24
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/error.go
generated
vendored
24
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/eventstreamapi/error.go
generated
vendored
@ -0,0 +1,24 @@ |
||||
package eventstreamapi |
||||
|
||||
import "fmt" |
||||
|
||||
type messageError struct { |
||||
code string |
||||
msg string |
||||
} |
||||
|
||||
func (e messageError) Code() string { |
||||
return e.code |
||||
} |
||||
|
||||
func (e messageError) Message() string { |
||||
return e.msg |
||||
} |
||||
|
||||
func (e messageError) Error() string { |
||||
return fmt.Sprintf("%s: %s", e.code, e.msg) |
||||
} |
||||
|
||||
func (e messageError) OrigErr() error { |
||||
return nil |
||||
} |
@ -0,0 +1,166 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
) |
||||
|
||||
// Headers are a collection of EventStream header values.
|
||||
type Headers []Header |
||||
|
||||
// Header is a single EventStream Key Value header pair.
|
||||
type Header struct { |
||||
Name string |
||||
Value Value |
||||
} |
||||
|
||||
// Set associates the name with a value. If the header name already exists in
|
||||
// the Headers the value will be replaced with the new one.
|
||||
func (hs *Headers) Set(name string, value Value) { |
||||
var i int |
||||
for ; i < len(*hs); i++ { |
||||
if (*hs)[i].Name == name { |
||||
(*hs)[i].Value = value |
||||
return |
||||
} |
||||
} |
||||
|
||||
*hs = append(*hs, Header{ |
||||
Name: name, Value: value, |
||||
}) |
||||
} |
||||
|
||||
// Get returns the Value associated with the header. Nil is returned if the
|
||||
// value does not exist.
|
||||
func (hs Headers) Get(name string) Value { |
||||
for i := 0; i < len(hs); i++ { |
||||
if h := hs[i]; h.Name == name { |
||||
return h.Value |
||||
} |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// Del deletes the value in the Headers if it exists.
|
||||
func (hs *Headers) Del(name string) { |
||||
for i := 0; i < len(*hs); i++ { |
||||
if (*hs)[i].Name == name { |
||||
copy((*hs)[i:], (*hs)[i+1:]) |
||||
(*hs) = (*hs)[:len(*hs)-1] |
||||
} |
||||
} |
||||
} |
||||
|
||||
func decodeHeaders(r io.Reader) (Headers, error) { |
||||
hs := Headers{} |
||||
|
||||
for { |
||||
name, err := decodeHeaderName(r) |
||||
if err != nil { |
||||
if err == io.EOF { |
||||
// EOF while getting header name means no more headers
|
||||
break |
||||
} |
||||
return nil, err |
||||
} |
||||
|
||||
value, err := decodeHeaderValue(r) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
hs.Set(name, value) |
||||
} |
||||
|
||||
return hs, nil |
||||
} |
||||
|
||||
func decodeHeaderName(r io.Reader) (string, error) { |
||||
var n headerName |
||||
|
||||
var err error |
||||
n.Len, err = decodeUint8(r) |
||||
if err != nil { |
||||
return "", err |
||||
} |
||||
|
||||
name := n.Name[:n.Len] |
||||
if _, err := io.ReadFull(r, name); err != nil { |
||||
return "", err |
||||
} |
||||
|
||||
return string(name), nil |
||||
} |
||||
|
||||
func decodeHeaderValue(r io.Reader) (Value, error) { |
||||
var raw rawValue |
||||
|
||||
typ, err := decodeUint8(r) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
raw.Type = valueType(typ) |
||||
|
||||
var v Value |
||||
|
||||
switch raw.Type { |
||||
case trueValueType: |
||||
v = BoolValue(true) |
||||
case falseValueType: |
||||
v = BoolValue(false) |
||||
case int8ValueType: |
||||
var tv Int8Value |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case int16ValueType: |
||||
var tv Int16Value |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case int32ValueType: |
||||
var tv Int32Value |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case int64ValueType: |
||||
var tv Int64Value |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case bytesValueType: |
||||
var tv BytesValue |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case stringValueType: |
||||
var tv StringValue |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case timestampValueType: |
||||
var tv TimestampValue |
||||
err = tv.decode(r) |
||||
v = tv |
||||
case uuidValueType: |
||||
var tv UUIDValue |
||||
err = tv.decode(r) |
||||
v = tv |
||||
default: |
||||
panic(fmt.Sprintf("unknown value type %d", raw.Type)) |
||||
} |
||||
|
||||
// Error could be EOF, let caller deal with it
|
||||
return v, err |
||||
} |
||||
|
||||
const maxHeaderNameLen = 255 |
||||
|
||||
type headerName struct { |
||||
Len uint8 |
||||
Name [maxHeaderNameLen]byte |
||||
} |
||||
|
||||
func (v headerName) encode(w io.Writer) error { |
||||
if err := binary.Write(w, binary.BigEndian, v.Len); err != nil { |
||||
return err |
||||
} |
||||
|
||||
_, err := w.Write(v.Name[:v.Len]) |
||||
return err |
||||
} |
501
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header_value.go
generated
vendored
501
vendor/github.com/aws/aws-sdk-go/private/protocol/eventstream/header_value.go
generated
vendored
@ -0,0 +1,501 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"encoding/base64" |
||||
"encoding/binary" |
||||
"fmt" |
||||
"io" |
||||
"strconv" |
||||
"time" |
||||
) |
||||
|
||||
const maxHeaderValueLen = 1<<15 - 1 // 2^15-1 or 32KB - 1
|
||||
|
||||
// valueType is the EventStream header value type.
|
||||
type valueType uint8 |
||||
|
||||
// Header value types
|
||||
const ( |
||||
trueValueType valueType = iota |
||||
falseValueType |
||||
int8ValueType // Byte
|
||||
int16ValueType // Short
|
||||
int32ValueType // Integer
|
||||
int64ValueType // Long
|
||||
bytesValueType |
||||
stringValueType |
||||
timestampValueType |
||||
uuidValueType |
||||
) |
||||
|
||||
func (t valueType) String() string { |
||||
switch t { |
||||
case trueValueType: |
||||
return "bool" |
||||
case falseValueType: |
||||
return "bool" |
||||
case int8ValueType: |
||||
return "int8" |
||||
case int16ValueType: |
||||
return "int16" |
||||
case int32ValueType: |
||||
return "int32" |
||||
case int64ValueType: |
||||
return "int64" |
||||
case bytesValueType: |
||||
return "byte_array" |
||||
case stringValueType: |
||||
return "string" |
||||
case timestampValueType: |
||||
return "timestamp" |
||||
case uuidValueType: |
||||
return "uuid" |
||||
default: |
||||
return fmt.Sprintf("unknown value type %d", uint8(t)) |
||||
} |
||||
} |
||||
|
||||
type rawValue struct { |
||||
Type valueType |
||||
Len uint16 // Only set for variable length slices
|
||||
Value []byte // byte representation of value, BigEndian encoding.
|
||||
} |
||||
|
||||
func (r rawValue) encodeScalar(w io.Writer, v interface{}) error { |
||||
return binaryWriteFields(w, binary.BigEndian, |
||||
r.Type, |
||||
v, |
||||
) |
||||
} |
||||
|
||||
func (r rawValue) encodeFixedSlice(w io.Writer, v []byte) error { |
||||
binary.Write(w, binary.BigEndian, r.Type) |
||||
|
||||
_, err := w.Write(v) |
||||
return err |
||||
} |
||||
|
||||
func (r rawValue) encodeBytes(w io.Writer, v []byte) error { |
||||
if len(v) > maxHeaderValueLen { |
||||
return LengthError{ |
||||
Part: "header value", |
||||
Want: maxHeaderValueLen, Have: len(v), |
||||
Value: v, |
||||
} |
||||
} |
||||
r.Len = uint16(len(v)) |
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian, |
||||
r.Type, |
||||
r.Len, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
_, err = w.Write(v) |
||||
return err |
||||
} |
||||
|
||||
func (r rawValue) encodeString(w io.Writer, v string) error { |
||||
if len(v) > maxHeaderValueLen { |
||||
return LengthError{ |
||||
Part: "header value", |
||||
Want: maxHeaderValueLen, Have: len(v), |
||||
Value: v, |
||||
} |
||||
} |
||||
r.Len = uint16(len(v)) |
||||
|
||||
type stringWriter interface { |
||||
WriteString(string) (int, error) |
||||
} |
||||
|
||||
err := binaryWriteFields(w, binary.BigEndian, |
||||
r.Type, |
||||
r.Len, |
||||
) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
if sw, ok := w.(stringWriter); ok { |
||||
_, err = sw.WriteString(v) |
||||
} else { |
||||
_, err = w.Write([]byte(v)) |
||||
} |
||||
|
||||
return err |
||||
} |
||||
|
||||
func decodeFixedBytesValue(r io.Reader, buf []byte) error { |
||||
_, err := io.ReadFull(r, buf) |
||||
return err |
||||
} |
||||
|
||||
func decodeBytesValue(r io.Reader) ([]byte, error) { |
||||
var raw rawValue |
||||
var err error |
||||
raw.Len, err = decodeUint16(r) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
buf := make([]byte, raw.Len) |
||||
_, err = io.ReadFull(r, buf) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return buf, nil |
||||
} |
||||
|
||||
func decodeStringValue(r io.Reader) (string, error) { |
||||
v, err := decodeBytesValue(r) |
||||
return string(v), err |
||||
} |
||||
|
||||
// Value represents the abstract header value.
|
||||
type Value interface { |
||||
Get() interface{} |
||||
String() string |
||||
valueType() valueType |
||||
encode(io.Writer) error |
||||
} |
||||
|
||||
// An BoolValue provides eventstream encoding, and representation
|
||||
// of a Go bool value.
|
||||
type BoolValue bool |
||||
|
||||
// Get returns the underlying type
|
||||
func (v BoolValue) Get() interface{} { |
||||
return bool(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (v BoolValue) valueType() valueType { |
||||
if v { |
||||
return trueValueType |
||||
} |
||||
return falseValueType |
||||
} |
||||
|
||||
func (v BoolValue) String() string { |
||||
return strconv.FormatBool(bool(v)) |
||||
} |
||||
|
||||
// encode encodes the BoolValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v BoolValue) encode(w io.Writer) error { |
||||
return binary.Write(w, binary.BigEndian, v.valueType()) |
||||
} |
||||
|
||||
// An Int8Value provides eventstream encoding, and representation of a Go
|
||||
// int8 value.
|
||||
type Int8Value int8 |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int8Value) Get() interface{} { |
||||
return int8(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int8Value) valueType() valueType { |
||||
return int8ValueType |
||||
} |
||||
|
||||
func (v Int8Value) String() string { |
||||
return fmt.Sprintf("0x%02x", int8(v)) |
||||
} |
||||
|
||||
// encode encodes the Int8Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int8Value) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
|
||||
return raw.encodeScalar(w, v) |
||||
} |
||||
|
||||
func (v *Int8Value) decode(r io.Reader) error { |
||||
n, err := decodeUint8(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = Int8Value(n) |
||||
return nil |
||||
} |
||||
|
||||
// An Int16Value provides eventstream encoding, and representation of a Go
|
||||
// int16 value.
|
||||
type Int16Value int16 |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int16Value) Get() interface{} { |
||||
return int16(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int16Value) valueType() valueType { |
||||
return int16ValueType |
||||
} |
||||
|
||||
func (v Int16Value) String() string { |
||||
return fmt.Sprintf("0x%04x", int16(v)) |
||||
} |
||||
|
||||
// encode encodes the Int16Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int16Value) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
return raw.encodeScalar(w, v) |
||||
} |
||||
|
||||
func (v *Int16Value) decode(r io.Reader) error { |
||||
n, err := decodeUint16(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = Int16Value(n) |
||||
return nil |
||||
} |
||||
|
||||
// An Int32Value provides eventstream encoding, and representation of a Go
|
||||
// int32 value.
|
||||
type Int32Value int32 |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int32Value) Get() interface{} { |
||||
return int32(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int32Value) valueType() valueType { |
||||
return int32ValueType |
||||
} |
||||
|
||||
func (v Int32Value) String() string { |
||||
return fmt.Sprintf("0x%08x", int32(v)) |
||||
} |
||||
|
||||
// encode encodes the Int32Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int32Value) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
return raw.encodeScalar(w, v) |
||||
} |
||||
|
||||
func (v *Int32Value) decode(r io.Reader) error { |
||||
n, err := decodeUint32(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = Int32Value(n) |
||||
return nil |
||||
} |
||||
|
||||
// An Int64Value provides eventstream encoding, and representation of a Go
|
||||
// int64 value.
|
||||
type Int64Value int64 |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v Int64Value) Get() interface{} { |
||||
return int64(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (Int64Value) valueType() valueType { |
||||
return int64ValueType |
||||
} |
||||
|
||||
func (v Int64Value) String() string { |
||||
return fmt.Sprintf("0x%016x", int64(v)) |
||||
} |
||||
|
||||
// encode encodes the Int64Value into an eventstream binary value
|
||||
// representation.
|
||||
func (v Int64Value) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
return raw.encodeScalar(w, v) |
||||
} |
||||
|
||||
func (v *Int64Value) decode(r io.Reader) error { |
||||
n, err := decodeUint64(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = Int64Value(n) |
||||
return nil |
||||
} |
||||
|
||||
// An BytesValue provides eventstream encoding, and representation of a Go
|
||||
// byte slice.
|
||||
type BytesValue []byte |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v BytesValue) Get() interface{} { |
||||
return []byte(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (BytesValue) valueType() valueType { |
||||
return bytesValueType |
||||
} |
||||
|
||||
func (v BytesValue) String() string { |
||||
return base64.StdEncoding.EncodeToString([]byte(v)) |
||||
} |
||||
|
||||
// encode encodes the BytesValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v BytesValue) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
|
||||
return raw.encodeBytes(w, []byte(v)) |
||||
} |
||||
|
||||
func (v *BytesValue) decode(r io.Reader) error { |
||||
buf, err := decodeBytesValue(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = BytesValue(buf) |
||||
return nil |
||||
} |
||||
|
||||
// An StringValue provides eventstream encoding, and representation of a Go
|
||||
// string.
|
||||
type StringValue string |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v StringValue) Get() interface{} { |
||||
return string(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (StringValue) valueType() valueType { |
||||
return stringValueType |
||||
} |
||||
|
||||
func (v StringValue) String() string { |
||||
return string(v) |
||||
} |
||||
|
||||
// encode encodes the StringValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v StringValue) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
|
||||
return raw.encodeString(w, string(v)) |
||||
} |
||||
|
||||
func (v *StringValue) decode(r io.Reader) error { |
||||
s, err := decodeStringValue(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = StringValue(s) |
||||
return nil |
||||
} |
||||
|
||||
// An TimestampValue provides eventstream encoding, and representation of a Go
|
||||
// timestamp.
|
||||
type TimestampValue time.Time |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v TimestampValue) Get() interface{} { |
||||
return time.Time(v) |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (TimestampValue) valueType() valueType { |
||||
return timestampValueType |
||||
} |
||||
|
||||
func (v TimestampValue) epochMilli() int64 { |
||||
nano := time.Time(v).UnixNano() |
||||
msec := nano / int64(time.Millisecond) |
||||
return msec |
||||
} |
||||
|
||||
func (v TimestampValue) String() string { |
||||
msec := v.epochMilli() |
||||
return strconv.FormatInt(msec, 10) |
||||
} |
||||
|
||||
// encode encodes the TimestampValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v TimestampValue) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
|
||||
msec := v.epochMilli() |
||||
return raw.encodeScalar(w, msec) |
||||
} |
||||
|
||||
func (v *TimestampValue) decode(r io.Reader) error { |
||||
n, err := decodeUint64(r) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
|
||||
*v = TimestampValue(timeFromEpochMilli(int64(n))) |
||||
return nil |
||||
} |
||||
|
||||
func timeFromEpochMilli(t int64) time.Time { |
||||
secs := t / 1e3 |
||||
msec := t % 1e3 |
||||
return time.Unix(secs, msec*int64(time.Millisecond)) |
||||
} |
||||
|
||||
// An UUIDValue provides eventstream encoding, and representation of a UUID
|
||||
// value.
|
||||
type UUIDValue [16]byte |
||||
|
||||
// Get returns the underlying value.
|
||||
func (v UUIDValue) Get() interface{} { |
||||
return v[:] |
||||
} |
||||
|
||||
// valueType returns the EventStream header value type value.
|
||||
func (UUIDValue) valueType() valueType { |
||||
return uuidValueType |
||||
} |
||||
|
||||
func (v UUIDValue) String() string { |
||||
return fmt.Sprintf(`%X-%X-%X-%X-%X`, v[0:4], v[4:6], v[6:8], v[8:10], v[10:]) |
||||
} |
||||
|
||||
// encode encodes the UUIDValue into an eventstream binary value
|
||||
// representation.
|
||||
func (v UUIDValue) encode(w io.Writer) error { |
||||
raw := rawValue{ |
||||
Type: v.valueType(), |
||||
} |
||||
|
||||
return raw.encodeFixedSlice(w, v[:]) |
||||
} |
||||
|
||||
func (v *UUIDValue) decode(r io.Reader) error { |
||||
tv := (*v)[:] |
||||
return decodeFixedBytesValue(r, tv) |
||||
} |
@ -0,0 +1,103 @@ |
||||
package eventstream |
||||
|
||||
import ( |
||||
"bytes" |
||||
"encoding/binary" |
||||
"hash/crc32" |
||||
) |
||||
|
||||
const preludeLen = 8 |
||||
const preludeCRCLen = 4 |
||||
const msgCRCLen = 4 |
||||
const minMsgLen = preludeLen + preludeCRCLen + msgCRCLen |
||||
const maxPayloadLen = 1024 * 1024 * 16 // 16MB
|
||||
const maxHeadersLen = 1024 * 128 // 128KB
|
||||
const maxMsgLen = minMsgLen + maxHeadersLen + maxPayloadLen |
||||
|
||||
var crc32IEEETable = crc32.MakeTable(crc32.IEEE) |
||||
|
||||
// A Message provides the eventstream message representation.
|
||||
type Message struct { |
||||
Headers Headers |
||||
Payload []byte |
||||
} |
||||
|
||||
func (m *Message) rawMessage() (rawMessage, error) { |
||||
var raw rawMessage |
||||
|
||||
if len(m.Headers) > 0 { |
||||
var headers bytes.Buffer |
||||
if err := encodeHeaders(&headers, m.Headers); err != nil { |
||||
return rawMessage{}, err |
||||
} |
||||
raw.Headers = headers.Bytes() |
||||
raw.HeadersLen = uint32(len(raw.Headers)) |
||||
} |
||||
|
||||
raw.Length = raw.HeadersLen + uint32(len(m.Payload)) + minMsgLen |
||||
|
||||
hash := crc32.New(crc32IEEETable) |
||||
binaryWriteFields(hash, binary.BigEndian, raw.Length, raw.HeadersLen) |
||||
raw.PreludeCRC = hash.Sum32() |
||||
|
||||
binaryWriteFields(hash, binary.BigEndian, raw.PreludeCRC) |
||||
|
||||
if raw.HeadersLen > 0 { |
||||
hash.Write(raw.Headers) |
||||
} |
||||
|
||||
// Read payload bytes and update hash for it as well.
|
||||
if len(m.Payload) > 0 { |
||||
raw.Payload = m.Payload |
||||
hash.Write(raw.Payload) |
||||
} |
||||
|
||||
raw.CRC = hash.Sum32() |
||||
|
||||
return raw, nil |
||||
} |
||||
|
||||
type messagePrelude struct { |
||||
Length uint32 |
||||
HeadersLen uint32 |
||||
PreludeCRC uint32 |
||||
} |
||||
|
||||
func (p messagePrelude) PayloadLen() uint32 { |
||||
return p.Length - p.HeadersLen - minMsgLen |
||||
} |
||||
|
||||
func (p messagePrelude) ValidateLens() error { |
||||
if p.Length == 0 || p.Length > maxMsgLen { |
||||
return LengthError{ |
||||
Part: "message prelude", |
||||
Want: maxMsgLen, |
||||
Have: int(p.Length), |
||||
} |
||||
} |
||||
if p.HeadersLen > maxHeadersLen { |
||||
return LengthError{ |
||||
Part: "message headers", |
||||
Want: maxHeadersLen, |
||||
Have: int(p.HeadersLen), |
||||
} |
||||
} |
||||
if payloadLen := p.PayloadLen(); payloadLen > maxPayloadLen { |
||||
return LengthError{ |
||||
Part: "message payload", |
||||
Want: maxPayloadLen, |
||||
Have: int(payloadLen), |
||||
} |
||||
} |
||||
|
||||
return nil |
||||
} |
||||
|
||||
type rawMessage struct { |
||||
messagePrelude |
||||
|
||||
Headers []byte |
||||
Payload []byte |
||||
|
||||
CRC uint32 |
||||
} |
@ -0,0 +1,81 @@ |
||||
package protocol |
||||
|
||||
import ( |
||||
"io" |
||||
"io/ioutil" |
||||
"net/http" |
||||
|
||||
"github.com/aws/aws-sdk-go/aws" |
||||
"github.com/aws/aws-sdk-go/aws/client/metadata" |
||||
"github.com/aws/aws-sdk-go/aws/request" |
||||
) |
||||
|
||||
// PayloadUnmarshaler provides the interface for unmarshaling a payload's
|
||||
// reader into a SDK shape.
|
||||
type PayloadUnmarshaler interface { |
||||
UnmarshalPayload(io.Reader, interface{}) error |
||||
} |
||||
|
||||
// HandlerPayloadUnmarshal implements the PayloadUnmarshaler from a
|
||||
// HandlerList. This provides the support for unmarshaling a payload reader to
|
||||
// a shape without needing a SDK request first.
|
||||
type HandlerPayloadUnmarshal struct { |
||||
Unmarshalers request.HandlerList |
||||
} |
||||
|
||||
// UnmarshalPayload unmarshals the io.Reader payload into the SDK shape using
|
||||
// the Unmarshalers HandlerList provided. Returns an error if unable
|
||||
// unmarshaling fails.
|
||||
func (h HandlerPayloadUnmarshal) UnmarshalPayload(r io.Reader, v interface{}) error { |
||||
req := &request.Request{ |
||||
HTTPRequest: &http.Request{}, |
||||
HTTPResponse: &http.Response{ |
||||
StatusCode: 200, |
||||
Header: http.Header{}, |
||||
Body: ioutil.NopCloser(r), |
||||
}, |
||||
Data: v, |
||||
} |
||||
|
||||
h.Unmarshalers.Run(req) |
||||
|
||||
return req.Error |
||||
} |
||||
|
||||
// PayloadMarshaler provides the interface for marshaling a SDK shape into and
|
||||
// io.Writer.
|
||||
type PayloadMarshaler interface { |
||||
MarshalPayload(io.Writer, interface{}) error |
||||
} |
||||
|
||||
// HandlerPayloadMarshal implements the PayloadMarshaler from a HandlerList.
|
||||
// This provides support for marshaling a SDK shape into an io.Writer without
|
||||
// needing a SDK request first.
|
||||
type HandlerPayloadMarshal struct { |
||||
Marshalers request.HandlerList |
||||
} |
||||
|
||||
// MarshalPayload marshals the SDK shape into the io.Writer using the
|
||||
// Marshalers HandlerList provided. Returns an error if unable if marshal
|
||||
// fails.
|
||||
func (h HandlerPayloadMarshal) MarshalPayload(w io.Writer, v interface{}) error { |
||||
req := request.New( |
||||
aws.Config{}, |
||||
metadata.ClientInfo{}, |
||||
request.Handlers{}, |
||||
nil, |
||||
&request.Operation{HTTPMethod: "GET"}, |
||||
v, |
||||
nil, |
||||
) |
||||
|
||||
h.Marshalers.Run(req) |
||||
|
||||
if req.Error != nil { |
||||
return req.Error |
||||
} |
||||
|
||||
io.Copy(w, req.GetBody()) |
||||
|
||||
return nil |
||||
} |
@ -0,0 +1,21 @@ |
||||
MIT License |
||||
|
||||
Copyright (c) 2015 Dmitri Shuralyov |
||||
|
||||
Permission is hereby granted, free of charge, to any person obtaining a copy |
||||
of this software and associated documentation files (the "Software"), to deal |
||||
in the Software without restriction, including without limitation the rights |
||||
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell |
||||
copies of the Software, and to permit persons to whom the Software is |
||||
furnished to do so, subject to the following conditions: |
||||
|
||||
The above copyright notice and this permission notice shall be included in all |
||||
copies or substantial portions of the Software. |
||||
|
||||
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR |
||||
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, |
||||
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE |
||||
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER |
||||
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, |
||||
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE |
||||
SOFTWARE. |
@ -0,0 +1,29 @@ |
||||
// Package sanitized_anchor_name provides a func to create sanitized anchor names.
|
||||
//
|
||||
// Its logic can be reused by multiple packages to create interoperable anchor names
|
||||
// and links to those anchors.
|
||||
//
|
||||
// At this time, it does not try to ensure that generated anchor names
|
||||
// are unique, that responsibility falls on the caller.
|
||||
package sanitized_anchor_name // import "github.com/shurcooL/sanitized_anchor_name"
|
||||
|
||||
import "unicode" |
||||
|
||||
// Create returns a sanitized anchor name for the given text.
|
||||
func Create(text string) string { |
||||
var anchorName []rune |
||||
var futureDash = false |
||||
for _, r := range text { |
||||
switch { |
||||
case unicode.IsLetter(r) || unicode.IsNumber(r): |
||||
if futureDash && len(anchorName) > 0 { |
||||
anchorName = append(anchorName, '-') |
||||
} |
||||
futureDash = false |
||||
anchorName = append(anchorName, unicode.ToLower(r)) |
||||
default: |
||||
futureDash = true |
||||
} |
||||
} |
||||
return string(anchorName) |
||||
} |
Loading…
Reference in new issue