add stats, volume, and volume_range commands to logcli (#9966)

Add `stats`, `volume`, and `volume_range` commands to `logcli`.

Does not implement the file client for now. I think it would be cool if
in the future the file client could read from a downloaded TSDB index
file for these commands.
pull/10045/head
Trevor Whitney 2 years ago committed by GitHub
parent 9934132b0f
commit 8aaf5c10a8
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 2
      .gitignore
  2. 157
      cmd/logcli/main.go
  3. 45
      pkg/logcli/client/client.go
  4. 15
      pkg/logcli/client/file.go
  5. 51
      pkg/logcli/index/stats.go
  6. 57
      pkg/logcli/index/volume.go
  7. 259
      pkg/logcli/print/print.go
  8. 150
      pkg/logcli/print/print_test.go
  9. 199
      pkg/logcli/query/query.go
  10. 152
      pkg/logcli/query/query_test.go
  11. 5
      pkg/logcli/query/tail.go
  12. 64
      pkg/logcli/query/utils.go
  13. 20
      pkg/logcli/util/util.go

2
.gitignore vendored

@ -50,4 +50,4 @@ pkg/loki/wal
.vscode
# nix
result
nix/result

@ -16,6 +16,7 @@ import (
"gopkg.in/alecthomas/kingpin.v2"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/index"
"github.com/grafana/loki/pkg/logcli/labelquery"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/query"
@ -181,6 +182,76 @@ This is helpful to find high cardinality labels.
seriesQuery = newSeriesQuery(seriesCmd)
fmtCmd = app.Command("fmt", "Formats a LogQL query.")
statsCmd = app.Command("stats", `Run a stats query.
The "stats" command will take the provided query and return statistics
from the index on how much data is contained in the matching stream(s).
This only works against Loki instances using the TSDB index format.
By default we look over the last hour of data; use --since to modify
or provide specific start and end times with --from and --to respectively.
Notice that when using --from and --to then ensure to use RFC3339Nano
time format, but without timezone at the end. The local timezone will be added
automatically or if using --timezone flag.
Example:
logcli stats
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
'my-query'
`)
statsQuery = newStatsQuery(statsCmd)
volumeCmd = app.Command("volume", `Run a volume query.
The "volume" command will take the provided label selector(s) and return aggregate
volumes for series matching those volumes. This only works
against Loki instances using the TSDB index format.
By default we look over the last hour of data; use --since to modify
or provide specific start and end times with --from and --to respectively.
Notice that when using --from and --to then ensure to use RFC3339Nano
time format, but without timezone at the end. The local timezone will be added
automatically or if using --timezone flag.
Example:
logcli volume
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
'my-query'
`)
volumeQuery = newVolumeQuery(false, volumeCmd)
volumeRangeCmd = app.Command("volume_range", `Run a volume query and return timeseries data.
The "volume_range" command will take the provided label selector(s) and return aggregate
volumes for series matching those volumes, aggregated into buckets according to the step value.
This only works against Loki instances using the TSDB index format.
By default we look over the last hour of data; use --since to modify
or provide specific start and end times with --from and --to respectively.
Notice that when using --from and --to then ensure to use RFC3339Nano
time format, but without timezone at the end. The local timezone will be added
automatically or if using --timezone flag.
Example:
logcli volume_range
--timezone=UTC
--from="2021-01-19T10:00:00Z"
--to="2021-01-19T20:00:00Z"
--step=1h
'my-query'
`)
volumeRangeQuery = newVolumeQuery(true, volumeRangeCmd)
)
func main() {
@ -292,6 +363,30 @@ func main() {
if err := formatLogQL(os.Stdin, os.Stdout); err != nil {
log.Fatalf("unable to format logql: %s", err)
}
case statsCmd.FullCommand():
statsQuery.DoStats(queryClient)
case volumeCmd.FullCommand(), volumeRangeCmd.FullCommand():
location, err := time.LoadLocation(*timezone)
if err != nil {
log.Fatalf("Unable to load timezone '%s': %s", *timezone, err)
}
outputOptions := &output.LogOutputOptions{
Timezone: location,
NoLabels: rangeQuery.NoLabels,
ColoredOutput: rangeQuery.ColoredOutput,
}
out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions)
if err != nil {
log.Fatalf("Unable to create log output: %s", err)
}
if cmd == volumeRangeCmd.FullCommand() {
volumeRangeQuery.DoVolumeRange(queryClient, out, *statistics)
} else {
volumeQuery.DoVolume(queryClient, out, *statistics)
}
}
}
@ -491,3 +586,65 @@ func defaultQueryRangeStep(start, end time.Time) time.Duration {
step := int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1))
return time.Duration(step) * time.Second
}
func newStatsQuery(cmd *kingpin.CmdClause) *index.StatsQuery {
// calculate query range from cli params
var from, to string
var since time.Duration
q := &index.StatsQuery{}
// executed after all command flags are parsed
cmd.Action(func(_ *kingpin.ParseContext) error {
defaultEnd := time.Now()
defaultStart := defaultEnd.Add(-since)
q.Start = mustParse(from, defaultStart)
q.End = mustParse(to, defaultEnd)
q.Quiet = *quiet
return nil
})
cmd.Arg("query", "eg '{foo=\"bar\",baz=~\".*blip\"} |~ \".*error.*\"'").Required().StringVar(&q.QueryString)
cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since)
cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from)
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
return q
}
func newVolumeQuery(rangeQuery bool, cmd *kingpin.CmdClause) *index.VolumeQuery {
// calculate query range from cli params
var from, to string
var since time.Duration
q := &index.VolumeQuery{}
// executed after all command flags are parsed
cmd.Action(func(_ *kingpin.ParseContext) error {
defaultEnd := time.Now()
defaultStart := defaultEnd.Add(-since)
q.Start = mustParse(from, defaultStart)
q.End = mustParse(to, defaultEnd)
q.Quiet = *quiet
return nil
})
cmd.Arg("query", "eg '{foo=\"bar\",baz=~\".*blip\"}").Required().StringVar(&q.QueryString)
cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since)
cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from)
cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to)
cmd.Flag("limit", "Limit on number of series to return volumes for.").Default("30").IntVar(&q.Limit)
if rangeQuery {
cmd.Flag("step", "Query resolution step width, roll up volumes into buckets cover step time each.").Default("1h").DurationVar(&q.Step)
}
return q
}

