The open and composable observability and data visualization platform. Visualize metrics, logs, and traces from multiple sources like Prometheus, Loki, Elasticsearch, InfluxDB, Postgres and many more.
You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
 
 
 
 
 
 
grafana/pkg/tsdb/postgres/postgres.go

218 lines
6.2 KiB

package postgres
import (
"fmt"
"reflect"
"strconv"
"strings"
"github.com/grafana/grafana-plugin-sdk-go/data"
"github.com/grafana/grafana-plugin-sdk-go/data/sqlutil"
"github.com/grafana/grafana/pkg/setting"
"github.com/grafana/grafana/pkg/util/errutil"
"github.com/grafana/grafana/pkg/infra/log"
"github.com/grafana/grafana/pkg/models"
"github.com/grafana/grafana/pkg/plugins"
"github.com/grafana/grafana/pkg/tsdb/sqleng"
)
func ProvideService(cfg *setting.Cfg) *PostgresService {
logger := log.New("tsdb.postgres")
return &PostgresService{
Cfg: cfg,
logger: logger,
tlsManager: newTLSManager(logger, cfg.DataPath),
}
}
type PostgresService struct {
Cfg *setting.Cfg
logger log.Logger
tlsManager tlsSettingsProvider
}
//nolint: staticcheck // plugins.DataPlugin deprecated
func (s *PostgresService) NewExecutor(datasource *models.DataSource) (plugins.DataPlugin, error) {
s.logger.Debug("Creating Postgres query endpoint")
cnnstr, err := s.generateConnectionString(datasource)
if err != nil {
return nil, err
}
if s.Cfg.Env == setting.Dev {
s.logger.Debug("getEngine", "connection", cnnstr)
}
config := sqleng.DataPluginConfiguration{
DriverName: "postgres",
ConnectionString: cnnstr,
Datasource: datasource,
MetricColumnTypes: []string{"UNKNOWN", "TEXT", "VARCHAR", "CHAR"},
}
queryResultTransformer := postgresQueryResultTransformer{
log: s.logger,
}
timescaledb := datasource.JsonData.Get("timescaledb").MustBool(false)
plugin, err := sqleng.NewDataPlugin(config, &queryResultTransformer, newPostgresMacroEngine(timescaledb),
s.logger)
if err != nil {
s.logger.Error("Failed connecting to Postgres", "err", err)
return nil, err
}
s.logger.Debug("Successfully connected to Postgres")
return plugin, nil
}
// escape single quotes and backslashes in Postgres connection string parameters.
func escape(input string) string {
return strings.ReplaceAll(strings.ReplaceAll(input, `\`, `\\`), "'", `\'`)
}
func (s *PostgresService) generateConnectionString(datasource *models.DataSource) (string, error) {
var host string
var port int
if strings.HasPrefix(datasource.Url, "/") {
host = datasource.Url
s.logger.Debug("Generating connection string with Unix socket specifier", "socket", host)
} else {
sp := strings.SplitN(datasource.Url, ":", 2)
host = sp[0]
if len(sp) > 1 {
var err error
port, err = strconv.Atoi(sp[1])
if err != nil {
return "", errutil.Wrapf(err, "invalid port in host specifier %q", sp[1])
}
s.logger.Debug("Generating connection string with network host/port pair", "host", host, "port", port)
} else {
s.logger.Debug("Generating connection string with network host", "host", host)
}
}
connStr := fmt.Sprintf("user='%s' password='%s' host='%s' dbname='%s'",
escape(datasource.User), escape(datasource.DecryptedPassword()), escape(host), escape(datasource.Database))
if port > 0 {
connStr += fmt.Sprintf(" port=%d", port)
}
tlsSettings, err := s.tlsManager.getTLSSettings(datasource)
if err != nil {
return "", err
}
connStr += fmt.Sprintf(" sslmode='%s'", escape(tlsSettings.Mode))
// Attach root certificate if provided
if tlsSettings.RootCertFile != "" {
s.logger.Debug("Setting server root certificate", "tlsRootCert", tlsSettings.RootCertFile)
connStr += fmt.Sprintf(" sslrootcert='%s'", escape(tlsSettings.RootCertFile))
}
// Attach client certificate and key if both are provided
if tlsSettings.CertFile != "" && tlsSettings.CertKeyFile != "" {
s.logger.Debug("Setting TLS/SSL client auth", "tlsCert", tlsSettings.CertFile, "tlsKey", tlsSettings.CertKeyFile)
connStr += fmt.Sprintf(" sslcert='%s' sslkey='%s'", escape(tlsSettings.CertFile), escape(tlsSettings.CertKeyFile))
} else if tlsSettings.CertFile != "" || tlsSettings.CertKeyFile != "" {
return "", fmt.Errorf("TLS/SSL client certificate and key must both be specified")
}
s.logger.Debug("Generated Postgres connection string successfully")
return connStr, nil
}
type postgresQueryResultTransformer struct {
log log.Logger
}
func (t *postgresQueryResultTransformer) TransformQueryError(err error) error {
return err
}
func (t *postgresQueryResultTransformer) GetConverterList() []sqlutil.StringConverter {
return []sqlutil.StringConverter{
{
Name: "handle FLOAT4",
InputScanKind: reflect.Interface,
InputTypeName: "FLOAT4",
ConversionFunc: func(in *string) (*string, error) { return in, nil },
Replacer: &sqlutil.StringFieldReplacer{
OutputFieldType: data.FieldTypeNullableFloat64,
ReplaceFunc: func(in *string) (interface{}, error) {
if in == nil {
return nil, nil
}
v, err := strconv.ParseFloat(*in, 64)
if err != nil {
return nil, err
}
return &v, nil
},
},
},
{
Name: "handle FLOAT8",
InputScanKind: reflect.Interface,
InputTypeName: "FLOAT8",
ConversionFunc: func(in *string) (*string, error) { return in, nil },
Replacer: &sqlutil.StringFieldReplacer{
OutputFieldType: data.FieldTypeNullableFloat64,
ReplaceFunc: func(in *string) (interface{}, error) {
if in == nil {
return nil, nil
}
v, err := strconv.ParseFloat(*in, 64)
if err != nil {
return nil, err
}
return &v, nil
},
},
},
{
Name: "handle NUMERIC",
InputScanKind: reflect.Interface,
InputTypeName: "NUMERIC",
ConversionFunc: func(in *string) (*string, error) { return in, nil },
Replacer: &sqlutil.StringFieldReplacer{
OutputFieldType: data.FieldTypeNullableFloat64,
ReplaceFunc: func(in *string) (interface{}, error) {
if in == nil {
return nil, nil
}
v, err := strconv.ParseFloat(*in, 64)
if err != nil {
return nil, err
}
return &v, nil
},
},
},
{
Name: "handle INT2",
InputScanKind: reflect.Interface,
InputTypeName: "INT2",
ConversionFunc: func(in *string) (*string, error) { return in, nil },
Replacer: &sqlutil.StringFieldReplacer{
OutputFieldType: data.FieldTypeNullableInt16,
ReplaceFunc: func(in *string) (interface{}, error) {
if in == nil {
return nil, nil
}
i64, err := strconv.ParseInt(*in, 10, 16)
if err != nil {
return nil, err
}
v := int16(i64)
return &v, nil
},
},
},
}
}