Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
loki/pkg/logproto/compat.go

268 lines
7.7 KiB

package logproto
import (
stdjson "encoding/json"
"fmt"
"math"
"sort"
"strconv"
"strings"
"time"
"unsafe"
jsoniter "github.com/json-iterator/go"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/grafana/loki/pkg/util"
)
// ToWriteRequest converts matched slices of Labels, Samples and Metadata into a WriteRequest proto.
// It gets timeseries from the pool, so ReuseSlice() should be called when done.
func ToWriteRequest(lbls []labels.Labels, samples []LegacySample, metadata []*MetricMetadata, source WriteRequest_SourceEnum) *WriteRequest {
req := &WriteRequest{
Timeseries: PreallocTimeseriesSliceFromPool(),
Metadata: metadata,
Source: source,
}
for i, s := range samples {
ts := TimeseriesFromPool()
ts.Labels = append(ts.Labels, FromLabelsToLabelAdapters(lbls[i])...)
ts.Samples = append(ts.Samples, s)
req.Timeseries = append(req.Timeseries, PreallocTimeseries{TimeSeries: ts})
}
return req
}
// FromLabelAdaptersToLabels casts []LabelAdapter to labels.Labels.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
//
// Note: while resulting labels.Labels is supposedly sorted, this function
// doesn't enforce that. If input is not sorted, output will be wrong.
func FromLabelAdaptersToLabels(ls []LabelAdapter) labels.Labels {
return *(*labels.Labels)(unsafe.Pointer(&ls))
}
// FromLabelAdaptersToLabelsWithCopy converts []LabelAdapter to labels.Labels.
// Do NOT use unsafe to convert between data types because this function may
// get in input labels whose data structure is reused.
func FromLabelAdaptersToLabelsWithCopy(input []LabelAdapter) labels.Labels {
return CopyLabels(FromLabelAdaptersToLabels(input))
}
// Efficiently copies labels input slice. To be used in cases where input slice
// can be reused, but long-term copy is needed.
func CopyLabels(input []labels.Label) labels.Labels {
result := make(labels.Labels, len(input))
size := 0
for _, l := range input {
size += len(l.Name)
size += len(l.Value)
}
// Copy all strings into the buffer, and use 'yoloString' to convert buffer
// slices to strings.
buf := make([]byte, size)
for i, l := range input {
result[i].Name, buf = copyStringToBuffer(l.Name, buf)
result[i].Value, buf = copyStringToBuffer(l.Value, buf)
}
return result
}
// Copies string to buffer (which must be big enough), and converts buffer slice containing
// the string copy into new string.
func copyStringToBuffer(in string, buf []byte) (string, []byte) {
l := len(in)
c := copy(buf, in)
if c != l {
panic("not copied full string")
}
return yoloString(buf[0:l]), buf[l:]
}
// FromLabelsToLabelAdapters casts labels.Labels to []LabelAdapter.
// It uses unsafe, but as LabelAdapter == labels.Label this should be safe.
// This allows us to use labels.Labels directly in protos.
func FromLabelsToLabelAdapters(ls labels.Labels) []LabelAdapter {
return *(*[]LabelAdapter)(unsafe.Pointer(&ls))
}
// FromLabelAdaptersToMetric converts []LabelAdapter to a model.Metric.
// Don't do this on any performance sensitive paths.
func FromLabelAdaptersToMetric(ls []LabelAdapter) model.Metric {
return util.LabelsToMetric(FromLabelAdaptersToLabels(ls))
}
// FromMetricsToLabelAdapters converts model.Metric to []LabelAdapter.
// Don't do this on any performance sensitive paths.
// The result is sorted.
func FromMetricsToLabelAdapters(metric model.Metric) []LabelAdapter {
result := make([]LabelAdapter, 0, len(metric))
for k, v := range metric {
result = append(result, LabelAdapter{
Name: string(k),
Value: string(v),
})
}
sort.Sort(byLabel(result)) // The labels should be sorted upon initialisation.
return result
}
type byLabel []LabelAdapter
func (s byLabel) Len() int { return len(s) }
func (s byLabel) Less(i, j int) bool { return strings.Compare(s[i].Name, s[j].Name) < 0 }
func (s byLabel) Swap(i, j int) { s[i], s[j] = s[j], s[i] }
// isTesting is only set from tests to get special behaviour to verify that custom sample encode and decode is used,
// both when using jsonitor or standard json package.
var isTesting = false
// MarshalJSON implements json.Marshaler.
func (s LegacySample) MarshalJSON() ([]byte, error) {
if isTesting && math.IsNaN(s.Value) {
return nil, fmt.Errorf("test sample")
}
t, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(model.Time(s.TimestampMs))
if err != nil {
return nil, err
}
v, err := jsoniter.ConfigCompatibleWithStandardLibrary.Marshal(model.SampleValue(s.Value))
if err != nil {
return nil, err
}
return []byte(fmt.Sprintf("[%s,%s]", t, v)), nil
}
// UnmarshalJSON implements json.Unmarshaler.
func (s *LegacySample) UnmarshalJSON(b []byte) error {
var t model.Time
var v model.SampleValue
vs := [...]stdjson.Unmarshaler{&t, &v}
if err := jsoniter.ConfigCompatibleWithStandardLibrary.Unmarshal(b, &vs); err != nil {
return err
}
s.TimestampMs = int64(t)
s.Value = float64(v)
if isTesting && math.IsNaN(float64(v)) {
return fmt.Errorf("test sample")
}
return nil
}
func SampleJsoniterEncode(ptr unsafe.Pointer, stream *jsoniter.Stream) {
legacySample := (*LegacySample)(ptr)
if isTesting && math.IsNaN(legacySample.Value) {
stream.Error = fmt.Errorf("test sample")
return
}
stream.WriteArrayStart()
stream.WriteFloat64(float64(legacySample.TimestampMs) / float64(time.Second/time.Millisecond))
stream.WriteMore()
stream.WriteString(model.SampleValue(legacySample.Value).String())
stream.WriteArrayEnd()
}
func SampleJsoniterDecode(ptr unsafe.Pointer, iter *jsoniter.Iterator) {
if !iter.ReadArray() {
iter.ReportError("logproto.LegacySample", "expected [")
return
}
t := model.Time(iter.ReadFloat64() * float64(time.Second/time.Millisecond))
if !iter.ReadArray() {
iter.ReportError("logproto.LegacySample", "expected ,")
return
}
bs := iter.ReadStringAsSlice()
ss := *(*string)(unsafe.Pointer(&bs))
v, err := strconv.ParseFloat(ss, 64)
if err != nil {
iter.ReportError("logproto.LegacySample", err.Error())
return
}
if isTesting && math.IsNaN(v) {
iter.Error = fmt.Errorf("test sample")
return
}
if iter.ReadArray() {
iter.ReportError("logproto.LegacySample", "expected ]")
}
*(*LegacySample)(ptr) = LegacySample{
TimestampMs: int64(t),
Value: v,
}
}
func init() {
jsoniter.RegisterTypeEncoderFunc("logproto.LegacySample", SampleJsoniterEncode, func(unsafe.Pointer) bool { return false })
jsoniter.RegisterTypeDecoderFunc("logproto.LegacySample", SampleJsoniterDecode)
}
// Combine unique values from multiple LabelResponses into a single, sorted LabelResponse.
func MergeLabelResponses(responses []*LabelResponse) (*LabelResponse, error) {
if len(responses) == 0 {
return &LabelResponse{}, nil
} else if len(responses) == 1 {
return responses[0], nil
}
unique := map[string]struct{}{}
for _, r := range responses {
for _, v := range r.Values {
if _, ok := unique[v]; !ok {
unique[v] = struct{}{}
} else {
continue
}
}
}
result := &LabelResponse{Values: make([]string, 0, len(unique))}
for value := range unique {
result.Values = append(result.Values, value)
}
// Sort the unique values before returning because we can't rely on map key ordering
sort.Strings(result.Values)
return result, nil
}
// Combine unique label sets from multiple SeriesResponse and return a single SeriesResponse.
func MergeSeriesResponses(responses []*SeriesResponse) (*SeriesResponse, error) {
if len(responses) == 0 {
return &SeriesResponse{}, nil
} else if len(responses) == 1 {
return responses[0], nil
}
result := &SeriesResponse{
Series: make([]SeriesIdentifier, 0, len(responses)),
}
for _, r := range responses {
result.Series = append(result.Series, r.Series...)
}
return result, nil
}