@ -32,6 +32,9 @@ const (
labelValuesPath = "/loki/api/v1/label/%s/values"
seriesPath = "/loki/api/v1/series"
tailPath = "/loki/api/v1/tail"
statsPath = "/loki/api/v1/index/stats"
volumePath = "/loki/api/v1/index/series_volume"
volumeRangePath = "/loki/api/v1/index/series_volume_range"
defaultAuthHeader = "Authorization"
)
@ -46,6 +49,9 @@ type Client interface {
Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error)
LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error)
GetOrgID() string
GetStats(queryStr string, start, end time.Time, quiet bool) (*logproto.IndexStatsResponse, error)
GetVolume(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error)
GetVolumeRange(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error)
}
// Tripperware can wrap a roundtripper.
@ -165,6 +171,45 @@ func (c *DefaultClient) GetOrgID() string {
return c.OrgID
}
func (c *DefaultClient) GetStats(queryStr string, start, end time.Time, quiet bool) (*logproto.IndexStatsResponse, error) {
params := util.NewQueryStringBuilder()
params.SetInt("start", start.UnixNano())
params.SetInt("end", end.UnixNano())
params.SetString("query", queryStr)
var statsResponse logproto.IndexStatsResponse
if err := c.doRequest(statsPath, params.Encode(), quiet, &statsResponse); err != nil {
return nil, err
}
return &statsResponse, nil
}
func (c *DefaultClient) GetVolume(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) {
return c.getVolume(volumePath, queryStr, start, end, step, limit, quiet)
}
func (c *DefaultClient) GetVolumeRange(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) {
return c.getVolume(volumeRangePath, queryStr, start, end, step, limit, quiet)
}
func (c *DefaultClient) getVolume(path string, queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) {
params := util.NewQueryStringBuilder()
params.SetInt("start", start.UnixNano())
params.SetInt("end", end.UnixNano())
params.SetString("query", queryStr)
params.SetString("limit", fmt.Sprintf("%d", limit))
if step != 0 {
params.SetString("step", fmt.Sprintf("%d", int(step.Seconds())))
}
var resp loghttp.QueryResponse
if err := c.doRequest(path, params.Encode(), quiet, &resp); err != nil {
return nil, err
}
return &resp, nil
}
func (c *DefaultClient) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) {
var err error
var r loghttp.QueryResponse

@ -182,6 +182,21 @@ func (f *FileClient) GetOrgID() string {
return f.orgID
}
func (f *FileClient) GetStats(_ string, _, _ time.Time, _ bool) (*logproto.IndexStatsResponse, error) {
// TODO(trevorwhitney): could we teach logcli to read from an actual index file?
return nil, ErrNotSupported
}
func (f *FileClient) GetVolume(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) {
// TODO(trevorwhitney): could we teach logcli to read from an actual index file?
return nil, ErrNotSupported
}
func (f *FileClient) GetVolumeRange(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) {
// TODO(trevorwhitney): could we teach logcli to read from an actual index file?
return nil, ErrNotSupported
}
type limiter struct {
n int
}

@ -0,0 +1,51 @@
package index
import (
"fmt"
"log"
"time"
"github.com/fatih/color"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logproto"
)
type StatsQuery struct {
QueryString string
Start time.Time
End time.Time
Quiet bool
}
// DoStats executes the stats query and prints the results
func (q *StatsQuery) DoStats(c client.Client) {
stats := q.Stats(c)
kvs := stats.LoggingKeyValues()
fmt.Print("{\n")
for i := 0; i < len(kvs)-1; i = i + 2 {
k := kvs[i].(string)
v := kvs[i+1]
if k == "bytes" {
fmt.Printf(" %s: %s\n", color.BlueString(k), v)
continue
}
fmt.Printf(" %s: %d\n", color.BlueString(k), v)
}
fmt.Print("}\n")
}
// Stats returns an index stats response
func (q *StatsQuery) Stats(c client.Client) *logproto.IndexStatsResponse {
var statsResponse *logproto.IndexStatsResponse
var err error
statsResponse, err = c.GetStats(q.QueryString, q.Start, q.End, q.Quiet)
if err != nil {
log.Fatalf("Error doing request: %+v", err)
}
return statsResponse
}

@ -0,0 +1,57 @@
package index
import (
"log"
"time"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/print"
"github.com/grafana/loki/pkg/loghttp"
)
type VolumeQuery struct {
QueryString string
Start time.Time
End time.Time
Step time.Duration
Quiet bool
Limit int
}
// DoVolume executes a volume query and prints the results
func (q *VolumeQuery) DoVolume(c client.Client, out output.LogOutput, statistics bool) {
q.do(false, c, out, statistics)
}
func (q *VolumeQuery) DoVolumeRange(c client.Client, out output.LogOutput, statistics bool) {
q.do(true, c, out, statistics)
}
func (q *VolumeQuery) do(rangeQuery bool, c client.Client, out output.LogOutput, statistics bool) {
resp := q.volume(rangeQuery, c)
resultsPrinter := print.NewQueryResultPrinter(nil, nil, q.Quiet, 0, false)
if statistics {
resultsPrinter.PrintStats(resp.Data.Statistics)
}
_, _ = resultsPrinter.PrintResult(resp.Data.Result, out, nil)
}
// volume returns a volume result
func (q *VolumeQuery) volume(rangeQuery bool, c client.Client) *loghttp.QueryResponse {
var resp *loghttp.QueryResponse
var err error
if rangeQuery {
resp, err = c.GetVolumeRange(q.QueryString, q.Start, q.End, q.Step, q.Limit, q.Quiet)
} else {
resp, err = c.GetVolume(q.QueryString, q.Start, q.End, q.Step, q.Limit, q.Quiet)
}
if err != nil {
log.Fatalf("Error doing request: %+v", err)
}
return resp
}

@ -0,0 +1,259 @@
package print
import (
"encoding/json"
"fmt"
"log"
"os"
"sort"
"strings"
"text/tabwriter"
"github.com/fatih/color"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/util"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
)
type QueryResultPrinter struct {
ShowLabelsKey []string
IgnoreLabelsKey []string
Quiet bool
FixedLabelsLen int
Forward bool
}
func NewQueryResultPrinter(showLabelsKey []string, ignoreLabelsKey []string, quiet bool, fixedLabelsLen int, forward bool) *QueryResultPrinter {
return &QueryResultPrinter{
ShowLabelsKey: showLabelsKey,
IgnoreLabelsKey: ignoreLabelsKey,
Quiet: quiet,
FixedLabelsLen: fixedLabelsLen,
Forward: forward,
}
}
type streamEntryPair struct {
entry loghttp.Entry
labels loghttp.LabelSet
}
func (r *QueryResultPrinter) PrintResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry
switch value.Type() {
case logqlmodel.ValueTypeStreams:
length, entry = r.printStream(value.(loghttp.Streams), out, lastEntry)
case loghttp.ResultTypeScalar:
printScalar(value.(loghttp.Scalar))
case loghttp.ResultTypeMatrix:
printMatrix(value.(loghttp.Matrix))
case loghttp.ResultTypeVector:
printVector(value.(loghttp.Vector))
default:
log.Fatalf("Unable to print unsupported type: %v", value.Type())
}
return length, entry
}
func (r *QueryResultPrinter) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
common := commonLabels(streams)
// Remove the labels we want to show from common
if len(r.ShowLabelsKey) > 0 {
common = matchLabels(false, common, r.ShowLabelsKey)
}
if len(common) > 0 && !r.Quiet {
log.Println("Common labels:", color.RedString(common.String()))
}
if len(r.IgnoreLabelsKey) > 0 && !r.Quiet {
log.Println("Ignoring labels key:", color.RedString(strings.Join(r.IgnoreLabelsKey, ",")))
}
if len(r.ShowLabelsKey) > 0 && !r.Quiet {
log.Println("Print only labels key:", color.RedString(strings.Join(r.ShowLabelsKey, ",")))
}
// Remove ignored and common labels from the cached labels and
// calculate the max labels length
maxLabelsLen := r.FixedLabelsLen
for i, s := range streams {
// Remove common labels
ls := subtract(s.Labels, common)
if len(r.ShowLabelsKey) > 0 {
ls = matchLabels(true, ls, r.ShowLabelsKey)
}
// Remove ignored labels
if len(r.IgnoreLabelsKey) > 0 {
ls = matchLabels(false, ls, r.IgnoreLabelsKey)
}
// Overwrite existing Labels
streams[i].Labels = ls
// Update max labels length
length := len(ls.String())
if maxLabelsLen < length {
maxLabelsLen = length
}
}
// sort and display entries
allEntries := make([]streamEntryPair, 0)
for _, s := range streams {
for _, e := range s.Entries {
allEntries = append(allEntries, streamEntryPair{
entry: e,
labels: s.Labels,
})
}
}
if len(allEntries) == 0 {
return 0, nil
}
if r.Forward {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) })
} else {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) })
}
printed := 0
for _, e := range allEntries {
// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
skip := false
// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
// shared by multiple entries we have to check all that were stored to see if we've already
// printed them.
for _, le := range lastEntry {
if e.entry.Line == le.Line {
skip = true
}
}
if skip {
continue
}
}
out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
printed++
}
// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
// because the next query is going to contain them too and we want to not duplicate anything already
// printed.
lel := []*loghttp.Entry{}
// Start with the timestamp of the last entry
le := allEntries[len(allEntries)-1].entry
for i, e := range allEntries {
// Save any entry which has this timestamp (most of the time this will only be the single last entry)
if e.entry.Timestamp.Equal(le.Timestamp) {
lel = append(lel, &allEntries[i].entry)
}
}
return printed, lel
}
func printMatrix(matrix loghttp.Matrix) {
// yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c
// it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually
// we might add output options such as render to an image file on disk
bytes, err := json.MarshalIndent(matrix, "", " ")
if err != nil {
log.Fatalf("Error marshalling matrix: %v", err)
}
fmt.Print(string(bytes))
}
func printVector(vector loghttp.Vector) {
bytes, err := json.MarshalIndent(vector, "", " ")
if err != nil {
log.Fatalf("Error marshalling vector: %v", err)
}
fmt.Print(string(bytes))
}
func printScalar(scalar loghttp.Scalar) {
bytes, err := json.MarshalIndent(scalar, "", " ")
if err != nil {
log.Fatalf("Error marshalling scalar: %v", err)
}
fmt.Print(string(bytes))
}
type kvLogger struct {
*tabwriter.Writer
}
func (k kvLogger) Log(keyvals ...interface{}) error {
for i := 0; i < len(keyvals); i += 2 {
fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1]))
}
k.Flush()
return nil
}
func (r *QueryResultPrinter) PrintStats(stats stats.Result) {
writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0)
stats.Log(kvLogger{Writer: writer})
}
func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet {
return util.MatchLabels(on, l, names)
}
// return commonLabels labels between given labels set
func commonLabels(streams loghttp.Streams) loghttp.LabelSet {
if len(streams) == 0 {
return nil
}
result := streams[0].Labels
for i := 1; i < len(streams); i++ {
result = intersect(result, streams[i].Labels)
}
return result
}
// intersect two labels set
func intersect(a, b loghttp.LabelSet) loghttp.LabelSet {
set := loghttp.LabelSet{}
for ka, va := range a {
if vb, ok := b[ka]; ok {
if vb == va {
set[ka] = va
}
}
}
return set
}
// subtract labels set b from labels set a
func subtract(a, b loghttp.LabelSet) loghttp.LabelSet {
set := loghttp.LabelSet{}
for ka, va := range a {
if vb, ok := b[ka]; ok {
if vb == va {
continue
}
}
set[ka] = va
}
return set
}

