diff --git a/.gitignore b/.gitignore index a0cc2550cb..ef9a59d5e5 100644 --- a/.gitignore +++ b/.gitignore @@ -50,4 +50,4 @@ pkg/loki/wal .vscode # nix -result +nix/result diff --git a/cmd/logcli/main.go b/cmd/logcli/main.go index 9007a3342e..5bdd247bf6 100644 --- a/cmd/logcli/main.go +++ b/cmd/logcli/main.go @@ -16,6 +16,7 @@ import ( "gopkg.in/alecthomas/kingpin.v2" "github.com/grafana/loki/pkg/logcli/client" + "github.com/grafana/loki/pkg/logcli/index" "github.com/grafana/loki/pkg/logcli/labelquery" "github.com/grafana/loki/pkg/logcli/output" "github.com/grafana/loki/pkg/logcli/query" @@ -181,6 +182,76 @@ This is helpful to find high cardinality labels. seriesQuery = newSeriesQuery(seriesCmd) fmtCmd = app.Command("fmt", "Formats a LogQL query.") + + statsCmd = app.Command("stats", `Run a stats query. + +The "stats" command will take the provided query and return statistics +from the index on how much data is contained in the matching stream(s). +This only works against Loki instances using the TSDB index format. + +By default we look over the last hour of data; use --since to modify +or provide specific start and end times with --from and --to respectively. + +Notice that when using --from and --to then ensure to use RFC3339Nano +time format, but without timezone at the end. The local timezone will be added +automatically or if using --timezone flag. + +Example: + + logcli stats + --timezone=UTC + --from="2021-01-19T10:00:00Z" + --to="2021-01-19T20:00:00Z" + 'my-query' + `) + statsQuery = newStatsQuery(statsCmd) + + volumeCmd = app.Command("volume", `Run a volume query. + +The "volume" command will take the provided label selector(s) and return aggregate +volumes for series matching those volumes. This only works +against Loki instances using the TSDB index format. + +By default we look over the last hour of data; use --since to modify +or provide specific start and end times with --from and --to respectively. + +Notice that when using --from and --to then ensure to use RFC3339Nano +time format, but without timezone at the end. The local timezone will be added +automatically or if using --timezone flag. + +Example: + + logcli volume + --timezone=UTC + --from="2021-01-19T10:00:00Z" + --to="2021-01-19T20:00:00Z" + 'my-query' + `) + volumeQuery = newVolumeQuery(false, volumeCmd) + + volumeRangeCmd = app.Command("volume_range", `Run a volume query and return timeseries data. + +The "volume_range" command will take the provided label selector(s) and return aggregate +volumes for series matching those volumes, aggregated into buckets according to the step value. +This only works against Loki instances using the TSDB index format. + +By default we look over the last hour of data; use --since to modify +or provide specific start and end times with --from and --to respectively. + +Notice that when using --from and --to then ensure to use RFC3339Nano +time format, but without timezone at the end. The local timezone will be added +automatically or if using --timezone flag. + +Example: + + logcli volume_range + --timezone=UTC + --from="2021-01-19T10:00:00Z" + --to="2021-01-19T20:00:00Z" + --step=1h + 'my-query' + `) + volumeRangeQuery = newVolumeQuery(true, volumeRangeCmd) ) func main() { @@ -292,6 +363,30 @@ func main() { if err := formatLogQL(os.Stdin, os.Stdout); err != nil { log.Fatalf("unable to format logql: %s", err) } + case statsCmd.FullCommand(): + statsQuery.DoStats(queryClient) + case volumeCmd.FullCommand(), volumeRangeCmd.FullCommand(): + location, err := time.LoadLocation(*timezone) + if err != nil { + log.Fatalf("Unable to load timezone '%s': %s", *timezone, err) + } + + outputOptions := &output.LogOutputOptions{ + Timezone: location, + NoLabels: rangeQuery.NoLabels, + ColoredOutput: rangeQuery.ColoredOutput, + } + + out, err := output.NewLogOutput(os.Stdout, *outputMode, outputOptions) + if err != nil { + log.Fatalf("Unable to create log output: %s", err) + } + + if cmd == volumeRangeCmd.FullCommand() { + volumeRangeQuery.DoVolumeRange(queryClient, out, *statistics) + } else { + volumeQuery.DoVolume(queryClient, out, *statistics) + } } } @@ -491,3 +586,65 @@ func defaultQueryRangeStep(start, end time.Time) time.Duration { step := int(math.Max(math.Floor(end.Sub(start).Seconds()/250), 1)) return time.Duration(step) * time.Second } + +func newStatsQuery(cmd *kingpin.CmdClause) *index.StatsQuery { + // calculate query range from cli params + var from, to string + var since time.Duration + + q := &index.StatsQuery{} + + // executed after all command flags are parsed + cmd.Action(func(_ *kingpin.ParseContext) error { + defaultEnd := time.Now() + defaultStart := defaultEnd.Add(-since) + + q.Start = mustParse(from, defaultStart) + q.End = mustParse(to, defaultEnd) + + q.Quiet = *quiet + + return nil + }) + + cmd.Arg("query", "eg '{foo=\"bar\",baz=~\".*blip\"} |~ \".*error.*\"'").Required().StringVar(&q.QueryString) + cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since) + cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from) + cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to) + + return q +} + +func newVolumeQuery(rangeQuery bool, cmd *kingpin.CmdClause) *index.VolumeQuery { + // calculate query range from cli params + var from, to string + var since time.Duration + + q := &index.VolumeQuery{} + + // executed after all command flags are parsed + cmd.Action(func(_ *kingpin.ParseContext) error { + defaultEnd := time.Now() + defaultStart := defaultEnd.Add(-since) + + q.Start = mustParse(from, defaultStart) + q.End = mustParse(to, defaultEnd) + + q.Quiet = *quiet + + return nil + }) + + cmd.Arg("query", "eg '{foo=\"bar\",baz=~\".*blip\"}").Required().StringVar(&q.QueryString) + cmd.Flag("since", "Lookback window.").Default("1h").DurationVar(&since) + cmd.Flag("from", "Start looking for logs at this absolute time (inclusive)").StringVar(&from) + cmd.Flag("to", "Stop looking for logs at this absolute time (exclusive)").StringVar(&to) + + cmd.Flag("limit", "Limit on number of series to return volumes for.").Default("30").IntVar(&q.Limit) + + if rangeQuery { + cmd.Flag("step", "Query resolution step width, roll up volumes into buckets cover step time each.").Default("1h").DurationVar(&q.Step) + } + + return q +} diff --git a/pkg/logcli/client/client.go b/pkg/logcli/client/client.go index 9ca496d5de..096f204e83 100644 --- a/pkg/logcli/client/client.go +++ b/pkg/logcli/client/client.go @@ -32,6 +32,9 @@ const ( labelValuesPath = "/loki/api/v1/label/%s/values" seriesPath = "/loki/api/v1/series" tailPath = "/loki/api/v1/tail" + statsPath = "/loki/api/v1/index/stats" + volumePath = "/loki/api/v1/index/series_volume" + volumeRangePath = "/loki/api/v1/index/series_volume_range" defaultAuthHeader = "Authorization" ) @@ -46,6 +49,9 @@ type Client interface { Series(matchers []string, start, end time.Time, quiet bool) (*loghttp.SeriesResponse, error) LiveTailQueryConn(queryStr string, delayFor time.Duration, limit int, start time.Time, quiet bool) (*websocket.Conn, error) GetOrgID() string + GetStats(queryStr string, start, end time.Time, quiet bool) (*logproto.IndexStatsResponse, error) + GetVolume(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) + GetVolumeRange(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) } // Tripperware can wrap a roundtripper. @@ -165,6 +171,45 @@ func (c *DefaultClient) GetOrgID() string { return c.OrgID } +func (c *DefaultClient) GetStats(queryStr string, start, end time.Time, quiet bool) (*logproto.IndexStatsResponse, error) { + params := util.NewQueryStringBuilder() + params.SetInt("start", start.UnixNano()) + params.SetInt("end", end.UnixNano()) + params.SetString("query", queryStr) + + var statsResponse logproto.IndexStatsResponse + if err := c.doRequest(statsPath, params.Encode(), quiet, &statsResponse); err != nil { + return nil, err + } + return &statsResponse, nil +} + +func (c *DefaultClient) GetVolume(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) { + return c.getVolume(volumePath, queryStr, start, end, step, limit, quiet) +} + +func (c *DefaultClient) GetVolumeRange(queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) { + return c.getVolume(volumeRangePath, queryStr, start, end, step, limit, quiet) +} + +func (c *DefaultClient) getVolume(path string, queryStr string, start, end time.Time, step time.Duration, limit int, quiet bool) (*loghttp.QueryResponse, error) { + params := util.NewQueryStringBuilder() + params.SetInt("start", start.UnixNano()) + params.SetInt("end", end.UnixNano()) + params.SetString("query", queryStr) + params.SetString("limit", fmt.Sprintf("%d", limit)) + + if step != 0 { + params.SetString("step", fmt.Sprintf("%d", int(step.Seconds()))) + } + + var resp loghttp.QueryResponse + if err := c.doRequest(path, params.Encode(), quiet, &resp); err != nil { + return nil, err + } + return &resp, nil +} + func (c *DefaultClient) doQuery(path string, query string, quiet bool) (*loghttp.QueryResponse, error) { var err error var r loghttp.QueryResponse diff --git a/pkg/logcli/client/file.go b/pkg/logcli/client/file.go index a656310429..c2b06f5774 100644 --- a/pkg/logcli/client/file.go +++ b/pkg/logcli/client/file.go @@ -182,6 +182,21 @@ func (f *FileClient) GetOrgID() string { return f.orgID } +func (f *FileClient) GetStats(_ string, _, _ time.Time, _ bool) (*logproto.IndexStatsResponse, error) { + // TODO(trevorwhitney): could we teach logcli to read from an actual index file? + return nil, ErrNotSupported +} + +func (f *FileClient) GetVolume(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) { + // TODO(trevorwhitney): could we teach logcli to read from an actual index file? + return nil, ErrNotSupported +} + +func (f *FileClient) GetVolumeRange(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) { + // TODO(trevorwhitney): could we teach logcli to read from an actual index file? + return nil, ErrNotSupported +} + type limiter struct { n int } diff --git a/pkg/logcli/index/stats.go b/pkg/logcli/index/stats.go new file mode 100644 index 0000000000..f67c6283ba --- /dev/null +++ b/pkg/logcli/index/stats.go @@ -0,0 +1,51 @@ +package index + +import ( + "fmt" + "log" + "time" + + "github.com/fatih/color" + + "github.com/grafana/loki/pkg/logcli/client" + "github.com/grafana/loki/pkg/logproto" +) + +type StatsQuery struct { + QueryString string + Start time.Time + End time.Time + Quiet bool +} + +// DoStats executes the stats query and prints the results +func (q *StatsQuery) DoStats(c client.Client) { + stats := q.Stats(c) + kvs := stats.LoggingKeyValues() + + fmt.Print("{\n") + for i := 0; i < len(kvs)-1; i = i + 2 { + k := kvs[i].(string) + v := kvs[i+1] + if k == "bytes" { + fmt.Printf(" %s: %s\n", color.BlueString(k), v) + continue + } + + fmt.Printf(" %s: %d\n", color.BlueString(k), v) + } + fmt.Print("}\n") +} + +// Stats returns an index stats response +func (q *StatsQuery) Stats(c client.Client) *logproto.IndexStatsResponse { + var statsResponse *logproto.IndexStatsResponse + var err error + + statsResponse, err = c.GetStats(q.QueryString, q.Start, q.End, q.Quiet) + + if err != nil { + log.Fatalf("Error doing request: %+v", err) + } + return statsResponse +} diff --git a/pkg/logcli/index/volume.go b/pkg/logcli/index/volume.go new file mode 100644 index 0000000000..a1f6cb65ca --- /dev/null +++ b/pkg/logcli/index/volume.go @@ -0,0 +1,57 @@ +package index + +import ( + "log" + "time" + + "github.com/grafana/loki/pkg/logcli/client" + "github.com/grafana/loki/pkg/logcli/output" + "github.com/grafana/loki/pkg/logcli/print" + "github.com/grafana/loki/pkg/loghttp" +) + +type VolumeQuery struct { + QueryString string + Start time.Time + End time.Time + Step time.Duration + Quiet bool + Limit int +} + +// DoVolume executes a volume query and prints the results +func (q *VolumeQuery) DoVolume(c client.Client, out output.LogOutput, statistics bool) { + q.do(false, c, out, statistics) +} +func (q *VolumeQuery) DoVolumeRange(c client.Client, out output.LogOutput, statistics bool) { + q.do(true, c, out, statistics) +} + +func (q *VolumeQuery) do(rangeQuery bool, c client.Client, out output.LogOutput, statistics bool) { + resp := q.volume(rangeQuery, c) + + resultsPrinter := print.NewQueryResultPrinter(nil, nil, q.Quiet, 0, false) + + if statistics { + resultsPrinter.PrintStats(resp.Data.Statistics) + } + + _, _ = resultsPrinter.PrintResult(resp.Data.Result, out, nil) +} + +// volume returns a volume result +func (q *VolumeQuery) volume(rangeQuery bool, c client.Client) *loghttp.QueryResponse { + var resp *loghttp.QueryResponse + var err error + + if rangeQuery { + resp, err = c.GetVolumeRange(q.QueryString, q.Start, q.End, q.Step, q.Limit, q.Quiet) + } else { + resp, err = c.GetVolume(q.QueryString, q.Start, q.End, q.Step, q.Limit, q.Quiet) + } + if err != nil { + log.Fatalf("Error doing request: %+v", err) + } + + return resp +} diff --git a/pkg/logcli/print/print.go b/pkg/logcli/print/print.go new file mode 100644 index 0000000000..6528b2c7ec --- /dev/null +++ b/pkg/logcli/print/print.go @@ -0,0 +1,259 @@ +package print + +import ( + "encoding/json" + "fmt" + "log" + "os" + "sort" + "strings" + "text/tabwriter" + + "github.com/fatih/color" + + "github.com/grafana/loki/pkg/logcli/output" + "github.com/grafana/loki/pkg/logcli/util" + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/logqlmodel" + "github.com/grafana/loki/pkg/logqlmodel/stats" +) + +type QueryResultPrinter struct { + ShowLabelsKey []string + IgnoreLabelsKey []string + Quiet bool + FixedLabelsLen int + Forward bool +} + +func NewQueryResultPrinter(showLabelsKey []string, ignoreLabelsKey []string, quiet bool, fixedLabelsLen int, forward bool) *QueryResultPrinter { + return &QueryResultPrinter{ + ShowLabelsKey: showLabelsKey, + IgnoreLabelsKey: ignoreLabelsKey, + Quiet: quiet, + FixedLabelsLen: fixedLabelsLen, + Forward: forward, + } +} + +type streamEntryPair struct { + entry loghttp.Entry + labels loghttp.LabelSet +} + +func (r *QueryResultPrinter) PrintResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { + length := -1 + var entry []*loghttp.Entry + switch value.Type() { + case logqlmodel.ValueTypeStreams: + length, entry = r.printStream(value.(loghttp.Streams), out, lastEntry) + case loghttp.ResultTypeScalar: + printScalar(value.(loghttp.Scalar)) + case loghttp.ResultTypeMatrix: + printMatrix(value.(loghttp.Matrix)) + case loghttp.ResultTypeVector: + printVector(value.(loghttp.Vector)) + default: + log.Fatalf("Unable to print unsupported type: %v", value.Type()) + } + return length, entry +} + +func (r *QueryResultPrinter) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { + common := commonLabels(streams) + + // Remove the labels we want to show from common + if len(r.ShowLabelsKey) > 0 { + common = matchLabels(false, common, r.ShowLabelsKey) + } + + if len(common) > 0 && !r.Quiet { + log.Println("Common labels:", color.RedString(common.String())) + } + + if len(r.IgnoreLabelsKey) > 0 && !r.Quiet { + log.Println("Ignoring labels key:", color.RedString(strings.Join(r.IgnoreLabelsKey, ","))) + } + + if len(r.ShowLabelsKey) > 0 && !r.Quiet { + log.Println("Print only labels key:", color.RedString(strings.Join(r.ShowLabelsKey, ","))) + } + + // Remove ignored and common labels from the cached labels and + // calculate the max labels length + maxLabelsLen := r.FixedLabelsLen + for i, s := range streams { + // Remove common labels + ls := subtract(s.Labels, common) + + if len(r.ShowLabelsKey) > 0 { + ls = matchLabels(true, ls, r.ShowLabelsKey) + } + + // Remove ignored labels + if len(r.IgnoreLabelsKey) > 0 { + ls = matchLabels(false, ls, r.IgnoreLabelsKey) + } + + // Overwrite existing Labels + streams[i].Labels = ls + + // Update max labels length + length := len(ls.String()) + if maxLabelsLen < length { + maxLabelsLen = length + } + } + + // sort and display entries + allEntries := make([]streamEntryPair, 0) + + for _, s := range streams { + for _, e := range s.Entries { + allEntries = append(allEntries, streamEntryPair{ + entry: e, + labels: s.Labels, + }) + } + } + + if len(allEntries) == 0 { + return 0, nil + } + + if r.Forward { + sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) }) + } else { + sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) }) + } + + printed := 0 + for _, e := range allEntries { + // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch + if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp { + skip := false + // Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp + // shared by multiple entries we have to check all that were stored to see if we've already + // printed them. + for _, le := range lastEntry { + if e.entry.Line == le.Line { + skip = true + } + } + if skip { + continue + } + } + out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) + printed++ + } + + // Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends + // with an entry that shared multiple timestamps, so we need to keep a list of all these entries + // because the next query is going to contain them too and we want to not duplicate anything already + // printed. + lel := []*loghttp.Entry{} + // Start with the timestamp of the last entry + le := allEntries[len(allEntries)-1].entry + for i, e := range allEntries { + // Save any entry which has this timestamp (most of the time this will only be the single last entry) + if e.entry.Timestamp.Equal(le.Timestamp) { + lel = append(lel, &allEntries[i].entry) + } + } + + return printed, lel +} + +func printMatrix(matrix loghttp.Matrix) { + // yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c + // it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually + // we might add output options such as render to an image file on disk + bytes, err := json.MarshalIndent(matrix, "", " ") + if err != nil { + log.Fatalf("Error marshalling matrix: %v", err) + } + + fmt.Print(string(bytes)) +} + +func printVector(vector loghttp.Vector) { + bytes, err := json.MarshalIndent(vector, "", " ") + if err != nil { + log.Fatalf("Error marshalling vector: %v", err) + } + + fmt.Print(string(bytes)) +} + +func printScalar(scalar loghttp.Scalar) { + bytes, err := json.MarshalIndent(scalar, "", " ") + if err != nil { + log.Fatalf("Error marshalling scalar: %v", err) + } + + fmt.Print(string(bytes)) +} + +type kvLogger struct { + *tabwriter.Writer +} + +func (k kvLogger) Log(keyvals ...interface{}) error { + for i := 0; i < len(keyvals); i += 2 { + fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1])) + } + k.Flush() + return nil +} + +func (r *QueryResultPrinter) PrintStats(stats stats.Result) { + writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0) + stats.Log(kvLogger{Writer: writer}) +} + +func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet { + return util.MatchLabels(on, l, names) +} + +// return commonLabels labels between given labels set +func commonLabels(streams loghttp.Streams) loghttp.LabelSet { + if len(streams) == 0 { + return nil + } + + result := streams[0].Labels + for i := 1; i < len(streams); i++ { + result = intersect(result, streams[i].Labels) + } + return result +} + +// intersect two labels set +func intersect(a, b loghttp.LabelSet) loghttp.LabelSet { + set := loghttp.LabelSet{} + + for ka, va := range a { + if vb, ok := b[ka]; ok { + if vb == va { + set[ka] = va + } + } + } + return set +} + +// subtract labels set b from labels set a +func subtract(a, b loghttp.LabelSet) loghttp.LabelSet { + set := loghttp.LabelSet{} + + for ka, va := range a { + if vb, ok := b[ka]; ok { + if vb == va { + continue + } + } + set[ka] = va + } + return set +} diff --git a/pkg/logcli/print/print_test.go b/pkg/logcli/print/print_test.go new file mode 100644 index 0000000000..91ada1c6e5 --- /dev/null +++ b/pkg/logcli/print/print_test.go @@ -0,0 +1,150 @@ +package print + +import ( + "reflect" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/grafana/loki/pkg/loghttp" + "github.com/grafana/loki/pkg/util/marshal" +) + +func Test_commonLabels(t *testing.T) { + type args struct { + lss []loghttp.LabelSet + } + tests := []struct { + name string + args args + want loghttp.LabelSet + }{ + { + "Extract common labels source > target", + args{ + []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`)}, + }, + mustParseLabels(t, `{bar="foo"}`), + }, + { + "Extract common labels source > target", + args{ + []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="bar", baz="baz"}`)}, + }, + mustParseLabels(t, `{foo="bar", bar="foo"}`), + }, + { + "Extract common labels source < target", + args{ + []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo"}`)}, + }, + mustParseLabels(t, `{bar="foo"}`), + }, + { + "Extract common labels source < target no common", + args{ + []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{fo="bar"}`)}, + }, + loghttp.LabelSet{}, + }, + { + "Extract common labels source = target no common", + args{ + []loghttp.LabelSet{mustParseLabels(t, `{foo="bar"}`), mustParseLabels(t, `{fooo="bar"}`)}, + }, + loghttp.LabelSet{}, + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + var streams []loghttp.Stream + + for _, lss := range tt.args.lss { + streams = append(streams, loghttp.Stream{ + Entries: nil, + Labels: lss, + }) + } + + if got := commonLabels(streams); !reflect.DeepEqual(got, tt.want) { + t.Errorf("commonLabels() = %v, want %v", got, tt.want) + } + }) + } +} + +func Test_subtract(t *testing.T) { + type args struct { + a loghttp.LabelSet + b loghttp.LabelSet + } + tests := []struct { + name string + args args + want loghttp.LabelSet + }{ + { + "Subtract labels source > target", + args{ + mustParseLabels(t, `{foo="bar", bar="foo"}`), + mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`), + }, + mustParseLabels(t, `{foo="bar"}`), + }, + { + "Subtract labels source < target", + args{ + mustParseLabels(t, `{foo="bar", bar="foo"}`), + mustParseLabels(t, `{bar="foo"}`), + }, + mustParseLabels(t, `{foo="bar"}`), + }, + { + "Subtract labels source < target no sub", + args{ + mustParseLabels(t, `{foo="bar", bar="foo"}`), + mustParseLabels(t, `{fo="bar"}`), + }, + mustParseLabels(t, `{bar="foo", foo="bar"}`), + }, + { + "Subtract labels source = target no sub", + args{ + mustParseLabels(t, `{foo="bar"}`), + mustParseLabels(t, `{fiz="buz"}`), + }, + mustParseLabels(t, `{foo="bar"}`), + }, + { + "Subtract labels source > target no sub", + args{ + mustParseLabels(t, `{foo="bar"}`), + mustParseLabels(t, `{fiz="buz", foo="baz"}`), + }, + mustParseLabels(t, `{foo="bar"}`), + }, + { + "Subtract labels source > target no sub", + args{ + mustParseLabels(t, `{a="b", foo="bar", baz="baz", fizz="fizz"}`), + mustParseLabels(t, `{foo="bar", baz="baz", buzz="buzz", fizz="fizz"}`), + }, + mustParseLabels(t, `{a="b"}`), + }, + } + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + if got := subtract(tt.args.a, tt.args.b); !reflect.DeepEqual(got, tt.want) { + t.Errorf("subtract() = %v, want %v", got, tt.want) + } + }) + } +} + +func mustParseLabels(t *testing.T, s string) loghttp.LabelSet { + t.Helper() + l, err := marshal.NewLabelSet(s) + require.NoErrorf(t, err, "Failed to parse %q", s) + + return l +} diff --git a/pkg/logcli/query/query.go b/pkg/logcli/query/query.go index d73a5fc5ee..eb5229fab7 100644 --- a/pkg/logcli/query/query.go +++ b/pkg/logcli/query/query.go @@ -7,15 +7,10 @@ import ( "io" "log" "os" - "sort" - "strings" "sync" - "text/tabwriter" "time" - "github.com/fatih/color" "github.com/grafana/dskit/multierror" - json "github.com/json-iterator/go" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/weaveworks/common/user" @@ -23,11 +18,10 @@ import ( "github.com/grafana/loki/pkg/logcli/client" "github.com/grafana/loki/pkg/logcli/output" + "github.com/grafana/loki/pkg/logcli/print" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/logproto" "github.com/grafana/loki/pkg/logql" - "github.com/grafana/loki/pkg/logqlmodel" - "github.com/grafana/loki/pkg/logqlmodel/stats" "github.com/grafana/loki/pkg/loki" "github.com/grafana/loki/pkg/storage" chunk "github.com/grafana/loki/pkg/storage/chunk/client" @@ -41,11 +35,6 @@ import ( const schemaConfigFilename = "schemaconfig" -type streamEntryPair struct { - entry loghttp.Entry - labels loghttp.LabelSet -} - // Query contains all necessary fields to execute instant and range queries and print the results. type Query struct { QueryString string @@ -127,15 +116,17 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) out = out.WithWriter(partFile) } + result := print.NewQueryResultPrinter(q.ShowLabelsKey, q.IgnoreLabelsKey, q.Quiet, q.FixedLabelsLen, q.Forward) + if q.isInstant() { resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet) if err != nil { log.Fatalf("Query failed: %+v", err) } if statistics { - q.printStats(resp.Data.Statistics) + result.PrintStats(resp.Data.Statistics) } - _, _ = q.printResult(resp.Data.Result, out, nil) + _, _ = result.PrintResult(resp.Data.Result, out, nil) } else { unlimited := q.Limit == 0 @@ -164,10 +155,10 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) } if statistics { - q.printStats(resp.Data.Statistics) + result.PrintStats(resp.Data.Statistics) } - resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry) + resultLength, lastEntry = result.PrintResult(resp.Data.Result, out, lastEntry) // Was not a log stream query, or no results, no more batching if resultLength <= 0 { break @@ -402,24 +393,6 @@ func maxTime(t1, t2 time.Time) time.Time { return t2 } -func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { - length := -1 - var entry []*loghttp.Entry - switch value.Type() { - case logqlmodel.ValueTypeStreams: - length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry) - case loghttp.ResultTypeScalar: - q.printScalar(value.(loghttp.Scalar)) - case loghttp.ResultTypeMatrix: - q.printMatrix(value.(loghttp.Matrix)) - case loghttp.ResultTypeVector: - q.printVector(value.(loghttp.Vector)) - default: - log.Fatalf("Unable to print unsupported type: %v", value.Type()) - } - return length, entry -} - // DoLocalQuery executes the query against the local store using a Loki configuration file. func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string, useRemoteSchema bool) error { var conf loki.Config @@ -502,8 +475,9 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } + resPrinter := print.NewQueryResultPrinter(q.ShowLabelsKey, q.IgnoreLabelsKey, q.Quiet, q.FixedLabelsLen, q.Forward) if statistics { - q.printStats(result.Statistics) + resPrinter.PrintStats(result.Statistics) } value, err := marshal.NewResultValue(result.Data) @@ -511,7 +485,7 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string return err } - q.printResult(value, out, nil) + resPrinter.PrintResult(value, out, nil) return nil } @@ -574,159 +548,6 @@ func (q *Query) isInstant() bool { return q.Start == q.End && q.Step == 0 } -func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) { - common := commonLabels(streams) - - // Remove the labels we want to show from common - if len(q.ShowLabelsKey) > 0 { - common = matchLabels(false, common, q.ShowLabelsKey) - } - - if len(common) > 0 && !q.Quiet { - log.Println("Common labels:", color.RedString(common.String())) - } - - if len(q.IgnoreLabelsKey) > 0 && !q.Quiet { - log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ","))) - } - - if len(q.ShowLabelsKey) > 0 && !q.Quiet { - log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ","))) - } - - // Remove ignored and common labels from the cached labels and - // calculate the max labels length - maxLabelsLen := q.FixedLabelsLen - for i, s := range streams { - // Remove common labels - ls := subtract(s.Labels, common) - - if len(q.ShowLabelsKey) > 0 { - ls = matchLabels(true, ls, q.ShowLabelsKey) - } - - // Remove ignored labels - if len(q.IgnoreLabelsKey) > 0 { - ls = matchLabels(false, ls, q.IgnoreLabelsKey) - } - - // Overwrite existing Labels - streams[i].Labels = ls - - // Update max labels length - len := len(ls.String()) - if maxLabelsLen < len { - maxLabelsLen = len - } - } - - // sort and display entries - allEntries := make([]streamEntryPair, 0) - - for _, s := range streams { - for _, e := range s.Entries { - allEntries = append(allEntries, streamEntryPair{ - entry: e, - labels: s.Labels, - }) - } - } - - if len(allEntries) == 0 { - return 0, nil - } - - if q.Forward { - sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) }) - } else { - sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) }) - } - - printed := 0 - for _, e := range allEntries { - // Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch - if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp { - skip := false - // Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp - // shared by multiple entries we have to check all that were stored to see if we've already - // printed them. - for _, le := range lastEntry { - if e.entry.Line == le.Line { - skip = true - } - } - if skip { - continue - } - } - out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line) - printed++ - } - - // Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends - // with an entry that shared multiple timestamps, so we need to keep a list of all these entries - // because the next query is going to contain them too and we want to not duplicate anything already - // printed. - lel := []*loghttp.Entry{} - // Start with the timestamp of the last entry - le := allEntries[len(allEntries)-1].entry - for i, e := range allEntries { - // Save any entry which has this timestamp (most of the time this will only be the single last entry) - if e.entry.Timestamp.Equal(le.Timestamp) { - lel = append(lel, &allEntries[i].entry) - } - } - - return printed, lel -} - -func (q *Query) printMatrix(matrix loghttp.Matrix) { - // yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c - // it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually - // we might add output options such as render to an image file on disk - bytes, err := json.MarshalIndent(matrix, "", " ") - if err != nil { - log.Fatalf("Error marshalling matrix: %v", err) - } - - fmt.Print(string(bytes)) -} - -func (q *Query) printVector(vector loghttp.Vector) { - bytes, err := json.MarshalIndent(vector, "", " ") - if err != nil { - log.Fatalf("Error marshalling vector: %v", err) - } - - fmt.Print(string(bytes)) -} - -func (q *Query) printScalar(scalar loghttp.Scalar) { - bytes, err := json.MarshalIndent(scalar, "", " ") - if err != nil { - log.Fatalf("Error marshalling scalar: %v", err) - } - - fmt.Print(string(bytes)) -} - -type kvLogger struct { - *tabwriter.Writer -} - -func (k kvLogger) Log(keyvals ...interface{}) error { - for i := 0; i < len(keyvals); i += 2 { - fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1])) - } - k.Flush() - return nil -} - -func (q *Query) printStats(stats stats.Result) { - writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0) - stats.Log(kvLogger{Writer: writer}) -} - func (q *Query) resultsDirection() logproto.Direction { if q.Forward { return logproto.FORWARD diff --git a/pkg/logcli/query/query_test.go b/pkg/logcli/query/query_test.go index 5d7c7cb4b7..316cc80914 100644 --- a/pkg/logcli/query/query_test.go +++ b/pkg/logcli/query/query_test.go @@ -6,7 +6,6 @@ import ( "fmt" "os" "path/filepath" - "reflect" "strings" "testing" "time" @@ -30,137 +29,6 @@ import ( "github.com/grafana/loki/pkg/util/marshal" ) -func Test_commonLabels(t *testing.T) { - type args struct { - lss []loghttp.LabelSet - } - tests := []struct { - name string - args args - want loghttp.LabelSet - }{ - { - "Extract common labels source > target", - args{ - []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`)}, - }, - mustParseLabels(t, `{bar="foo"}`), - }, - { - "Extract common labels source > target", - args{ - []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo", foo="bar", baz="baz"}`)}, - }, - mustParseLabels(t, `{foo="bar", bar="foo"}`), - }, - { - "Extract common labels source < target", - args{ - []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{bar="foo"}`)}, - }, - mustParseLabels(t, `{bar="foo"}`), - }, - { - "Extract common labels source < target no common", - args{ - []loghttp.LabelSet{mustParseLabels(t, `{foo="bar", bar="foo"}`), mustParseLabels(t, `{fo="bar"}`)}, - }, - loghttp.LabelSet{}, - }, - { - "Extract common labels source = target no common", - args{ - []loghttp.LabelSet{mustParseLabels(t, `{foo="bar"}`), mustParseLabels(t, `{fooo="bar"}`)}, - }, - loghttp.LabelSet{}, - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - var streams []loghttp.Stream - - for _, lss := range tt.args.lss { - streams = append(streams, loghttp.Stream{ - Entries: nil, - Labels: lss, - }) - } - - if got := commonLabels(streams); !reflect.DeepEqual(got, tt.want) { - t.Errorf("commonLabels() = %v, want %v", got, tt.want) - } - }) - } -} - -func Test_subtract(t *testing.T) { - type args struct { - a loghttp.LabelSet - b loghttp.LabelSet - } - tests := []struct { - name string - args args - want loghttp.LabelSet - }{ - { - "Subtract labels source > target", - args{ - mustParseLabels(t, `{foo="bar", bar="foo"}`), - mustParseLabels(t, `{bar="foo", foo="foo", baz="baz"}`), - }, - mustParseLabels(t, `{foo="bar"}`), - }, - { - "Subtract labels source < target", - args{ - mustParseLabels(t, `{foo="bar", bar="foo"}`), - mustParseLabels(t, `{bar="foo"}`), - }, - mustParseLabels(t, `{foo="bar"}`), - }, - { - "Subtract labels source < target no sub", - args{ - mustParseLabels(t, `{foo="bar", bar="foo"}`), - mustParseLabels(t, `{fo="bar"}`), - }, - mustParseLabels(t, `{bar="foo", foo="bar"}`), - }, - { - "Subtract labels source = target no sub", - args{ - mustParseLabels(t, `{foo="bar"}`), - mustParseLabels(t, `{fiz="buz"}`), - }, - mustParseLabels(t, `{foo="bar"}`), - }, - { - "Subtract labels source > target no sub", - args{ - mustParseLabels(t, `{foo="bar"}`), - mustParseLabels(t, `{fiz="buz", foo="baz"}`), - }, - mustParseLabels(t, `{foo="bar"}`), - }, - { - "Subtract labels source > target no sub", - args{ - mustParseLabels(t, `{a="b", foo="bar", baz="baz", fizz="fizz"}`), - mustParseLabels(t, `{foo="bar", baz="baz", buzz="buzz", fizz="fizz"}`), - }, - mustParseLabels(t, `{a="b"}`), - }, - } - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - if got := subtract(tt.args.a, tt.args.b); !reflect.DeepEqual(got, tt.want) { - t.Errorf("subtract() = %v, want %v", got, tt.want) - } - }) - } -} - func Test_batch(t *testing.T) { tests := []struct { name string @@ -536,14 +404,6 @@ func Test_batch(t *testing.T) { } } -func mustParseLabels(t *testing.T, s string) loghttp.LabelSet { - t.Helper() - l, err := marshal.NewLabelSet(s) - require.NoErrorf(t, err, "Failed to parse %q", s) - - return l -} - type testQueryClient struct { engine *logql.Engine queryRangeCalls int @@ -610,6 +470,18 @@ func (t *testQueryClient) GetOrgID() string { panic("implement me") } +func (t *testQueryClient) GetStats(_ string, _, _ time.Time, _ bool) (*logproto.IndexStatsResponse, error) { + panic("not implemented") +} + +func (t *testQueryClient) GetVolume(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) { + panic("not implemented") +} + +func (t *testQueryClient) GetVolumeRange(_ string, _, _ time.Time, _ time.Duration, _ int, _ bool) (*loghttp.QueryResponse, error) { + panic("not implemented") +} + var schemaConfigContents = `schema_config: configs: - from: 2020-05-15 diff --git a/pkg/logcli/query/tail.go b/pkg/logcli/query/tail.go index 6108407ccc..c1c092a9e1 100644 --- a/pkg/logcli/query/tail.go +++ b/pkg/logcli/query/tail.go @@ -15,6 +15,7 @@ import ( "github.com/grafana/loki/pkg/logcli/client" "github.com/grafana/loki/pkg/logcli/output" + "github.com/grafana/loki/pkg/logcli/util" "github.com/grafana/loki/pkg/loghttp" "github.com/grafana/loki/pkg/util/unmarshal" ) @@ -127,3 +128,7 @@ func (q *Query) TailQuery(delayFor time.Duration, c client.Client, out output.Lo } } } + +func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet { + return util.MatchLabels(on, l, names) +} diff --git a/pkg/logcli/query/utils.go b/pkg/logcli/query/utils.go deleted file mode 100644 index c6c6cc0a33..0000000000 --- a/pkg/logcli/query/utils.go +++ /dev/null @@ -1,64 +0,0 @@ -package query - -import ( - "github.com/grafana/loki/pkg/loghttp" -) - -// return commonLabels labels between given labels set -func commonLabels(streams loghttp.Streams) loghttp.LabelSet { - if len(streams) == 0 { - return nil - } - - result := streams[0].Labels - for i := 1; i < len(streams); i++ { - result = intersect(result, streams[i].Labels) - } - return result -} - -// intersect two labels set -func intersect(a, b loghttp.LabelSet) loghttp.LabelSet { - set := loghttp.LabelSet{} - - for ka, va := range a { - if vb, ok := b[ka]; ok { - if vb == va { - set[ka] = va - } - } - } - return set -} - -// subtract labels set b from labels set a -func subtract(a, b loghttp.LabelSet) loghttp.LabelSet { - set := loghttp.LabelSet{} - - for ka, va := range a { - if vb, ok := b[ka]; ok { - if vb == va { - continue - } - } - set[ka] = va - } - return set -} - -func matchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet { - ret := loghttp.LabelSet{} - - nameSet := map[string]struct{}{} - for _, n := range names { - nameSet[n] = struct{}{} - } - - for k, v := range l { - if _, ok := nameSet[k]; on == ok { - ret[k] = v - } - } - - return ret -} diff --git a/pkg/logcli/util/util.go b/pkg/logcli/util/util.go new file mode 100644 index 0000000000..cb3a70f924 --- /dev/null +++ b/pkg/logcli/util/util.go @@ -0,0 +1,20 @@ +package util + +import "github.com/grafana/loki/pkg/loghttp" + +func MatchLabels(on bool, l loghttp.LabelSet, names []string) loghttp.LabelSet { + ret := loghttp.LabelSet{} + + nameSet := map[string]struct{}{} + for _, n := range names { + nameSet[n] = struct{}{} + } + + for k, v := range l { + if _, ok := nameSet[k]; on == ok { + ret[k] = v + } + } + + return ret +}