Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/logcli/client/file.go

294 lines
6.8 KiB

package client
import (
"context"
"errors"
"fmt"
"io"
"sort"
"strings"
"time"
"github.com/gorilla/websocket"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
logqllog "github.com/grafana/loki/pkg/logql/log"
"github.com/grafana/loki/pkg/util/log"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/util/validation"
"github.com/prometheus/prometheus/model/labels"
"github.com/weaveworks/common/user"
)
const (
defaultLabelKey = "source"
defaultLabelValue = "logcli"
defaultOrgID = "logcli"
defaultMetricSeriesLimit = 1024
defaultMaxFileSize = 20 * (1 << 20) // 20MB
)
var ErrNotSupported = errors.New("not supported")
// FileClient is a type of LogCLI client that do LogQL on log lines from
// the given file directly, instead get log lines from Loki servers.
type FileClient struct {
r io.ReadCloser
labels []string
labelValues []string
orgID string
engine *logql.Engine
}
// NewFileClient returns the new instance of FileClient for the given `io.ReadCloser`
func NewFileClient(r io.ReadCloser) *FileClient {
lbs := []labels.Label{
{
Name: defaultLabelKey,
Value: defaultLabelValue,
},
}
eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r, labels: lbs}, &limiter{n: defaultMetricSeriesLimit}, log.Logger)
return &FileClient{
r: r,
orgID: defaultOrgID,
engine: eng,
labels: []string{defaultLabelKey},
labelValues: []string{defaultLabelValue},
}
}
func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, f.orgID)
params := logql.NewLiteralParams(
q,
t, t,
0,
0,
direction,
uint32(limit),
nil,
)
query := f.engine.Query(params)
result, err := query.Exec(ctx)
if err != nil {
return nil, fmt.Errorf("failed to exec query: %w", err)
}
value, err := marshal.NewResultValue(result.Data)
if err != nil {
return nil, fmt.Errorf("failed to marshal result data: %w", err)
}
return &loghttp.QueryResponse{
Status: "success",
Data: loghttp.QueryResponseData{
ResultType: value.Type(),
Result: value,
Statistics: result.Statistics,
},
}, nil
}
func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, f.orgID)
params := logql.NewLiteralParams(
queryStr,
start,
end,
step,
interval,
direction,
uint32(limit),
nil,
)
query := f.engine.Query(params)
result, err := query.Exec(ctx)
if err != nil {
return nil, err
}
value, err := marshal.NewResultValue(result.Data)
if err != nil {
return nil, err
}
return &loghttp.QueryResponse{
Status: "success",
Data: loghttp.QueryResponseData{
ResultType: value.Type(),
Result: value,
Statistics: result.Statistics,
},
}, nil
}
func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) {
return &loghttp.LabelResponse{
Status: loghttp.QueryStatusSuccess,
Data: f.labels,
}, nil
}
func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) {
i := sort.SearchStrings(f.labels, name)
if i < 0 {
return &loghttp.LabelResponse{}, nil
}
return &loghttp.LabelResponse{
Status: loghttp.QueryStatusSuccess,
Data: []string{f.labelValues[i]},
}, nil
}
func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
m := len(f.labels)
if m > len(f.labelValues) {
m = len(f.labelValues)
}
lbs := make(loghttp.LabelSet)
for i := 0; i < m; i++ {
lbs[f.labels[i]] = f.labelValues[i]
}
return &loghttp.SeriesResponse{
Status: loghttp.QueryStatusSuccess,
Data: []loghttp.LabelSet{lbs},
}, nil
}
func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) {
return nil, fmt.Errorf("LiveTailQuery: %w", ErrNotSupported)
}
func (f *FileClient) GetOrgID() string {
return f.orgID
}
type limiter struct {
n int
}
func (l *limiter) MaxQuerySeries(ctx context.Context, userID string) int {
return l.n
}
func (l *limiter) QueryTimeout(ctx context.Context, userID string) time.Duration {
return time.Minute * 5
}
func (l *limiter) BlockedQueries(ctx context.Context, userID string) []*validation.BlockedQuery {
return []*validation.BlockedQuery{}
}
func (l *limiter) RequiredLabels(ctx context.Context, userID string) []string {
return nil
}
type querier struct {
r io.Reader
labels labels.Labels
}
func (q *querier) SelectLogs(_ context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) {
expr, err := params.LogSelector()
if err != nil {
return nil, fmt.Errorf("failed to extract selector for logs: %w", err)
}
pipeline, err := expr.Pipeline()
if err != nil {
return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err)
}
return newFileIterator(q.r, params, pipeline.ForStream(q.labels))
}
func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) {
return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported)
}
func newFileIterator(
r io.Reader,
params logql.SelectLogParams,
pipeline logqllog.StreamPipeline,
) (iter.EntryIterator, error) {
lr := io.LimitReader(r, defaultMaxFileSize)
b, err := io.ReadAll(lr)
if err != nil {
return nil, err
}
lines := strings.FieldsFunc(string(b), func(r rune) bool {
return r == '\n'
})
if len(lines) == 0 {
return iter.NoopIterator, nil
}
streams := map[uint64]*logproto.Stream{}
processLine := func(line string) {
ts := time.Now()
parsedLine, parsedLabels, matches := pipeline.ProcessString(ts.UnixNano(), line)
if !matches {
return
}
var stream *logproto.Stream
lhash := parsedLabels.Hash()
var ok bool
if stream, ok = streams[lhash]; !ok {
stream = &logproto.Stream{
Labels: parsedLabels.String(),
}
streams[lhash] = stream
}
stream.Entries = append(stream.Entries, logproto.Entry{
Timestamp: ts,
Line: parsedLine,
})
}
if params.Direction == logproto.FORWARD {
for _, line := range lines {
processLine(line)
}
} else {
for i := len(lines) - 1; i >= 0; i-- {
processLine(lines[i])
}
}
if len(streams) == 0 {
return iter.NoopIterator, nil
}
streamResult := make([]logproto.Stream, 0, len(streams))
for _, stream := range streams {
streamResult = append(streamResult, *stream)
}
return iter.NewStreamsIterator(
streamResult,
params.Direction,
), nil
}