diff --git a/pkg/infra/db/dbrepl.go b/pkg/infra/db/dbrepl.go index d5c1e0a51ed..a8d8e23d676 100644 --- a/pkg/infra/db/dbrepl.go +++ b/pkg/infra/db/dbrepl.go @@ -9,6 +9,5 @@ type ReplDB interface { DB() *sqlstore.SQLStore // ReadReplica is the read-only database connection. If no read replica is configured, the implementation must return the primary DB. - // TODO: ReadReplica will take a list of replicas and load-balance across them in a future milestone. ReadReplica() *sqlstore.SQLStore } diff --git a/pkg/services/sqlstore/database_config.go b/pkg/services/sqlstore/database_config.go index 8ffc69fcf66..9105859ca03 100644 --- a/pkg/services/sqlstore/database_config.go +++ b/pkg/services/sqlstore/database_config.go @@ -10,6 +10,7 @@ import ( "strings" "github.com/go-sql-driver/mysql" + "gopkg.in/ini.v1" "github.com/grafana/grafana/pkg/services/featuremgmt" "github.com/grafana/grafana/pkg/services/sqlstore/migrator" @@ -70,6 +71,10 @@ func NewDatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (* // section to the configuration file while using the same cfg struct. func (dbCfg *DatabaseConfig) readConfigSection(cfg *setting.Cfg, section string) error { sec := cfg.Raw.Section(section) + return dbCfg.parseConfigIni(sec) +} + +func (dbCfg *DatabaseConfig) parseConfigIni(sec *ini.Section) error { cfgURL := sec.Key("url").String() if len(cfgURL) != 0 { dbURL, err := url.Parse(cfgURL) diff --git a/pkg/services/sqlstore/replstore.go b/pkg/services/sqlstore/replstore.go index 5cc1bda9ccf..5d6d0b686b4 100644 --- a/pkg/services/sqlstore/replstore.go +++ b/pkg/services/sqlstore/replstore.go @@ -2,6 +2,7 @@ package sqlstore import ( "errors" + "sync/atomic" "time" "github.com/dlmiddlecote/sqlstats" @@ -24,23 +25,37 @@ import ( // SQLStore. type ReplStore struct { *SQLStore - repl *SQLStore + repls []*SQLStore + + // next is the index of the next read-only SQLStore in the chain. + next uint64 } // DB returns the main SQLStore. -func (rs ReplStore) DB() *SQLStore { +func (rs *ReplStore) DB() *SQLStore { return rs.SQLStore } // ReadReplica returns the read-only SQLStore. If no read replica is configured, // it returns the main SQLStore. -func (rs ReplStore) ReadReplica() *SQLStore { - if rs.repl == nil { +func (rs *ReplStore) ReadReplica() *SQLStore { + if rs.repls == nil || len(rs.repls) == 0 { rs.log.Debug("ReadReplica not configured, using main SQLStore") return rs.SQLStore } rs.log.Debug("Using ReadReplica") - return rs.repl + return rs.nextRepl() +} + +// nextRepl() returns the next read-only SQLStore in the chain. If no read replica is configured, the Primary is returned. +func (rs *ReplStore) nextRepl() *SQLStore { + // start by grabbing the replica at the current index + selected := rs.repls[(int(rs.next))%len(rs.repls)] + + // then increment the index for the next call + atomic.AddUint64(&rs.next, 1) + + return selected } // ProvideServiceWithReadReplica creates a new *SQLStore connection intended for @@ -50,7 +65,7 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator, bus bus.Bus, tracer tracing.Tracer) (*ReplStore, error) { // start with the initialized SQLStore - replStore := &ReplStore{primary, nil} + replStore := &ReplStore{primary, nil, 0} // FeatureToggle fallback: If the FlagDatabaseReadReplica feature flag is not enabled, return a single SQLStore. if !features.IsEnabledGlobally(featuremgmt.FlagDatabaseReadReplica) { @@ -62,22 +77,32 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg, // by that mimic the functionality of how it was functioning before // xorm's changes above. xorm.DefaultPostgresSchema = "" - s, err := newReadOnlySQLStore(cfg, features, bus, tracer) + + // Parsing the configuration to get the number of repls + replCfgs, err := NewRODatabaseConfigs(cfg, features) if err != nil { return nil, err } - s.features = features - s.tracer = tracer - - // initialize and register metrics wrapper around the *sql.DB - db := s.engine.DB().DB - // register the go_sql_stats_connections_* metrics - if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil { - s.log.Warn("Failed to register sqlstore stats collector", "error", err) + if len(replCfgs) > 0 { + replStore.repls = make([]*SQLStore, len(replCfgs)) } - replStore.repl = s + for i, replCfg := range replCfgs { + s, err := newReadOnlySQLStore(cfg, replCfg, features, bus, tracer) + if err != nil { + return nil, err + } + + // initialize and register metrics wrapper around the *sql.DB + db := s.engine.DB().DB + + // register the go_sql_stats_connections_* metrics + if err := prometheus.Register(sqlstats.NewStatsCollector("grafana_repl", db)); err != nil { + s.log.Warn("Failed to register sqlstore stats collector", "error", err) + } + replStore.repls[i] = s + } return replStore, nil } @@ -85,17 +110,16 @@ func ProvideServiceWithReadReplica(primary *SQLStore, cfg *setting.Cfg, // fully-populated read replica of the main Grafana Database. It provides no // write capabilities and does not run migrations, but other tracing and logging // features are enabled. -func newReadOnlySQLStore(cfg *setting.Cfg, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) { +func newReadOnlySQLStore(cfg *setting.Cfg, dbCfg *DatabaseConfig, features featuremgmt.FeatureToggles, bus bus.Bus, tracer tracing.Tracer) (*SQLStore, error) { s := &SQLStore{ - cfg: cfg, - log: log.New("replstore"), - bus: bus, - tracer: tracer, + log: log.New("replstore"), + bus: bus, + tracer: tracer, + features: features, + dbCfg: dbCfg, + cfg: cfg, } - s.features = features - s.tracer = tracer - err := s.initReadOnlyEngine(s.engine) if err != nil { return nil, err @@ -111,12 +135,6 @@ func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error { return nil } - dbCfg, err := NewRODatabaseConfig(ss.cfg, ss.features) - if err != nil { - return err - } - ss.dbCfg = dbCfg - if ss.cfg.DatabaseInstrumentQueries { ss.dbCfg.Type = WrapDatabaseReplDriverWithHooks(ss.dbCfg.Type, ss.tracer) } @@ -158,27 +176,44 @@ func (ss *SQLStore) initReadOnlyEngine(engine *xorm.Engine) error { } // NewRODatabaseConfig creates a new read-only database configuration. -func NewRODatabaseConfig(cfg *setting.Cfg, features featuremgmt.FeatureToggles) (*DatabaseConfig, error) { +func NewRODatabaseConfigs(cfg *setting.Cfg, features featuremgmt.FeatureToggles) ([]*DatabaseConfig, error) { if cfg == nil { return nil, errors.New("cfg cannot be nil") } - dbCfg := &DatabaseConfig{} - if err := dbCfg.readConfigSection(cfg, "database_replica"); err != nil { + // if only one replica is configured in the database_replicas section, use it as the default + defaultReplCfg := &DatabaseConfig{} + if err := defaultReplCfg.readConfigSection(cfg, "database_replicas"); err != nil { return nil, err } - - if err := dbCfg.buildConnectionString(cfg, features); err != nil { + err := defaultReplCfg.buildConnectionString(cfg, features) + if err != nil { return nil, err } + ret := []*DatabaseConfig{defaultReplCfg} - return dbCfg, nil + // Check for additional replicas as children of the database_replicas section (e.g. database_replicas.one, database_replicas.cheetara) + repls := cfg.Raw.Section("database_replicas") + if len(repls.ChildSections()) > 0 { + for _, sec := range repls.ChildSections() { + replCfg := &DatabaseConfig{} + if err := replCfg.parseConfigIni(sec); err != nil { + return nil, err + } + if err := replCfg.buildConnectionString(cfg, features); err != nil { + return nil, err + } + ret = append(ret, replCfg) + } + } + + return ret, nil } // ProvideServiceWithReadReplicaForTests wraps the SQLStore in a ReplStore, with the main sqlstore as both the primary and read replica. // TODO: eventually this should be replaced with a more robust test setup which in func ProvideServiceWithReadReplicaForTests(testDB *SQLStore, t sqlutil.ITestDB, cfg *setting.Cfg, features featuremgmt.FeatureToggles, migrations registry.DatabaseMigrator) (*ReplStore, error) { - return &ReplStore{testDB, testDB}, nil + return newReplStore(testDB, testDB), nil } // InitTestReplDB initializes a test DB and returns it wrapped in a ReplStore with the main SQLStore as both the primary and read replica. @@ -190,7 +225,7 @@ func InitTestReplDB(t sqlutil.ITestDB, opts ...InitTestDBOpt) (*ReplStore, *sett if err != nil { t.Fatalf("failed to initialize sql repl store: %s", err) } - return &ReplStore{ss, ss}, cfg + return newReplStore(ss, ss), cfg } // InitTestReplDBWithMigration initializes the test DB given custom migrations. @@ -202,5 +237,16 @@ func InitTestReplDBWithMigration(t sqlutil.ITestDB, migration registry.DatabaseM if err != nil { t.Fatalf("failed to initialize sql store: %s", err) } - return &ReplStore{ss, ss} + return newReplStore(ss, ss) +} + +// newReplStore is a wrapper function that returns a ReplStore with the given primary and read replicas. +func newReplStore(primary *SQLStore, readReplicas ...*SQLStore) *ReplStore { + ret := &ReplStore{ + SQLStore: primary, + repls: make([]*SQLStore, len(readReplicas)), + next: 0, + } + ret.repls = readReplicas + return ret } diff --git a/pkg/services/sqlstore/replstore_test.go b/pkg/services/sqlstore/replstore_test.go new file mode 100644 index 00000000000..b9084804341 --- /dev/null +++ b/pkg/services/sqlstore/replstore_test.go @@ -0,0 +1,64 @@ +package sqlstore + +import ( + "fmt" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/require" + "gopkg.in/ini.v1" + + "github.com/grafana/grafana/pkg/setting" +) + +func TestReplStore_ReadReplica(t *testing.T) { + // Using the connection strings to differentiate between the replicas + replStore, _ := InitTestReplDB(t) + replStore.repls[0].dbCfg.ConnectionString = "repl0" + + repl1 := &SQLStore{dbCfg: &DatabaseConfig{ConnectionString: "repl1"}} + repl2 := &SQLStore{dbCfg: &DatabaseConfig{ConnectionString: "repl2"}} + replStore.repls = append(replStore.repls, repl1, repl2) + + got := make([]string, 5) + for i := 0; i < 5; i++ { + got[i] = replStore.ReadReplica().dbCfg.ConnectionString + } + + want := []string{"repl0", "repl1", "repl2", "repl0", "repl1"} + if cmp.Equal(got, want) == false { + t.Fatal("wrong result. Got:", got, "Want:", want) + } +} + +func TestNewRODatabaseConfig(t *testing.T) { + inicfg, err := ini.Load([]byte(replCfg)) + require.NoError(t, err) + cfg, err := setting.NewCfgFromINIFile(inicfg) + require.NoError(t, err) + + dbCfgs, err := NewRODatabaseConfigs(cfg, nil) + require.NoError(t, err) + + var connStr = func(port int) string { + return fmt.Sprintf("grafana:password@tcp(127.0.0.1:%d)/grafana?collation=utf8mb4_unicode_ci&allowNativePasswords=true&clientFoundRows=true", port) + } + + for i, c := range dbCfgs { + if !cmp.Equal(c.ConnectionString, connStr(i+3306)) { + t.Errorf("wrong result for connection string %d.\nGot: %s,\nWant: %s", i, c.ConnectionString, connStr(i+3306)) + } + } +} + +var replCfg = ` +[database_replicas] +type = mysql +name = grafana +user = grafana +password = password +host = 127.0.0.1:3306 +[database_replicas.one] = +host = 127.0.0.1:3307 +[database_replicas.two] = +host = 127.0.0.1:3308`