feat: update logcli so it tries to load the latest version of the schemaconfig (#11852)

Signed-off-by: Michel Hollands <michel.hollands@gmail.com>
Signed-off-by: Michel Hollands <michel.hollands@grafana.com>
Co-authored-by: Dylan Guedes <djmgguedes@gmail.com>
pull/11646/head^2
Michel Hollands 1 year ago committed by GitHub
parent 0d1a3694f5
commit 4ce5fa8954
No known key found for this signature in database
GPG Key ID: B5690EEEBB952194
  1. 2
      CHANGELOG.md
  2. 101
      pkg/logcli/query/query.go
  3. 132
      pkg/logcli/query/query_test.go

@ -110,6 +110,8 @@
#### LogCLI
* [11852](https://github.com/grafana/loki/pull/11852) **MichelHollands**: feat: update logcli so it tries to load the latest version of the schemaconfig
#### Mixins
* [11087](https://github.com/grafana/loki/pull/11087) **JoaoBraveCoding**: Adds structured metadata panels for ingested data

@ -2,6 +2,7 @@ package query
import (
"context"
stdErrors "errors"
"flag"
"fmt"
"io"
@ -10,7 +11,6 @@ import (
"sync"
"time"
"github.com/grafana/dskit/multierror"
"github.com/grafana/dskit/user"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
@ -395,6 +395,41 @@ func maxTime(t1, t2 time.Time) time.Time {
return t2
}
func getLatestConfig(client chunk.ObjectClient, orgID string) (*config.SchemaConfig, error) {
// Get the latest
iteration := 0
searchFor := fmt.Sprintf("%s-%s.yaml", orgID, schemaConfigFilename) // schemaconfig-tenant.yaml
var loadedSchema *config.SchemaConfig
for {
if iteration != 0 {
searchFor = fmt.Sprintf("%s-%s-%d.yaml", orgID, schemaConfigFilename, iteration) // tenant-schemaconfig-1.yaml
}
tempSchema, err := LoadSchemaUsingObjectClient(client, searchFor)
if err == errNotExists {
break
}
if err != nil {
return nil, err
}
loadedSchema = tempSchema
iteration++
}
if loadedSchema != nil {
return loadedSchema, nil
}
searchFor = fmt.Sprintf("%s.yaml", schemaConfigFilename) // schemaconfig.yaml for backwards compatibility
loadedSchema, err := LoadSchemaUsingObjectClient(client, searchFor)
if err == nil {
return loadedSchema, nil
}
if err != errNotExists {
return nil, err
}
return nil, errNotExists
}
// DoLocalQuery executes the query against the local store using a Loki configuration file.
func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string, useRemoteSchema bool) error {
var conf loki.Config
@ -417,15 +452,10 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
}
objects := []string{
fmt.Sprintf("%s-%s.yaml", orgID, schemaConfigFilename), // schemaconfig-tenant.yaml
fmt.Sprintf("%s.yaml", schemaConfigFilename), // schemaconfig.yaml for backwards compatibility
}
loadedSchema, err := LoadSchemaUsingObjectClient(client, objects...)
loadedSchema, err := getLatestConfig(client, orgID)
if err != nil {
return err
}
conf.SchemaConfig = *loadedSchema
}
@ -484,10 +514,6 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
query = eng.Query(params)
}
if err != nil {
return err
}
// execute the query
ctx := user.InjectOrgID(context.Background(), orgID)
result, err := query.Exec(ctx)
@ -521,41 +547,40 @@ func GetObjectClient(store string, conf loki.Config, cm storage.ClientMetrics) (
return oc, nil
}
var errNotExists = stdErrors.New("doesn't exist")
type schemaConfigSection struct {
config.SchemaConfig `yaml:"schema_config"`
}
// LoadSchemaUsingObjectClient returns the loaded schema from the first found object
func LoadSchemaUsingObjectClient(oc chunk.ObjectClient, names ...string) (*config.SchemaConfig, error) {
errs := multierror.New()
for _, name := range names {
schema, err := func(name string) (*config.SchemaConfig, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
defer cancel()
rdr, _, err := oc.GetObject(ctx, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to load schema object '%s'", name)
}
defer rdr.Close()
// LoadSchemaUsingObjectClient returns the loaded schema from the object with the given name
func LoadSchemaUsingObjectClient(oc chunk.ObjectClient, name string) (*config.SchemaConfig, error) {
ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(1*time.Minute))
defer cancel()
decoder := yaml.NewDecoder(rdr)
decoder.SetStrict(true)
section := schemaConfigSection{}
err = decoder.Decode(&section)
if err != nil {
return nil, err
}
ok, err := oc.ObjectExists(ctx, name)
if !ok {
return nil, errNotExists
}
if err != nil {
return nil, err
}
return &section.SchemaConfig, nil
}(name)
rdr, _, err := oc.GetObject(ctx, name)
if err != nil {
return nil, errors.Wrapf(err, "failed to load schema object '%s'", name)
}
defer rdr.Close()
if err != nil {
errs = append(errs, err)
continue
}
return schema, nil
decoder := yaml.NewDecoder(rdr)
decoder.SetStrict(true)
section := schemaConfigSection{}
err = decoder.Decode(&section)
if err != nil {
return nil, err
}
return nil, errs.Err()
return &section.SchemaConfig, nil
}
// SetInstant makes the Query an instant type

@ -3,6 +3,7 @@ package query
import (
"bytes"
"context"
"errors"
"fmt"
"os"
"path/filepath"
@ -23,6 +24,7 @@ import (
"github.com/grafana/loki/pkg/logql"
"github.com/grafana/loki/pkg/loki"
"github.com/grafana/loki/pkg/storage"
"github.com/grafana/loki/pkg/storage/chunk/client"
"github.com/grafana/loki/pkg/storage/chunk/client/local"
"github.com/grafana/loki/pkg/storage/config"
"github.com/grafana/loki/pkg/util/marshal"
@ -406,7 +408,6 @@ func Test_batch(t *testing.T) {
type testQueryClient struct {
engine *logql.Engine
queryRangeCalls int
orgID string
}
func newTestQueryClient(testStreams ...logproto.Stream) *testQueryClient {
@ -484,6 +485,17 @@ func (t *testQueryClient) GetVolumeRange(_ *volume.Query) (*loghttp.QueryRespons
panic("not implemented")
}
var legacySchemaConfigContents = `schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: gcs
schema: v10
index:
prefix: index_
period: 168h
`
var schemaConfigContents = `schema_config:
configs:
- from: 2020-05-15
@ -501,10 +513,35 @@ var schemaConfigContents = `schema_config:
prefix: index_
period: 24h
`
var schemaConfigContents2 = `schema_config:
configs:
- from: 2020-05-15
store: boltdb-shipper
object_store: gcs
schema: v10
index:
prefix: index_
period: 168h
- from: 2020-07-31
store: boltdb-shipper
object_store: gcs
schema: v11
index:
prefix: index_
period: 24h
- from: 2020-09-30
store: boltdb-shipper
object_store: gcs
schema: v12
index:
prefix: index_
period: 24h
`
var cm = storage.NewClientMetrics()
func TestLoadFromURL(t *testing.T) {
func setupTestEnv(t *testing.T) (string, client.ObjectClient) {
t.Helper()
tmpDir := t.TempDir()
conf := loki.Config{
StorageConfig: storage.Config{
FSConfig: local.FSConfig{
@ -513,11 +550,19 @@ func TestLoadFromURL(t *testing.T) {
},
}
cm := storage.NewClientMetrics()
client, err := GetObjectClient(config.StorageTypeFileSystem, conf, cm)
require.NoError(t, err)
require.NotNil(t, client)
_, err = getLatestConfig(client, "456")
require.Error(t, err)
require.True(t, errors.Is(err, errNotExists))
return tmpDir, client
}
func TestLoadFromURL(t *testing.T) {
tmpDir, client := setupTestEnv(t)
filename := "schemaconfig.yaml"
// Missing schemaconfig.yaml file should error
@ -537,12 +582,85 @@ func TestLoadFromURL(t *testing.T) {
require.NoError(t, err)
require.NotNil(t, schemaConfig)
}
// Load multiple schemaconfig files
schemaConfig, err = LoadSchemaUsingObjectClient(client, "foo.yaml", filename, "bar.yaml")
func TestMultipleConfigs(t *testing.T) {
tmpDir, client := setupTestEnv(t)
err := os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig.yaml"),
[]byte(schemaConfigContents),
0666,
)
require.NoError(t, err)
require.NotNil(t, schemaConfig)
config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 2)
err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig-1.yaml"),
[]byte(schemaConfigContents2),
0666,
)
require.NoError(t, err)
config, err = getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 3)
}
func TestMultipleConfigsIncludingLegacy(t *testing.T) {
tmpDir, client := setupTestEnv(t)
err := os.WriteFile(
filepath.Join(tmpDir, "schemaconfig.yaml"),
[]byte(legacySchemaConfigContents),
0666,
)
require.NoError(t, err)
err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig.yaml"),
[]byte(schemaConfigContents),
0666,
)
require.NoError(t, err)
config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 2)
err = os.WriteFile(
filepath.Join(tmpDir, "456-schemaconfig-1.yaml"),
[]byte(schemaConfigContents2),
0666,
)
require.NoError(t, err)
config, err = getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 3)
}
func TestLegacyConfigOnly(t *testing.T) {
tmpDir, client := setupTestEnv(t)
err := os.WriteFile(
filepath.Join(tmpDir, "schemaconfig.yaml"),
[]byte(legacySchemaConfigContents),
0666,
)
require.NoError(t, err)
config, err := getLatestConfig(client, "456")
require.NoError(t, err)
require.NotNil(t, config)
require.Len(t, config.Configs, 1)
}
func TestDurationCeilDiv(t *testing.T) {

Loading…
Cancel
Save