mirror of https://github.com/grafana/grafana
Loki: Create component for sending logs to Loki (#51500)
parent
a5924315f8
commit
b45c71c9ca
@ -0,0 +1,5 @@ |
||||
The Loki package is derived from the Grafana Loki project's logproto and |
||||
promtail packages with removals to support Grafana's needs. |
||||
|
||||
- https://github.com/grafana/loki/tree/2fc20af9a4e55483afe0f09f8a8119ab380daea9/clients/pkg/promtail/client |
||||
- https://github.com/grafana/loki/tree/2fc20af9a4e55483afe0f09f8a8119ab380daea9/pkg/logproto |
||||
@ -0,0 +1,202 @@ |
||||
|
||||
Apache License |
||||
Version 2.0, January 2004 |
||||
http://www.apache.org/licenses/ |
||||
|
||||
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION |
||||
|
||||
1. Definitions. |
||||
|
||||
"License" shall mean the terms and conditions for use, reproduction, |
||||
and distribution as defined by Sections 1 through 9 of this document. |
||||
|
||||
"Licensor" shall mean the copyright owner or entity authorized by |
||||
the copyright owner that is granting the License. |
||||
|
||||
"Legal Entity" shall mean the union of the acting entity and all |
||||
other entities that control, are controlled by, or are under common |
||||
control with that entity. For the purposes of this definition, |
||||
"control" means (i) the power, direct or indirect, to cause the |
||||
direction or management of such entity, whether by contract or |
||||
otherwise, or (ii) ownership of fifty percent (50%) or more of the |
||||
outstanding shares, or (iii) beneficial ownership of such entity. |
||||
|
||||
"You" (or "Your") shall mean an individual or Legal Entity |
||||
exercising permissions granted by this License. |
||||
|
||||
"Source" form shall mean the preferred form for making modifications, |
||||
including but not limited to software source code, documentation |
||||
source, and configuration files. |
||||
|
||||
"Object" form shall mean any form resulting from mechanical |
||||
transformation or translation of a Source form, including but |
||||
not limited to compiled object code, generated documentation, |
||||
and conversions to other media types. |
||||
|
||||
"Work" shall mean the work of authorship, whether in Source or |
||||
Object form, made available under the License, as indicated by a |
||||
copyright notice that is included in or attached to the work |
||||
(an example is provided in the Appendix below). |
||||
|
||||
"Derivative Works" shall mean any work, whether in Source or Object |
||||
form, that is based on (or derived from) the Work and for which the |
||||
editorial revisions, annotations, elaborations, or other modifications |
||||
represent, as a whole, an original work of authorship. For the purposes |
||||
of this License, Derivative Works shall not include works that remain |
||||
separable from, or merely link (or bind by name) to the interfaces of, |
||||
the Work and Derivative Works thereof. |
||||
|
||||
"Contribution" shall mean any work of authorship, including |
||||
the original version of the Work and any modifications or additions |
||||
to that Work or Derivative Works thereof, that is intentionally |
||||
submitted to Licensor for inclusion in the Work by the copyright owner |
||||
or by an individual or Legal Entity authorized to submit on behalf of |
||||
the copyright owner. For the purposes of this definition, "submitted" |
||||
means any form of electronic, verbal, or written communication sent |
||||
to the Licensor or its representatives, including but not limited to |
||||
communication on electronic mailing lists, source code control systems, |
||||
and issue tracking systems that are managed by, or on behalf of, the |
||||
Licensor for the purpose of discussing and improving the Work, but |
||||
excluding communication that is conspicuously marked or otherwise |
||||
designated in writing by the copyright owner as "Not a Contribution." |
||||
|
||||
"Contributor" shall mean Licensor and any individual or Legal Entity |
||||
on behalf of whom a Contribution has been received by Licensor and |
||||
subsequently incorporated within the Work. |
||||
|
||||
2. Grant of Copyright License. Subject to the terms and conditions of |
||||
this License, each Contributor hereby grants to You a perpetual, |
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
||||
copyright license to reproduce, prepare Derivative Works of, |
||||
publicly display, publicly perform, sublicense, and distribute the |
||||
Work and such Derivative Works in Source or Object form. |
||||
|
||||
3. Grant of Patent License. Subject to the terms and conditions of |
||||
this License, each Contributor hereby grants to You a perpetual, |
||||
worldwide, non-exclusive, no-charge, royalty-free, irrevocable |
||||
(except as stated in this section) patent license to make, have made, |
||||
use, offer to sell, sell, import, and otherwise transfer the Work, |
||||
where such license applies only to those patent claims licensable |
||||
by such Contributor that are necessarily infringed by their |
||||
Contribution(s) alone or by combination of their Contribution(s) |
||||
with the Work to which such Contribution(s) was submitted. If You |
||||
institute patent litigation against any entity (including a |
||||
cross-claim or counterclaim in a lawsuit) alleging that the Work |
||||
or a Contribution incorporated within the Work constitutes direct |
||||
or contributory patent infringement, then any patent licenses |
||||
granted to You under this License for that Work shall terminate |
||||
as of the date such litigation is filed. |
||||
|
||||
4. Redistribution. You may reproduce and distribute copies of the |
||||
Work or Derivative Works thereof in any medium, with or without |
||||
modifications, and in Source or Object form, provided that You |
||||
meet the following conditions: |
||||
|
||||
(a) You must give any other recipients of the Work or |
||||
Derivative Works a copy of this License; and |
||||
|
||||
(b) You must cause any modified files to carry prominent notices |
||||
stating that You changed the files; and |
||||
|
||||
(c) You must retain, in the Source form of any Derivative Works |
||||
that You distribute, all copyright, patent, trademark, and |
||||
attribution notices from the Source form of the Work, |
||||
excluding those notices that do not pertain to any part of |
||||
the Derivative Works; and |
||||
|
||||
(d) If the Work includes a "NOTICE" text file as part of its |
||||
distribution, then any Derivative Works that You distribute must |
||||
include a readable copy of the attribution notices contained |
||||
within such NOTICE file, excluding those notices that do not |
||||
pertain to any part of the Derivative Works, in at least one |
||||
of the following places: within a NOTICE text file distributed |
||||
as part of the Derivative Works; within the Source form or |
||||
documentation, if provided along with the Derivative Works; or, |
||||
within a display generated by the Derivative Works, if and |
||||
wherever such third-party notices normally appear. The contents |
||||
of the NOTICE file are for informational purposes only and |
||||
do not modify the License. You may add Your own attribution |
||||
notices within Derivative Works that You distribute, alongside |
||||
or as an addendum to the NOTICE text from the Work, provided |
||||
that such additional attribution notices cannot be construed |
||||
as modifying the License. |
||||
|
||||
You may add Your own copyright statement to Your modifications and |
||||
may provide additional or different license terms and conditions |
||||
for use, reproduction, or distribution of Your modifications, or |
||||
for any such Derivative Works as a whole, provided Your use, |
||||
reproduction, and distribution of the Work otherwise complies with |
||||
the conditions stated in this License. |
||||
|
||||
5. Submission of Contributions. Unless You explicitly state otherwise, |
||||
any Contribution intentionally submitted for inclusion in the Work |
||||
by You to the Licensor shall be under the terms and conditions of |
||||
this License, without any additional terms or conditions. |
||||
Notwithstanding the above, nothing herein shall supersede or modify |
||||
the terms of any separate license agreement you may have executed |
||||
with Licensor regarding such Contributions. |
||||
|
||||
6. Trademarks. This License does not grant permission to use the trade |
||||
names, trademarks, service marks, or product names of the Licensor, |
||||
except as required for reasonable and customary use in describing the |
||||
origin of the Work and reproducing the content of the NOTICE file. |
||||
|
||||
7. Disclaimer of Warranty. Unless required by applicable law or |
||||
agreed to in writing, Licensor provides the Work (and each |
||||
Contributor provides its Contributions) on an "AS IS" BASIS, |
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or |
||||
implied, including, without limitation, any warranties or conditions |
||||
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A |
||||
PARTICULAR PURPOSE. You are solely responsible for determining the |
||||
appropriateness of using or redistributing the Work and assume any |
||||
risks associated with Your exercise of permissions under this License. |
||||
|
||||
8. Limitation of Liability. In no event and under no legal theory, |
||||
whether in tort (including negligence), contract, or otherwise, |
||||
unless required by applicable law (such as deliberate and grossly |
||||
negligent acts) or agreed to in writing, shall any Contributor be |
||||
liable to You for damages, including any direct, indirect, special, |
||||
incidental, or consequential damages of any character arising as a |
||||
result of this License or out of the use or inability to use the |
||||
Work (including but not limited to damages for loss of goodwill, |
||||
work stoppage, computer failure or malfunction, or any and all |
||||
other commercial damages or losses), even if such Contributor |
||||
has been advised of the possibility of such damages. |
||||
|
||||
9. Accepting Warranty or Additional Liability. While redistributing |
||||
the Work or Derivative Works thereof, You may choose to offer, |
||||
and charge a fee for, acceptance of support, warranty, indemnity, |
||||
or other liability obligations and/or rights consistent with this |
||||
License. However, in accepting such obligations, You may act only |
||||
on Your own behalf and on Your sole responsibility, not on behalf |
||||
of any other Contributor, and only if You agree to indemnify, |
||||
defend, and hold each Contributor harmless for any liability |
||||
incurred by, or claims asserted against, such Contributor by reason |
||||
of your accepting any such warranty or additional liability. |
||||
|
||||
END OF TERMS AND CONDITIONS |
||||
|
||||
APPENDIX: How to apply the Apache License to your work. |
||||
|
||||
To apply the Apache License to your work, attach the following |
||||
boilerplate notice, with the fields enclosed by brackets "[]" |
||||
replaced with your own identifying information. (Don't include |
||||
the brackets!) The text should be enclosed in the appropriate |
||||
comment syntax for the file format. We also recommend that a |
||||
file or class name and description of purpose be included on the |
||||
same "printed page" as the copyright notice for easier |
||||
identification within third-party archives. |
||||
|
||||
Copyright [yyyy] [name of copyright owner] |
||||
|
||||
Licensed under the Apache License, Version 2.0 (the "License"); |
||||
you may not use this file except in compliance with the License. |
||||
You may obtain a copy of the License at |
||||
|
||||
http://www.apache.org/licenses/LICENSE-2.0 |
||||
|
||||
Unless required by applicable law or agreed to in writing, software |
||||
distributed under the License is distributed on an "AS IS" BASIS, |
||||
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
||||
See the License for the specific language governing permissions and |
||||
limitations under the License. |
||||
@ -0,0 +1,7 @@ |
||||
package logproto |
||||
|
||||
type Streams []Stream |
||||
|
||||
func (xs Streams) Len() int { return len(xs) } |
||||
func (xs Streams) Swap(i, j int) { xs[i], xs[j] = xs[j], xs[i] } |
||||
func (xs Streams) Less(i, j int) bool { return xs[i].Labels <= xs[j].Labels } |
||||
File diff suppressed because it is too large
Load Diff
@ -0,0 +1,47 @@ |
||||
syntax = "proto3"; |
||||
|
||||
// COPIED FROM LOKI |
||||
// If you modify this file, you need to manually generate the |
||||
// logproto.pb.go file as well. |
||||
// |
||||
// Install gogoslick from https://github.com/gogo/protobuf |
||||
// protoc --proto_path=$GOPATH/src/ --proto_path=. --gogoslick_out=plugins=grpc:. logproto.proto |
||||
|
||||
package logproto; |
||||
|
||||
import "github.com/gogo/protobuf/gogoproto/gogo.proto"; |
||||
import "google/protobuf/timestamp.proto"; |
||||
|
||||
option go_package = "logproto"; |
||||
|
||||
service Pusher { |
||||
rpc Push(PushRequest) returns (PushResponse) {} |
||||
} |
||||
|
||||
message PushRequest { |
||||
repeated StreamAdapter streams = 1 [ |
||||
(gogoproto.jsontag) = "streams", |
||||
(gogoproto.customtype) = "Stream" |
||||
]; |
||||
} |
||||
|
||||
message PushResponse {} |
||||
|
||||
message StreamAdapter { |
||||
string labels = 1 [(gogoproto.jsontag) = "labels"]; |
||||
repeated EntryAdapter entries = 2 [ |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "entries" |
||||
]; |
||||
// hash contains the original hash of the stream. |
||||
uint64 hash = 3 [(gogoproto.jsontag) = "-"]; |
||||
} |
||||
|
||||
message EntryAdapter { |
||||
google.protobuf.Timestamp timestamp = 1 [ |
||||
(gogoproto.stdtime) = true, |
||||
(gogoproto.nullable) = false, |
||||
(gogoproto.jsontag) = "ts" |
||||
]; |
||||
string line = 2 [(gogoproto.jsontag) = "line"]; |
||||
} |
||||
@ -0,0 +1,106 @@ |
||||
package logproto |
||||
|
||||
import ( |
||||
"errors" |
||||
"strconv" |
||||
"time" |
||||
|
||||
"github.com/gogo/protobuf/types" |
||||
) |
||||
|
||||
const ( |
||||
// Seconds field of the earliest valid Timestamp.
|
||||
// This is time.Date(1, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
||||
minValidSeconds = -62135596800 |
||||
// Seconds field just after the latest valid Timestamp.
|
||||
// This is time.Date(10000, 1, 1, 0, 0, 0, 0, time.UTC).Unix().
|
||||
maxValidSeconds = 253402300800 |
||||
) |
||||
|
||||
// validateTimestamp determines whether a Timestamp is valid.
|
||||
// A valid timestamp represents a time in the range
|
||||
// [0001-01-01, 10000-01-01) and has a Nanos field
|
||||
// in the range [0, 1e9).
|
||||
//
|
||||
// If the Timestamp is valid, validateTimestamp returns nil.
|
||||
// Otherwise, it returns an error that describes
|
||||
// the problem.
|
||||
//
|
||||
// Every valid Timestamp can be represented by a time.Time, but the converse is not true.
|
||||
func validateTimestamp(ts *types.Timestamp) error { |
||||
if ts == nil { |
||||
return errors.New("timestamp: nil Timestamp") |
||||
} |
||||
if ts.Seconds < minValidSeconds { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + " before 0001-01-01") |
||||
} |
||||
if ts.Seconds >= maxValidSeconds { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + " after 10000-01-01") |
||||
} |
||||
if ts.Nanos < 0 || ts.Nanos >= 1e9 { |
||||
return errors.New("timestamp: " + formatTimestamp(ts) + ": nanos not in range [0, 1e9)") |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
// formatTimestamp is equivalent to fmt.Sprintf("%#v", ts)
|
||||
// but avoids the escape incurred by using fmt.Sprintf, eliminating
|
||||
// unnecessary heap allocations.
|
||||
func formatTimestamp(ts *types.Timestamp) string { |
||||
if ts == nil { |
||||
return "nil" |
||||
} |
||||
|
||||
seconds := strconv.FormatInt(ts.Seconds, 10) |
||||
nanos := strconv.FormatInt(int64(ts.Nanos), 10) |
||||
return "&types.Timestamp{Seconds: " + seconds + ",\nNanos: " + nanos + ",\n}" |
||||
} |
||||
|
||||
func SizeOfStdTime(t time.Time) int { |
||||
ts, err := timestampProto(t) |
||||
if err != nil { |
||||
return 0 |
||||
} |
||||
return ts.Size() |
||||
} |
||||
|
||||
func StdTimeMarshalTo(t time.Time, data []byte) (int, error) { |
||||
ts, err := timestampProto(t) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
return ts.MarshalTo(data) |
||||
} |
||||
|
||||
func StdTimeUnmarshal(t *time.Time, data []byte) error { |
||||
ts := &types.Timestamp{} |
||||
if err := ts.Unmarshal(data); err != nil { |
||||
return err |
||||
} |
||||
tt, err := timestampFromProto(ts) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
*t = tt |
||||
return nil |
||||
} |
||||
|
||||
func timestampFromProto(ts *types.Timestamp) (time.Time, error) { |
||||
// Don't return the zero value on error, because corresponds to a valid
|
||||
// timestamp. Instead return whatever time.Unix gives us.
|
||||
var t time.Time |
||||
if ts == nil { |
||||
t = time.Unix(0, 0).UTC() // treat nil like the empty Timestamp
|
||||
} else { |
||||
t = time.Unix(ts.Seconds, int64(ts.Nanos)).UTC() |
||||
} |
||||
return t, validateTimestamp(ts) |
||||
} |
||||
|
||||
func timestampProto(t time.Time) (types.Timestamp, error) { |
||||
ts := types.Timestamp{ |
||||
Seconds: t.Unix(), |
||||
Nanos: int32(t.Nanosecond()), |
||||
} |
||||
return ts, validateTimestamp(&ts) |
||||
} |
||||
@ -0,0 +1,467 @@ |
||||
package logproto |
||||
|
||||
import ( |
||||
"fmt" |
||||
"io" |
||||
"time" |
||||
) |
||||
|
||||
// Stream contains a unique labels set as a string and a set of entries for it.
|
||||
// We are not using the proto generated version but this custom one so that we
|
||||
// can improve serialization see benchmark.
|
||||
type Stream struct { |
||||
Labels string `protobuf:"bytes,1,opt,name=labels,proto3" json:"labels"` |
||||
Entries []Entry `protobuf:"bytes,2,rep,name=entries,proto3,customtype=EntryAdapter" json:"entries"` |
||||
Hash uint64 `protobuf:"varint,3,opt,name=hash,proto3" json:"-"` |
||||
} |
||||
|
||||
// Entry is a log entry with a timestamp.
|
||||
type Entry struct { |
||||
Timestamp time.Time `protobuf:"bytes,1,opt,name=timestamp,proto3,stdtime" json:"ts"` |
||||
Line string `protobuf:"bytes,2,opt,name=line,proto3" json:"line"` |
||||
} |
||||
|
||||
func (m *Stream) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Stream) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Stream) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if m.Hash != 0 { |
||||
i = encodeVarintLogproto(dAtA, i, m.Hash) |
||||
i-- |
||||
dAtA[i] = 0x18 |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for iNdEx := len(m.Entries) - 1; iNdEx >= 0; iNdEx-- { |
||||
{ |
||||
size, err := m.Entries[iNdEx].MarshalToSizedBuffer(dAtA[:i]) |
||||
if err != nil { |
||||
return 0, err |
||||
} |
||||
i -= size |
||||
i = encodeVarintLogproto(dAtA, i, uint64(size)) |
||||
} |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
} |
||||
if len(m.Labels) > 0 { |
||||
i -= len(m.Labels) |
||||
copy(dAtA[i:], m.Labels) |
||||
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Labels))) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
} |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
func (m *Entry) Marshal() (dAtA []byte, err error) { |
||||
size := m.Size() |
||||
dAtA = make([]byte, size) |
||||
n, err := m.MarshalToSizedBuffer(dAtA[:size]) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
return dAtA[:n], nil |
||||
} |
||||
|
||||
func (m *Entry) MarshalTo(dAtA []byte) (int, error) { |
||||
size := m.Size() |
||||
return m.MarshalToSizedBuffer(dAtA[:size]) |
||||
} |
||||
|
||||
func (m *Entry) MarshalToSizedBuffer(dAtA []byte) (int, error) { |
||||
i := len(dAtA) |
||||
_ = i |
||||
var l int |
||||
_ = l |
||||
if len(m.Line) > 0 { |
||||
i -= len(m.Line) |
||||
copy(dAtA[i:], m.Line) |
||||
i = encodeVarintLogproto(dAtA, i, uint64(len(m.Line))) |
||||
i-- |
||||
dAtA[i] = 0x12 |
||||
} |
||||
n7, err7 := StdTimeMarshalTo(m.Timestamp, dAtA[i-SizeOfStdTime(m.Timestamp):]) |
||||
if err7 != nil { |
||||
return 0, err7 |
||||
} |
||||
i -= n7 |
||||
i = encodeVarintLogproto(dAtA, i, uint64(n7)) |
||||
i-- |
||||
dAtA[i] = 0xa |
||||
return len(dAtA) - i, nil |
||||
} |
||||
|
||||
//nolint: gocyclo
|
||||
func (m *Stream) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: StreamAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: StreamAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Labels", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Labels = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Entries", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Entries = append(m.Entries, Entry{}) |
||||
if err := m.Entries[len(m.Entries)-1].Unmarshal(dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 3: |
||||
if wireType != 0 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Hash", wireType) |
||||
} |
||||
m.Hash = 0 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
m.Hash |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipLogproto(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
//nolint: gocyclo
|
||||
func (m *Entry) Unmarshal(dAtA []byte) error { |
||||
l := len(dAtA) |
||||
iNdEx := 0 |
||||
for iNdEx < l { |
||||
preIndex := iNdEx |
||||
var wire uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
wire |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
fieldNum := int32(wire >> 3) |
||||
wireType := int(wire & 0x7) |
||||
if wireType == 4 { |
||||
return fmt.Errorf("proto: EntryAdapter: wiretype end group for non-group") |
||||
} |
||||
if fieldNum <= 0 { |
||||
return fmt.Errorf("proto: EntryAdapter: illegal tag %d (wire type %d)", fieldNum, wire) |
||||
} |
||||
switch fieldNum { |
||||
case 1: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Timestamp", wireType) |
||||
} |
||||
var msglen int |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
msglen |= int(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
if msglen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + msglen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
if err := StdTimeUnmarshal(&m.Timestamp, dAtA[iNdEx:postIndex]); err != nil { |
||||
return err |
||||
} |
||||
iNdEx = postIndex |
||||
case 2: |
||||
if wireType != 2 { |
||||
return fmt.Errorf("proto: wrong wireType = %d for field Line", wireType) |
||||
} |
||||
var stringLen uint64 |
||||
for shift := uint(0); ; shift += 7 { |
||||
if shift >= 64 { |
||||
return ErrIntOverflowLogproto |
||||
} |
||||
if iNdEx >= l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
b := dAtA[iNdEx] |
||||
iNdEx++ |
||||
stringLen |= uint64(b&0x7F) << shift |
||||
if b < 0x80 { |
||||
break |
||||
} |
||||
} |
||||
intStringLen := int(stringLen) |
||||
if intStringLen < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
postIndex := iNdEx + intStringLen |
||||
if postIndex < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if postIndex > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
m.Line = string(dAtA[iNdEx:postIndex]) |
||||
iNdEx = postIndex |
||||
default: |
||||
iNdEx = preIndex |
||||
skippy, err := skipLogproto(dAtA[iNdEx:]) |
||||
if err != nil { |
||||
return err |
||||
} |
||||
if skippy < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) < 0 { |
||||
return ErrInvalidLengthLogproto |
||||
} |
||||
if (iNdEx + skippy) > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
iNdEx += skippy |
||||
} |
||||
} |
||||
|
||||
if iNdEx > l { |
||||
return io.ErrUnexpectedEOF |
||||
} |
||||
return nil |
||||
} |
||||
|
||||
func (m *Stream) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = len(m.Labels) |
||||
if l > 0 { |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
if len(m.Entries) > 0 { |
||||
for _, e := range m.Entries { |
||||
l = e.Size() |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
} |
||||
if m.Hash != 0 { |
||||
n += 1 + sovLogproto(m.Hash) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Entry) Size() (n int) { |
||||
if m == nil { |
||||
return 0 |
||||
} |
||||
var l int |
||||
_ = l |
||||
l = SizeOfStdTime(m.Timestamp) |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
l = len(m.Line) |
||||
if l > 0 { |
||||
n += 1 + l + sovLogproto(uint64(l)) |
||||
} |
||||
return n |
||||
} |
||||
|
||||
func (m *Stream) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Stream) |
||||
if !ok { |
||||
that2, ok := that.(Stream) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if m.Labels != that1.Labels { |
||||
return false |
||||
} |
||||
if len(m.Entries) != len(that1.Entries) { |
||||
return false |
||||
} |
||||
for i := range m.Entries { |
||||
if !m.Entries[i].Equal(that1.Entries[i]) { |
||||
return false |
||||
} |
||||
} |
||||
return m.Hash == that1.Hash |
||||
} |
||||
|
||||
func (m *Entry) Equal(that interface{}) bool { |
||||
if that == nil { |
||||
return m == nil |
||||
} |
||||
|
||||
that1, ok := that.(*Entry) |
||||
if !ok { |
||||
that2, ok := that.(Entry) |
||||
if ok { |
||||
that1 = &that2 |
||||
} else { |
||||
return false |
||||
} |
||||
} |
||||
if that1 == nil { |
||||
return m == nil |
||||
} else if m == nil { |
||||
return false |
||||
} |
||||
if !m.Timestamp.Equal(that1.Timestamp) { |
||||
return false |
||||
} |
||||
if m.Line != that1.Line { |
||||
return false |
||||
} |
||||
return true |
||||
} |
||||
@ -0,0 +1,111 @@ |
||||
package logproto |
||||
|
||||
import ( |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
var ( |
||||
now = time.Now().UTC() |
||||
line = `level=info ts=2019-12-12T15:00:08.325Z caller=compact.go:441 component=tsdb msg="compact blocks" count=3 mint=1576130400000 maxt=1576152000000 ulid=01DVX9ZHNM71GRCJS7M34Q0EV7 sources="[01DVWNC6NWY1A60AZV3Z6DGS65 01DVWW7XXX75GHA6ZDTD170CSZ 01DVX33N5W86CWJJVRPAVXJRWJ]" duration=2.897213221s` |
||||
stream = Stream{ |
||||
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`, |
||||
Hash: 1234*10 ^ 9, |
||||
Entries: []Entry{ |
||||
{Timestamp: now, Line: line}, |
||||
{Timestamp: now.Add(1 * time.Second), Line: line}, |
||||
{Timestamp: now.Add(2 * time.Second), Line: line}, |
||||
{Timestamp: now.Add(3 * time.Second), Line: line}, |
||||
}, |
||||
} |
||||
streamAdapter = StreamAdapter{ |
||||
Labels: `{job="foobar", cluster="foo-central1", namespace="bar", container_name="buzz"}`, |
||||
Hash: 1234*10 ^ 9, |
||||
Entries: []EntryAdapter{ |
||||
{Timestamp: now, Line: line}, |
||||
{Timestamp: now.Add(1 * time.Second), Line: line}, |
||||
{Timestamp: now.Add(2 * time.Second), Line: line}, |
||||
{Timestamp: now.Add(3 * time.Second), Line: line}, |
||||
}, |
||||
} |
||||
) |
||||
|
||||
func TestStream(t *testing.T) { |
||||
avg := testing.AllocsPerRun(200, func() { |
||||
b, err := stream.Marshal() |
||||
require.NoError(t, err) |
||||
|
||||
var new Stream |
||||
err = new.Unmarshal(b) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, stream, new) |
||||
}) |
||||
t.Log("avg allocs per run:", avg) |
||||
} |
||||
|
||||
func TestStreamAdapter(t *testing.T) { |
||||
avg := testing.AllocsPerRun(200, func() { |
||||
b, err := streamAdapter.Marshal() |
||||
require.NoError(t, err) |
||||
|
||||
var new StreamAdapter |
||||
err = new.Unmarshal(b) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, streamAdapter, new) |
||||
}) |
||||
t.Log("avg allocs per run:", avg) |
||||
} |
||||
|
||||
func TestCompatibility(t *testing.T) { |
||||
b, err := stream.Marshal() |
||||
require.NoError(t, err) |
||||
|
||||
var adapter StreamAdapter |
||||
err = adapter.Unmarshal(b) |
||||
require.NoError(t, err) |
||||
require.Equal(t, streamAdapter, adapter) |
||||
|
||||
ba, err := adapter.Marshal() |
||||
require.NoError(t, err) |
||||
require.Equal(t, b, ba) |
||||
|
||||
var new Stream |
||||
err = new.Unmarshal(ba) |
||||
require.NoError(t, err) |
||||
|
||||
require.Equal(t, stream, new) |
||||
} |
||||
|
||||
func BenchmarkStream(b *testing.B) { |
||||
b.ReportAllocs() |
||||
for n := 0; n < b.N; n++ { |
||||
by, err := stream.Marshal() |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
var new Stream |
||||
err = new.Unmarshal(by) |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
} |
||||
} |
||||
|
||||
func BenchmarkStreamAdapter(b *testing.B) { |
||||
b.ReportAllocs() |
||||
for n := 0; n < b.N; n++ { |
||||
by, err := streamAdapter.Marshal() |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
var new StreamAdapter |
||||
err = new.Unmarshal(by) |
||||
if err != nil { |
||||
b.Fatal(err) |
||||
} |
||||
} |
||||
} |
||||
@ -0,0 +1,110 @@ |
||||
package lokihttp |
||||
|
||||
import ( |
||||
"fmt" |
||||
"sort" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/gogo/protobuf/proto" |
||||
"github.com/golang/snappy" |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/grafana/pkg/components/loki/logproto" |
||||
) |
||||
|
||||
// batch holds pending log streams waiting to be sent to Loki, and it's used
|
||||
// to reduce the number of push requests to Loki aggregating multiple log streams
|
||||
// and entries in a single batch request. In case of multi-tenant Promtail, log
|
||||
// streams for each tenant are stored in a dedicated batch.
|
||||
type batch struct { |
||||
streams map[string]*logproto.Stream |
||||
bytes int |
||||
createdAt time.Time |
||||
} |
||||
|
||||
func newBatch(entries ...Entry) *batch { |
||||
b := &batch{ |
||||
streams: map[string]*logproto.Stream{}, |
||||
bytes: 0, |
||||
createdAt: time.Now(), |
||||
} |
||||
|
||||
// Add entries to the batch
|
||||
for _, entry := range entries { |
||||
b.add(entry) |
||||
} |
||||
|
||||
return b |
||||
} |
||||
|
||||
// add an entry to the batch
|
||||
func (b *batch) add(entry Entry) { |
||||
b.bytes += len(entry.Line) |
||||
|
||||
// Append the entry to an already existing stream (if any)
|
||||
labels := labelsMapToString(entry.Labels, ReservedLabelTenantID) |
||||
if stream, ok := b.streams[labels]; ok { |
||||
stream.Entries = append(stream.Entries, entry.Entry) |
||||
return |
||||
} |
||||
|
||||
// Add the entry as a new stream
|
||||
b.streams[labels] = &logproto.Stream{ |
||||
Labels: labels, |
||||
Entries: []logproto.Entry{entry.Entry}, |
||||
} |
||||
} |
||||
|
||||
func labelsMapToString(ls model.LabelSet, without ...model.LabelName) string { |
||||
lstrs := make([]string, 0, len(ls)) |
||||
Outer: |
||||
for l, v := range ls { |
||||
for _, w := range without { |
||||
if l == w { |
||||
continue Outer |
||||
} |
||||
} |
||||
lstrs = append(lstrs, fmt.Sprintf("%s=%q", l, v)) |
||||
} |
||||
|
||||
sort.Strings(lstrs) |
||||
return fmt.Sprintf("{%s}", strings.Join(lstrs, ", ")) |
||||
} |
||||
|
||||
// sizeBytesAfter returns the size of the batch after the input entry
|
||||
// will be added to the batch itself
|
||||
func (b *batch) sizeBytesAfter(entry Entry) int { |
||||
return b.bytes + len(entry.Line) |
||||
} |
||||
|
||||
// age of the batch since its creation
|
||||
func (b *batch) age() time.Duration { |
||||
return time.Since(b.createdAt) |
||||
} |
||||
|
||||
// encode the batch as snappy-compressed push request, and returns
|
||||
// the encoded bytes and the number of encoded entries
|
||||
func (b *batch) encode() ([]byte, int, error) { |
||||
req, entriesCount := b.createPushRequest() |
||||
buf, err := proto.Marshal(req) |
||||
if err != nil { |
||||
return nil, 0, err |
||||
} |
||||
buf = snappy.Encode(nil, buf) |
||||
return buf, entriesCount, nil |
||||
} |
||||
|
||||
// creates push request and returns it, together with number of entries
|
||||
func (b *batch) createPushRequest() (*logproto.PushRequest, int) { |
||||
req := logproto.PushRequest{ |
||||
Streams: make([]logproto.Stream, 0, len(b.streams)), |
||||
} |
||||
|
||||
entriesCount := 0 |
||||
for _, stream := range b.streams { |
||||
req.Streams = append(req.Streams, *stream) |
||||
entriesCount += len(stream.Entries) |
||||
} |
||||
return &req, entriesCount |
||||
} |
||||
@ -0,0 +1,363 @@ |
||||
package lokihttp |
||||
|
||||
import ( |
||||
"bufio" |
||||
"bytes" |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"net/http" |
||||
"strconv" |
||||
"sync" |
||||
"time" |
||||
|
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/prometheus/client_golang/prometheus" |
||||
"github.com/prometheus/common/config" |
||||
|
||||
"github.com/grafana/grafana/pkg/infra/log" |
||||
"github.com/grafana/grafana/pkg/setting" |
||||
) |
||||
|
||||
const ( |
||||
contentType = "application/x-protobuf" |
||||
maxErrMsgLen = 1024 |
||||
|
||||
// Label reserved to override the tenant ID while processing
|
||||
// pipeline stages
|
||||
ReservedLabelTenantID = "__tenant_id__" |
||||
|
||||
LatencyLabel = "filename" |
||||
HostLabel = "host" |
||||
) |
||||
|
||||
var UserAgent = fmt.Sprintf("grafana/%s", setting.BuildVersion) |
||||
|
||||
type metrics struct { |
||||
encodedBytes *prometheus.CounterVec |
||||
sentBytes *prometheus.CounterVec |
||||
droppedBytes *prometheus.CounterVec |
||||
sentEntries *prometheus.CounterVec |
||||
droppedEntries *prometheus.CounterVec |
||||
requestDuration *prometheus.HistogramVec |
||||
batchRetries *prometheus.CounterVec |
||||
countersWithHost []*prometheus.CounterVec |
||||
} |
||||
|
||||
func newMetrics(reg prometheus.Registerer) *metrics { |
||||
var m metrics |
||||
|
||||
m.encodedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "encoded_bytes_total", |
||||
Help: "Number of bytes encoded and ready to send.", |
||||
}, []string{HostLabel}) |
||||
m.sentBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "sent_bytes_total", |
||||
Help: "Number of bytes sent.", |
||||
}, []string{HostLabel}) |
||||
m.droppedBytes = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "dropped_bytes_total", |
||||
Help: "Number of bytes dropped because failed to be sent to the ingester after all retries.", |
||||
}, []string{HostLabel}) |
||||
m.sentEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "sent_entries_total", |
||||
Help: "Number of log entries sent to the ingester.", |
||||
}, []string{HostLabel}) |
||||
m.droppedEntries = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "dropped_entries_total", |
||||
Help: "Number of log entries dropped because failed to be sent to the ingester after all retries.", |
||||
}, []string{HostLabel}) |
||||
m.requestDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{ |
||||
Namespace: "promtail", |
||||
Name: "request_duration_seconds", |
||||
Help: "Duration of send requests.", |
||||
}, []string{"status_code", HostLabel}) |
||||
m.batchRetries = prometheus.NewCounterVec(prometheus.CounterOpts{ |
||||
Namespace: "promtail", |
||||
Name: "batch_retries_total", |
||||
Help: "Number of times batches has had to be retried.", |
||||
}, []string{HostLabel}) |
||||
|
||||
m.countersWithHost = []*prometheus.CounterVec{ |
||||
m.encodedBytes, m.sentBytes, m.droppedBytes, m.sentEntries, m.droppedEntries, |
||||
} |
||||
|
||||
if reg != nil { |
||||
m.encodedBytes = mustRegisterOrGet(reg, m.encodedBytes).(*prometheus.CounterVec) |
||||
m.sentBytes = mustRegisterOrGet(reg, m.sentBytes).(*prometheus.CounterVec) |
||||
m.droppedBytes = mustRegisterOrGet(reg, m.droppedBytes).(*prometheus.CounterVec) |
||||
m.sentEntries = mustRegisterOrGet(reg, m.sentEntries).(*prometheus.CounterVec) |
||||
m.droppedEntries = mustRegisterOrGet(reg, m.droppedEntries).(*prometheus.CounterVec) |
||||
m.requestDuration = mustRegisterOrGet(reg, m.requestDuration).(*prometheus.HistogramVec) |
||||
m.batchRetries = mustRegisterOrGet(reg, m.batchRetries).(*prometheus.CounterVec) |
||||
} |
||||
|
||||
return &m |
||||
} |
||||
|
||||
func mustRegisterOrGet(reg prometheus.Registerer, c prometheus.Collector) prometheus.Collector { |
||||
if err := reg.Register(c); err != nil { |
||||
promError := prometheus.AlreadyRegisteredError{} |
||||
if errors.As(err, &promError) { |
||||
return promError.ExistingCollector |
||||
} |
||||
panic(err) |
||||
} |
||||
return c |
||||
} |
||||
|
||||
// Client pushes entries to Loki and can be stopped
|
||||
type Client interface { |
||||
Chan() chan<- Entry |
||||
|
||||
Stop() |
||||
StopNow() |
||||
} |
||||
|
||||
// Client for pushing logs in snappy-compressed protos over HTTP.
|
||||
type client struct { |
||||
metrics *metrics |
||||
logger log.Logger |
||||
cfg Config |
||||
client *http.Client |
||||
entries chan Entry |
||||
|
||||
once sync.Once |
||||
wg sync.WaitGroup |
||||
|
||||
// ctx is used in any upstream calls from the `client`.
|
||||
ctx context.Context |
||||
cancel context.CancelFunc |
||||
} |
||||
|
||||
// Tripperware can wrap a roundtripper.
|
||||
type Tripperware func(http.RoundTripper) http.RoundTripper |
||||
|
||||
// New makes a new Client.
|
||||
func New(reg prometheus.Registerer, cfg Config, logger log.Logger) (Client, error) { |
||||
return newClient(reg, cfg, logger) |
||||
} |
||||
|
||||
func newClient(reg prometheus.Registerer, cfg Config, logger log.Logger) (*client, error) { |
||||
if cfg.URL.URL == nil { |
||||
return nil, errors.New("client needs target URL") |
||||
} |
||||
|
||||
ctx, cancel := context.WithCancel(context.Background()) |
||||
|
||||
c := &client{ |
||||
logger: logger.New("host", cfg.URL.Host), |
||||
cfg: cfg, |
||||
entries: make(chan Entry), |
||||
metrics: newMetrics(reg), |
||||
|
||||
ctx: ctx, |
||||
cancel: cancel, |
||||
} |
||||
|
||||
err := cfg.Client.Validate() |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
c.client, err = config.NewClientFromConfig(cfg.Client, "promtail", config.WithHTTP2Disabled()) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
c.client.Timeout = cfg.Timeout |
||||
|
||||
// Initialize counters to 0 so the metrics are exported before the first
|
||||
// occurrence of incrementing to avoid missing metrics.
|
||||
for _, counter := range c.metrics.countersWithHost { |
||||
counter.WithLabelValues(c.cfg.URL.Host).Add(0) |
||||
} |
||||
|
||||
c.wg.Add(1) |
||||
go c.run() |
||||
return c, nil |
||||
} |
||||
|
||||
// NewWithTripperware creates a new Loki client with a custom tripperware.
|
||||
func NewWithTripperware(reg prometheus.Registerer, cfg Config, logger log.Logger, tp Tripperware) (Client, error) { |
||||
c, err := newClient(reg, cfg, logger) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
if tp != nil { |
||||
c.client.Transport = tp(c.client.Transport) |
||||
} |
||||
|
||||
return c, nil |
||||
} |
||||
|
||||
func (c *client) run() { |
||||
batches := map[string]*batch{} |
||||
|
||||
// Given the client handles multiple batches (1 per tenant) and each batch
|
||||
// can be created at a different point in time, we look for batches whose
|
||||
// max wait time has been reached every 10 times per BatchWait, so that the
|
||||
// maximum delay we have sending batches is 10% of the max waiting time.
|
||||
// We apply a cap of 10ms to the ticker, to avoid too frequent checks in
|
||||
// case the BatchWait is very low.
|
||||
minWaitCheckFrequency := 10 * time.Millisecond |
||||
maxWaitCheckFrequency := c.cfg.BatchWait / 10 |
||||
if maxWaitCheckFrequency < minWaitCheckFrequency { |
||||
maxWaitCheckFrequency = minWaitCheckFrequency |
||||
} |
||||
|
||||
maxWaitCheck := time.NewTicker(maxWaitCheckFrequency) |
||||
|
||||
defer func() { |
||||
maxWaitCheck.Stop() |
||||
// Send all pending batches
|
||||
for tenantID, batch := range batches { |
||||
c.sendBatch(tenantID, batch) |
||||
} |
||||
|
||||
c.wg.Done() |
||||
}() |
||||
|
||||
for { |
||||
select { |
||||
case e, ok := <-c.entries: |
||||
if !ok { |
||||
return |
||||
} |
||||
tenantID := "" |
||||
batch, ok := batches[tenantID] |
||||
|
||||
// If the batch doesn't exist yet, we create a new one with the entry
|
||||
if !ok { |
||||
batches[tenantID] = newBatch(e) |
||||
break |
||||
} |
||||
|
||||
// If adding the entry to the batch will increase the size over the max
|
||||
// size allowed, we do send the current batch and then create a new one
|
||||
if batch.sizeBytesAfter(e) > c.cfg.BatchSize { |
||||
c.sendBatch(tenantID, batch) |
||||
|
||||
batches[tenantID] = newBatch(e) |
||||
break |
||||
} |
||||
|
||||
// The max size of the batch isn't reached, so we can add the entry
|
||||
batch.add(e) |
||||
|
||||
case <-maxWaitCheck.C: |
||||
// Send all batches whose max wait time has been reached
|
||||
for tenantID, batch := range batches { |
||||
if batch.age() < c.cfg.BatchWait { |
||||
continue |
||||
} |
||||
|
||||
c.sendBatch(tenantID, batch) |
||||
delete(batches, tenantID) |
||||
} |
||||
} |
||||
} |
||||
} |
||||
|
||||
func (c *client) Chan() chan<- Entry { |
||||
return c.entries |
||||
} |
||||
|
||||
func (c *client) sendBatch(tenantID string, batch *batch) { |
||||
buf, entriesCount, err := batch.encode() |
||||
if err != nil { |
||||
c.logger.Error("error encoding batch", "error", err) |
||||
return |
||||
} |
||||
bufBytes := float64(len(buf)) |
||||
c.metrics.encodedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) |
||||
|
||||
backoff := backoff.New(c.ctx, c.cfg.BackoffConfig) |
||||
var status int |
||||
for { |
||||
start := time.Now() |
||||
// send uses `timeout` internally, so `context.Background` is good enough.
|
||||
status, err = c.send(context.Background(), tenantID, buf) |
||||
|
||||
c.metrics.requestDuration.WithLabelValues(strconv.Itoa(status), c.cfg.URL.Host).Observe(time.Since(start).Seconds()) |
||||
|
||||
// Only retry 429s, 500s and connection-level errors.
|
||||
if status > 0 && status != 429 && status/100 != 5 { |
||||
break |
||||
} |
||||
|
||||
c.logger.Warn("error sending batch, will retry", "status", status, "error", err) |
||||
c.metrics.batchRetries.WithLabelValues(c.cfg.URL.Host).Inc() |
||||
backoff.Wait() |
||||
|
||||
// Make sure it sends at least once before checking for retry.
|
||||
if !backoff.Ongoing() { |
||||
break |
||||
} |
||||
} |
||||
|
||||
if err != nil { |
||||
c.logger.Error("final error sendnig batch", "status", status, "error") |
||||
c.metrics.droppedBytes.WithLabelValues(c.cfg.URL.Host).Add(bufBytes) |
||||
c.metrics.droppedEntries.WithLabelValues(c.cfg.URL.Host).Add(float64(entriesCount)) |
||||
} |
||||
} |
||||
|
||||
func (c *client) send(ctx context.Context, tenantID string, buf []byte) (int, error) { |
||||
ctx, cancel := context.WithTimeout(ctx, c.cfg.Timeout) |
||||
defer cancel() |
||||
req, err := http.NewRequest("POST", c.cfg.URL.String(), bytes.NewReader(buf)) |
||||
if err != nil { |
||||
return -1, err |
||||
} |
||||
req = req.WithContext(ctx) |
||||
req.Header.Set("Content-Type", contentType) |
||||
req.Header.Set("User-Agent", UserAgent) |
||||
|
||||
// If the tenant ID is not empty promtail is running in multi-tenant mode, so
|
||||
// we should send it to Loki
|
||||
if tenantID != "" { |
||||
req.Header.Set("X-Scope-OrgID", tenantID) |
||||
} |
||||
|
||||
resp, err := c.client.Do(req) |
||||
if err != nil { |
||||
return -1, err |
||||
} |
||||
defer func() { |
||||
err := resp.Body.Close() |
||||
if err != nil { |
||||
c.logger.Error("closing response body", "error", err) |
||||
} |
||||
}() |
||||
|
||||
if resp.StatusCode/100 != 2 { |
||||
scanner := bufio.NewScanner(io.LimitReader(resp.Body, maxErrMsgLen)) |
||||
line := "" |
||||
if scanner.Scan() { |
||||
line = scanner.Text() |
||||
} |
||||
err = fmt.Errorf("server returned HTTP status %s (%d): %s", resp.Status, resp.StatusCode, line) |
||||
} |
||||
return resp.StatusCode, err |
||||
} |
||||
|
||||
// Stop the client.
|
||||
func (c *client) Stop() { |
||||
c.once.Do(func() { close(c.entries) }) |
||||
c.wg.Wait() |
||||
} |
||||
|
||||
// StopNow stops the client without retries
|
||||
func (c *client) StopNow() { |
||||
// cancel will stop retrying http requests.
|
||||
c.cancel() |
||||
c.Stop() |
||||
} |
||||
@ -0,0 +1,21 @@ |
||||
package lokihttp |
||||
|
||||
import ( |
||||
"time" |
||||
|
||||
"github.com/grafana/dskit/backoff" |
||||
"github.com/grafana/dskit/flagext" |
||||
"github.com/prometheus/common/config" |
||||
) |
||||
|
||||
// Config describes configuration for a HTTP pusher client.
|
||||
type Config struct { |
||||
URL flagext.URLValue |
||||
BatchWait time.Duration |
||||
BatchSize int |
||||
|
||||
Client config.HTTPClientConfig |
||||
|
||||
BackoffConfig backoff.Config |
||||
Timeout time.Duration |
||||
} |
||||
@ -0,0 +1,39 @@ |
||||
package lokihttp |
||||
|
||||
import ( |
||||
"sync" |
||||
|
||||
"github.com/prometheus/common/model" |
||||
) |
||||
|
||||
type Fake struct { |
||||
Labels model.LabelSet |
||||
Entry string |
||||
|
||||
entries chan Entry |
||||
wg *sync.WaitGroup |
||||
} |
||||
|
||||
func NewFake() *Fake { |
||||
c := &Fake{ |
||||
entries: make(chan Entry, 1), |
||||
wg: &sync.WaitGroup{}, |
||||
} |
||||
|
||||
c.wg.Add(1) |
||||
|
||||
go func() { |
||||
entry := <-c.entries |
||||
c.Labels = entry.Labels |
||||
c.Entry = entry.Line |
||||
c.wg.Done() |
||||
}() |
||||
|
||||
return c |
||||
} |
||||
|
||||
func (c *Fake) Stop() { c.wg.Wait() } |
||||
|
||||
func (c *Fake) Chan() chan<- Entry { return c.entries } |
||||
|
||||
func (c *Fake) StopNow() {} |
||||
@ -0,0 +1,13 @@ |
||||
package lokihttp |
||||
|
||||
import ( |
||||
"github.com/prometheus/common/model" |
||||
|
||||
"github.com/grafana/grafana/pkg/components/loki/logproto" |
||||
) |
||||
|
||||
// Entry is a log entry with labels.
|
||||
type Entry struct { |
||||
Labels model.LabelSet |
||||
logproto.Entry |
||||
} |
||||
Loading…
Reference in new issue