Logcli: automatically batch requests (#2482)

* first stab at batching logcli requests

* fix other calls to printResult

* working batching code

* refactor the logcli Client into an interface so that it can be mocked for testing.

* fixing some bugs

* refactoring the output interface to make testing easier

* starting to add tests

* adding a bunch of tests
fixing up how the MockQuerier works.

* updating docs

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

* Update docs/sources/getting-started/logcli.md

Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>

Co-authored-by: Owen Diehl <ow.diehl@gmail.com>
Co-authored-by: Diana Payton <52059945+oddlittlebird@users.noreply.github.com>
pull/2491/head
Ed Welch 6 years ago committed by GitHub
parent 7418d42141
commit 80693cae6f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 10
      cmd/logcli/main.go
  2. 16
      docs/sources/getting-started/logcli.md
  3. 55
      pkg/logcli/client/client.go
  4. 4
      pkg/logcli/labelquery/labels.go
  5. 13
      pkg/logcli/output/default.go
  6. 29
      pkg/logcli/output/default_test.go
  7. 7
      pkg/logcli/output/jsonl.go
  8. 16
      pkg/logcli/output/jsonl_test.go
  9. 8
      pkg/logcli/output/output.go
  10. 14
      pkg/logcli/output/output_test.go
  11. 14
      pkg/logcli/output/raw.go
  12. 16
      pkg/logcli/output/raw_test.go
  13. 138
      pkg/logcli/query/query.go
  14. 409
      pkg/logcli/query/query_test.go
  15. 5
      pkg/logcli/query/tail.go
  16. 4
      pkg/logcli/seriesquery/series.go
  17. 22
      pkg/logql/test_utils.go

@ -123,7 +123,7 @@ func main() {
ColoredOutput: rangeQuery.ColoredOutput,
}
out, err := output.NewLogOutput(*outputMode, outputOptions)
out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions)
if err != nil {
log.Fatalf("Unable to create log output: %s", err)
}
@ -145,7 +145,7 @@ func main() {
ColoredOutput: instantQuery.ColoredOutput,
}
out, err := output.NewLogOutput(*outputMode, outputOptions)
out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions)
if err != nil {
log.Fatalf("Unable to create log output: %s", err)
}
@ -158,8 +158,9 @@ func main() {
}
}
func newQueryClient(app *kingpin.Application) *client.Client {
client := &client.Client{
func newQueryClient(app *kingpin.Application) client.Client {
client := &client.DefaultClient{
TLSConfig: config.TLSConfig{},
}
@ -273,6 +274,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
cmd.Flag("step", "Query resolution step width, for metric queries. Evaluate the query at the specified step over the time range.").DurationVar(&q.Step)
cmd.Flag("interval", "Query interval, for log queries. Return entries at the specified interval, ignoring those between. **This parameter is experimental, please see Issue 1779**").DurationVar(&q.Interval)
cmd.Flag("batch", "Query batch size to use until 'limit' is reached").Default("1000").IntVar(&q.BatchSize)
}
cmd.Flag("forward", "Scan forwards through logs.").Default("false").BoolVar(&q.Forward)

@ -62,6 +62,21 @@ $ logcli series -q --match='{namespace="loki",container_name="loki"}'
{app="loki", container_name="loki", controller_revision_hash="loki-57c9df47f4", filename="/var/log/pods/loki_loki-0_8ed03ded-bacb-4b13-a6fe-53a445a15887/loki/0.log", instance="loki-0", job="loki/loki", name="loki", namespace="loki", release="loki", statefulset_kubernetes_io_pod_name="loki-0", stream="stderr"}
```
#### Batched Queries
Starting with Loki 1.6.0, `logcli` batches log queries to Loki.
If you set a `--limit` on a query (default is 30) to a large number, say `--limit=10000`, then logcli automatically
sends this request to Loki in batches.
The default batch size is `1000`.
Loki has a server-side limit for the maximum lines returned in a query (default is 5000).
Batching allows you to make larger requests than the server-side limit as long as the `--batch` size is less than the server limit.
Please note that the query metadata is printed for each batch on `stderr`. Set the `--quiet` flag to stop this behavior.
### Configuration
Configuration values are considered in the following order (lowest to highest):
@ -188,6 +203,7 @@ Flags:
range.
--interval=INTERVAL Query interval, for log queries. Return entries at the specified interval, ignoring those between.
**This parameter is experimental, please see Issue 1779**.
--batch=1000 Query batch size to use until 'limit' is reached.
--forward Scan forwards through logs.
--no-labels Do not print any labels.
--exclude-label=EXCLUDE-LABEL ...

@ -34,8 +34,19 @@ var (
userAgent = fmt.Sprintf("loki-logcli/%s", build.Version)
)
// Client contains all the methods to query a Loki instance, it's an interface to allow multiple implementations.
type Client interface {
Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error)
QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error)
ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error)
ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error)
Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error)
LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error)
GetOrgID() string
}
// Client contains fields necessary to query a Loki instance
type Client struct {
type DefaultClient struct {
TLSConfig config.TLSConfig
Username string
Password string
@ -46,7 +57,7 @@ type Client struct {
// Query uses the /api/v1/query endpoint to execute an instant query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
func (c *DefaultClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("limit", int64(limit))
@ -59,7 +70,7 @@ func (c *Client) Query(queryStr string, limit int, time time.Time, direction log
// QueryRange uses the /api/v1/query_range endpoint to execute a range query
// excluding interfacer b/c it suggests taking the interface promql.Node instead of logproto.Direction b/c it happens to have a String() method
// nolint:interfacer
func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
func (c *DefaultClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
params := util.NewQueryStringBuilder()
params.SetString("query", queryStr)
params.SetInt32("limit", limit)
@ -81,7 +92,7 @@ func (c *Client) QueryRange(queryStr string, limit int, from, through time.Time,
}
// ListLabelNames uses the /api/v1/label endpoint to list label names
func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
func (c *DefaultClient) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
var labelResponse loghttp.LabelResponse
params := util.NewQueryStringBuilder()
params.SetInt("start", from.UnixNano())
@ -94,7 +105,7 @@ func (c *Client) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.L
}
// ListLabelValues uses the /api/v1/label endpoint to list label values
func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
func (c *DefaultClient) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
path := fmt.Sprintf(labelValuesPath, url.PathEscape(name))
var labelResponse loghttp.LabelResponse
params := util.NewQueryStringBuilder()
@ -106,7 +117,7 @@ func (c *Client) ListLabelValues(name string, quiet bool, from, through time.Tim
return &labelResponse, nil
}
func (c *Client) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
func (c *DefaultClient) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
params := util.NewQueryStringBuilder()
params.SetInt("start", from.UnixNano())
params.SetInt("end", through.UnixNano())
@ -119,7 +130,22 @@ func (c *Client) Series(matchers []string, from, through time.Time, quiet bool)
return &seriesResponse, nil
}
func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) {
// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it
func (c *DefaultClient) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) {
qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("delay_for", int64(delayFor))
qsb.SetInt("limit", int64(limit))
qsb.SetInt("from", from)
return c.wsConnect(tailPath, qsb.Encode(), quiet)
}
func (c *DefaultClient) GetOrgID() string {
return c.OrgID
}
func (c *DefaultClient) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) {
var err error
var r loghttp.QueryResponse
@ -130,7 +156,7 @@ func (c *Client) doQuery(path string, query string, quiet bool) (*loghttp.QueryR
return &r, nil
}
func (c *Client) doRequest(path, query string, quiet bool, out interface{}) error {
func (c *DefaultClient) doRequest(path, query string, quiet bool, out interface{}) error {
us, err := buildURL(c.Address, path, query)
if err != nil {
@ -180,18 +206,7 @@ func (c *Client) doRequest(path, query string, quiet bool, out interface{}) erro
return json.NewDecoder(resp.Body).Decode(out)
}
// LiveTailQueryConn uses /api/prom/tail to set up a websocket connection and returns it
func (c *Client) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) {
qsb := util.NewQueryStringBuilder()
qsb.SetString("query", queryStr)
qsb.SetInt("delay_for", int64(delayFor))
qsb.SetInt("limit", int64(limit))
qsb.SetInt("from", from)
return c.wsConnect(tailPath, qsb.Encode(), quiet)
}
func (c *Client) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) {
func (c *DefaultClient) wsConnect(path, query string, quiet bool) (*websocket.Conn, error) {
us, err := buildURL(c.Address, path, query)
if err != nil {
return nil, err

@ -18,7 +18,7 @@ type LabelQuery struct {
}
// DoLabels prints out label results
func (q *LabelQuery) DoLabels(c *client.Client) {
func (q *LabelQuery) DoLabels(c client.Client) {
values := q.ListLabels(c)
for _, value := range values {
@ -27,7 +27,7 @@ func (q *LabelQuery) DoLabels(c *client.Client) {
}
// ListLabels returns an array of label strings
func (q *LabelQuery) ListLabels(c *client.Client) []string {
func (q *LabelQuery) ListLabels(c client.Client) []string {
var labelResponse *loghttp.LabelResponse
var err error
if len(q.LabelName) > 0 {

@ -2,6 +2,7 @@ package output
import (
"fmt"
"io"
"strings"
"time"
@ -12,24 +13,26 @@ import (
// DefaultOutput provides logs and metadata in human readable format
type DefaultOutput struct {
w io.Writer
options *LogOutputOptions
}
// Format a log entry in a human readable format
func (o *DefaultOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string {
func (o *DefaultOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) {
timestamp := ts.In(o.options.Timezone).Format(time.RFC3339)
line = strings.TrimSpace(line)
if o.options.NoLabels {
return fmt.Sprintf("%s %s", color.BlueString(timestamp), line)
fmt.Fprintf(o.w, "%s %s\n", color.BlueString(timestamp), line)
return
}
if o.options.ColoredOutput {
labelsColor := getColor(lbls.String()).SprintFunc()
return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line)
fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), labelsColor(padLabel(lbls, maxLabelsLen)), line)
} else {
fmt.Fprintf(o.w, "%s %s %s\n", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line)
}
return fmt.Sprintf("%s %s %s", color.BlueString(timestamp), color.RedString(padLabel(lbls, maxLabelsLen)), line)
}
// add some padding after labels

@ -1,6 +1,7 @@
package output
import (
"bytes"
"strings"
"testing"
"time"
@ -33,7 +34,7 @@ func TestDefaultOutput_Format(t *testing.T) {
emptyLabels,
0,
"",
"2006-01-02T08:04:05Z {} ",
"2006-01-02T08:04:05Z {} \n",
},
"empty line with labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -41,7 +42,7 @@ func TestDefaultOutput_Format(t *testing.T) {
someLabels,
len(someLabels.String()),
"",
"2006-01-02T08:04:05Z {type=\"test\"} ",
"2006-01-02T08:04:05Z {type=\"test\"} \n",
},
"max labels length shorter than input labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -49,7 +50,7 @@ func TestDefaultOutput_Format(t *testing.T) {
someLabels,
0,
"Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello\n",
},
"max labels length longer than input labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -57,7 +58,7 @@ func TestDefaultOutput_Format(t *testing.T) {
someLabels,
20,
"Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello",
"2006-01-02T08:04:05Z {type=\"test\"} Hello\n",
},
"timezone option set to a Local one": {
&LogOutputOptions{Timezone: time.FixedZone("test", 2*60*60), NoLabels: false},
@ -65,7 +66,7 @@ func TestDefaultOutput_Format(t *testing.T) {
someLabels,
0,
"Hello",
"2006-01-02T10:04:05+02:00 {type=\"test\"} Hello",
"2006-01-02T10:04:05+02:00 {type=\"test\"} Hello\n",
},
"labels output disabled": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: true},
@ -73,7 +74,7 @@ func TestDefaultOutput_Format(t *testing.T) {
someLabels,
0,
"Hello",
"2006-01-02T08:04:05Z Hello",
"2006-01-02T08:04:05Z Hello\n",
},
}
@ -83,10 +84,11 @@ func TestDefaultOutput_Format(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
out := &DefaultOutput{testData.options}
actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
writer := &bytes.Buffer{}
out := &DefaultOutput{writer,testData.options}
out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
assert.Equal(t, testData.expected, actual)
assert.Equal(t, testData.expected, writer.String())
})
}
}
@ -111,16 +113,19 @@ func TestDefaultOutput_FormatLabelsPadding(t *testing.T) {
timestamp, _ := time.Parse(time.RFC3339, "2006-01-02T15:04:05+07:00")
maxLabelsLen := findMaxLabelsLength(labelsList)
options := &LogOutputOptions{Timezone: time.UTC, NoLabels: false}
out := &DefaultOutput{options}
writer := &bytes.Buffer{}
out := &DefaultOutput{writer,options}
// Format the same log line with different labels
formattedEntries := make([]string, 0, len(labelsList))
for _, lbls := range labelsList {
formattedEntries = append(formattedEntries, out.Format(timestamp, lbls, maxLabelsLen, "XXX"))
out.FormatAndPrintln(timestamp, lbls, maxLabelsLen, "XXX")
formattedEntries = append(formattedEntries, writer.String())
writer.Reset()
}
// Ensure the log line starts at the same position in each formatted output
assert.Equal(t, len(formattedEntries), len(labelsList))
assert.Equal(t, len(labelsList), len(formattedEntries))
expectedIndex := strings.Index(formattedEntries[0], "XXX")
if expectedIndex <= 0 {

@ -2,6 +2,8 @@ package output
import (
"encoding/json"
"fmt"
"io"
"log"
"time"
@ -10,11 +12,12 @@ import (
// JSONLOutput prints logs and metadata as JSON Lines, suitable for scripts
type JSONLOutput struct {
w io.Writer
options *LogOutputOptions
}
// Format a log entry as json line
func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string {
func (o *JSONLOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) {
entry := map[string]interface{}{
"timestamp": ts.In(o.options.Timezone),
"line": line,
@ -30,5 +33,5 @@ func (o *JSONLOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen i
log.Fatalf("error marshalling entry: %s", err)
}
return string(out)
fmt.Fprintln(o.w, string(out))
}

@ -1,6 +1,7 @@
package output
import (
"bytes"
"encoding/json"
"testing"
"time"
@ -33,7 +34,7 @@ func TestJSONLOutput_Format(t *testing.T) {
emptyLabels,
0,
"",
`{"labels":{},"line":"","timestamp":"2006-01-02T08:04:05Z"}`,
`{"labels":{},"line":"","timestamp":"2006-01-02T08:04:05Z"}` + "\n",
},
"empty line with labels": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -41,7 +42,7 @@ func TestJSONLOutput_Format(t *testing.T) {
someLabels,
len(someLabels.String()),
"",
`{"labels":{"type":"test"},"line":"","timestamp":"2006-01-02T08:04:05Z"}`,
`{"labels":{"type":"test"},"line":"","timestamp":"2006-01-02T08:04:05Z"}` + "\n",
},
"timezone option set to a Local one": {
&LogOutputOptions{Timezone: time.FixedZone("test", 2*60*60), NoLabels: false},
@ -49,7 +50,7 @@ func TestJSONLOutput_Format(t *testing.T) {
someLabels,
0,
"Hello",
`{"labels":{"type":"test"},"line":"Hello","timestamp":"2006-01-02T10:04:05+02:00"}`,
`{"labels":{"type":"test"},"line":"Hello","timestamp":"2006-01-02T10:04:05+02:00"}` + "\n",
},
"labels output disabled": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: true},
@ -57,7 +58,7 @@ func TestJSONLOutput_Format(t *testing.T) {
someLabels,
0,
"Hello",
`{"line":"Hello","timestamp":"2006-01-02T08:04:05Z"}`,
`{"line":"Hello","timestamp":"2006-01-02T08:04:05Z"}` + "\n",
},
}
@ -66,10 +67,11 @@ func TestJSONLOutput_Format(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
writer := &bytes.Buffer{}
out := &JSONLOutput{writer,testData.options}
out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
out := &JSONLOutput{testData.options}
actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
actual := writer.String()
assert.Equal(t, testData.expected, actual)
assert.NoError(t, isValidJSON(actual))
})

@ -3,6 +3,7 @@ package output
import (
"fmt"
"hash/fnv"
"io"
"time"
"github.com/fatih/color"
@ -27,7 +28,7 @@ var colorList = []*color.Color{
// LogOutput is the interface any output mode must implement
type LogOutput interface {
Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string
FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string)
}
// LogOutputOptions defines options supported by LogOutput
@ -38,7 +39,7 @@ type LogOutputOptions struct {
}
// NewLogOutput creates a log output based on the input mode and options
func NewLogOutput(mode string, options *LogOutputOptions) (LogOutput, error) {
func NewLogOutput(w io.Writer, mode string, options *LogOutputOptions) (LogOutput, error) {
if options.Timezone == nil {
options.Timezone = time.Local
}
@ -46,14 +47,17 @@ func NewLogOutput(mode string, options *LogOutputOptions) (LogOutput, error) {
switch mode {
case "default":
return &DefaultOutput{
w: w,
options: options,
}, nil
case "jsonl":
return &JSONLOutput{
w: w,
options: options,
}, nil
case "raw":
return &RawOutput{
w: w,
options: options,
}, nil
default:

@ -10,19 +10,19 @@ import (
func TestNewLogOutput(t *testing.T) {
options := &LogOutputOptions{time.UTC, false, false}
out, err := NewLogOutput("default", options)
out, err := NewLogOutput(nil,"default", options)
assert.NoError(t, err)
assert.IsType(t, &DefaultOutput{options}, out)
assert.IsType(t, &DefaultOutput{nil,options}, out)
out, err = NewLogOutput("jsonl", options)
out, err = NewLogOutput(nil,"jsonl", options)
assert.NoError(t, err)
assert.IsType(t, &JSONLOutput{options}, out)
assert.IsType(t, &JSONLOutput{nil,options}, out)
out, err = NewLogOutput("raw", options)
out, err = NewLogOutput(nil,"raw", options)
assert.NoError(t, err)
assert.IsType(t, &RawOutput{options}, out)
assert.IsType(t, &RawOutput{nil,options}, out)
out, err = NewLogOutput("unknown", options)
out, err = NewLogOutput(nil,"unknown", options)
assert.Error(t, err)
assert.Nil(t, out)
}

@ -1,6 +1,8 @@
package output
import (
"fmt"
"io"
"time"
"github.com/grafana/loki/pkg/loghttp"
@ -8,13 +10,21 @@ import (
// RawOutput prints logs in their original form, without any metadata
type RawOutput struct {
w io.Writer
options *LogOutputOptions
}
func NewRaw (writer io.Writer, options *LogOutputOptions) LogOutput {
return &RawOutput{
w: writer,
options: options,
}
}
// Format a log entry as is
func (o *RawOutput) Format(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) string {
func (o *RawOutput) FormatAndPrintln(ts time.Time, lbls loghttp.LabelSet, maxLabelsLen int, line string) {
if len(line) > 0 && line[len(line)-1] == '\n' {
line = line[:len(line)-1]
}
return line
fmt.Fprintln(o.w, line)
}

@ -1,6 +1,7 @@
package output
import (
"bytes"
"testing"
"time"
@ -31,7 +32,7 @@ func TestRawOutput_Format(t *testing.T) {
someLabels,
0,
"",
"",
"\n",
},
"non empty line": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -39,7 +40,7 @@ func TestRawOutput_Format(t *testing.T) {
someLabels,
0,
"Hello world",
"Hello world",
"Hello world\n",
},
"line with single newline at the end": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -47,7 +48,7 @@ func TestRawOutput_Format(t *testing.T) {
someLabels,
0,
"Hello world\n",
"Hello world",
"Hello world\n",
},
"line with multiple newlines at the end": {
&LogOutputOptions{Timezone: time.UTC, NoLabels: false},
@ -55,7 +56,7 @@ func TestRawOutput_Format(t *testing.T) {
someLabels,
0,
"Hello world\n\n\n",
"Hello world\n\n",
"Hello world\n\n\n",
},
}
@ -65,10 +66,11 @@ func TestRawOutput_Format(t *testing.T) {
t.Run(testName, func(t *testing.T) {
t.Parallel()
out := &RawOutput{testData.options}
actual := out.Format(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
writer := &bytes.Buffer{}
out := &RawOutput{writer,testData.options}
out.FormatAndPrintln(testData.timestamp, testData.lbls, testData.maxLabelsLen, testData.line)
assert.Equal(t, testData.expected, actual)
assert.Equal(t, testData.expected, writer.String())
})
}
}

@ -43,6 +43,7 @@ type Query struct {
Start time.Time
End time.Time
Limit int
BatchSize int
Forward bool
Step time.Duration
Interval time.Duration
@ -56,10 +57,10 @@ type Query struct {
}
// DoQuery executes the query and prints out the results
func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool) {
func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) {
if q.LocalConfig != "" {
if err := q.DoLocalQuery(out, statistics, c.OrgID); err != nil {
if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil {
log.Fatalf("Query failed: %+v", err)
}
return
@ -72,26 +73,91 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool)
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if statistics {
q.printStats(resp.Data.Statistics)
}
_, _ = q.printResult(resp.Data.Result, out, nil)
} else {
resp, err = c.QueryRange(q.QueryString, q.Limit, q.Start, q.End, d, q.Step, q.Interval, q.Quiet)
}
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if q.Limit < q.BatchSize {
q.BatchSize = q.Limit
}
resultLength := q.BatchSize
total := 0
start := q.Start
end := q.End
var lastEntry []*loghttp.Entry
for total < q.Limit {
bs := q.BatchSize
// We want to truncate the batch size if the remaining number
// of items needed to reach the limit is less than the batch size
if q.Limit-total < q.BatchSize {
// Truncated batchsize is q.Limit - total, however we add to this
// the length of the overlap from the last query to make sure we get the
// correct amount of new logs knowing there will be some overlapping logs returned.
bs = q.Limit - total + len(lastEntry)
}
resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if statistics {
q.printStats(resp.Data.Statistics)
}
resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry)
// Was not a log stream query, or no results, no more batching
if resultLength <= 0 {
break
}
// Also no result, wouldn't expect to hit this.
if lastEntry == nil || len(lastEntry) == 0 {
break
}
// Can only happen if all the results return in one request
if resultLength == q.Limit {
break
}
if len(lastEntry) >= q.BatchSize {
log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+
"(there will always be 1 overlapping entry but Loki allows multiple entries to have "+
"the same timestamp, so when a batch ends in this scenario the next query will include "+
"all the overlapping entries again). Please increase your batch size to at least %v to account "+
"for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1)
}
// Batching works by taking the timestamp of the last query and using it in the next query,
// because Loki supports multiple entries with the same timestamp it's possible for a batch to have
// fallen in the middle of a list of entries for the same time, so to make sure we get all entries
// we start the query on the same time as the last entry from the last batch, and then we keep this last
// entry and remove the duplicate when printing the results.
// Because of this duplicate entry, we have to subtract it here from the total for each batch
// to get the desired limit.
total += resultLength
// Based on the query direction we either set the start or end for the next query.
// If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first
if q.Forward {
start = lastEntry[0].Timestamp
} else {
// The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result
// fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query
end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond)
}
if statistics {
q.printStats(resp.Data.Statistics)
}
}
q.printResult(resp.Data.Result, out)
}
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput) {
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry
switch value.Type() {
case logql.ValueTypeStreams:
q.printStream(value.(loghttp.Streams), out)
length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry)
case parser.ValueTypeScalar:
q.printScalar(value.(loghttp.Scalar))
case parser.ValueTypeMatrix:
@ -101,6 +167,7 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput) {
default:
log.Fatalf("Unable to print unsupported type: %v", value.Type())
}
return length, entry
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
@ -172,7 +239,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}
q.printResult(value, out)
q.printResult(value, out, nil)
return nil
}
@ -186,7 +253,7 @@ func (q *Query) isInstant() bool {
return q.Start == q.End
}
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput) {
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
common := commonLabels(streams)
// Remove the labels we want to show from common
@ -236,15 +303,52 @@ func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput) {
}
}
if len(allEntries) == 0 {
return 0, nil
}
if q.Forward {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) })
} else {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) })
}
printed := 0
for _, e := range allEntries {
fmt.Println(out.Format(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line))
// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
if lastEntry != nil && len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
skip := false
// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
// shared by multiple entries we have to check all that were stored to see if we've already
// printed them.
for _, le := range lastEntry {
if e.entry.Line == le.Line {
skip = true
}
}
if skip {
continue
}
}
out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
printed++
}
// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
// because the next query is going to contain them too and we want to not duplicate anything already
// printed.
lel := []*loghttp.Entry{}
// Start with the timestamp of the last entry
le := allEntries[len(allEntries)-1].entry
for i, e := range allEntries {
// Save any entry which has this timestamp (most of the time this will only be the single last entry)
if e.entry.Timestamp.Equal(le.Timestamp) {
lel = append(lel, &allEntries[i].entry)
}
}
return printed, lel
}
func (q *Query) printMatrix(matrix loghttp.Matrix) {

@ -1,11 +1,21 @@
package query
import (
"bytes"
"context"
"log"
"reflect"
"strings"
"testing"
"time"
"github.com/gorilla/websocket"
"github.com/stretchr/testify/assert"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
)
@ -140,6 +150,341 @@ func Test_subtract(t *testing.T) {
}
}
func Test_batch(t *testing.T) {
tests := []struct {
name string
streams []logproto.Stream
start, end time.Time
limit, batch int
labelMatcher string
forward bool
expectedCalls int
expected []string
}{
{
name: "super simple forward",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive
},
},
},
start: time.Unix(1, 0),
end: time.Unix(3, 0),
limit: 10,
batch: 10,
labelMatcher: "{test=\"simple\"}",
forward: true,
expectedCalls: 2, // Client doesn't know if the server hit a limit or there were no results so we have to query until there is no results, in this case 2 calls
expected: []string{
"line1",
"line2",
},
},
{
name: "super simple backward",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"}, // End timestmap is exclusive
},
},
},
start: time.Unix(1, 0),
end: time.Unix(3, 0),
limit: 10,
batch: 10,
labelMatcher: "{test=\"simple\"}",
forward: false,
expectedCalls: 2,
expected: []string{
"line2",
"line1",
},
},
{
name: "single stream forward batch",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 9,
batch: 2,
labelMatcher: "{test=\"simple\"}",
forward: true,
// Our batchsize is 2 but each query will also return the overlapping last element from the
// previous batch, as such we only get one item per call so we make a lot of calls
// Call one: line1 line2
// Call two: line2 line3
// Call three: line3 line4
// Call four: line4 line5
// Call five: line5 line6
// Call six: line6 line7
// Call seven: line7 line8
// Call eight: line8 line9
expectedCalls: 8,
expected: []string{
"line1", "line2", "line3", "line4", "line5", "line6", "line7", "line8", "line9",
},
},
{
name: "single stream backward batch",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 9,
batch: 2,
labelMatcher: "{test=\"simple\"}",
forward: false,
expectedCalls: 8,
expected: []string{
"line10", "line9", "line8", "line7", "line6", "line5", "line4", "line3", "line2",
},
},
{
name: "two streams forward batch",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"one\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
logproto.Stream{
Labels: "{test=\"two\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"},
logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"},
logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"},
logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"},
logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"},
logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"},
logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"},
logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"},
logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"},
logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 12,
batch: 3,
labelMatcher: "{test=~\"one|two\"}",
forward: true,
// Six calls
// 1 line1, s2line1, line2
// 2 line2, s2line2, line3
// 3 line3, s2line3, line4
// 4 line4, s2line4, line5
// 5 line5, s2line5, line6
// 6 line6, s2line6
expectedCalls: 6,
expected: []string{
"line1", "s2line1", "line2", "s2line2", "line3", "s2line3", "line4", "s2line4", "line5", "s2line5", "line6", "s2line6",
},
},
{
name: "two streams backward batch",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"one\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
logproto.Stream{
Labels: "{test=\"two\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 1000), Line: "s2line1"},
logproto.Entry{Timestamp: time.Unix(2, 1000), Line: "s2line2"},
logproto.Entry{Timestamp: time.Unix(3, 1000), Line: "s2line3"},
logproto.Entry{Timestamp: time.Unix(4, 1000), Line: "s2line4"},
logproto.Entry{Timestamp: time.Unix(5, 1000), Line: "s2line5"},
logproto.Entry{Timestamp: time.Unix(6, 1000), Line: "s2line6"},
logproto.Entry{Timestamp: time.Unix(7, 1000), Line: "s2line7"},
logproto.Entry{Timestamp: time.Unix(8, 1000), Line: "s2line8"},
logproto.Entry{Timestamp: time.Unix(9, 1000), Line: "s2line9"},
logproto.Entry{Timestamp: time.Unix(10, 1000), Line: "s2line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 12,
batch: 3,
labelMatcher: "{test=~\"one|two\"}",
forward: false,
expectedCalls: 6,
expected: []string{
"s2line10", "line10", "s2line9", "line9", "s2line8", "line8", "s2line7", "line7", "s2line6", "line6", "s2line5", "line5",
},
},
{
name: "single stream forward batch identical timestamps",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 9,
batch: 4,
labelMatcher: "{test=\"simple\"}",
forward: true,
// Our batchsize is 2 but each query will also return the overlapping last element from the
// previous batch, as such we only get one item per call so we make a lot of calls
// Call one: line1 line2 line3 line4
// Call two: line4 line5 line6 line6a
// Call three: line6 line6a line7 line8 <- notice line 6 and 6a share the same timestamp so they get returned as overlap in the next query.
expectedCalls: 3,
expected: []string{
"line1", "line2", "line3", "line4", "line5", "line6", "line6a", "line7", "line8",
},
},
{
name: "single stream backward batch identical timestamps",
streams: []logproto.Stream{
logproto.Stream{
Labels: "{test=\"simple\"}",
Entries: []logproto.Entry{
logproto.Entry{Timestamp: time.Unix(1, 0), Line: "line1"},
logproto.Entry{Timestamp: time.Unix(2, 0), Line: "line2"},
logproto.Entry{Timestamp: time.Unix(3, 0), Line: "line3"},
logproto.Entry{Timestamp: time.Unix(4, 0), Line: "line4"},
logproto.Entry{Timestamp: time.Unix(5, 0), Line: "line5"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6a"},
logproto.Entry{Timestamp: time.Unix(6, 0), Line: "line6b"},
logproto.Entry{Timestamp: time.Unix(7, 0), Line: "line7"},
logproto.Entry{Timestamp: time.Unix(8, 0), Line: "line8"},
logproto.Entry{Timestamp: time.Unix(9, 0), Line: "line9"},
logproto.Entry{Timestamp: time.Unix(10, 0), Line: "line10"},
},
},
},
start: time.Unix(1, 0),
end: time.Unix(11, 0),
limit: 11,
batch: 4,
labelMatcher: "{test=\"simple\"}",
forward: false,
// Our batchsize is 2 but each query will also return the overlapping last element from the
// previous batch, as such we only get one item per call so we make a lot of calls
// Call one: line10 line9 line8 line7
// Call two: line7 line6b line6a line6
// Call three: line6b line6a line6 line5
// Call four: line5 line5 line3 line2
expectedCalls: 4,
expected: []string{
"line10", "line9", "line8", "line7", "line6b", "line6a", "line6", "line5", "line4", "line3", "line2",
},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
tc := newTestQueryClient(tt.streams...)
writer := &bytes.Buffer{}
out := output.NewRaw(writer, nil)
q := Query{
QueryString: tt.labelMatcher,
Start: tt.start,
End: tt.end,
Limit: tt.limit,
BatchSize: tt.batch,
Forward: tt.forward,
Step: 0,
Interval: 0,
Quiet: false,
NoLabels: false,
IgnoreLabelsKey: nil,
ShowLabelsKey: nil,
FixedLabelsLen: 0,
LocalConfig: "",
}
q.DoQuery(tc, out, false)
split := strings.Split(writer.String(), "\n")
// Remove the last entry because there is always a newline after the last line which
// leaves an entry element in the list of lines.
if len(split) > 0 {
split = split[:len(split)-1]
}
assert.Equal(t, tt.expected, split)
assert.Equal(t, tt.expectedCalls, tc.queryRangeCalls)
})
}
}
func mustParseLabels(s string) loghttp.LabelSet {
l, err := marshal.NewLabelSet(s)
@ -149,3 +494,67 @@ func mustParseLabels(s string) loghttp.LabelSet {
return l
}
type testQueryClient struct {
engine *logql.Engine
queryRangeCalls int
}
func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
q := logql.NewMockQuerier(0, testStreams)
e := logql.NewEngine(logql.EngineOpts{}, q)
return &testQueryClient{
engine: e,
queryRangeCalls: 0,
}
}
func (t *testQueryClient) Query(queryStr string, limit int, time time.Time, direction logproto.Direction, quiet bool) (*loghttp.QueryResponse, error) {
panic("implement me")
}
func (t *testQueryClient) QueryRange(queryStr string, limit int, from, through time.Time, direction logproto.Direction, step, interval time.Duration, quiet bool) (*loghttp.QueryResponse, error) {
params := logql.NewLiteralParams(queryStr, from, through, step, interval, direction, uint32(limit), nil)
v, err := t.engine.Query(params).Exec(context.Background())
if err != nil {
return nil, err
}
value, err := marshal.NewResultValue(v.Data)
if err != nil {
return nil, err
}
q := &loghttp.QueryResponse{
Status: "success",
Data: loghttp.QueryResponseData{
ResultType: value.Type(),
Result: value,
Statistics: v.Statistics,
},
}
t.queryRangeCalls++
return q, nil
}
func (t *testQueryClient) ListLabelNames(quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
panic("implement me")
}
func (t *testQueryClient) ListLabelValues(name string, quiet bool, from, through time.Time) (*loghttp.LabelResponse, error) {
panic("implement me")
}
func (t *testQueryClient) Series(matchers []string, from, through time.Time, quiet bool) (*loghttp.SeriesResponse, error) {
panic("implement me")
}
func (t *testQueryClient) LiveTailQueryConn(queryStr string, delayFor int, limit int, from int64, quiet bool) (*websocket.Conn, error) {
panic("implement me")
}
func (t *testQueryClient) GetOrgID() string {
panic("implement me")
}

@ -1,7 +1,6 @@
package query
import (
"fmt"
"log"
"os"
"os/signal"
@ -17,7 +16,7 @@ import (
)
// TailQuery connects to the Loki websocket endpoint and tails logs
func (q *Query) TailQuery(delayFor int, c *client.Client, out output.LogOutput) {
func (q *Query) TailQuery(delayFor int, c client.Client, out output.LogOutput) {
conn, err := c.LiveTailQueryConn(q.QueryString, delayFor, q.Limit, q.Start.UnixNano(), q.Quiet)
if err != nil {
log.Fatalf("Tailing logs failed: %+v", err)
@ -75,7 +74,7 @@ func (q *Query) TailQuery(delayFor int, c *client.Client, out output.LogOutput)
}
for _, entry := range stream.Entries {
fmt.Println(out.Format(entry.Timestamp, labels, 0, entry.Line))
out.FormatAndPrintln(entry.Timestamp, labels, 0, entry.Line)
}
}

@ -18,7 +18,7 @@ type SeriesQuery struct {
}
// DoSeries prints out series results
func (q *SeriesQuery) DoSeries(c *client.Client) {
func (q *SeriesQuery) DoSeries(c client.Client) {
values := q.GetSeries(c)
for _, value := range values {
@ -27,7 +27,7 @@ func (q *SeriesQuery) DoSeries(c *client.Client) {
}
// GetSeries returns an array of label sets
func (q *SeriesQuery) GetSeries(c *client.Client) []loghttp.LabelSet {
func (q *SeriesQuery) GetSeries(c client.Client) []loghttp.LabelSet {
seriesResponse, err := c.Series(q.Matchers, q.Start, q.End, q.Quiet)
if err != nil {
log.Fatalf("Error doing request: %+v", err)

@ -91,11 +91,23 @@ outer:
}
return iter.NewTimeRangedIterator(
iter.NewStreamsIterator(ctx, filtered, req.Direction),
req.Start,
req.End,
), nil
streamIters := make([]iter.EntryIterator, 0, len(filtered))
for i := range filtered {
// This is the same as how LazyChunk or MemChunk build their iterators,
// they return a TimeRangedIterator which is wrapped in a EntryReversedIter if the direction is BACKWARD
iterForward := iter.NewTimeRangedIterator(iter.NewStreamIterator(filtered[i]), req.Start, req.End)
if req.Direction == logproto.FORWARD {
streamIters = append(streamIters, iterForward)
} else {
reversed, err := iter.NewEntryReversedIter(iterForward)
if err != nil {
return nil, err
}
streamIters = append(streamIters, reversed)
}
}
return iter.NewHeapIterator(ctx, streamIters, req.Direction), nil
}
func (q MockQuerier) SelectSamples(ctx context.Context, req SelectSampleParams) (iter.SampleIterator, error) {

Loading…
Cancel
Save