@ -1,6 +1,7 @@
package query
import (
"context"
"fmt"
"log"
"os"
@ -12,13 +13,20 @@ import (
"github.com/fatih/color"
json "github.com/json-iterator/go"
"github.com/prometheus/prometheus/promql"
"github.com/weaveworks/common/user"
"github.com/grafana/loki/pkg/cfg"
"github.com/grafana/loki/pkg/iter"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/logql/marshal"
"github.com/grafana/loki/pkg/logql/stats"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/util/validation"
)
type streamEntryPair struct {
@ -39,10 +47,20 @@ type Query struct {
IgnoreLabelsKey [ ] string
ShowLabelsKey [ ] string
FixedLabelsLen int
LocalConfig string
}
// DoQuery executes the query and prints out the results
func ( q * Query ) DoQuery ( c * client . Client , out output . LogOutput , statistics bool ) {
if q . LocalConfig != "" {
if err := q . DoLocalQuery ( out , statistics , c . OrgID ) ; err != nil {
log . Fatalf ( "Query failed: %+v" , err )
}
return
}
d := q . resultsDirection ( )
var resp * loghttp . QueryResponse
@ -62,21 +80,81 @@ func (q *Query) DoQuery(c *client.Client, out output.LogOutput, statistics bool)
q . printStats ( resp . Data . Statistics )
}
switch resp . Data . ResultType {
q . printResult ( resp . Data . Result , out )
}
func ( q * Query ) printResult ( value loghttp . ResultValue , out output . LogOutput ) {
switch value . Type ( ) {
case logql . ValueTypeStreams :
streams := resp . Data . Result . ( loghttp . Streams )
q . printStream ( streams , out )
q . printStream ( value . ( loghttp . Streams ) , out )
case promql . ValueTypeScalar :
q . printScalar ( resp . Data . Result . ( loghttp . Scalar ) )
q . printScalar ( value . ( loghttp . Scalar ) )
case promql . ValueTypeMatrix :
matrix := resp . Data . Result . ( loghttp . Matrix )
q . printMatrix ( matrix )
q . printMatrix ( value . ( loghttp . Matrix ) )
case promql . ValueTypeVector :
vector := resp . Data . Result . ( loghttp . Vector )
q . printVector ( vector )
q . printVector ( value . ( loghttp . Vector ) )
default :
log . Fatalf ( "Unable to print unsupported type: %v" , resp . Data . ResultType )
log . Fatalf ( "Unable to print unsupported type: %v" , value . Type ( ) )
}
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func ( q * Query ) DoLocalQuery ( out output . LogOutput , statistics bool , orgID string ) error {
var conf loki . Config
if err := cfg . Defaults ( ) ( & conf ) ; err != nil {
return err
}
if err := cfg . YAML ( & q . LocalConfig ) ( & conf ) ; err != nil {
return err
}
querier , err := localStore ( conf )
if err != nil {
return err
}
eng := logql . NewEngine ( conf . Querier . Engine , querier )
var query logql . Query
if q . isInstant ( ) {
query = eng . NewInstantQuery ( q . QueryString , q . Start , q . resultsDirection ( ) , uint32 ( q . Limit ) )
} else {
query = eng . NewRangeQuery ( q . QueryString , q . Start , q . End , q . Step , q . resultsDirection ( ) , uint32 ( q . Limit ) )
}
// execute the query
ctx := user . InjectOrgID ( context . Background ( ) , orgID )
result , err := query . Exec ( ctx )
if err != nil {
return err
}
if statistics {
q . printStats ( result . Statistics )
}
value , err := marshal . NewResultValue ( result . Data )
if err != nil {
return err
}
q . printResult ( value , out )
return nil
}
func localStore ( conf loki . Config ) ( logql . Querier , error ) {
limits , err := validation . NewOverrides ( conf . LimitsConfig , nil )
if err != nil {
return nil , err
}
s , err := storage . NewStore ( conf . StorageConfig , conf . ChunkStoreConfig , conf . SchemaConfig , limits )
if err != nil {
return nil , err
}
return logql . QuerierFunc ( func ( ctx context . Context , params logql . SelectParams ) ( iter . EntryIterator , error ) {
return s . LazyQuery ( ctx , params )
} ) , nil
}
// SetInstant makes the Query an instant type