logcli: add flag to read remote schema configs (#6539)

* Add flag to read remote schema

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Fix lint imports

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>

* Trigger CI

* Trigger CI

* Addresss review comment

Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
pull/6570/head
Michel Hollands 4 years ago committed by GitHub
parent 360843485d
commit 527b658cc1
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
  1. 1
      cmd/logcli/main.go
  2. 94
      pkg/logcli/query/query.go
  3. 71
      pkg/logcli/query/query_test.go

@ -356,6 +356,7 @@ func newQuery(instant bool, cmd *kingpin.CmdClause) *query.Query {
cmd.Flag("include-label", "Include labels given the provided key during output.").StringsVar(&q.ShowLabelsKey)
cmd.Flag("labels-length", "Set a fixed padding to labels").Default("0").IntVar(&q.FixedLabelsLen)
cmd.Flag("store-config", "Execute the current query using a configured storage from a given Loki configuration file.").Default("").StringVar(&q.LocalConfig)
cmd.Flag("remote-schema", "Execute the current query using a remote schema retrieved using the configured storage in the given Loki configuration file.").Default("false").BoolVar(&q.FetchSchemaFromStorage)
cmd.Flag("colored-output", "Show output with colored labels").Default("false").BoolVar(&q.ColoredOutput)
return q

@ -16,6 +16,7 @@ import (
json "github.com/json-iterator/go"
"github.com/prometheus/client_golang/prometheus"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
@ -26,6 +27,8 @@ import (
"github.com/grafana/loki/pkg/logqlmodel/stats"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
chunk "github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/util/cfg"
util_log "github.com/grafana/loki/pkg/util/log"
@ -33,6 +36,8 @@ import (
"github.com/grafana/loki/pkg/validation"
)
const SchemaConfigFilename = "schemaconfig.yaml"
type streamEntryPair struct {
entry loghttp.Entry
labels loghttp.LabelSet
@ -40,21 +45,22 @@ type streamEntryPair struct {
// Query contains all necessary fields to execute instant and range queries and print the results.
type Query struct {
QueryString string
Start time.Time
End time.Time
Limit int
BatchSize int
Forward bool
Step time.Duration
Interval time.Duration
Quiet bool
NoLabels bool
IgnoreLabelsKey []string
ShowLabelsKey []string
FixedLabelsLen int
ColoredOutput bool
LocalConfig string
QueryString string
Start time.Time
End time.Time
Limit int
BatchSize int
Forward bool
Step time.Duration
Interval time.Duration
Quiet bool
NoLabels bool
IgnoreLabelsKey []string
ShowLabelsKey []string
FixedLabelsLen int
ColoredOutput bool
LocalConfig string
FetchSchemaFromStorage bool
}
// DoQuery executes the query and prints out the results
@ -64,7 +70,7 @@ func (q *Query) DoQuery(c client.Client, out output.LogOutput, statistics bool)
if orgID == "" {
orgID = "fake"
}
if err := q.DoLocalQuery(out, statistics, orgID); err != nil {
if err := q.DoLocalQuery(out, statistics, orgID, q.FetchSchemaFromStorage); err != nil {
log.Fatalf("Query failed: %+v", err)
}
return
@ -174,7 +180,7 @@ func (q *Query) printResult(value loghttp.ResultValue, out output.LogOutput, las
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string) error {
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string, useRemoteSchema bool) error {
var conf loki.Config
conf.RegisterFlags(flag.CommandLine)
if q.LocalConfig == "" {
@ -196,7 +202,23 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
conf.StorageConfig.BoltDBShipperConfig.Mode = indexshipper.ModeReadOnly
conf.StorageConfig.BoltDBShipperConfig.IndexGatewayClientConfig.Disabled = true
querier, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, conf.SchemaConfig, limits, cm, prometheus.DefaultRegisterer, util_log.Logger)
schema := conf.SchemaConfig
if useRemoteSchema {
cm := storage.NewClientMetrics()
client, err := GetObjectClient(conf, cm)
if err != nil {
return err
}
loadedSchema, err := LoadSchemaUsingObjectClient(client, SchemaConfigFilename)
if err != nil {
return err
}
schema = *loadedSchema
}
querier, err := storage.NewStore(conf.StorageConfig, conf.ChunkStoreConfig, schema, limits, cm, prometheus.DefaultRegisterer, util_log.Logger)
if err != nil {
return err
}
@ -248,6 +270,42 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return nil
}
func GetObjectClient(conf loki.Config, cm storage.ClientMetrics) (chunk.ObjectClient, error) {
oc, err := storage.NewObjectClient(
conf.StorageConfig.BoltDBShipperConfig.SharedStoreType,
conf.StorageConfig,
cm,
)
if err != nil {
return nil, err
}
return oc, nil
}
type schemaConfigSection struct {
config.SchemaConfig `yaml:"schema_config"`
}
func LoadSchemaUsingObjectClient(oc chunk.ObjectClient, name string) (*config.SchemaConfig, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(5*time.Second))
defer cancel()
rdr, _, err := oc.GetObject(ctx, name)
if err != nil {
return nil, err
}
defer rdr.Close()
decoder := yaml.NewDecoder(rdr)
decoder.SetStrict(true)
section := schemaConfigSection{}
err = decoder.Decode(&section)
if err != nil {
return nil, err
}
return &section.SchemaConfig, nil
}
// SetInstant makes the Query an instant type
func (q *Query) SetInstant(time time.Time) {
q.Start = time

@ -3,6 +3,8 @@ package query
import (
"bytes"
"context"
"os"
"path/filepath"
"reflect"
"strings"
"testing"
@ -17,6 +19,12 @@ import (
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/logproto"
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/storage/stores/indexshipper"
"github.com/grafana/loki/pkg/storage/stores/shipper"
"github.com/grafana/loki/pkg/util/marshal"
)
@ -557,3 +565,66 @@ func (t *testQueryClient) LiveTailQueryConn(queryStr string, delayFor time.Durat
func (t *testQueryClient) GetOrgID() string {
panic("implement me")
}
var schemaConfigContents = `schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: gcs
schema: v10
index:
prefix: index_
period: 168h
- from: 2020-07-31
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h
`
func TestLoadFromURL(t *testing.T) {
tmpDir := t.TempDir()
conf := loki.Config{
StorageConfig: storage.Config{
FSConfig: local.FSConfig{
Directory: tmpDir,
},
},
}
// Missing SharedStoreType should error
cm := storage.NewClientMetrics()
client, err := GetObjectClient(conf, cm)
require.Error(t, err)
require.Nil(t, client)
conf.StorageConfig.BoltDBShipperConfig = shipper.Config{
Config: indexshipper.Config{
SharedStoreType: config.StorageTypeFileSystem,
},
}
client, err = GetObjectClient(conf, cm)
require.NoError(t, err)
require.NotNil(t, client)
// Missing schema.config file should error
schemaConfig, err := LoadSchemaUsingObjectClient(client, SchemaConfigFilename)
require.Error(t, err)
require.Nil(t, schemaConfig)
err = os.WriteFile(
filepath.Join(tmpDir, SchemaConfigFilename),
[]byte(schemaConfigContents),
0666,
)
require.NoError(t, err)
schemaConfig, err = LoadSchemaUsingObjectClient(client, SchemaConfigFilename)
require.NoError(t, err)
require.NotNil(t, schemaConfig)
}

Loading…
Cancel
Save