|
|
|
@ -1,6 +1,8 @@ |
|
|
|
|
package distributor |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"compress/gzip" |
|
|
|
|
"fmt" |
|
|
|
|
"math" |
|
|
|
|
"net/http" |
|
|
|
|
"time" |
|
|
|
@ -23,6 +25,7 @@ import ( |
|
|
|
|
|
|
|
|
|
var ( |
|
|
|
|
contentType = http.CanonicalHeaderKey("Content-Type") |
|
|
|
|
contentEnc = http.CanonicalHeaderKey("Content-Encoding") |
|
|
|
|
|
|
|
|
|
bytesIngested = promauto.NewCounterVec(prometheus.CounterOpts{ |
|
|
|
|
Namespace: "loki", |
|
|
|
@ -63,7 +66,24 @@ func (d *Distributor) PushHandler(w http.ResponseWriter, r *http.Request) { |
|
|
|
|
func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { |
|
|
|
|
userID, _ := user.ExtractOrgID(r.Context()) |
|
|
|
|
logger := util_log.WithContext(r.Context(), util_log.Logger) |
|
|
|
|
body := lokiutil.NewSizeReader(r.Body) |
|
|
|
|
|
|
|
|
|
var body lokiutil.SizeReader |
|
|
|
|
|
|
|
|
|
contentEncoding := r.Header.Get(contentEnc) |
|
|
|
|
switch contentEncoding { |
|
|
|
|
case "": |
|
|
|
|
body = lokiutil.NewSizeReader(r.Body) |
|
|
|
|
case "gzip": |
|
|
|
|
gzipReader, err := gzip.NewReader(r.Body) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
defer gzipReader.Close() |
|
|
|
|
body = lokiutil.NewSizeReader(gzipReader) |
|
|
|
|
default: |
|
|
|
|
return nil, fmt.Errorf("Content-Encoding %q not supported", contentEncoding) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
contentType := r.Header.Get(contentType) |
|
|
|
|
var req logproto.PushRequest |
|
|
|
|
|
|
|
|
@ -97,6 +117,7 @@ func ParseRequest(r *http.Request) (*logproto.PushRequest, error) { |
|
|
|
|
"msg", "push request parsed", |
|
|
|
|
"path", r.URL.Path, |
|
|
|
|
"contentType", contentType, |
|
|
|
|
"contentEncoding", contentEncoding, |
|
|
|
|
"bodySize", humanize.Bytes(uint64(body.Size())), |
|
|
|
|
"streams", len(req.Streams), |
|
|
|
|
"entries", totalEntries, |
|
|
|
|