Like Prometheus, but for logs.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
loki/pkg/logcli/query/query.go

419 lines
12 KiB

package query
import (
"context"
"errors"
"flag"
"fmt"
"log"
"os"
"sort"
"strings"
"text/tabwriter"
"time"
util_log "github.com/cortexproject/cortex/pkg/util/log"
"github.com/fatih/color"
json "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logqlmodel"
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
chunk_storage "github.com/grafana/loki/pkg/storage/chunk/storage"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/util/cfg"
"github.com/grafana/loki/pkg/util/marshal"
"github.com/grafana/loki/pkg/validation"
)
type streamEntryPair struct {
entry loghttp.Entry
labels loghttp.LabelSet
}
// Query contains all necessary fields to execute instant and range queries and print the results.
type Query struct {
QueryString string
Start time.Time
End time.Time
Limit int
BatchSize int
Forward bool
Step time.Duration
Interval time.Duration
Quiet bool
NoLabels bool
IgnoreLabelsKey []string
ShowLabelsKey []string
FixedLabelsLen int
ColoredOutput bool
LocalConfig string
}
// DoQuery executes the query and prints out the results
func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool) {
if q.LocalConfig != "" {
if err := q.DoLocalQuery(out, statistics, c.GetOrgID()); err != nil {
log.Fatalf("Query failed: %+v", err)
}
return
}
d := q.resultsDirection()
var resp *loghttp.QueryResponse
var err error
if q.isInstant() {
resp, err = c.Query(q.QueryString, q.Limit, q.Start, d, q.Quiet)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if statistics {
q.printStats(resp.Data.Statistics)
}
_, _ = q.printResult(resp.Data.Result, out, nil)
} else {
if q.Limit < q.BatchSize {
q.BatchSize = q.Limit
}
resultLength := 0
total := 0
start := q.Start
end := q.End
var lastEntry []*loghttp.Entry
for total < q.Limit {
bs := q.BatchSize
// We want to truncate the batch size if the remaining number
// of items needed to reach the limit is less than the batch size
if q.Limit-total < q.BatchSize {
// Truncated batchsize is q.Limit - total, however we add to this
// the length of the overlap from the last query to make sure we get the
// correct amount of new logs knowing there will be some overlapping logs returned.
bs = q.Limit - total + len(lastEntry)
}
resp, err = c.QueryRange(q.QueryString, bs, start, end, d, q.Step, q.Interval, q.Quiet)
if err != nil {
log.Fatalf("Query failed: %+v", err)
}
if statistics {
q.printStats(resp.Data.Statistics)
}
resultLength, lastEntry = q.printResult(resp.Data.Result, out, lastEntry)
// Was not a log stream query, or no results, no more batching
if resultLength <= 0 {
break
}
// Also no result, wouldn't expect to hit this.
if len(lastEntry) == 0 {
break
}
// Can only happen if all the results return in one request
if resultLength == q.Limit {
break
}
if len(lastEntry) >= q.BatchSize {
log.Fatalf("Invalid batch size %v, the next query will have %v overlapping entries "+
"(there will always be 1 overlapping entry but Loki allows multiple entries to have "+
"the same timestamp, so when a batch ends in this scenario the next query will include "+
"all the overlapping entries again). Please increase your batch size to at least %v to account "+
"for overlapping entryes\n", q.BatchSize, len(lastEntry), len(lastEntry)+1)
}
// Batching works by taking the timestamp of the last query and using it in the next query,
// because Loki supports multiple entries with the same timestamp it's possible for a batch to have
// fallen in the middle of a list of entries for the same time, so to make sure we get all entries
// we start the query on the same time as the last entry from the last batch, and then we keep this last
// entry and remove the duplicate when printing the results.
// Because of this duplicate entry, we have to subtract it here from the total for each batch
// to get the desired limit.
total += resultLength
// Based on the query direction we either set the start or end for the next query.
// If there are multiple entries in `lastEntry` they have to have the same timestamp so we can pick just the first
if q.Forward {
start = lastEntry[0].Timestamp
} else {
// The end timestamp is exclusive on a backward query, so to make sure we get back an overlapping result
// fudge the timestamp forward in time to make sure to get the last entry from this batch in the next query
end = lastEntry[0].Timestamp.Add(1 * time.Nanosecond)
}
}
}
}
func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
length := -1
var entry []*loghttp.Entry
switch value.Type() {
case logqlmodel.ValueTypeStreams:
length, entry = q.printStream(value.(loghttp.Streams), out, lastEntry)
case loghttp.ResultTypeScalar:
q.printScalar(value.(loghttp.Scalar))
case loghttp.ResultTypeMatrix:
q.printMatrix(value.(loghttp.Matrix))
case loghttp.ResultTypeVector:
q.printVector(value.(loghttp.Vector))
default:
log.Fatalf("Unable to print unsupported type: %v", value.Type())
}
return length, entry
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error {
var conf loki.Config
conf.RegisterFlags(flag.CommandLine)
if q.LocalConfig == "" {
return errors.New("no supplied config file")
}
if err := cfg.YAML(q.LocalConfig, false)(&conf); err != nil {
return err
}
if err := conf.Validate(); err != nil {
return err
}
limits, err := validation.NewOverrides(conf.LimitsConfig, nil)
if err != nil {
return err
}
storage.RegisterCustomIndexClients(&conf.StorageConfig, prometheus.DefaultRegisterer)
conf.StorageConfig.BoltDBShipperConfig.Mode = shipper.ModeReadOnly
chunkStore, err := chunk_storage.NewStore(conf.StorageConfig.Config, conf.ChunkStoreConfig.StoreConfig, conf.SchemaConfig.SchemaConfig, limits, prometheus.DefaultRegisterer, nil, util_log.Logger)
if err != nil {
return err
}
querier, err := storage.NewStore(conf.StorageConfig, conf.SchemaConfig, chunkStore, prometheus.DefaultRegisterer)
if err != nil {
return err
}
eng := logql.NewEngine(conf.Querier.Engine, querier, limits)
var query logql.Query
if q.isInstant() {
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.Start,
0,
0,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
} else {
query = eng.Query(logql.NewLiteralParams(
q.QueryString,
q.Start,
q.End,
q.Step,
q.Interval,
q.resultsDirection(),
uint32(q.Limit),
nil,
))
}
// execute the query
ctx := user.InjectOrgID(context.Background(), orgID)
result, err := query.Exec(ctx)
if err != nil {
return err
}
if statistics {
q.printStats(result.Statistics)
}
value, err := marshal.NewResultValue(result.Data)
if err != nil {
return err
}
q.printResult(value, out, nil)
return nil
}
// SetInstant makes the Query an instant type
func (q *Query) SetInstant(time time.Time) {
q.Start = time
q.End = time
}
func (q *Query) isInstant() bool {
return q.Start == q.End && q.Step == 0
}
func (q *Query) printStream(streams loghttp.Streams, out output.LogOutput, lastEntry []*loghttp.Entry) (int, []*loghttp.Entry) {
common := commonLabels(streams)
// Remove the labels we want to show from common
if len(q.ShowLabelsKey) > 0 {
common = matchLabels(false, common, q.ShowLabelsKey)
}
if len(common) > 0 && !q.Quiet {
log.Println("Common labels:", color.RedString(common.String()))
}
if len(q.IgnoreLabelsKey) > 0 && !q.Quiet {
log.Println("Ignoring labels key:", color.RedString(strings.Join(q.IgnoreLabelsKey, ",")))
}
if len(q.ShowLabelsKey) > 0 && !q.Quiet {
log.Println("Print only labels key:", color.RedString(strings.Join(q.ShowLabelsKey, ",")))
}
// Remove ignored and common labels from the cached labels and
// calculate the max labels length
maxLabelsLen := q.FixedLabelsLen
for i, s := range streams {
// Remove common labels
ls := subtract(s.Labels, common)
if len(q.ShowLabelsKey) > 0 {
ls = matchLabels(true, ls, q.ShowLabelsKey)
}
// Remove ignored labels
if len(q.IgnoreLabelsKey) > 0 {
ls = matchLabels(false, ls, q.IgnoreLabelsKey)
}
// Overwrite existing Labels
streams[i].Labels = ls
// Update max labels length
len := len(ls.String())
if maxLabelsLen < len {
maxLabelsLen = len
}
}
// sort and display entries
allEntries := make([]streamEntryPair, 0)
for _, s := range streams {
for _, e := range s.Entries {
allEntries = append(allEntries, streamEntryPair{
entry: e,
labels: s.Labels,
})
}
}
if len(allEntries) == 0 {
return 0, nil
}
if q.Forward {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.Before(allEntries[j].entry.Timestamp) })
} else {
sort.Slice(allEntries, func(i, j int) bool { return allEntries[i].entry.Timestamp.After(allEntries[j].entry.Timestamp) })
}
printed := 0
for _, e := range allEntries {
// Skip the last entry if it overlaps, this happens because batching includes the last entry from the last batch
if len(lastEntry) > 0 && e.entry.Timestamp == lastEntry[0].Timestamp {
skip := false
// Because many logs can share a timestamp in the unlucky event a batch ends with a timestamp
// shared by multiple entries we have to check all that were stored to see if we've already
// printed them.
for _, le := range lastEntry {
if e.entry.Line == le.Line {
skip = true
}
}
if skip {
continue
}
}
out.FormatAndPrintln(e.entry.Timestamp, e.labels, maxLabelsLen, e.entry.Line)
printed++
}
// Loki allows multiple entries at the same timestamp, this is a bit of a mess if a batch ends
// with an entry that shared multiple timestamps, so we need to keep a list of all these entries
// because the next query is going to contain them too and we want to not duplicate anything already
// printed.
lel := []*loghttp.Entry{}
// Start with the timestamp of the last entry
le := allEntries[len(allEntries)-1].entry
for i, e := range allEntries {
// Save any entry which has this timestamp (most of the time this will only be the single last entry)
if e.entry.Timestamp.Equal(le.Timestamp) {
lel = append(lel, &allEntries[i].entry)
}
}
return printed, lel
}
func (q *Query) printMatrix(matrix loghttp.Matrix) {
// yes we are effectively unmarshalling and then immediately marshalling this object back to json. we are doing this b/c
// it gives us more flexibility with regard to output types in the future. initially we are supporting just formatted json but eventually
// we might add output options such as render to an image file on disk
bytes, err := json.MarshalIndent(matrix, "", " ")
if err != nil {
log.Fatalf("Error marshalling matrix: %v", err)
}
fmt.Print(string(bytes))
}
func (q *Query) printVector(vector loghttp.Vector) {
bytes, err := json.MarshalIndent(vector, "", " ")
if err != nil {
log.Fatalf("Error marshalling vector: %v", err)
}
fmt.Print(string(bytes))
}
func (q *Query) printScalar(scalar loghttp.Scalar) {
bytes, err := json.MarshalIndent(scalar, "", " ")
if err != nil {
log.Fatalf("Error marshalling scalar: %v", err)
}
fmt.Print(string(bytes))
}
type kvLogger struct {
*tabwriter.Writer
}
func (k kvLogger) Log(keyvals ...interface{}) error {
for i := 0; i < len(keyvals); i += 2 {
fmt.Fprintln(k.Writer, color.BlueString("%s", keyvals[i]), "\t", fmt.Sprintf("%v", keyvals[i+1]))
}
k.Flush()
return nil
}
func (q *Query) printStats(stats stats.Result) {
writer := tabwriter.NewWriter(os.Stderr, 0, 8, 0, '\t', 0)
stats.Log(kvLogger{Writer: writer})
}
func (q *Query) resultsDirection() logproto.Direction {
if q.Forward {
return logproto.FORWARD
}
return logproto.BACKWARD
}