@ -0,0 +1,150 @@
package print
import (
"reflect"
"testing"
"github.com/stretchr/testify/require"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/util/marshal"
)
func Test_commonLabels(t *testing.T) {
type args struct {
lss []loghttp.LabelSet
}
tests := []struct {
name string
args args
want loghttp.LabelSet
}{
{
"Extract common labels source > target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`)},
},
mustParseLabels(t, `{bar="foo"}`),
},
{
"Extract common labels source > target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="bar", baz="baz"}`)},
},
mustParseLabels(t, `{foo="bar", bar="foo"}`),
},
{
"Extract common labels source < target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo"}`)},
},
mustParseLabels(t, `{bar="foo"}`),
},
{
"Extract common labels source < target no common",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{fo="bar"}`)},
},
loghttp.LabelSet{},
},
{
"Extract common labels source = target no common",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar"}`), mustParseLabels(t, `{fooo="bar"}`)},
},
loghttp.LabelSet{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var streams []loghttp.Stream
for _, lss := range tt.args.lss {
streams = append(streams, loghttp.Stream{
Entries: nil,
Labels: lss,
})
}
if got := commonLabels(streams); !reflect.DeepEqual(got, tt.want) {
t.Errorf("commonLabels() = %v, want %v", got, tt.want)
}
})
}
}
func Test_subtract(t *testing.T) {
type args struct {
a loghttp.LabelSet
b loghttp.LabelSet
}
tests := []struct {
name string
args args
want loghttp.LabelSet
}{
{
"Subtract labels source > target",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source < target",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{bar="foo"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source < target no sub",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{fo="bar"}`),
},
mustParseLabels(t, `{bar="foo", foo="bar"}`),
},
{
"Subtract labels source = target no sub",
args{
mustParseLabels(t, `{foo="bar"}`),
mustParseLabels(t, `{fiz="buz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source > target no sub",
args{
mustParseLabels(t, `{foo="bar"}`),
mustParseLabels(t, `{fiz="buz", foo="baz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source > target no sub",
args{
mustParseLabels(t, `{a="b", foo="bar", baz="baz", fizz="fizz"}`),
mustParseLabels(t, `{foo="bar", baz="baz", buzz="buzz", fizz="fizz"}`),
},
mustParseLabels(t, `{a="b"}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := subtract(tt.args.a, tt.args.b); !reflect.DeepEqual(got, tt.want) {
t.Errorf("subtract() = %v, want %v", got, tt.want)
}
})
}
}
func mustParseLabels(t *testing.T, s string) loghttp.LabelSet {
t.Helper()
l, err := marshal.NewLabelSet(s)
require.NoErrorf(t, err, "Failed to parse %q", s)
return l
}

@ -7,15 +7,10 @@ import (
"io"
"log"
"os"
"sort"
"strings"
"sync"
"text/tabwriter"
"time"
"github.com/fatih/color"
"github.com/grafana/dskit/multierror"
json "github.com/json-iterator/go"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/user"
@ -23,11 +18,10 @@ import (
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/print"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
chunk "github.com/grafana/loki/pkg/storage/chunk/client"
@ -41,11 +35,6 @@ import (
const schemaConfigFilename = "schemaconfig"
type streamEntryPair struct {
entry loghttp.Entry
labels loghttp.LabelSet
}
// Query contains all necessary fields to execute instant and range queries and print the results.
type Query struct {
QueryString string
@ -127,15 +116,17 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
out = out.WithWriter(partFile)
}
result := print.NewQueryResultPrinter(q.ShowLabelsKey, q.IgnoreLabelsKey, q.Quiet, q.FixedLabelsLen, q.Forward)
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if statistics {
q.printStats(resp.Data.Statistics)
result.PrintStats(resp.Data.Statistics)
}
_, _ = q.printResult(resp.Data.Result, out, nil)
_, _ = result.PrintResult(resp.Data.Result, out, nil)
} else {
unlimited := q.Limit == 0
@ -164,10 +155,10 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
}
if statistics {
q.printStats(resp.Data.Statistics)
result.PrintStats(resp.Data.Statistics)
}
resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry)
resultLength, lastEntry = result.PrintResult(resp.Data.Result, out, lastEntry)
// Was not a log stream query, or no results, no more batching
if resultLength <= 0 {
break
@ -402,24 +393,6 @@ func maxTime(t1, t2 time.Time) time.Time {
return t2
}
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry
switch value.Type() {
case logqlmodel.ValueTypeStreams:
length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry)
case loghttp.ResultTypeScalar:
q.printScalar(value.(loghttp.Scalar))
case loghttp.ResultTypeMatrix:
q.printMatrix(value.(loghttp.Matrix))
case loghttp.ResultTypeVector:
q.printVector(value.(loghttp.Vector))
default:
log.Fatalf("Unable to print unsupported type: %v", value.Type())
}
return length, entry
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string, useRemoteSchema bool) error {
var conf loki.Config
@ -502,8 +475,9 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}
resPrinter := print.NewQueryResultPrinter(q.ShowLabelsKey, q.IgnoreLabelsKey, q.Quiet, q.FixedLabelsLen, q.Forward)
if statistics {
q.printStats(result.Statistics)
resPrinter.PrintStats(result.Statistics)
}
value, err := marshal.NewResultValue(result.Data)
@ -511,7 +485,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}
q.printResult(value, out, nil)
resPrinter.PrintResult(value, out, nil)
return nil
}
@ -574,159 +548,6 @@ func (q *Query) isInstant() bool {
return q.Start == q.End && q.Step == 0
}
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
common := commonLabels(streams)
// Remove the labels we want to show from common
if len(q.ShowLabelsKey) > 0 {
common = matchLabels(false, common, q.ShowLabelsKey)
}
if len(common) > 0 && !q.Quiet {
log.Println("Common labels:", color.RedString(common.String()))
}
if len(q.IgnoreLabelsKey) > 0 && !q.Quiet {
log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ",")))
}
if len(q.ShowLabelsKey) > 0 && !q.Quiet {
log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ",")))
}
// Remove ignored and common labels from the cached labels and
// calculate the max labels length
maxLabelsLen := q.FixedLabelsLen
for i, s := range streams {
// Remove common labels
ls := subtract(s.Labels, common)
if len(q.ShowLabelsKey) > 0 {
ls = matchLabels(true, ls, q.ShowLabelsKey)
}
// Remove ignored labels
if len(q.IgnoreLabelsKey) > 0 {
ls = matchLabels(false, ls, q.IgnoreLabelsKey)
}
// Overwrite existing Labels
streams[i].Labels = ls
// Update max labels length
len := len(ls.String())
if maxLabelsLen < len {
maxLabelsLen = len
}
}
// sort and display entries
allEntries := make([]streamEntryPair, 0)
for _, s := range streams {
for _, e := range s.Entries {
allEntries = append(allEntries, streamEntryPair{
entry: e,
labels: s.Labels,
})
}
}
if len(allEntries) == 0 {
return 0, nil
}
if q.Forward {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) })
} else {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) })
}
printed := 0
for _, e := range allEntries {
// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
skip := false
// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
// shared by multiple entries we have to check all that were stored to see if we've already
// printed them.
for _, le := range lastEntry {
if e.entry.Line == le.Line {
skip = true
}
}
if skip {
continue
}
}
out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
printed++
}
// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
// because the next query is going to contain them too and we want to not duplicate anything already
// printed.
lel := []*loghttp.Entry{}
// Start with the timestamp of the last entry
le := allEntries[len(allEntries)-1].entry
for i, e := range allEntries {
// Save any entry which has this timestamp (most of the time this will only be the single last entry)
if e.entry.Timestamp.Equal(le.Timestamp) {
lel = append(lel, &allEntries[i].entry)
}
}
return printed, lel
}
func (q *Query) printMatrix(matrix loghttp.Matrix) {
// yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c
// it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually
// we might add output options such as render to an image file on disk
bytes, err := json.MarshalIndent(matrix, "", " ")
if err != nil {
log.Fatalf("Error marshalling matrix: %v", err)
}
fmt.Print(string(bytes))
}
func (q *Query) printVector(vector loghttp.Vector) {
bytes, err := json.MarshalIndent(vector, "", " ")
if err != nil {
log.Fatalf("Error marshalling vector: %v", err)
}
fmt.Print(string(bytes))
}
func (q *Query) printScalar(scalar loghttp.Scalar) {
bytes, err := json.MarshalIndent(scalar, "", " ")
if err != nil {
log.Fatalf("Error marshalling scalar: %v", err)
}
fmt.Print(string(bytes))
}
type kvLogger struct {
*tabwriter.Writer
}
func (k kvLogger) Log(keyvals ...interface{}) error {
for i := 0; i < len(keyvals); i += 2 {
fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1]))
}
k.Flush()
return nil
}
func (q *Query) printStats(stats stats.Result) {
writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0)
stats.Log(kvLogger{Writer: writer})
}
func (q *Query) resultsDirection() logproto.Direction {
if q.Forward {
return logproto.FORWARD

@ -6,7 +6,6 @@ import (
"fmt"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
"time"
@ -30,137 +29,6 @@ import (
"github.com/grafana/loki/pkg/util/marshal"
)
func Test_commonLabels(t *testing.T) {
type args struct {
lss []loghttp.LabelSet
}
tests := []struct {
name string
args args
want loghttp.LabelSet
}{
{
"Extract common labels source > target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`)},
},
mustParseLabels(t, `{bar="foo"}`),
},
{
"Extract common labels source > target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="bar", baz="baz"}`)},
},
mustParseLabels(t, `{foo="bar", bar="foo"}`),
},
{
"Extract common labels source < target",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo"}`)},
},
mustParseLabels(t, `{bar="foo"}`),
},
{
"Extract common labels source < target no common",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{fo="bar"}`)},
},
loghttp.LabelSet{},
},
{
"Extract common labels source = target no common",
args{
[]loghttp.LabelSet{mustParseLabels(t, `{foo="bar"}`), mustParseLabels(t, `{fooo="bar"}`)},
},
loghttp.LabelSet{},
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
var streams []loghttp.Stream
for _, lss := range tt.args.lss {
streams = append(streams, loghttp.Stream{
Entries: nil,
Labels: lss,
})
}
if got := commonLabels(streams); !reflect.DeepEqual(got, tt.want) {
t.Errorf("commonLabels() = %v, want %v", got, tt.want)
}
})
}
}
func Test_subtract(t *testing.T) {
type args struct {
a loghttp.LabelSet
b loghttp.LabelSet
}
tests := []struct {
name string
args args
want loghttp.LabelSet
}{
{
"Subtract labels source > target",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source < target",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{bar="foo"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source < target no sub",
args{
mustParseLabels(t, `{foo="bar", bar="foo"}`),
mustParseLabels(t, `{fo="bar"}`),
},
mustParseLabels(t, `{bar="foo", foo="bar"}`),
},
{
"Subtract labels source = target no sub",
args{
mustParseLabels(t, `{foo="bar"}`),
mustParseLabels(t, `{fiz="buz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source > target no sub",
args{
mustParseLabels(t, `{foo="bar"}`),
mustParseLabels(t, `{fiz="buz", foo="baz"}`),
},
mustParseLabels(t, `{foo="bar"}`),
},
{
"Subtract labels source > target no sub",
args{
mustParseLabels(t, `{a="b", foo="bar", baz="baz", fizz="fizz"}`),
mustParseLabels(t, `{foo="bar", baz="baz", buzz="buzz", fizz="fizz"}`),
},
mustParseLabels(t, `{a="b"}`),
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
if got := subtract(tt.args.a, tt.args.b); !reflect.DeepEqual(got, tt.want) {
t.Errorf("subtract() = %v, want %v", got, tt.want)
}
})
}
}
func Test_batch(t *testing.T) {
tests := []struct {
name string
@ -536,14 +404,6 @@ func Test_batch(t *testing.T) {
}
}
func mustParseLabels(t *testing.T, s string) loghttp.LabelSet {
t.Helper()
l, err := marshal.NewLabelSet(s)
require.NoErrorf(t, err, "Failed to parse %q", s)
return l
}
type testQueryClient struct {
engine *logql.Engine
queryRangeCalls int
@ -610,6 +470,18 @@ func (t *testQueryClient) GetOrgID() string {
panic("implement me")
}
func (t *testQueryClient) GetStats(_ string, _, _ time.Time, _ bool) (*logproto.IndexStatsResponse, error) {
panic("not implemented")
}
func (t *testQueryClient) GetVolume(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) {
panic("not implemented")
}
func (t *testQueryClient) GetVolumeRange(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) {
panic("not implemented")
}
var schemaConfigContents = `schema_config:
configs:
- from: 2020-05-15

@ -15,6 +15,7 @@ import (
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/util"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/util/unmarshal"
)
@ -127,3 +128,7 @@ func (q *Query) TailQuery(delayFor time.Duration, c client.Client, out output.Lo
}
}
}
func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet {
return util.MatchLabels(on, l, names)
}

@ -1,64 +0,0 @@
package query
import (
"github.com/grafana/loki/pkg/loghttp"
)
// return commonLabels labels between given labels set
func commonLabels(streams loghttp.Streams) loghttp.LabelSet {
if len(streams) == 0 {
return nil
}
result := streams[0].Labels
for i := 1; i < len(streams); i++ {
result = intersect(result, streams[i].Labels)
}
return result
}
// intersect two labels set
func intersect(a, b loghttp.LabelSet) loghttp.LabelSet {
set := loghttp.LabelSet{}
for ka, va := range a {
if vb, ok := b[ka]; ok {
if vb == va {
set[ka] = va
}
}
}
return set
}
// subtract labels set b from labels set a
func subtract(a, b loghttp.LabelSet) loghttp.LabelSet {
set := loghttp.LabelSet{}
for ka, va := range a {
if vb, ok := b[ka]; ok {
if vb == va {
continue
}
}
set[ka] = va
}
return set
}
func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet {
ret := loghttp.LabelSet{}
nameSet := map[string]struct{}{}
for _, n := range names {
nameSet[n] = struct{}{}
}
for k, v := range l {
if _, ok := nameSet[k]; on == ok {
ret[k] = v
}
}
return ret
}

@ -0,0 +1,20 @@
package util
import "github.com/grafana/loki/pkg/loghttp"
func MatchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet {
ret := loghttp.LabelSet{}
nameSet := map[string]struct{}{}
for _, n := range names {
nameSet[n] = struct{}{}
}
for k, v := range l {
if _, ok := nameSet[k]; on == ok {
ret[k] = v
}
}
return ret
}
Loading…
Cancel
Save