aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortsmax2004 <tsmax2004@yandex-team.com>2023-11-14 21:41:34 +0300
committertsmax2004 <tsmax2004@yandex-team.com>2023-11-14 22:14:06 +0300
commit8ef1d62f9e385f0b29a23d93bfd8b3d05555c370 (patch)
treeba0a0ba0952d08449ae526e7ceefb518c0792274
parentc3520f09d6b014bae5840791412a796d49d166dd (diff)
downloadydb-8ef1d62f9e385f0b29a23d93bfd8b3d05555c370.tar.gz
YQ Connector:fix mapping in binary string
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go12
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go8
-rw-r--r--ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go12
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/clickhouse.py2
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/postgresql.py2
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive.py95
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py4
-rw-r--r--ydb/library/yql/providers/generic/connector/tests/utils/schema.py21
9 files changed, 98 insertions, 66 deletions
diff --git a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
index 75079c75fa..560f8efd9f 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/clickhouse/type_mapper.go
@@ -113,7 +113,7 @@ func makeYdbDateTimeType(ydbTypeID Ydb.Type_PrimitiveTypeId, format api_service_
// type marked as nullable because ClickHouse's type value range is much more wide than YDB's type value range
return &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: ydbTypeID}}, true, nil
case api_service_protos.EDateTimeFormat_STRING_FORMAT:
- return &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}, false, nil
+ return &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}, false, nil
default:
return nil, false, fmt.Errorf("unexpected datetime format '%s': %w", format, utils.ErrDataTypeNotSupported)
}
@@ -270,10 +270,10 @@ func (typeMapper) appendValueToBuilder(
case Ydb.Type_DOUBLE:
err = appendValueToArrowBuilder[float64, float64, *array.Float64Builder, utils.Float64Converter](acceptor, builder)
case Ydb.Type_STRING:
- // depends on date/time representation format
+ err = appendValueToArrowBuilder[string, []byte, *array.BinaryBuilder, utils.StringToBytesConverter](acceptor, builder)
+ case Ydb.Type_UTF8:
+ // date/time in string representation format
switch acceptor.(type) {
- case **string:
- err = appendValueToArrowBuilder[string, string, *array.StringBuilder, utils.StringConverter](acceptor, builder)
case **utils.Date:
err = appendValueToArrowBuilder[utils.Date, string, *array.StringBuilder, utils.DateToStringConverter](acceptor, builder)
case **utils.Datetime:
diff --git a/ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go b/ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go
index f478a57734..d59fd76339 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/postgresql/type_mapper.go
@@ -39,7 +39,7 @@ func (tm typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_
case "date":
switch rules.GetDateTimeFormat() {
case api_service_protos.EDateTimeFormat_STRING_FORMAT:
- ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}
+ ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}
case api_service_protos.EDateTimeFormat_YQL_FORMAT:
ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_DATE}}
default:
@@ -52,7 +52,7 @@ func (tm typeMapper) SQLTypeToYDBColumn(columnName, typeName string, rules *api_
case "timestamp without time zone":
switch rules.GetDateTimeFormat() {
case api_service_protos.EDateTimeFormat_STRING_FORMAT:
- ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_STRING}}
+ ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_UTF8}}
case api_service_protos.EDateTimeFormat_YQL_FORMAT:
ydbType = &Ydb.Type{Type: &Ydb.Type_TypeId{TypeId: Ydb.Type_TIMESTAMP}}
default:
@@ -156,7 +156,7 @@ func (tm typeMapper) AddRowToArrowIPCStreaming(
}
case *pgtype.Date:
switch ydbTypeID {
- case Ydb.Type_STRING:
+ case Ydb.Type_UTF8:
err = appendValueToArrowBuilder[utils.Date, string, *array.StringBuilder, utils.DateToStringConverter](utils.Date(t.Time), builders[i], t.Valid)
case Ydb.Type_DATE:
err = appendValueToArrowBuilder[utils.Date, uint16, *array.Uint16Builder, utils.DateConverter](utils.Date(t.Time), builders[i], t.Valid)
@@ -165,7 +165,7 @@ func (tm typeMapper) AddRowToArrowIPCStreaming(
}
case *pgtype.Timestamp:
switch ydbTypeID {
- case Ydb.Type_STRING:
+ case Ydb.Type_UTF8:
err = appendValueToArrowBuilder[utils.Timestamp, string, *array.StringBuilder, utils.TimestampToStringConverter](utils.Timestamp(t.Time), builders[i], t.Valid)
case Ydb.Type_TIMESTAMP:
err = appendValueToArrowBuilder[utils.Timestamp, uint64, *array.Uint64Builder, utils.TimestampConverter](utils.Timestamp(t.Time), builders[i], t.Valid)
@@ -200,8 +200,10 @@ func acceptorFromOID(oid uint32) (any, error) {
return new(pgtype.Float4), nil
case pgtype.Float8OID:
return new(pgtype.Float8), nil
- case pgtype.TextOID, pgtype.BPCharOID, pgtype.VarcharOID, pgtype.ByteaOID:
+ case pgtype.TextOID, pgtype.BPCharOID, pgtype.VarcharOID:
return new(pgtype.Text), nil
+ case pgtype.ByteaOID:
+ return new(*[]byte), nil
case pgtype.DateOID:
return new(pgtype.Date), nil
case pgtype.TimestampOID:
diff --git a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
index 51152171bd..197abfc6f1 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/streaming/streamer_test.go
@@ -73,12 +73,11 @@ func (m *streamMock) makeSendMatcher(
}
}
- // FIXME: YQ-2590: String -> Binary
- col1 := record.Column(1).(*array.String)
- require.Equal(t, &arrow.StringType{}, col1.DataType())
+ col1 := record.Column(1).(*array.Binary)
+ require.Equal(t, &arrow.BinaryType{}, col1.DataType())
for i := 0; i < len(expectedColumnarBlock[1]); i++ {
- if expectedColumnarBlock[1][i].(string) != col1.Value(i) {
+ if !bytes.Equal([]byte(expectedColumnarBlock[1][i].(string)), col1.Value(i)) {
return false
}
}
@@ -146,7 +145,6 @@ func (tc testCaseStreaming) execute(t *testing.T) {
}
connection.On("Query", "SELECT col0, col1 FROM example_1").Return(rows, nil).Once()
- // FIXME: YQ-2590: string -> []byte
col0Acceptor := new(*int32)
*col0Acceptor = new(int32)
col1Acceptor := new(*string)
diff --git a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
index ddda5f60c1..380dda9a93 100644
--- a/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
+++ b/ydb/library/yql/providers/generic/connector/app/server/utils/arrow_helpers.go
@@ -115,11 +115,7 @@ func ydbTypeToArrowBuilder(typeID Ydb.Type_PrimitiveTypeId, arrowAllocator memor
case Ydb.Type_DOUBLE:
builder = array.NewFloat64Builder(arrowAllocator)
case Ydb.Type_STRING:
- // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary
- // TODO: what about LargeBinary?
- // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE
- // builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary)
- builder = array.NewStringBuilder(arrowAllocator)
+ builder = array.NewBinaryBuilder(arrowAllocator, arrow.BinaryTypes.Binary)
case Ydb.Type_UTF8:
// TODO: what about LargeString?
// https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_STRINGE
@@ -166,11 +162,7 @@ func ydbTypeToArrowField(typeID Ydb.Type_PrimitiveTypeId, column *Ydb.Column) (a
case Ydb.Type_DOUBLE:
field = arrow.Field{Name: column.Name, Type: arrow.PrimitiveTypes.Float64}
case Ydb.Type_STRING:
- // FIXME: YQ-2590: map it to arrow.BinaryTypes.Binary
- // TODO: what about LargeBinary?
- // https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_BINARYE
- // field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.Binary}
- field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.String}
+ field = arrow.Field{Name: column.Name, Type: arrow.BinaryTypes.Binary}
case Ydb.Type_UTF8:
// TODO: what about LargeString?
// https://arrow.apache.org/docs/cpp/api/datatype.html#_CPPv4N5arrow4Type4type12LARGE_STRINGE
diff --git a/ydb/library/yql/providers/generic/connector/tests/clickhouse.py b/ydb/library/yql/providers/generic/connector/tests/clickhouse.py
index 095546c85b..1c8c3dc01b 100644
--- a/ydb/library/yql/providers/generic/connector/tests/clickhouse.py
+++ b/ydb/library/yql/providers/generic/connector/tests/clickhouse.py
@@ -97,6 +97,8 @@ def select_positive(
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
assert test_case.data_out == result.data_out_with_types, (test_case.data_out, result.data_out_with_types)
+ if test_case.check_output_schema:
+ assert test_case.schema == result.schema, (test_case.schema, result.schema)
def select_missing_database(
diff --git a/ydb/library/yql/providers/generic/connector/tests/postgresql.py b/ydb/library/yql/providers/generic/connector/tests/postgresql.py
index aa0d1c5e7e..57c1cdfe2c 100644
--- a/ydb/library/yql/providers/generic/connector/tests/postgresql.py
+++ b/ydb/library/yql/providers/generic/connector/tests/postgresql.py
@@ -103,6 +103,8 @@ def select_positive(
result = dqrun_runner.run(test_dir=tmp_path, script=yql_script, generic_settings=test_case.generic_settings)
assert test_case.data_out == result.data_out_with_types, (test_case.data_out, result.data_out_with_types)
+ if test_case.check_output_schema:
+ assert test_case.schema == result.schema, (test_case.schema, result.schema)
def select_missing_database(
diff --git a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive.py b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive.py
index fd5717ccdd..3f14adab7a 100644
--- a/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive.py
+++ b/ydb/library/yql/providers/generic/connector/tests/test_cases/select_positive.py
@@ -16,6 +16,9 @@ from ydb.library.yql.providers.generic.connector.tests.utils.schema import (
DataSourceType,
SelectWhat,
SelectWhere,
+ makeYdbTypeFromTypeID,
+ makeOptionalYdbTypeFromTypeID,
+ makeOptionalYdbTypeFromYdbType,
)
from ydb.library.yql.providers.generic.connector.tests.test_cases.base import BaseTestCase
@@ -32,6 +35,7 @@ class TestCase(BaseTestCase):
select_where: Optional[SelectWhere]
data_out_: Optional[Sequence]
protocol: EProtocol = EProtocol.NATIVE
+ check_output_schema: bool = False
@property
def data_out(self) -> Sequence:
@@ -55,123 +59,123 @@ class Factory:
columns=ColumnList(
Column(
name='col_01_bool',
- ydb_type=Type.BOOL,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.BOOL),
data_source_type=DataSourceType(pg=postgresql.Bool()),
),
Column(
name='col_02_smallint',
- ydb_type=Type.INT16,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT16),
data_source_type=DataSourceType(pg=postgresql.SmallInt()),
),
Column(
name='col_03_int2',
- ydb_type=Type.INT16,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT16),
data_source_type=DataSourceType(pg=postgresql.Int2()),
),
Column(
name='col_04_smallserial',
- ydb_type=Type.INT16,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT16),
data_source_type=DataSourceType(pg=postgresql.SmallSerial()),
),
Column(
name='col_05_serial2',
- ydb_type=Type.INT16,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT16),
data_source_type=DataSourceType(pg=postgresql.Serial2()),
),
Column(
name='col_06_integer',
- ydb_type=Type.INT32,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(pg=postgresql.Integer()),
),
Column(
name='col_07_int',
- ydb_type=Type.INT32,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(pg=postgresql.Int()),
),
Column(
name='col_08_int4',
- ydb_type=Type.INT32,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(pg=postgresql.Int4()),
),
Column(
name='col_09_serial',
- ydb_type=Type.INT32,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(pg=postgresql.Serial()),
),
Column(
name='col_10_serial4',
- ydb_type=Type.INT32,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(pg=postgresql.Serial4()),
),
Column(
name='col_11_bigint',
- ydb_type=Type.INT64,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT64),
data_source_type=DataSourceType(pg=postgresql.BigInt()),
),
Column(
name='col_12_int8',
- ydb_type=Type.INT64,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT64),
data_source_type=DataSourceType(pg=postgresql.Int8()),
),
Column(
name='col_13_bigserial',
- ydb_type=Type.INT64,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT64),
data_source_type=DataSourceType(pg=postgresql.BigSerial()),
),
Column(
name='col_14_serial8',
- ydb_type=Type.INT64,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.INT64),
data_source_type=DataSourceType(pg=postgresql.Serial8()),
),
Column(
name='col_15_real',
- ydb_type=Type.FLOAT,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.FLOAT),
data_source_type=DataSourceType(pg=postgresql.Real()),
),
Column(
name='col_16_float4',
- ydb_type=Type.FLOAT,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.FLOAT),
data_source_type=DataSourceType(pg=postgresql.Float4()),
),
Column(
name='col_17_double_precision',
- ydb_type=Type.DOUBLE,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.DOUBLE),
data_source_type=DataSourceType(pg=postgresql.DoublePrecision()),
),
Column(
name='col_18_float8',
- ydb_type=Type.DOUBLE,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.DOUBLE),
data_source_type=DataSourceType(pg=postgresql.Float8()),
),
# TODO: check unicode strings
Column(
name='col_19_bytea',
- ydb_type=Type.STRING,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.STRING),
data_source_type=DataSourceType(pg=postgresql.Bytea()),
),
Column(
name='col_20_character',
- ydb_type=Type.UTF8,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.UTF8),
data_source_type=DataSourceType(pg=postgresql.Character()),
),
Column(
name='col_21_character_varying',
- ydb_type=Type.UTF8,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.UTF8),
data_source_type=DataSourceType(pg=postgresql.CharacterVarying()),
),
Column(
name='col_22_text',
- ydb_type=Type.UTF8,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.UTF8),
data_source_type=DataSourceType(pg=postgresql.Text()),
),
Column(
name='col_23_date',
- ydb_type=Type.DATE,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.DATE),
data_source_type=DataSourceType(pg=postgresql.Date()),
),
Column(
name='col_24_timestamp',
- ydb_type=Type.TIMESTAMP,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.TIMESTAMP),
data_source_type=DataSourceType(pg=postgresql.TimestampWithoutTimeZone()),
),
# TODO: YQ-2297
@@ -274,6 +278,7 @@ class Factory:
data_source_kind=EDataSourceKind.POSTGRESQL,
database=Database.make_for_data_source_kind(EDataSourceKind.POSTGRESQL),
pragmas=dict(),
+ check_output_schema=True,
)
return [tc]
@@ -283,88 +288,88 @@ class Factory:
columns=ColumnList(
Column(
name='col_01_boolean',
- ydb_type=Type.BOOL,
+ ydb_type=makeYdbTypeFromTypeID(Type.BOOL),
data_source_type=DataSourceType(ch=clickhouse.Boolean()),
),
Column(
name='col_02_int8',
- ydb_type=Type.INT8,
+ ydb_type=makeYdbTypeFromTypeID(Type.INT8),
data_source_type=DataSourceType(ch=clickhouse.Int8()),
),
Column(
name='col_03_uint8',
- ydb_type=Type.UINT8,
+ ydb_type=makeYdbTypeFromTypeID(Type.UINT8),
data_source_type=DataSourceType(ch=clickhouse.UInt8()),
),
Column(
name='col_04_int16',
- ydb_type=Type.INT16,
+ ydb_type=makeYdbTypeFromTypeID(Type.INT16),
data_source_type=DataSourceType(ch=clickhouse.Int16()),
),
Column(
name='col_05_uint16',
- ydb_type=Type.UINT16,
+ ydb_type=makeYdbTypeFromTypeID(Type.UINT16),
data_source_type=DataSourceType(ch=clickhouse.UInt16()),
),
Column(
name='col_06_int32',
- ydb_type=Type.INT32,
+ ydb_type=makeYdbTypeFromTypeID(Type.INT32),
data_source_type=DataSourceType(ch=clickhouse.Int32()),
),
Column(
name='col_07_uint32',
- ydb_type=Type.UINT32,
+ ydb_type=makeYdbTypeFromTypeID(Type.UINT32),
data_source_type=DataSourceType(ch=clickhouse.UInt32()),
),
Column(
name='col_08_int64',
- ydb_type=Type.INT64,
+ ydb_type=makeYdbTypeFromTypeID(Type.INT64),
data_source_type=DataSourceType(ch=clickhouse.Int64()),
),
Column(
name='col_09_uint64',
- ydb_type=Type.UINT64,
+ ydb_type=makeYdbTypeFromTypeID(Type.UINT64),
data_source_type=DataSourceType(ch=clickhouse.UInt64()),
),
Column(
name='col_10_float32',
- ydb_type=Type.FLOAT,
+ ydb_type=makeYdbTypeFromTypeID(Type.FLOAT),
data_source_type=DataSourceType(ch=clickhouse.Float32()),
),
Column(
name='col_11_float64',
- ydb_type=Type.FLOAT,
+ ydb_type=makeYdbTypeFromTypeID(Type.DOUBLE),
data_source_type=DataSourceType(ch=clickhouse.Float64()),
),
Column(
name='col_12_string',
- ydb_type=Type.STRING,
+ ydb_type=makeYdbTypeFromTypeID(Type.STRING),
data_source_type=DataSourceType(ch=clickhouse.String()),
),
Column(
name='col_13_fixed_string',
- ydb_type=Type.STRING,
+ ydb_type=makeYdbTypeFromTypeID(Type.STRING),
data_source_type=DataSourceType(ch=clickhouse.FixedString()),
),
Column(
name='col_14_date',
- ydb_type=Type.DATE,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.DATE),
data_source_type=DataSourceType(ch=clickhouse.Date()),
),
# FIXME: https://st.yandex-team.ru/YQ-2295
# Column(
# name='col_15_date32',
- # ydb_type=Type.DATE,
+ # ydb_type=getYdbType(Type.DATE,
# data_source_type=DataSourceType(ch=clickhouse.Date32),
# ),
Column(
name='col_16_datetime',
- ydb_type=Type.DATETIME,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.DATETIME),
data_source_type=DataSourceType(ch=clickhouse.DateTime()),
),
Column(
name='col_17_datetime64',
- ydb_type=Type.DATETIME,
+ ydb_type=makeOptionalYdbTypeFromTypeID(Type.TIMESTAMP),
data_source_type=DataSourceType(ch=clickhouse.DateTime64()),
),
),
@@ -454,6 +459,7 @@ class Factory:
data_source_kind=EDataSourceKind.CLICKHOUSE,
database=Database.make_for_data_source_kind(EDataSourceKind.CLICKHOUSE),
pragmas=dict(),
+ check_output_schema=True,
)
# ClickHouse returns different data if columns are Nullable
@@ -474,7 +480,11 @@ class Factory:
# copy type and example value to new TestCase
schema_nullable.columns.append(
- Column(name=col.name, ydb_type=col.ydb_type, data_source_type=DataSourceType(ch=ch_type.to_nullable()))
+ Column(
+ name=col.name,
+ ydb_type=makeOptionalYdbTypeFromYdbType(col.ydb_type),
+ data_source_type=DataSourceType(ch=ch_type.to_nullable()),
+ )
)
data_in_nullable[0].append(tc.data_in[0][i])
data_out_nullable[0].append(tc.data_out_[0][i])
@@ -496,6 +506,7 @@ class Factory:
data_in=data_in_nullable,
data_out_=data_out_nullable,
pragmas=dict(),
+ check_output_schema=True,
)
return [
diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py b/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py
index 33a5e60b9e..5f88098172 100644
--- a/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py
+++ b/ydb/library/yql/providers/generic/connector/tests/utils/dqrun.py
@@ -181,6 +181,7 @@ Dq {
class Result:
data_out: Optional[YsonList]
data_out_with_types: Optional[List]
+ schema: Optional[Schema]
stdout: str
stderr: str
@@ -219,6 +220,7 @@ class Runner:
data_out = None
data_out_with_types = None
+ schema = None
if out.returncode == 0:
# Parse output
@@ -231,6 +233,7 @@ class Runner:
schema = Schema.from_yson(result[0]['Write'][0]['Type'][1][1])
data_out_with_types = [schema.cast_row(row) for row in data_out]
+ LOGGER.debug('Schema: %s', schema)
LOGGER.debug('Data out: %s', data_out)
LOGGER.debug('Data out with types: %s', data_out_with_types)
else:
@@ -253,6 +256,7 @@ class Runner:
return Result(
data_out=data_out,
data_out_with_types=data_out_with_types,
+ schema=schema,
stdout=out.stdout.decode('utf-8'),
stderr=out.stderr.decode('utf-8'),
)
diff --git a/ydb/library/yql/providers/generic/connector/tests/utils/schema.py b/ydb/library/yql/providers/generic/connector/tests/utils/schema.py
index 252a055638..555728c6e1 100644
--- a/ydb/library/yql/providers/generic/connector/tests/utils/schema.py
+++ b/ydb/library/yql/providers/generic/connector/tests/utils/schema.py
@@ -6,6 +6,7 @@ from yt import yson
from yt.yson.yson_types import YsonEntity
import ydb.public.api.protos.ydb_value_pb2 as ydb_value
from ydb.library.yql.providers.generic.connector.api.common.data_source_pb2 import EDataSourceKind
+from ydb.public.api.protos.ydb_value_pb2 import Type, OptionalType
import ydb.library.yql.providers.generic.connector.tests.utils.clickhouse as clickhouse
import ydb.library.yql.providers.generic.connector.tests.utils.postgresql as postgresql
@@ -37,6 +38,12 @@ class Column:
ydb_type: ydb_value.Type
data_source_type: DataSourceType
+ def __eq__(self, __value: object) -> bool:
+ if isinstance(__value, self.__class__):
+ return self.name == __value.name and self.ydb_type == __value.ydb_type
+ else:
+ raise Exception(f"can't compare 'Column' with '{__value.__class__}'")
+
@classmethod
def from_yson(cls, src: YsonList):
name = src[0]
@@ -319,3 +326,17 @@ class Schema:
'''
items = [SelectWhat.Item(name=col.name) for col in self.columns]
return SelectWhat(*items)
+
+
+def makeYdbTypeFromTypeID(type_id: Type.PrimitiveTypeId) -> Type:
+ return Type(type_id=type_id)
+
+
+def makeOptionalYdbTypeFromTypeID(type_id: Type.PrimitiveTypeId) -> Type:
+ return Type(optional_type=OptionalType(item=Type(type_id=type_id)))
+
+
+def makeOptionalYdbTypeFromYdbType(ydb_type: Type) -> Type:
+ if ydb_type.HasField('optional_type'):
+ return ydb_type
+ return Type(optional_type=OptionalType(item=Type(type_id=ydb_type.type_id)))