@ -23,7 +23,6 @@ func convertToDataFrame(ctx *mysql.Context, iter mysql.RowIter, schema mysql.Sch
if err != nil {
return nil , err
}
field := data . NewFieldFromFieldType ( fT , 0 )
field . Name = col . Name
f . Fields = append ( f . Fields , field )
@ -40,11 +39,22 @@ func convertToDataFrame(ctx *mysql.Context, iter mysql.RowIter, schema mysql.Sch
}
for i , val := range row {
v , err := fieldValFromRowVal ( f . Fields [ i ] . Type ( ) , val )
// Run val through mysql.Type.Convert to normalize underlying value
// of the interface
nV , _ , err := schema [ i ] . Type . Convert ( val )
if err != nil {
return nil , err
}
// Run the normalized value through fieldValFromRowVal to normalize
// the interface type to the dataframe value type, and make nullable
// values pointers as dataframe expects.
fV , err := fieldValFromRowVal ( f . Fields [ i ] . Type ( ) , nV )
if err != nil {
return nil , fmt . Errorf ( "unexpected type for column %s: %w" , schema [ i ] . Name , err )
}
f . Fields [ i ] . Append ( v )
f . Fields [ i ] . Append ( fV )
}
}
@ -72,11 +82,10 @@ func MySQLColToFieldType(col *mysql.Column) (data.FieldType, error) {
fT = data . FieldTypeInt64
case types . Uint64 :
fT = data . FieldTypeUint64
case types . Float32 :
fT = data . FieldTypeFloat32
case types . Float64 :
fT = data . FieldTypeFloat64
// StringType represents all string types, including VARCHAR and BLOB.
case types . Text , types . LongText :
fT = data . FieldTypeString
case types . Timestamp :
fT = data . FieldTypeTime
case types . Datetime :
@ -84,9 +93,12 @@ func MySQLColToFieldType(col *mysql.Column) (data.FieldType, error) {
case types . Boolean :
fT = data . FieldTypeBool
default :
if types . IsDecimal ( col . Type ) {
switch {
case types . IsDecimal ( col . Type ) :
fT = data . FieldTypeFloat64
} else {
case types . IsText ( col . Type ) :
fT = data . FieldTypeString
default :
return fT , fmt . Errorf ( "unsupported type for column %s of type %v" , col . Name , col . Type )
}
}
@ -98,315 +110,96 @@ func MySQLColToFieldType(col *mysql.Column) (data.FieldType, error) {
return fT , nil
}
// Helper function to convert data.FieldType to types.Type
func convertDataType ( fieldType data . FieldType ) mysql . Type {
switch fieldType {
case data . FieldTypeInt8 , data . FieldTypeNullableInt8 :
return types . Int8
case data . FieldTypeUint8 , data . FieldTypeNullableUint8 :
return types . Uint8
case data . FieldTypeInt16 , data . FieldTypeNullableInt16 :
return types . Int16
case data . FieldTypeUint16 , data . FieldTypeNullableUint16 :
return types . Uint16
case data . FieldTypeInt32 , data . FieldTypeNullableInt32 :
return types . Int32
case data . FieldTypeUint32 , data . FieldTypeNullableUint32 :
return types . Uint32
case data . FieldTypeInt64 , data . FieldTypeNullableInt64 :
return types . Int64
case data . FieldTypeUint64 , data . FieldTypeNullableUint64 :
return types . Uint64
case data . FieldTypeFloat32 , data . FieldTypeNullableFloat32 :
return types . Float32
case data . FieldTypeFloat64 , data . FieldTypeNullableFloat64 :
return types . Float64
case data . FieldTypeString , data . FieldTypeNullableString :
return types . Text
case data . FieldTypeBool , data . FieldTypeNullableBool :
return types . Boolean
case data . FieldTypeTime , data . FieldTypeNullableTime :
return types . Timestamp
default :
fmt . Printf ( "------- Unsupported field type: %v" , fieldType )
return types . JSON
}
}
// fieldValFromRowVal converts a go-mysql-server row value to a data.field value
//
//nolint:gocyclo
func fieldValFromRowVal ( fieldType data . FieldType , val interface { } ) ( interface { } , error ) {
// the input val may be nil, it also may not be a pointer even if the fieldtype is a nullable pointer type
// if the input interface is nil, we can return an untyped nil
if val == nil {
return nil , nil
}
nullable := fieldType . Nullable ( )
switch fieldType {
// ----------------------------
// Int8 / Nullable Int8
// ----------------------------
case data . FieldTypeInt8 :
v , ok := val . ( int8 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int8" , val , val )
}
return v , nil
case data . FieldTypeInt8 , data . FieldTypeNullableInt8 :
return parseVal [ int8 ] ( val , "int8" , nullable )
case data . FieldTypeNullableInt8 :
vP , ok := val . ( * int8 )
if ok {
return vP , nil
}
v , ok := val . ( int8 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int8 or *int8" , val , val )
// ----------------------------
// Uint8 / Nullable Uint8
// ----------------------------
case data . FieldTypeUint8 :
v , ok := val . ( uint8 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint8" , val , val )
}
return v , nil
case data . FieldTypeUint8 , data . FieldTypeNullableUint8 :
return parseVal [ uint8 ] ( val , "uint8" , nullable )
case data . FieldTypeNullableUint8 :
vP , ok := val . ( * uint8 )
if ok {
return vP , nil
}
v , ok := val . ( uint8 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint8 or *uint8" , val , val )
// ----------------------------
// Int16 / Nullable Int16
// ----------------------------
case data . FieldTypeInt16 :
v , ok := val . ( int16 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int16" , val , val )
}
return v , nil
case data . FieldTypeInt16 , data . FieldTypeNullableInt16 :
return parseVal [ int16 ] ( val , "int16" , nullable )
case data . FieldTypeNullableInt16 :
vP , ok := val . ( * int16 )
if ok {
return vP , nil
}
v , ok := val . ( int16 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int16 or *int16" , val , val )
// ----------------------------
// Uint16 / Nullable Uint16
// ----------------------------
case data . FieldTypeUint16 :
v , ok := val . ( uint16 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint16" , val , val )
}
return v , nil
case data . FieldTypeUint16 , data . FieldTypeNullableUint16 :
return parseVal [ uint16 ] ( val , "uint16" , nullable )
case data . FieldTypeNullableUint16 :
vP , ok := val . ( * uint16 )
if ok {
return vP , nil
}
v , ok := val . ( uint16 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint16 or *uint16" , val , val )
// ----------------------------
// Int32 / Nullable Int32
// ----------------------------
case data . FieldTypeInt32 :
v , ok := val . ( int32 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int32" , val , val )
}
return v , nil
case data . FieldTypeInt32 , data . FieldTypeNullableInt32 :
return parseVal [ int32 ] ( val , "int32" , nullable )
case data . FieldTypeNullableInt32 :
vP , ok := val . ( * int32 )
if ok {
return vP , nil
}
v , ok := val . ( int32 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int32 or *int32" , val , val )
// ----------------------------
// Uint32 / Nullable Uint32
// ----------------------------
case data . FieldTypeUint32 :
v , ok := val . ( uint32 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint32" , val , val )
}
return v , nil
case data . FieldTypeUint32 , data . FieldTypeNullableUint32 :
return parseVal [ uint32 ] ( val , "uint32" , nullable )
case data . FieldTypeNullableUint32 :
vP , ok := val . ( * uint32 )
if ok {
return vP , nil
}
v , ok := val . ( uint32 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint32 or *uint32" , val , val )
// ----------------------------
// Int64 / Nullable Int64
// ----------------------------
case data . FieldTypeInt64 :
v , ok := val . ( int64 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int64" , val , val )
}
return v , nil
case data . FieldTypeInt64 , data . FieldTypeNullableInt64 :
return parseVal [ int64 ] ( val , "int64" , nullable )
case data . FieldTypeNullableInt64 :
vP , ok := val . ( * int64 )
if ok {
return vP , nil
}
v , ok := val . ( int64 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected int64 or *int64" , val , val )
// ----------------------------
// Uint64 / Nullable Uint64
// ----------------------------
case data . FieldTypeUint64 :
v , ok := val . ( uint64 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint64" , val , val )
}
return v , nil
case data . FieldTypeUint64 , data . FieldTypeNullableUint64 :
return parseVal [ uint64 ] ( val , "uint64" , nullable )
case data . FieldTypeNullableUint64 :
vP , ok := val . ( * uint64 )
if ok {
return vP , nil
}
v , ok := val . ( uint64 )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected uint64 or *uint64" , val , val )
// ----------------------------
// Float64 / Nullable Float64
// ----------------------------
case data . FieldTypeFloat64 :
// Accept float64 or decimal.Decimal, convert decimal.Decimal -> float64
if v , ok := val . ( float64 ) ; ok {
return v , nil
}
if d , ok := val . ( decimal . Decimal ) ; ok {
return d . InexactFloat64 ( ) , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected float64 or decimal.Decimal" , val , val )
case data . FieldTypeFloat32 , data . FieldTypeNullableFloat32 :
return parseVal [ float32 ] ( val , "float32" , nullable )
case data . FieldTypeNullableFloat64 :
// Possibly already *float64
if vP , ok := val . ( * float64 ) ; ok {
return vP , nil
}
// Possibly float64
if v , ok := val . ( float64 ) ; ok {
return & v , nil
}
// Possibly decimal.Decimal
if d , ok := val . ( decimal . Decimal ) ; ok {
f := d . InexactFloat64 ( )
return & f , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected float64, *float64, or decimal.Decimal" , val , val )
// ----------------------------
// Time / Nullable Time
// ----------------------------
case data . FieldTypeTime :
v , ok := val . ( time . Time )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected time.Time" , val , val )
}
return v , nil
case data . FieldTypeFloat64 , data . FieldTypeNullableFloat64 :
return parseFloat64OrDecimal ( val , nullable )
case data . FieldTypeNullableTime :
vP , ok := val . ( * time . Time )
if ok {
return vP , nil
}
v , ok := val . ( time . Time )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected time.Time or *time.Time" , val , val )
// ----------------------------
// String / Nullable String
// ----------------------------
case data . FieldTypeString :
v , ok := val . ( string )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected string" , val , val )
}
return v , nil
case data . FieldTypeTime , data . FieldTypeNullableTime :
return parseVal [ time . Time ] ( val , "time.Time" , nullable )
case data . FieldTypeNullableString :
vP , ok := val . ( * string )
if ok {
return vP , nil
}
v , ok := val . ( string )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected string or *string" , val , val )
// ----------------------------
// Bool / Nullable Bool
// ----------------------------
case data . FieldTypeBool :
v , ok := val . ( bool )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected bool" , val , val )
}
return v , nil
case data . FieldTypeString , data . FieldTypeNullableString :
return parseVal [ string ] ( val , "string" , nullable )
case data . FieldTypeNullableBool :
vP , ok := val . ( * bool )
if ok {
return vP , nil
}
v , ok := val . ( bool )
if ok {
return & v , nil
}
return nil , fmt . Errorf ( "unexpected value type for interface %v of type %T, expected bool or *bool" , val , val )
case data . FieldTypeBool , data . FieldTypeNullableBool :
return parseBoolFromInt8 ( val , nullable )
// ----------------------------
// Fallback / Unsupported
// ----------------------------
default :
return nil , fmt . Errorf ( "unsupported field type %s for val %v" , fieldType , val )
}
}
// parseVal attempts to assert `val` as type T. If successful, it returns either
// the value or a pointer, depending on `isNullable`. If not, returns an error.
func parseVal [ T any ] ( val interface { } , typeName string , isNullable bool ) ( interface { } , error ) {
v , ok := val . ( T )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type %v of type %T, expected %s" , val , val , typeName )
}
return ptrIfNull ( v , isNullable ) , nil
}
// parseFloat64OrDecimal handles the special case where val can be float64 or decimal.Decimal.
func parseFloat64OrDecimal ( val interface { } , isNullable bool ) ( interface { } , error ) {
if fv , ok := val . ( float64 ) ; ok {
return ptrIfNull ( fv , isNullable ) , nil
}
if d , ok := val . ( decimal . Decimal ) ; ok {
return ptrIfNull ( d . InexactFloat64 ( ) , isNullable ) , nil
}
return nil , fmt . Errorf ( "unexpected value type %v of type %T, expected float64 or decimal.Decimal" , val , val )
}
// parseBoolFromInt8 asserts val as an int8, converts non-zero to true.
// Returns pointer if isNullable, otherwise the bool value.
func parseBoolFromInt8 ( val interface { } , isNullable bool ) ( interface { } , error ) {
v , ok := val . ( int8 )
if ! ok {
return nil , fmt . Errorf ( "unexpected value type %v of type %T, expected int8 (for bool)" , val , val )
}
b := ( v != 0 )
return ptrIfNull ( b , isNullable ) , nil
}
// ptrIfNull returns a pointer to val if isNullable is true; otherwise, returns val.
func ptrIfNull [ T any ] ( val T , isNullable bool ) interface { } {
if isNullable {
return & val
}
return val
}