@ -18,6 +18,7 @@ import (
"github.com/weaveworks/common/user"
"github.com/weaveworks/common/user"
"gopkg.in/yaml.v2"
"gopkg.in/yaml.v2"
"github.com/grafana/dskit/multierror"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/client"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/logcli/output"
"github.com/grafana/loki/pkg/loghttp"
"github.com/grafana/loki/pkg/loghttp"
@ -36,7 +37,7 @@ import (
"github.com/grafana/loki/pkg/validation"
"github.com/grafana/loki/pkg/validation"
)
)
const S chemaConfigFilename = "schemaconfig.yaml "
const s chemaConfigFilename = "schemaconfig"
type streamEntryPair struct {
type streamEntryPair struct {
entry loghttp . Entry
entry loghttp . Entry
@ -197,7 +198,11 @@ func (q *Query) DoLocalQuery(out output.LogOutput, statistics bool, orgID string
return err
return err
}
}
loadedSchema , err := LoadSchemaUsingObjectClient ( client , SchemaConfigFilename )
objects := [ ] string {
fmt . Sprintf ( "%s-%s.yaml" , orgID , schemaConfigFilename ) , // schemaconfig-tenant.yaml
fmt . Sprintf ( "%s.yaml" , schemaConfigFilename ) , // schemaconfig.yaml for backwards compatibility
}
loadedSchema , err := LoadSchemaUsingObjectClient ( client , objects ... )
if err != nil {
if err != nil {
return err
return err
}
}
@ -284,24 +289,37 @@ type schemaConfigSection struct {
config . SchemaConfig ` yaml:"schema_config" `
config . SchemaConfig ` yaml:"schema_config" `
}
}
func LoadSchemaUsingObjectClient ( oc chunk . ObjectClient , name string ) ( * config . SchemaConfig , error ) {
// LoadSchemaUsingObjectClient returns the loaded schema from the first found object
ctx , cancel := context . WithDeadline ( context . Background ( ) , time . Now ( ) . Add ( 5 * time . Second ) )
func LoadSchemaUsingObjectClient ( oc chunk . ObjectClient , names ... string ) ( * config . SchemaConfig , error ) {
defer cancel ( )
errors := multierror . New ( )
rdr , _ , err := oc . GetObject ( ctx , name )
for _ , name := range names {
if err != nil {
schema , err := func ( name string ) ( * config . SchemaConfig , error ) {
return nil , err
ctx , cancel := context . WithDeadline ( context . Background ( ) , time . Now ( ) . Add ( 5 * time . Second ) )
}
defer cancel ( )
defer rdr . Close ( )
rdr , _ , err := oc . GetObject ( ctx , name )
if err != nil {
return nil , err
}
defer rdr . Close ( )
decoder := yaml . NewDecoder ( rdr )
decoder := yaml . NewDecoder ( rdr )
decoder . SetStrict ( true )
decoder . SetStrict ( true )
section := schemaConfigSection { }
section := schemaConfigSection { }
err = decoder . Decode ( & section )
err = decoder . Decode ( & section )
if err != nil {
if err != nil {
return nil , err
return nil , err
}
}
return & section . SchemaConfig , nil
} ( name )
return & section . SchemaConfig , nil
if err != nil {
errors = append ( errors , err )
continue
}
return schema , nil
}
return nil , errors . Err ( )
}
}
// SetInstant makes the Query an instant type
// SetInstant makes the Query an instant type