mirror of https://github.com/grafana/loki
Logql `stdin` support only log queries (#4606)
* Hack stdin client into LogCLI Add flag to choose what kind of client to make request to. Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Label filter working Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Demo checkpoint1 Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Metrics query working. Issue was with default value of `step` (1 nanosecond). StepEvaluator try to go through every nanosecond to apply aggregate on sample data during the metric queries. Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Basic tests for fileClient * Tests for logqueries direction * Use HeapIterator for Entries. Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Remove some debug statements Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Remove support for metric queries. Stick with only log queries Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Small rough usage doc * Remove filesampleiterator * Fix some typos and tests * Fix breaking test cases Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * Make linter happy Signed-off-by: Kaviraj <kavirajkanagaraj@gmail.com> * PR remarks. 1. Add `--stdin` in examples of the usage 2. Add `--stdin` in all the command help output * PR remarks - Use parsed labels correctly - Fix indendation with --stdin flag * Fix issue with direction * Fix linter * MaxInt64 -> MaxInt (to support even arm32 images) * Add note on calculating `step` value on the client sidepull/4784/head
parent
35ebe967ea
commit
94113ee080
@ -0,0 +1,284 @@ |
||||
package client |
||||
|
||||
import ( |
||||
"context" |
||||
"errors" |
||||
"fmt" |
||||
"io" |
||||
"io/ioutil" |
||||
"sort" |
||||
"strings" |
||||
"time" |
||||
|
||||
"github.com/gorilla/websocket" |
||||
|
||||
"github.com/grafana/loki/pkg/iter" |
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
"github.com/grafana/loki/pkg/logql" |
||||
logqllog "github.com/grafana/loki/pkg/logql/log" |
||||
"github.com/grafana/loki/pkg/util/marshal" |
||||
|
||||
"github.com/prometheus/prometheus/pkg/labels" |
||||
"github.com/weaveworks/common/user" |
||||
) |
||||
|
||||
const ( |
||||
defaultLabelKey = "source" |
||||
defaultLabelValue = "logcli" |
||||
defaultOrgID = "logcli" |
||||
defaultMetricSeriesLimit = 1024 |
||||
defaultMaxFileSize = 20 * (1 << 20) // 20MB
|
||||
) |
||||
|
||||
var ( |
||||
ErrNotSupported = errors.New("not supported") |
||||
) |
||||
|
||||
// FileClient is a type of LogCLI client that do LogQL on log lines from
|
||||
// the given file directly, instead get log lines from Loki servers.
|
||||
type FileClient struct { |
||||
r io.ReadCloser |
||||
labels []string |
||||
labelValues []string |
||||
orgID string |
||||
engine *logql.Engine |
||||
} |
||||
|
||||
// NewFileClient returns the new instance of FileClient for the given `io.ReadCloser`
|
||||
func NewFileClient(r io.ReadCloser) *FileClient { |
||||
lbs := []labels.Label{ |
||||
{ |
||||
Name: defaultLabelKey, |
||||
Value: defaultLabelValue, |
||||
}, |
||||
} |
||||
|
||||
eng := logql.NewEngine(logql.EngineOpts{}, &querier{r: r, labels: lbs}, &limiter{n: defaultMetricSeriesLimit}) |
||||
return &FileClient{ |
||||
r: r, |
||||
orgID: defaultOrgID, |
||||
engine: eng, |
||||
labels: []string{defaultLabelKey}, |
||||
labelValues: []string{defaultLabelValue}, |
||||
} |
||||
|
||||
} |
||||
|
||||
func (f *FileClient) Query(q string, limit int, t time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) { |
||||
ctx := context.Background() |
||||
|
||||
ctx = user.InjectOrgID(ctx, f.orgID) |
||||
|
||||
params := logql.NewLiteralParams( |
||||
q, |
||||
t, t, |
||||
0, |
||||
0, |
||||
direction, |
||||
uint32(limit), |
||||
nil, |
||||
) |
||||
|
||||
query := f.engine.Query(params) |
||||
|
||||
result, err := query.Exec(ctx) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to exec query: %w", err) |
||||
} |
||||
|
||||
value, err := marshal.NewResultValue(result.Data) |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to marshal result data: %w", err) |
||||
} |
||||
|
||||
return &loghttp.QueryResponse{ |
||||
Status: "success", |
||||
Data: loghttp.QueryResponseData{ |
||||
ResultType: value.Type(), |
||||
Result: value, |
||||
Statistics: result.Statistics, |
||||
}, |
||||
}, nil |
||||
} |
||||
|
||||
func (f *FileClient) QueryRange(queryStr string, limit int, start, end time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) { |
||||
ctx := context.Background() |
||||
|
||||
ctx = user.InjectOrgID(ctx, f.orgID) |
||||
|
||||
params := logql.NewLiteralParams( |
||||
queryStr, |
||||
start, |
||||
end, |
||||
step, |
||||
interval, |
||||
direction, |
||||
uint32(limit), |
||||
nil, |
||||
) |
||||
|
||||
query := f.engine.Query(params) |
||||
|
||||
result, err := query.Exec(ctx) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
value, err := marshal.NewResultValue(result.Data) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
|
||||
return &loghttp.QueryResponse{ |
||||
Status: "success", |
||||
Data: loghttp.QueryResponseData{ |
||||
ResultType: value.Type(), |
||||
Result: value, |
||||
Statistics: result.Statistics, |
||||
}, |
||||
}, nil |
||||
} |
||||
|
||||
func (f *FileClient) ListLabelNames(quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { |
||||
return &loghttp.LabelResponse{ |
||||
Status: loghttp.QueryStatusSuccess, |
||||
Data: f.labels, |
||||
}, nil |
||||
} |
||||
|
||||
func (f *FileClient) ListLabelValues(name string, quiet bool, start, end time.Time) (*loghttp.LabelResponse, error) { |
||||
i := sort.SearchStrings(f.labels, name) |
||||
if i < 0 { |
||||
return &loghttp.LabelResponse{}, nil |
||||
} |
||||
|
||||
return &loghttp.LabelResponse{ |
||||
Status: loghttp.QueryStatusSuccess, |
||||
Data: []string{f.labelValues[i]}, |
||||
}, nil |
||||
} |
||||
|
||||
func (f *FileClient) Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) { |
||||
m := len(f.labels) |
||||
if m > len(f.labelValues) { |
||||
m = len(f.labelValues) |
||||
} |
||||
|
||||
lbs := make(loghttp.LabelSet) |
||||
for i := 0; i < m; i++ { |
||||
lbs[f.labels[i]] = f.labelValues[i] |
||||
} |
||||
|
||||
return &loghttp.SeriesResponse{ |
||||
Status: loghttp.QueryStatusSuccess, |
||||
Data: []loghttp.LabelSet{lbs}, |
||||
}, nil |
||||
} |
||||
|
||||
func (f *FileClient) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) { |
||||
return nil, fmt.Errorf("LiveTailQuery: %w", ErrNotSupported) |
||||
} |
||||
|
||||
func (f *FileClient) GetOrgID() string { |
||||
return f.orgID |
||||
} |
||||
|
||||
type limiter struct { |
||||
n int |
||||
} |
||||
|
||||
func (l *limiter) MaxQuerySeries(userID string) int { |
||||
return l.n |
||||
} |
||||
|
||||
type querier struct { |
||||
r io.Reader |
||||
labels labels.Labels |
||||
} |
||||
|
||||
func (q *querier) SelectLogs(ctx context.Context, params logql.SelectLogParams) (iter.EntryIterator, error) { |
||||
expr, err := params.LogSelector() |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to extract selector for logs: %w", err) |
||||
} |
||||
pipeline, err := expr.Pipeline() |
||||
if err != nil { |
||||
return nil, fmt.Errorf("failed to extract pipeline for logs: %w", err) |
||||
} |
||||
return newFileIterator(ctx, q.r, params, pipeline.ForStream(q.labels)) |
||||
} |
||||
|
||||
func (q *querier) SelectSamples(ctx context.Context, params logql.SelectSampleParams) (iter.SampleIterator, error) { |
||||
return nil, fmt.Errorf("Metrics Query: %w", ErrNotSupported) |
||||
} |
||||
|
||||
func newFileIterator( |
||||
ctx context.Context, |
||||
r io.Reader, |
||||
params logql.SelectLogParams, |
||||
pipeline logqllog.StreamPipeline, |
||||
) (iter.EntryIterator, error) { |
||||
|
||||
lr := io.LimitReader(r, defaultMaxFileSize) |
||||
b, err := ioutil.ReadAll(lr) |
||||
if err != nil { |
||||
return nil, err |
||||
} |
||||
lines := strings.FieldsFunc(string(b), func(r rune) bool { |
||||
return r == '\n' |
||||
}) |
||||
|
||||
if len(lines) == 0 { |
||||
return iter.NoopIterator, nil |
||||
} |
||||
|
||||
streams := map[uint64]*logproto.Stream{} |
||||
|
||||
processLine := func(line string) { |
||||
parsedLine, parsedLabels, ok := pipeline.ProcessString(line) |
||||
if !ok { |
||||
return |
||||
} |
||||
|
||||
var stream *logproto.Stream |
||||
lhash := parsedLabels.Hash() |
||||
if stream, ok = streams[lhash]; !ok { |
||||
stream = &logproto.Stream{ |
||||
Labels: parsedLabels.String(), |
||||
} |
||||
streams[lhash] = stream |
||||
} |
||||
|
||||
stream.Entries = append(stream.Entries, logproto.Entry{ |
||||
Timestamp: time.Now(), |
||||
Line: parsedLine, |
||||
}) |
||||
} |
||||
|
||||
if params.Direction == logproto.FORWARD { |
||||
for _, line := range lines { |
||||
processLine(line) |
||||
} |
||||
} else { |
||||
for i := len(lines) - 1; i >= 0; i-- { |
||||
processLine(lines[i]) |
||||
} |
||||
} |
||||
|
||||
if len(streams) == 0 { |
||||
return iter.NoopIterator, nil |
||||
} |
||||
|
||||
streamResult := make([]logproto.Stream, 0, len(streams)) |
||||
|
||||
for _, stream := range streams { |
||||
streamResult = append(streamResult, *stream) |
||||
} |
||||
|
||||
return iter.NewStreamsIterator( |
||||
ctx, |
||||
streamResult, |
||||
params.Direction, |
||||
), nil |
||||
} |
||||
@ -0,0 +1,223 @@ |
||||
package client |
||||
|
||||
import ( |
||||
"bytes" |
||||
"errors" |
||||
"io" |
||||
"sort" |
||||
"strings" |
||||
"testing" |
||||
"time" |
||||
|
||||
"github.com/grafana/loki/pkg/loghttp" |
||||
"github.com/grafana/loki/pkg/logproto" |
||||
|
||||
"github.com/stretchr/testify/assert" |
||||
"github.com/stretchr/testify/require" |
||||
) |
||||
|
||||
func TestFileClient_QueryRangeLogQueries(t *testing.T) { |
||||
input := []string{ |
||||
`level=info event="loki started" caller=main.go ts=1625995076`, |
||||
`level=info event="runtime loader started" caller=main.go ts=1625995077`, |
||||
`level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, |
||||
`level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, |
||||
`level=info event="loki ready" caller=main.go ts=1625996095`, |
||||
} |
||||
|
||||
reversed := make([]string, len(input)) |
||||
copy(reversed, input) |
||||
sort.Slice(reversed, func(i, j int) bool { |
||||
return i > j |
||||
}) |
||||
|
||||
now := time.Now() |
||||
|
||||
cases := []struct { |
||||
name string |
||||
limit int |
||||
start, end time.Time |
||||
direction logproto.Direction |
||||
step, interval time.Duration |
||||
expectedStatus loghttp.QueryStatus |
||||
expected []string |
||||
}{ |
||||
{ |
||||
name: "return-all-logs-backward", |
||||
limit: 10, // more than input
|
||||
start: now.Add(-1 * time.Hour), |
||||
end: now, |
||||
direction: logproto.BACKWARD, |
||||
step: 0, // let client decide based on start and end
|
||||
interval: 0, |
||||
expectedStatus: loghttp.QueryStatusSuccess, |
||||
expected: reversed, |
||||
}, |
||||
{ |
||||
name: "return-all-logs-forward", |
||||
limit: 10, // more than input
|
||||
start: now.Add(-1 * time.Hour), |
||||
end: now, |
||||
direction: logproto.FORWARD, |
||||
step: 0, // let the client decide based on start and end
|
||||
interval: 0, |
||||
expectedStatus: loghttp.QueryStatusSuccess, |
||||
expected: input, |
||||
}, |
||||
} |
||||
|
||||
for _, c := range cases { |
||||
t.Run(c.name, func(t *testing.T) { |
||||
client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) |
||||
resp, err := client.QueryRange( |
||||
`{foo="bar"}`, // label matcher doesn't matter.
|
||||
c.limit, |
||||
c.start, |
||||
c.end, |
||||
c.direction, |
||||
c.step, |
||||
c.interval, |
||||
true, |
||||
) |
||||
|
||||
require.NoError(t, err) |
||||
require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) |
||||
assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) |
||||
assertStreams(t, resp.Data.Result, c.expected) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestFileClient_Query(t *testing.T) { |
||||
input := []string{ |
||||
`level=info event="loki started" caller=main.go ts=1625995076`, |
||||
`level=info event="runtime loader started" caller=main.go ts=1625995077`, |
||||
`level=error event="unable to read rules directory" file="/tmp/rules" caller=rules.go ts=1625995090`, |
||||
`level=error event="failed to apply wal" error="/tmp/wal/ corrupted" caller=wal.go ts=1625996090`, |
||||
`level=info event="loki ready" caller=main.go ts=1625996095`, |
||||
} |
||||
|
||||
reversed := make([]string, len(input)) |
||||
copy(reversed, input) |
||||
sort.Slice(reversed, func(i, j int) bool { |
||||
return i > j |
||||
}) |
||||
|
||||
now := time.Now() |
||||
|
||||
cases := []struct { |
||||
name string |
||||
limit int |
||||
ts time.Time |
||||
direction logproto.Direction |
||||
expectedStatus loghttp.QueryStatus |
||||
expected []string |
||||
}{ |
||||
{ |
||||
name: "return-all-logs-backward", |
||||
limit: 10, // more than input
|
||||
ts: now.Add(-1 * time.Hour), |
||||
direction: logproto.BACKWARD, |
||||
expectedStatus: loghttp.QueryStatusSuccess, |
||||
expected: reversed, |
||||
}, |
||||
{ |
||||
name: "return-all-logs-forward", |
||||
limit: 10, // more than input
|
||||
ts: now.Add(-1 * time.Hour), |
||||
direction: logproto.FORWARD, |
||||
expectedStatus: loghttp.QueryStatusSuccess, |
||||
expected: input, |
||||
}, |
||||
} |
||||
|
||||
for _, c := range cases { |
||||
t.Run(c.name, func(t *testing.T) { |
||||
client := NewFileClient(io.NopCloser(strings.NewReader(strings.Join(input, "\n")))) |
||||
resp, err := client.Query( |
||||
`{foo="bar"}`, // label matcher doesn't matter.
|
||||
c.limit, |
||||
c.ts, |
||||
c.direction, |
||||
true, |
||||
) |
||||
|
||||
require.NoError(t, err) |
||||
require.Equal(t, loghttp.QueryStatusSuccess, resp.Status) |
||||
assert.Equal(t, string(resp.Data.ResultType), loghttp.ResultTypeStream) |
||||
assertStreams(t, resp.Data.Result, c.expected) |
||||
}) |
||||
} |
||||
} |
||||
|
||||
func TestFileClient_ListLabelNames(t *testing.T) { |
||||
c := newEmptyClient(t) |
||||
values, err := c.ListLabelNames(true, time.Now(), time.Now()) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, &loghttp.LabelResponse{ |
||||
Data: []string{defaultLabelKey}, |
||||
Status: loghttp.QueryStatusSuccess, |
||||
}, values) |
||||
} |
||||
|
||||
func TestFileClient_ListLabelValues(t *testing.T) { |
||||
c := newEmptyClient(t) |
||||
values, err := c.ListLabelValues(defaultLabelKey, true, time.Now(), time.Now()) |
||||
require.NoError(t, err) |
||||
assert.Equal(t, &loghttp.LabelResponse{ |
||||
Data: []string{defaultLabelValue}, |
||||
Status: loghttp.QueryStatusSuccess, |
||||
}, values) |
||||
|
||||
} |
||||
|
||||
func TestFileClient_Series(t *testing.T) { |
||||
c := newEmptyClient(t) |
||||
got, err := c.Series(nil, time.Now(), time.Now(), true) |
||||
require.NoError(t, err) |
||||
|
||||
exp := &loghttp.SeriesResponse{ |
||||
Data: []loghttp.LabelSet{ |
||||
{defaultLabelKey: defaultLabelValue}, |
||||
}, |
||||
Status: loghttp.QueryStatusSuccess, |
||||
} |
||||
|
||||
assert.Equal(t, exp, got) |
||||
} |
||||
|
||||
func TestFileClient_LiveTail(t *testing.T) { |
||||
c := newEmptyClient(t) |
||||
x, err := c.LiveTailQueryConn("", time.Second, 0, time.Now(), true) |
||||
require.Error(t, err) |
||||
require.Nil(t, x) |
||||
assert.True(t, errors.Is(err, ErrNotSupported)) |
||||
} |
||||
|
||||
func TestFileClient_GetOrgID(t *testing.T) { |
||||
c := newEmptyClient(t) |
||||
assert.Equal(t, defaultOrgID, c.GetOrgID()) |
||||
} |
||||
|
||||
func newEmptyClient(t *testing.T) *FileClient { |
||||
t.Helper() |
||||
return NewFileClient(io.NopCloser(&bytes.Buffer{})) |
||||
} |
||||
|
||||
func assertStreams(t *testing.T, result loghttp.ResultValue, logLines []string) { |
||||
t.Helper() |
||||
|
||||
streams, ok := result.(loghttp.Streams) |
||||
require.True(t, ok, "response type should be `loghttp.Streams`") |
||||
|
||||
require.Len(t, streams, 1, "there should be only one stream for FileClient") |
||||
|
||||
got := streams[0] |
||||
sort.Slice(got.Entries, func(i, j int) bool { |
||||
return got.Entries[i].Timestamp.UnixNano() < got.Entries[j].Timestamp.UnixNano() |
||||
}) |
||||
require.Equal(t, len(got.Entries), len(logLines)) |
||||
for i, entry := range got.Entries { |
||||
assert.Equal(t, entry.Line, logLines[i]) |
||||
} |
||||
} |
||||
Loading…
Reference in new issue