|
|
|
|
@ -2,6 +2,7 @@ package mysql |
|
|
|
|
|
|
|
|
|
import ( |
|
|
|
|
"context" |
|
|
|
|
"database/sql" |
|
|
|
|
"fmt" |
|
|
|
|
"sync" |
|
|
|
|
|
|
|
|
|
@ -12,9 +13,9 @@ import ( |
|
|
|
|
) |
|
|
|
|
|
|
|
|
|
type MysqlExecutor struct { |
|
|
|
|
*models.DataSource |
|
|
|
|
engine *xorm.Engine |
|
|
|
|
log log.Logger |
|
|
|
|
datasource *models.DataSource |
|
|
|
|
engine *xorm.Engine |
|
|
|
|
log log.Logger |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type engineCacheType struct { |
|
|
|
|
@ -28,116 +29,174 @@ var engineCache = engineCacheType{ |
|
|
|
|
versions: make(map[int64]int), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
tsdb.RegisterExecutor("mysql", NewMysqlExecutor) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewMysqlExecutor(datasource *models.DataSource) (tsdb.Executor, error) { |
|
|
|
|
engine, err := getEngineFor(datasource) |
|
|
|
|
executor := &MysqlExecutor{ |
|
|
|
|
datasource: datasource, |
|
|
|
|
log: log.New("tsdb.mysql"), |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
err := executor.initEngine() |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return &MysqlExecutor{ |
|
|
|
|
log: log.New("tsdb.mysql"), |
|
|
|
|
engine: engine, |
|
|
|
|
}, nil |
|
|
|
|
return executor, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func getEngineFor(ds *models.DataSource) (*xorm.Engine, error) { |
|
|
|
|
func (e *MysqlExecutor) initEngine() error { |
|
|
|
|
engineCache.Lock() |
|
|
|
|
defer engineCache.Unlock() |
|
|
|
|
|
|
|
|
|
if engine, present := engineCache.cache[ds.Id]; present { |
|
|
|
|
if version, _ := engineCache.versions[ds.Id]; version == ds.Version { |
|
|
|
|
return engine, nil |
|
|
|
|
if engine, present := engineCache.cache[e.datasource.Id]; present { |
|
|
|
|
if version, _ := engineCache.versions[e.datasource.Id]; version == e.datasource.Version { |
|
|
|
|
e.engine = engine |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8mb4", ds.User, ds.Password, "tcp", ds.Url, ds.Database) |
|
|
|
|
cnnstr := fmt.Sprintf("%s:%s@%s(%s)/%s?charset=utf8mb4", e.datasource.User, e.datasource.Password, "tcp", e.datasource.Url, e.datasource.Database) |
|
|
|
|
e.log.Debug("getEngine", "connection", cnnstr) |
|
|
|
|
|
|
|
|
|
engine, err := xorm.NewEngine("mysql", cnnstr) |
|
|
|
|
engine.SetMaxConns(10) |
|
|
|
|
engine.SetMaxIdleConns(10) |
|
|
|
|
if err != nil { |
|
|
|
|
return nil, err |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
engineCache.cache[ds.Id] = engine |
|
|
|
|
return engine, nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func init() { |
|
|
|
|
tsdb.RegisterExecutor("graphite", NewMysqlExecutor) |
|
|
|
|
engineCache.cache[e.datasource.Id] = engine |
|
|
|
|
e.engine = engine |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (e *MysqlExecutor) Execute(ctx context.Context, queries tsdb.QuerySlice, context *tsdb.QueryContext) *tsdb.BatchResult { |
|
|
|
|
result := &tsdb.BatchResult{} |
|
|
|
|
|
|
|
|
|
session := engine.NewSession() |
|
|
|
|
session := e.engine.NewSession() |
|
|
|
|
defer session.Close() |
|
|
|
|
|
|
|
|
|
db := session.DB() |
|
|
|
|
result, err := getData(db, &req) |
|
|
|
|
|
|
|
|
|
if err != nil { |
|
|
|
|
return |
|
|
|
|
// queries := strings.Split(req.Query, ";")
|
|
|
|
|
//
|
|
|
|
|
// data := dataStruct{}
|
|
|
|
|
// data.Results = make([]resultsStruct, 1)
|
|
|
|
|
// data.Results[0].Series = make([]seriesStruct, 0)
|
|
|
|
|
|
|
|
|
|
for _, query := range queries { |
|
|
|
|
rawSql := query.Model.Get("rawSql").MustString() |
|
|
|
|
if rawSql == "" { |
|
|
|
|
continue |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rows, err := db.Query(rawSql) |
|
|
|
|
if err != nil { |
|
|
|
|
result.Error = err |
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
defer rows.Close() |
|
|
|
|
|
|
|
|
|
columnNames, err := rows.Columns() |
|
|
|
|
if err != nil { |
|
|
|
|
result.Error = err |
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rc := NewStringStringScan(columnNames) |
|
|
|
|
for rows.Next() { |
|
|
|
|
err := rc.Update(rows.Rows) |
|
|
|
|
if err != nil { |
|
|
|
|
e.log.Error("Mysql response parsing", "error", err) |
|
|
|
|
result.Error = err |
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
rowValues := rc.Get() |
|
|
|
|
e.log.Info("Rows", "row", rowValues) |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// for rows.Next() {
|
|
|
|
|
// columnValues := make([]interface{}, len(columnNames))
|
|
|
|
|
//
|
|
|
|
|
// err = rows.ScanSlice(&columnValues)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// result.Error = err
|
|
|
|
|
// return result
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // bytes -> string
|
|
|
|
|
// for i := range columnValues {
|
|
|
|
|
// rowType := reflect.TypeOf(columnValues[i])
|
|
|
|
|
// e.log.Info("row", "type", rowType)
|
|
|
|
|
//
|
|
|
|
|
// rawValue := reflect.Indirect(reflect.ValueOf(columnValues[i]))
|
|
|
|
|
//
|
|
|
|
|
// // if rawValue is null then ignore
|
|
|
|
|
// if rawValue.Interface() == nil {
|
|
|
|
|
// continue
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// rawValueType := reflect.TypeOf(rawValue.Interface())
|
|
|
|
|
// vv := reflect.ValueOf(rawValue.Interface())
|
|
|
|
|
// e.log.Info("column type", "name", columnNames[i], "type", rawValueType, "vv", vv)
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
return result |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
type stringStringScan struct { |
|
|
|
|
// cp are the column pointers
|
|
|
|
|
cp []interface{} |
|
|
|
|
// row contains the final result
|
|
|
|
|
row []string |
|
|
|
|
colCount int |
|
|
|
|
colNames []string |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func NewStringStringScan(columnNames []string) *stringStringScan { |
|
|
|
|
lenCN := len(columnNames) |
|
|
|
|
s := &stringStringScan{ |
|
|
|
|
cp: make([]interface{}, lenCN), |
|
|
|
|
row: make([]string, lenCN*2), |
|
|
|
|
colCount: lenCN, |
|
|
|
|
colNames: columnNames, |
|
|
|
|
} |
|
|
|
|
j := 0 |
|
|
|
|
for i := 0; i < lenCN; i++ { |
|
|
|
|
s.cp[i] = new(sql.RawBytes) |
|
|
|
|
s.row[j] = s.colNames[i] |
|
|
|
|
j = j + 2 |
|
|
|
|
} |
|
|
|
|
return s |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *stringStringScan) Update(rows *sql.Rows) error { |
|
|
|
|
if err := rows.Scan(s.cp...); err != nil { |
|
|
|
|
return err |
|
|
|
|
} |
|
|
|
|
j := 0 |
|
|
|
|
for i := 0; i < s.colCount; i++ { |
|
|
|
|
if rb, ok := s.cp[i].(*sql.RawBytes); ok { |
|
|
|
|
s.row[j+1] = string(*rb) |
|
|
|
|
*rb = nil // reset pointer to discard current value to avoid a bug
|
|
|
|
|
} else { |
|
|
|
|
return fmt.Errorf("Cannot convert index %d column %s to type *sql.RawBytes", i, s.colNames[i]) |
|
|
|
|
} |
|
|
|
|
j = j + 2 |
|
|
|
|
} |
|
|
|
|
return nil |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
func (s *stringStringScan) Get() []string { |
|
|
|
|
return s.row |
|
|
|
|
} |
|
|
|
|
|
|
|
|
|
// func getData(db *core.DB, req *sqlDataRequest) (interface{}, error) {
|
|
|
|
|
// queries := strings.Split(req.Query, ";")
|
|
|
|
|
//
|
|
|
|
|
// data := dataStruct{}
|
|
|
|
|
// data.Results = make([]resultsStruct, 1)
|
|
|
|
|
// data.Results[0].Series = make([]seriesStruct, 0)
|
|
|
|
|
//
|
|
|
|
|
// for i := range queries {
|
|
|
|
|
// if queries[i] == "" {
|
|
|
|
|
// continue
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// rows, err := db.Query(queries[i])
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return nil, err
|
|
|
|
|
// }
|
|
|
|
|
// defer rows.Close()
|
|
|
|
|
//
|
|
|
|
|
// name := fmt.Sprintf("table_%d", i+1)
|
|
|
|
|
// series, err := arrangeResult(rows, name)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return nil, err
|
|
|
|
|
// }
|
|
|
|
|
// data.Results[0].Series = append(data.Results[0].Series, series.(seriesStruct))
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// return data, nil
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// func arrangeResult(rows *core.Rows, name string) (interface{}, error) {
|
|
|
|
|
// columnNames, err := rows.Columns()
|
|
|
|
|
//
|
|
|
|
|
// series := seriesStruct{}
|
|
|
|
|
// series.Columns = columnNames
|
|
|
|
|
// series.Name = name
|
|
|
|
|
//
|
|
|
|
|
// for rows.Next() {
|
|
|
|
|
// columnValues := make([]interface{}, len(columnNames))
|
|
|
|
|
//
|
|
|
|
|
// err = rows.ScanSlice(&columnValues)
|
|
|
|
|
// if err != nil {
|
|
|
|
|
// return nil, err
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// // bytes -> string
|
|
|
|
|
// for i := range columnValues {
|
|
|
|
|
// switch columnValues[i].(type) {
|
|
|
|
|
// case []byte:
|
|
|
|
|
// columnValues[i] = fmt.Sprintf("%s", columnValues[i])
|
|
|
|
|
// }
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// series.Values = append(series.Values, columnValues)
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// return series, err
|
|
|
|
|
// }
|
|
|
|
|
//
|
|
|
|
|
// type sqlDataRequest struct {
|
|
|
|
|
// Query string `json:"query"`
|
|
|
|
|
// Body []byte `json:"-"`
|
|
|
|
|
|