diff options
author | qrort <qrort@yandex-team.com> | 2023-01-10 12:36:34 +0300 |
---|---|---|
committer | qrort <qrort@yandex-team.com> | 2023-01-10 12:36:34 +0300 |
commit | 4c8400526beadeb0d8ebe794a1869605de5c99f2 (patch) | |
tree | 36632e4b01cdc6d355c68cb8ae8dd55c392d64f4 | |
parent | d649bd6c35082346d29ac814532d1151a9d496d7 (diff) | |
download | ydb-4c8400526beadeb0d8ebe794a1869605de5c99f2.tar.gz |
pg select query support
31 files changed, 607 insertions, 296 deletions
diff --git a/ydb/core/engine/minikql/CMakeLists.darwin.txt b/ydb/core/engine/minikql/CMakeLists.darwin.txt index 4c32de6bd23..9b153d8472a 100644 --- a/ydb/core/engine/minikql/CMakeLists.darwin.txt +++ b/ydb/core/engine/minikql/CMakeLists.darwin.txt @@ -19,6 +19,7 @@ target_link_libraries(core-engine-minikql PUBLIC ydb-core-engine ydb-core-formats ydb-core-tablet_flat + parser-pg_wrapper-interface ) target_sources(core-engine-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/engine/minikql/flat_local_tx_factory.cpp diff --git a/ydb/core/engine/minikql/CMakeLists.linux-aarch64.txt b/ydb/core/engine/minikql/CMakeLists.linux-aarch64.txt index 006c11c5b79..b0379a4e3a8 100644 --- a/ydb/core/engine/minikql/CMakeLists.linux-aarch64.txt +++ b/ydb/core/engine/minikql/CMakeLists.linux-aarch64.txt @@ -20,6 +20,7 @@ target_link_libraries(core-engine-minikql PUBLIC ydb-core-engine ydb-core-formats ydb-core-tablet_flat + parser-pg_wrapper-interface ) target_sources(core-engine-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/engine/minikql/flat_local_tx_factory.cpp diff --git a/ydb/core/engine/minikql/CMakeLists.linux.txt b/ydb/core/engine/minikql/CMakeLists.linux.txt index 006c11c5b79..b0379a4e3a8 100644 --- a/ydb/core/engine/minikql/CMakeLists.linux.txt +++ b/ydb/core/engine/minikql/CMakeLists.linux.txt @@ -20,6 +20,7 @@ target_link_libraries(core-engine-minikql PUBLIC ydb-core-engine ydb-core-formats ydb-core-tablet_flat + parser-pg_wrapper-interface ) target_sources(core-engine-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/engine/minikql/flat_local_tx_factory.cpp diff --git a/ydb/core/engine/minikql/minikql_engine_host.cpp b/ydb/core/engine/minikql/minikql_engine_host.cpp index 2442fe7bfd7..b8326dc363c 100644 --- a/ydb/core/engine/minikql/minikql_engine_host.cpp +++ b/ydb/core/engine/minikql/minikql_engine_host.cpp @@ -5,6 +5,7 @@ #include <ydb/core/tablet_flat/flat_table_stats.h> #include <ydb/library/yql/minikql/computation/mkql_custom_list.h> #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h> #include <ydb/core/tx/datashard/sys_tables.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -1087,8 +1088,7 @@ NUdf::TUnboxedValue GetCellValue(const TCell& cell, NScheme::TTypeInfo type) { } if (type.GetTypeId() == NScheme::NTypeIds::Pg) { - // TODO: support pg types - Y_VERIFY(false, "pg types are not supported"); + return NYql::NCommon::PgValueFromNativeBinary(cell.AsBuf(), NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc())); } Y_VERIFY_DEBUG(false, "Unsupported type: %" PRIu16, type.GetTypeId()); diff --git a/ydb/core/grpc_services/rpc_kh_describe.cpp b/ydb/core/grpc_services/rpc_kh_describe.cpp index 2cd5434a926..1ceb70b9e1e 100644 --- a/ydb/core/grpc_services/rpc_kh_describe.cpp +++ b/ydb/core/grpc_services/rpc_kh_describe.cpp @@ -176,11 +176,12 @@ private: auto* colMeta = Result.add_columns(); colMeta->set_name(col.second.Name); auto& typeInfo = col.second.PType; - auto* item = colMeta->mutable_type()->mutable_optional_type()->mutable_item(); + auto* item = colMeta->mutable_type(); if (typeInfo.GetTypeId() == NScheme::NTypeIds::Pg) { item->mutable_pg_type()->set_oid(NPg::PgTypeIdFromTypeDesc(typeInfo.GetTypeDesc())); } else { - item->set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); + item->mutable_optional_type()->mutable_item() + ->set_type_id((Ydb::Type::PrimitiveTypeId)typeInfo.GetTypeId()); } if (col.second.KeyOrder == -1) continue; diff --git a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp index 8ff5c5502e1..dc5f72a4042 100644 --- a/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp +++ b/ydb/core/kqp/compute_actor/kqp_scan_compute_actor.cpp @@ -135,8 +135,15 @@ public: YQL_ENSURE(Meta.GetTable().GetTableKind() != (ui32)ETableKind::SysView); KeyColumnTypes.reserve(Meta.GetKeyColumnTypes().size()); - for (auto typeId : Meta.GetKeyColumnTypes()) { - KeyColumnTypes.push_back(NScheme::TTypeInfo((NScheme::TTypeId)typeId)); + for (size_t i = 0; i < Meta.KeyColumnTypesSize(); i++) { + auto typeId = Meta.GetKeyColumnTypes().at(i); + KeyColumnTypes.push_back(NScheme::TTypeInfo( + (NScheme::TTypeId)typeId, + (typeId == NScheme::NTypeIds::Pg) ? + NPg::TypeDescFromPgTypeId( + Meta.GetKeyColumnTypeInfos().at(i).GetPgTypeId() + ) : nullptr + )); } } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index eab53a449c3..ac5ae77a5c5 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -149,14 +149,21 @@ TTableMetadataResult GetLoadTableMetadataResult(const NSchemeCache::TSchemeCache std::map<ui32, TString, std::less<ui32>> columnOrder; for (auto& pair : entry.Columns) { const auto& columnDesc = pair.second; - TString typeName; - // TODO: support pg types - YQL_ENSURE(columnDesc.PType.GetTypeId() != NScheme::NTypeIds::Pg); - YQL_ENSURE(NScheme::TryGetTypeName(columnDesc.PType.GetTypeId(), typeName)); auto notNull = entry.NotNullColumns.contains(columnDesc.Name); - tableMeta->Columns.emplace(columnDesc.Name, NYql::TKikimrColumnMetadata( - columnDesc.Name, columnDesc.Id, typeName, notNull, columnDesc.PType)); + if (columnDesc.PType.GetTypeId() != NScheme::NTypeIds::Pg) { + YQL_ENSURE(NScheme::TryGetTypeName(columnDesc.PType.GetTypeId(), typeName)); + } else { + Y_VERIFY(columnDesc.PType.GetTypeDesc(), "no pg type descriptor"); + Y_VERIFY(!notNull, "pg not null types are not allowed"); + typeName = NPg::PgTypeNameFromTypeDesc(columnDesc.PType.GetTypeDesc()); + } + tableMeta->Columns.emplace( + columnDesc.Name, + NYql::TKikimrColumnMetadata( + columnDesc.Name, columnDesc.Id, typeName, notNull, columnDesc.PType + ) + ); if (columnDesc.KeyOrder >= 0) { keyColumns[columnDesc.KeyOrder] = columnDesc.Name; } diff --git a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp index cb6c8ac1a7f..d251aad27e9 100644 --- a/ydb/core/kqp/provider/yql_kikimr_gateway.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_gateway.cpp @@ -213,8 +213,8 @@ EYqlIssueCode YqlStatusFromYdbStatus(ui32 ydbStatus) { void SetColumnType(Ydb::Type& protoType, const TString& typeName, bool notNull) { auto* typeDesc = NKikimr::NPg::TypeDescFromPgTypeName(typeName); if (typeDesc) { - auto pg = notNull ? protoType.mutable_pg_type() : - protoType.mutable_optional_type()->mutable_item()->mutable_pg_type(); + Y_VERIFY(!notNull, "It is not allowed to create NOT NULL pg columns"); + auto pg = protoType.mutable_pg_type(); pg->set_oid(NKikimr::NPg::PgTypeIdFromTypeDesc(typeDesc)); return; } diff --git a/ydb/core/kqp/provider/yql_kikimr_provider.cpp b/ydb/core/kqp/provider/yql_kikimr_provider.cpp index 72cf95291ea..47f6e8a122d 100644 --- a/ydb/core/kqp/provider/yql_kikimr_provider.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_provider.cpp @@ -2,6 +2,7 @@ #include <ydb/library/yql/providers/common/proto/gateways_config.pb.h> #include <ydb/core/base/path.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h> #include <ydb/library/yql/providers/result/provider/yql_result_provider.h> #include <ydb/library/yql/providers/common/schema/expr/yql_expr_schema.h> @@ -163,15 +164,19 @@ bool TKikimrTableDescription::Load(TExprContext& ctx, bool withSystemColumns) { // is passed with no params. It's known to always be Decimal(22,9), // so we transform Decimal type here. const TTypeAnnotationNode *type; - if (to_lower(column.Type) == "decimal") + if (to_lower(column.Type) == "decimal") { type = ctx.MakeType<TDataExprParamsType>( NKikimr::NUdf::GetDataSlot(column.Type), ToString(NKikimr::NScheme::DECIMAL_PRECISION), ToString(NKikimr::NScheme::DECIMAL_SCALE)); - else - type = ctx.MakeType<TDataExprType>(NKikimr::NUdf::GetDataSlot(column.Type)); - - if (!column.NotNull) { + } else { + if (column.TypeInfo.GetTypeId() != NKikimr::NScheme::NTypeIds::Pg) { + type = ctx.MakeType<TDataExprType>(NKikimr::NUdf::GetDataSlot(column.Type)); + } else { + type = ctx.MakeType<TPgExprType>(NKikimr::NPg::PgTypeIdFromTypeDesc(column.TypeInfo.GetTypeDesc())); + } + } + if (!column.NotNull && column.TypeInfo.GetTypeId() != NKikimr::NScheme::NTypeIds::Pg) { type = ctx.MakeType<TOptionalExprType>(type); } diff --git a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp index 2786aca66d9..3f9224477d0 100644 --- a/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp +++ b/ydb/core/kqp/query_compiler/kqp_mkql_compiler.cpp @@ -23,12 +23,15 @@ TVector<TKqpTableColumn> GetKqpColumns(const TKikimrTableMetadata& table, const ui32 columnId = 0; ui32 columnType = 0; bool notNull = false; + void* columnTypeDesc = nullptr; auto columnData = table.Columns.FindPtr(name); if (columnData) { columnId = columnData->Id; - // TODO: support pg types columnType = columnData->TypeInfo.GetTypeId(); + if (columnType == NScheme::NTypeIds::Pg) { + columnTypeDesc = columnData->TypeInfo.GetTypeDesc(); + } notNull = columnData->NotNull; } else if (allowSystemColumns) { auto systemColumn = GetSystemColumns().find(name); @@ -38,7 +41,7 @@ TVector<TKqpTableColumn> GetKqpColumns(const TKikimrTableMetadata& table, const } YQL_ENSURE(columnId, "Unknown column: " << name); - pgmColumns.emplace_back(columnId, name, columnType, notNull); + pgmColumns.emplace_back(columnId, name, columnType, notNull, columnTypeDesc); } return pgmColumns; diff --git a/ydb/core/kqp/runtime/kqp_program_builder.cpp b/ydb/core/kqp/runtime/kqp_program_builder.cpp index b5169c19d82..758b0efb69d 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.cpp +++ b/ydb/core/kqp/runtime/kqp_program_builder.cpp @@ -15,13 +15,28 @@ TType* GetRowType(const TProgramBuilder& builder, const TArrayRef<TKqpTableColum TStructTypeBuilder rowTypeBuilder(builder.GetTypeEnvironment()); for (auto& column : columns) { TType* type = nullptr; - if (column.Type == NUdf::TDataType<NUdf::TDecimal>::Id) { - type = TDataDecimalType::Create(NScheme::DECIMAL_PRECISION, NScheme::DECIMAL_SCALE, builder.GetTypeEnvironment()); - } else { - type = TDataType::Create(column.Type, builder.GetTypeEnvironment()); + switch (column.Type) { + case NUdf::TDataType<NUdf::TDecimal>::Id: { + type = TDataDecimalType::Create( + NScheme::DECIMAL_PRECISION, + NScheme::DECIMAL_SCALE, + builder.GetTypeEnvironment() + ); + break; + } + case NKikimr::NScheme::NTypeIds::Pg: { + Y_VERIFY(column.TypeDesc, "No pg type description"); + Y_VERIFY(!column.NotNull, "pg not null types are not allowed"); + type = TPgType::Create(NPg::PgTypeIdFromTypeDesc(column.TypeDesc), builder.GetTypeEnvironment()); + break; + } + default: { + type = TDataType::Create(column.Type, builder.GetTypeEnvironment()); + break; + } } - if (!column.NotNull) { + if (!column.NotNull && column.Type != NKikimr::NScheme::NTypeIds::Pg) { type = TOptionalType::Create(type, builder.GetTypeEnvironment()); } diff --git a/ydb/core/kqp/runtime/kqp_program_builder.h b/ydb/core/kqp/runtime/kqp_program_builder.h index 348d25eeea3..997ce00ac34 100644 --- a/ydb/core/kqp/runtime/kqp_program_builder.h +++ b/ydb/core/kqp/runtime/kqp_program_builder.h @@ -13,12 +13,14 @@ struct TKqpTableColumn { TString Name; NUdf::TDataTypeId Type; bool NotNull; + void* TypeDesc; - TKqpTableColumn(ui32 id, const TStringBuf& name, NUdf::TDataTypeId type, bool notNull) + TKqpTableColumn(ui32 id, const TStringBuf& name, NUdf::TDataTypeId type, bool notNull, void* typeDesc) : Id(id) , Name(name) , Type(type) - , NotNull(notNull) {} + , NotNull(notNull) + , TypeDesc(typeDesc) {} }; using TKqpKeyTuple = TVector<TRuntimeNode>; diff --git a/ydb/core/kqp/runtime/kqp_scan_data.cpp b/ydb/core/kqp/runtime/kqp_scan_data.cpp index f0e04056d95..52800f87a11 100644 --- a/ydb/core/kqp/runtime/kqp_scan_data.cpp +++ b/ydb/core/kqp/runtime/kqp_scan_data.cpp @@ -5,6 +5,7 @@ #include <ydb/core/scheme/scheme_types_proto.h> #include <ydb/library/yql/minikql/mkql_string_util.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/pack.h> #include <ydb/library/yql/utils/yql_panic.h> namespace NKikimr { @@ -59,9 +60,15 @@ TBytesStatistics GetUnboxedValueSize(const NUdf::TUnboxedValue& value, NScheme:: } } - case NTypeIds::Pg: - // TODO: support pg types - YQL_ENSURE(false, "pg types are not supported"); + case NTypeIds::Pg: { + return { + sizeof(NUdf::TUnboxedValue), + NKikimr::NMiniKQL::PgValueSize( + NPg::PgTypeIdFromTypeDesc(type.GetTypeDesc()), //extra typeDesc resolve + value + ) + }; + } default: Y_VERIFY_DEBUG_S(false, "Unsupported type " << NScheme::TypeName(type.GetTypeId())); diff --git a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp index 67edc49d046..1fd50c06108 100644 --- a/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp +++ b/ydb/core/kqp/ut/pg/kqp_pg_ut.cpp @@ -1,12 +1,31 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> #include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h> extern "C" { #include "postgres.h" #include "catalog/pg_type_d.h" } +namespace { + struct TPgTypeTestSpec { + bool IsKey; + std::function<TString(size_t)> TextIn, TextOut; + std::function<TString(TString)> ArrayPrint; + TPgTypeTestSpec() = default; + TPgTypeTestSpec( + bool isKey, + std::function<TString(size_t)> in, + std::function<TString(size_t)> out, + std::function<TString(TString)> print = [] (auto s) { return Sprintf("{%s,%s}", s.c_str(), s.c_str()); }) + : IsKey(isKey) + , TextIn(in) + , TextOut(out) + , ArrayPrint(print) {} + }; +} + namespace NKikimr { namespace NKqp { @@ -17,6 +36,324 @@ Y_UNIT_TEST_SUITE(KqpPg) { auto makePgType = [] (ui32 oid, i32 typlen = -1) { return TPgType(oid, typlen, -1); }; + TMap< + ui32, + TPgTypeTestSpec + > typeSpecs ={ + { BOOLOID, { + true, + [] (auto i) { return TString(i ? "true" : "false"); }, + [] (auto i) { return TString(i ? "t" : "f"); } + } + }, + { CHAROID, { + true, + [] (auto i) { return Sprintf("%c", (char)(i + '0')); }, + [] (auto i) { return Sprintf("%c", (char)(i + '0')); } + } + }, + { INT2OID, { + true, + [] (auto i) { return Sprintf("%u", i); }, + [] (auto i) { return Sprintf("%u", i); } + } + }, + { INT4OID, { + true, + [] (auto i) { return Sprintf("%u", i); }, + [] (auto i) { return Sprintf("%u", i); } + } + }, + { INT8OID, { + true, + [] (auto i) { return Sprintf("%u", i); }, + [] (auto i) { return Sprintf("%u", i); } + } + }, + { FLOAT4OID, { + true, + [] (auto i) { return Sprintf("%g", i + 0.5f); }, + [] (auto i) { return Sprintf("%g", i + 0.5f); } + } + }, + { FLOAT8OID, { + true, + [] (auto i) { return Sprintf("%lg", i + 0.5); }, + [] (auto i) { return Sprintf("%lg", i + 0.5); } + } + }, + { TEXTOID, { + true, + [] (auto i) { return Sprintf("text %u", i); }, + [] (auto i) { return Sprintf("text %u", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { BPCHAROID, { + true, + [] (auto i) { return Sprintf("bpchar %u", i); }, + [] (auto i) { return Sprintf("bpchar %u", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { VARCHAROID, { + false, + [] (auto i) { return Sprintf("varchar %u", i); }, + [] (auto i) { return Sprintf("varchar %u", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { NAMEOID, { + true, + [] (auto i) { return Sprintf("name %u", i); }, + [] (auto i) { return Sprintf("name %u", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { NUMERICOID, { + true, + [] (auto i) { return Sprintf("%lg", i + 0.12345); }, + [] (auto i) { return Sprintf("%lg", i + 0.12345); } + } + }, + { MONEYOID, { + true, + [] (auto i) { return Sprintf("%lg", i + i / 100.); }, + [] (auto i) { return Sprintf("$%.2lf", i + i / 100.); } + } + }, + { DATEOID, { + true, + [] (auto i) { return Sprintf("1970-01-%02u", i + 1); }, + [] (auto i) { return Sprintf("1970-01-%02u", i + 1); } + } + }, + { TIMEOID, { + true, + [] (auto i) { return Sprintf("%02u:01:02.345", i); }, + [] (auto i) { return Sprintf("%02u:01:02.345", i); } + } + }, + { TIMESTAMPOID, { + true, + [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345", i); }, + [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { TIMETZOID, { + true, + [] (auto i) { return Sprintf("%02u:01:02.345-03", i); }, + [] (auto i) { return Sprintf("%02u:01:02.345-03", i); } + } + }, + { TIMESTAMPTZOID, { + true, + [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345 -3:00", i); }, + [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345+00", i + 3); }, // TODO: investigate + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { INTERVALOID, { + true, + [] (auto i) { return Sprintf("P01-02-03T04:05:%02u", i); }, + [] (auto i) { return Sprintf("1 year 2 mons 3 days 04:05:%02u", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { BITOID, { + true, + [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }, + [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); } + } + }, + { VARBITOID, { + true, + [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }, + [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); } + } + }, + { POINTOID, { + false, + [] (auto i) { return Sprintf("(10, %u)", i); }, + [] (auto i) { return Sprintf("(10,%u)", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { LINEOID, { + false, + [] (auto i) { return Sprintf("{1, 2, %u}", i); }, + [] (auto i) { return Sprintf("{1,2,%u}", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { LSEGOID, { + false, + [] (auto i) { return Sprintf("[(0, 0), (1, %u)]", i); }, + [] (auto i) { return Sprintf("[(0,0),(1,%u)]", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { BOXOID, { + false, + [] (auto i) { return Sprintf("(1, %u), (0, 0)", i + 1); }, + [] (auto i) { return Sprintf("(1,%u),(0,0)", i + 1); }, + [] (auto s) { return Sprintf("{%s;%s}", s.c_str(), s.c_str()); } + } + }, + { PATHOID, { + false, + [] (auto i) { return Sprintf("((0, 1), (2, 3), (4, %u))", i); }, + [] (auto i) { return Sprintf("((0,1),(2,3),(4,%u))", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { POLYGONOID, { + false, + [] (auto i) { return Sprintf("((0, 1), (2, 3), (4, %u))", i); }, + [] (auto i) { return Sprintf("((0,1),(2,3),(4,%u))", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { CIRCLEOID, { + false, + [] (auto i) { return Sprintf("<(0, 1), %u>", i); }, + [] (auto i) { return Sprintf("<(0,1),%u>", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { INETOID, { + false, + [] (auto i) { return Sprintf("128.%u.0.0/16", i); }, + [] (auto i) { return Sprintf("128.%u.0.0/16", i); } + } + }, + { CIDROID, { + false, + [] (auto i) { return Sprintf("128.%u.0.0/16", i); }, + [] (auto i) { return Sprintf("128.%u.0.0/16", i); } + } + }, + { MACADDROID, { + false, + [] (auto i) { return Sprintf("08:00:2b:01:02:%02u", i); }, + [] (auto i) { return Sprintf("08:00:2b:01:02:%02u", i); } + } + }, + { MACADDR8OID, { + false, + [] (auto i) { return Sprintf("08:00:2b:01:02:03:04:%02u", i); }, + [] (auto i) { return Sprintf("08:00:2b:01:02:03:04:%02u", i); } + } + }, + { UUIDOID, { + false, + [] (auto i) { return Sprintf("00000000-0000-0000-0000-0000000000%02u", i); }, + [] (auto i) { return Sprintf("00000000-0000-0000-0000-0000000000%02u", i); } + } + }, + { JSONOID, { + false, + [] (auto i) { return Sprintf("[%u]", i); }, + [] (auto i) { return Sprintf("[%u]", i); } + } + }, + { JSONBOID, { + false, + [] (auto i) { return Sprintf("[%u]", i); }, + [] (auto i) { return Sprintf("[%u]", i); } + } + }, + { JSONPATHOID, { + false, + [] (auto i) { return Sprintf("$[%u]", i); }, + [] (auto i) { return Sprintf("$[%u]", i); } + } + }, + { XMLOID, { + false, + [] (auto i) { return Sprintf("<a>%u</a>", i); }, + [] (auto i) { return Sprintf("<a>%u</a>", i); } + } + }, + { TSQUERYOID, { + false, + [] (auto i) { return Sprintf("a&b%u", i); }, + [] (auto i) { return Sprintf("'a' & 'b%u'", i); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { TSVECTOROID, { + false, + [] (auto i) { return Sprintf("a:1 b:%u", i + 2); }, + [] (auto i) { return Sprintf("'a':1 'b':%u", i + 2); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + }, + { INT2VECTOROID, { + false, + [] (auto i) { return Sprintf("%u %u %u", i, i + 1, i + 2); }, + [] (auto i) { return Sprintf("%u %u %u", i, i + 1, i + 2); }, + [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); } + } + } + }; + auto createTable = [] ( + NYdb::NTable::TTableClient& db, + NYdb::NTable::TSession& session, + ui32 id, + bool isKey, + std::function<TString(size_t)> textIn, + TString setTableName = "", + ui16 rowCount = 10 + ) { + TTableBuilder builder; + if (isKey) { + builder.AddNullableColumn("key", makePgType(id)); + } else { + builder.AddNullableColumn("key", makePgType(INT2OID)); + } + builder.AddNullableColumn("value", makePgType(id)); + builder.SetPrimaryKeyColumn("key"); + + auto tableName = (setTableName.empty()) ? Sprintf("/Root/Pg%u", id) : setTableName; + auto result = session.CreateTable(tableName, builder.Build()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + NYdb::TValueBuilder rows; + rows.BeginList(); + for (size_t i = 0; i < rowCount; ++i) { + auto str = NPg::PgNativeBinaryFromNativeText(textIn(i), id); + if (isKey) { + rows.AddListItem() + .BeginStruct() + .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) + .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) + .EndStruct(); + } else { + auto int2Val = (i16)i; + TString int2Str((const char*)&int2Val, sizeof(int2Val)); + rows.AddListItem() + .BeginStruct() + .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID))) + .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) + .EndStruct(); + } + } + rows.EndList(); + + result = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + auto readSettings = TReadTableSettings() + .AppendColumns("key") + .AppendColumns("value"); + + auto it = session.ReadTable(tableName, readSettings).GetValueSync(); + UNIT_ASSERT_C(it.IsSuccess(), result.GetIssues().ToString()); + return tableName; + }; + Y_UNIT_TEST(CreateTableBulkUpsertAndRead) { TKikimrRunner kikimr; @@ -26,69 +363,30 @@ Y_UNIT_TEST_SUITE(KqpPg) { { auto db = kikimr.GetTableClient(); auto session = db.CreateSession().GetValueSync().GetSession(); - - TTableBuilder builder; - if (isKey) { - builder.AddNullableColumn("key", makePgType(id)); - } else { - builder.AddNullableColumn("key", makePgType(INT2OID)); - } - builder.AddNullableColumn("value", makePgType(id)); - builder.SetPrimaryKeyColumn("key"); - - auto tableName = Sprintf("/Root/Pg%u", id); - auto result = session.CreateTable(tableName, builder.Build()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); - - NYdb::TValueBuilder rows; - rows.BeginList(); - for (size_t i = 0; i < 10; ++i) { - auto str = NPg::PgNativeBinaryFromNativeText(textIn(i), id); - if (isKey) { - rows.AddListItem() - .BeginStruct() - .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) - .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) - .EndStruct(); - } else { - auto int2Val = (i16)i; - TString int2Str((const char*)&int2Val, sizeof(int2Val)); - rows.AddListItem() - .BeginStruct() - .AddMember("key").Pg(TPgValue(TPgValue::VK_BINARY, int2Str, makePgType(INT2OID))) - .AddMember("value").Pg(TPgValue(TPgValue::VK_BINARY, str, makePgType(id))) - .EndStruct(); - } - } - rows.EndList(); - - result = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); - UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + auto tableName = createTable(db, session, id, isKey, textIn); auto readSettings = TReadTableSettings() .AppendColumns("key") .AppendColumns("value"); auto it = session.ReadTable(tableName, readSettings).GetValueSync(); - UNIT_ASSERT_C(it.IsSuccess(), result.GetIssues().ToString()); + Y_ENSURE(it.IsSuccess()); bool eos = false; while (!eos) { auto part = it.ReadNext().ExtractValueSync(); if (!part.IsSuccess()) { eos = true; - UNIT_ASSERT_C(part.EOS(), result.GetIssues().ToString()); + Y_ENSURE(part.EOS()); continue; } auto resultSet = part.ExtractPart(); TResultSetParser parser(resultSet); for (size_t i = 0; parser.TryNextRow(); ++i) { - auto check = [&parser, &id] (const TString& column, const TString& expected) { + auto check = [&parser, &id, &i] (const TString& column, const TString& expected) { auto& c = parser.ColumnParser(column); - c.OpenOptional(); UNIT_ASSERT_VALUES_EQUAL(expected, NPg::PgNativeTextFromNativeBinary(c.GetPg().Content_, id)); Cerr << expected << Endl; - c.CloseOptional(); }; auto expected = textOut(i); if (isKey) { @@ -101,23 +399,20 @@ Y_UNIT_TEST_SUITE(KqpPg) { session.Close().GetValueSync(); }; - auto testType = [&] (ui32 id, bool isKey, - std::function<TString(size_t)> textIn, - std::function<TString(size_t)> textOut, - std::function<TString(TString)> arrayPrint = [] (auto s) { return Sprintf("{%s,%s}", s.c_str(), s.c_str()); }) + auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec) { - testSingleType(id, isKey, textIn, textOut); + testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut); auto arrayId = NYql::NPg::LookupType(id).ArrayTypeId; - auto textInArray = [&] (auto i) { - auto str = textIn(i); - return arrayPrint(str); + auto textInArray = [&typeSpec] (auto i) { + auto str = typeSpec.TextIn(i); + return typeSpec.ArrayPrint(str); }; - auto textOutArray = [&] (auto i) { - auto str = textOut(i); - return arrayPrint(str); + auto textOutArray = [&typeSpec] (auto i) { + auto str = typeSpec.TextOut(i); + return typeSpec.ArrayPrint(str); }; testSingleType(arrayId, false, textInArray, textOutArray); @@ -133,185 +428,11 @@ Y_UNIT_TEST_SUITE(KqpPg) { [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); }); }; - testType(BOOLOID, true, - [] (auto i) { return TString(i ? "true" : "false"); }, - [] (auto i) { return TString(i ? "t" : "f"); }); - - testType(CHAROID, true, - [] (auto i) { return Sprintf("%c", (char)(i + '0')); }, - [] (auto i) { return Sprintf("%c", (char)(i + '0')); }); - - testType(INT2OID, true, - [] (auto i) { return Sprintf("%u", i); }, - [] (auto i) { return Sprintf("%u", i); }); - - testType(INT4OID, true, - [] (auto i) { return Sprintf("%u", i); }, - [] (auto i) { return Sprintf("%u", i); }); - - testType(INT8OID, true, - [] (auto i) { return Sprintf("%u", i); }, - [] (auto i) { return Sprintf("%u", i); }); - - testType(FLOAT4OID, true, - [] (auto i) { return Sprintf("%g", i + 0.5f); }, - [] (auto i) { return Sprintf("%g", i + 0.5f); }); - - testType(FLOAT8OID, true, - [] (auto i) { return Sprintf("%lg", i + 0.5); }, - [] (auto i) { return Sprintf("%lg", i + 0.5); }); - testByteaType(); - testType(TEXTOID, true, - [] (auto i) { return Sprintf("text %u", i); }, - [] (auto i) { return Sprintf("text %u", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(BPCHAROID, true, - [] (auto i) { return Sprintf("bpchar %u", i); }, - [] (auto i) { return Sprintf("bpchar %u", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(VARCHAROID, false, - [] (auto i) { return Sprintf("varchar %u", i); }, - [] (auto i) { return Sprintf("varchar %u", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(NAMEOID, true, - [] (auto i) { return Sprintf("name %u", i); }, - [] (auto i) { return Sprintf("name %u", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(NUMERICOID, true, - [] (auto i) { return Sprintf("%lg", i + 0.12345); }, - [] (auto i) { return Sprintf("%lg", i + 0.12345); }); - - testType(MONEYOID, true, - [] (auto i) { return Sprintf("%lg", i + i / 100.); }, - [] (auto i) { return Sprintf("$%.2lf", i + i / 100.); }); - - testType(DATEOID, true, - [] (auto i) { return Sprintf("1970-01-%02u", i + 1); }, - [] (auto i) { return Sprintf("1970-01-%02u", i + 1); }); - - testType(TIMEOID, true, - [] (auto i) { return Sprintf("%02u:01:02.345", i); }, - [] (auto i) { return Sprintf("%02u:01:02.345", i); }); - - testType(TIMESTAMPOID, true, - [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345", i); }, - [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(TIMETZOID, true, - [] (auto i) { return Sprintf("%02u:01:02.345-03", i); }, - [] (auto i) { return Sprintf("%02u:01:02.345-03", i); }); - - testType(TIMESTAMPTZOID, true, - [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345 -3:00", i); }, - [] (auto i) { return Sprintf("1970-01-01 %02u:01:02.345+00", i + 3); }, // TODO: investigate - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(INTERVALOID, true, - [] (auto i) { return Sprintf("P01-02-03T04:05:%02u", i); }, - [] (auto i) { return Sprintf("1 year 2 mons 3 days 04:05:%02u", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(BITOID, true, - [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }, - [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }); - - testType(VARBITOID, true, - [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }, - [] (auto i) { return Sprintf("%c%c%c%c", (i&8)?'1':'0', (i&4)?'1':'0', (i&2)?'1':'0', (i&1)?'1':'0'); }); - - testType(POINTOID, false, - [] (auto i) { return Sprintf("(10, %u)", i); }, - [] (auto i) { return Sprintf("(10,%u)", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(LINEOID, false, - [] (auto i) { return Sprintf("{1, 2, %u}", i); }, - [] (auto i) { return Sprintf("{1,2,%u}", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(LSEGOID, false, - [] (auto i) { return Sprintf("[(0, 0), (1, %u)]", i); }, - [] (auto i) { return Sprintf("[(0,0),(1,%u)]", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(BOXOID, false, - [] (auto i) { return Sprintf("(1, %u), (0, 0)", i + 1); }, - [] (auto i) { return Sprintf("(1,%u),(0,0)", i + 1); }, - [] (auto s) { return Sprintf("{%s;%s}", s.c_str(), s.c_str()); }); - - testType(PATHOID, false, - [] (auto i) { return Sprintf("((0, 1), (2, 3), (4, %u))", i); }, - [] (auto i) { return Sprintf("((0,1),(2,3),(4,%u))", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(POLYGONOID, false, - [] (auto i) { return Sprintf("((0, 1), (2, 3), (4, %u))", i); }, - [] (auto i) { return Sprintf("((0,1),(2,3),(4,%u))", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(CIRCLEOID, false, - [] (auto i) { return Sprintf("<(0, 1), %u>", i); }, - [] (auto i) { return Sprintf("<(0,1),%u>", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(INETOID, false, - [] (auto i) { return Sprintf("128.%u.0.0/16", i); }, - [] (auto i) { return Sprintf("128.%u.0.0/16", i); }); - - testType(CIDROID, false, - [] (auto i) { return Sprintf("128.%u.0.0/16", i); }, - [] (auto i) { return Sprintf("128.%u.0.0/16", i); }); - - testType(MACADDROID, false, - [] (auto i) { return Sprintf("08:00:2b:01:02:%02u", i); }, - [] (auto i) { return Sprintf("08:00:2b:01:02:%02u", i); }); - - testType(MACADDR8OID, false, - [] (auto i) { return Sprintf("08:00:2b:01:02:03:04:%02u", i); }, - [] (auto i) { return Sprintf("08:00:2b:01:02:03:04:%02u", i); }); - - testType(UUIDOID, false, - [] (auto i) { return Sprintf("00000000-0000-0000-0000-0000000000%02u", i); }, - [] (auto i) { return Sprintf("00000000-0000-0000-0000-0000000000%02u", i); }); - - testType(JSONOID, false, - [] (auto i) { return Sprintf("[%u]", i); }, - [] (auto i) { return Sprintf("[%u]", i); }); - - testType(JSONBOID, false, - [] (auto i) { return Sprintf("[%u]", i); }, - [] (auto i) { return Sprintf("[%u]", i); }); - - testType(JSONPATHOID, false, - [] (auto i) { return Sprintf("$[%u]", i); }, - [] (auto i) { return Sprintf("$[%u]", i); }); - - testType(XMLOID, false, - [] (auto i) { return Sprintf("<a>%u</a>", i); }, - [] (auto i) { return Sprintf("<a>%u</a>", i); }); - - testType(TSQUERYOID, false, - [] (auto i) { return Sprintf("a&b%u", i); }, - [] (auto i) { return Sprintf("'a' & 'b%u'", i); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(TSVECTOROID, false, - [] (auto i) { return Sprintf("a:1 b:%u", i + 2); }, - [] (auto i) { return Sprintf("'a':1 'b':%u", i + 2); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - - testType(INT2VECTOROID, false, - [] (auto i) { return Sprintf("%u %u %u", i, i + 1, i + 2); }, - [] (auto i) { return Sprintf("%u %u %u", i, i + 1, i + 2); }, - [] (auto s) { return Sprintf("{\"%s\",\"%s\"}", s.c_str(), s.c_str()); }); - + for (const auto& [oid, spec] : typeSpecs) { + testType(oid, spec); + } // TODO: varchar as a key // TODO: native range/multirange types (use get_range_io_data()) } @@ -349,6 +470,96 @@ Y_UNIT_TEST_SUITE(KqpPg) { ["3";"three"] ])", FormatResultSetYson(result.GetResultSet(0))); } + + Y_UNIT_TEST(TableSelect) { + auto kikimr = DefaultKikimrRunner(); + auto testSingleType = [&kikimr] (ui32 id, bool isKey, + std::function<TString(size_t)> textIn, + std::function<TString(size_t)> textOut) + { + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + auto tableName = createTable(db, session, id, isKey, textIn); + session.Close().GetValueSync(); + NYdb::NScripting::TScriptingClient client(kikimr.GetDriver()); + auto result = client.ExecuteYqlScript( + TStringBuilder() << R"( + --!syntax_pg + SELECT * FROM ")" + << tableName << "\"" + ).GetValueSync(); + + UNIT_ASSERT_VALUES_EQUAL_C(result.GetStatus(), EStatus::SUCCESS, result.GetIssues().ToString()); + + TResultSetParser parser(result.GetResultSetParser(0)); + for (size_t i = 0; parser.TryNextRow(); ++i) { + auto check = [&parser, &id] (const TString& column, const TString& expected) { + auto& c = parser.ColumnParser(column); + UNIT_ASSERT_VALUES_EQUAL(expected, c.GetPg().Content_); + Cerr << expected << Endl; + }; + auto expected = textOut(i); + if (isKey) { + check("key", expected); + } + check("value", expected); + } + }; + + + auto testType = [&] (ui32 id, const TPgTypeTestSpec& typeSpec) + { + testSingleType(id, typeSpec.IsKey, typeSpec.TextIn, typeSpec.TextOut); + + //arrays do not work for now due to postgress requesting null-terminated + //data in ReadArrayBinary + //KIKIMR-16501 + + // auto arrayId = NYql::NPg::LookupType(id).ArrayTypeId; + + // auto textInArray = [&typeSpec] (auto i) { + // auto str = typeSpec.TextIn(i); + // return typeSpec.ArrayPrint(str); + // }; + + // auto textOutArray = [&typeSpec] (auto i) { + // auto str = typeSpec.TextOut(i); + // return typeSpec.ArrayPrint(str); + // }; + + // testSingleType(arrayId, false, textInArray, textOutArray); + }; + + auto testByteaType = [&] () { + testSingleType(BYTEAOID, true, + [] (auto i) { return Sprintf("bytea %u", i); }, + [] (auto i) { return Sprintf("\\x627974656120%x", i + 48); }); + + testSingleType(BYTEAARRAYOID, false, + [] (auto i) { return Sprintf("{a%u, b%u}", i, i + 10); }, + [] (auto i) { return Sprintf("{\"\\\\x61%x\",\"\\\\x6231%x\"}", i + 48, i + 48); }); + }; + testByteaType(); + for (const auto& [oid, spec] : typeSpecs) { + Cerr << oid << Endl; + testType(oid, spec); + } + } + + Y_UNIT_TEST(ReadPgArray) { + NKikimr::NMiniKQL::TScopedAlloc alloc(__LOCATION__); + auto binaryStr = NPg::PgNativeBinaryFromNativeText("{1,1}", INT2ARRAYOID); + Y_ENSURE(binaryStr.Size() == 32); + auto value = NYql::NCommon::PgValueFromNativeBinary(binaryStr, INT2ARRAYOID); + } + + Y_UNIT_TEST(CreateNotNullPgColumn) { + auto kikimr = DefaultKikimrRunner(); + + TTableBuilder builder; + UNIT_ASSERT_EXCEPTION(builder.AddNonNullableColumn("key", makePgType(INT2OID)), yexception); + //add create table check here once create table YQL is supported + } } } // namespace NKqp diff --git a/ydb/core/tx/datashard/read_table_scan.cpp b/ydb/core/tx/datashard/read_table_scan.cpp index 33b2a65aad1..efd5640606a 100644 --- a/ydb/core/tx/datashard/read_table_scan.cpp +++ b/ydb/core/tx/datashard/read_table_scan.cpp @@ -261,8 +261,7 @@ private: col.HasTypeInfo() ? &col.GetTypeInfo() : nullptr); if (col.GetTypeId() == NScheme::NTypeIds::Pg) { - auto pgType = meta->mutable_type()->mutable_optional_type()->mutable_item() - ->mutable_pg_type(); + auto pgType = meta->mutable_type()->mutable_pg_type(); pgType->set_oid(NPg::PgTypeIdFromTypeDesc(typeInfo.GetTypeDesc())); } else { auto id = static_cast<NYql::NProto::TypeIds>(col.GetTypeId()); diff --git a/ydb/core/tx/tx_proxy/read_table_impl.cpp b/ydb/core/tx/tx_proxy/read_table_impl.cpp index 8356fda135f..b19a85de34b 100644 --- a/ydb/core/tx/tx_proxy/read_table_impl.cpp +++ b/ydb/core/tx/tx_proxy/read_table_impl.cpp @@ -1731,8 +1731,7 @@ private: meta->set_name(col.Name); if (col.PType.GetTypeId() == NScheme::NTypeIds::Pg) { - auto pgType = meta->mutable_type()->mutable_optional_type()->mutable_item() - ->mutable_pg_type(); + auto pgType = meta->mutable_type()->mutable_pg_type(); pgType->set_oid(NPg::PgTypeIdFromTypeDesc(col.PType.GetTypeDesc())); } else { auto id = static_cast<NYql::NProto::TypeIds>(col.PType.GetTypeId()); diff --git a/ydb/core/ydb_convert/table_description.cpp b/ydb/core/ydb_convert/table_description.cpp index ff8761c668c..69dc8547edc 100644 --- a/ydb/core/ydb_convert/table_description.cpp +++ b/ydb/core/ydb_convert/table_description.cpp @@ -44,7 +44,7 @@ static Ydb::Type* AddColumn(Ydb::Table::ColumnMeta* newColumn, const TColumn& co newColumn->set_name(column.GetName()); Ydb::Type* columnType = nullptr; - if (column.GetNotNull()) { + if (column.GetNotNull() || protoType == NScheme::NTypeIds::Pg) { columnType = newColumn->mutable_type(); } else { columnType = newColumn->mutable_type()->mutable_optional_type()->mutable_item(); @@ -103,6 +103,10 @@ void FillColumnDescriptionImpl(TYdbProto& out, for (const auto& column : in.GetColumns()) { auto newColumn = out.add_columns(); + Y_ENSURE( + column.GetTypeId() != NScheme::NTypeIds::Pg || !column.GetNotNull(), + "It is not allowed to create NOT NULL column with pg type" + ); Ydb::Type* columnType = AddColumn(newColumn, column); if (columnIdToKeyPos.count(column.GetId())) { @@ -136,6 +140,10 @@ void FillColumnDescription(Ydb::Table::DescribeTableResult& out, const NKikimrSc auto& schema = in.GetSchema(); for (const auto& column : schema.GetColumns()) { + Y_ENSURE( + column.GetTypeId() != NScheme::NTypeIds::Pg || !column.GetNotNull(), + "It is not allowed to create NOT NULL column with pg type" + ); auto newColumn = out.add_columns(); AddColumn(newColumn, column); } @@ -225,7 +233,9 @@ bool FillColumnDescription(NKikimrSchemeOp::TTableDescription& out, return false; } - cd->SetNotNull(true); + if (!column.type().has_pg_type()) { + cd->SetNotNull(true); + } } NScheme::TTypeInfo typeInfo; diff --git a/ydb/core/ydb_convert/ydb_convert.cpp b/ydb/core/ydb_convert/ydb_convert.cpp index 01c58c3109a..cb5047bbd34 100644 --- a/ydb/core/ydb_convert/ydb_convert.cpp +++ b/ydb/core/ydb_convert/ydb_convert.cpp @@ -210,6 +210,13 @@ void ConvertYdbTypeToMiniKQLType(const Ydb::Type& input, NKikimrMiniKQL::TType& } break; } + case Ydb::Type::kPgType: { + output.SetKind(NKikimrMiniKQL::ETypeKind::Pg); + const Ydb::PgType& pgType = input.pg_type(); + auto pgOut = output.MutablePg(); + pgOut->Setoid(pgType.Getoid()); + break; + } default: { ythrow yexception() << "Unknown protobuf type: " << input.DebugString(); @@ -689,7 +696,11 @@ void ConvertYdbValueToMiniKQLValue(const Ydb::Type& inputType, } break; } - + case Ydb::Type::kPgType: { + const auto& stringRef = inputValue.Gettext_value(); + output.SetText(stringRef.data(), stringRef.size()); + break; + } default: { throw yexception() << "Unknown protobuf type: " << inputType.DebugString(); diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp index e0ab361fb95..a88e95785d4 100644 --- a/ydb/library/mkql_proto/mkql_proto.cpp +++ b/ydb/library/mkql_proto/mkql_proto.cpp @@ -480,7 +480,7 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, NK return; } auto pgType = static_cast<TPgType*>(type); - auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId()); + auto textValue = NYql::NCommon::PgValueToNativeText(value, pgType->GetTypeId()); res.SetText(textValue); break; } @@ -587,7 +587,7 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd return; } auto pgType = static_cast<TPgType*>(type); - auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId()); + auto textValue = NYql::NCommon::PgValueToNativeText(value, pgType->GetTypeId()); res.set_text_value(textValue); break; } diff --git a/ydb/library/yql/minikql/CMakeLists.darwin.txt b/ydb/library/yql/minikql/CMakeLists.darwin.txt index 9ffb72aa024..2f676fafe37 100644 --- a/ydb/library/yql/minikql/CMakeLists.darwin.txt +++ b/ydb/library/yql/minikql/CMakeLists.darwin.txt @@ -43,6 +43,7 @@ target_link_libraries(library-yql-minikql PUBLIC public-udf-tz library-yql-utils ydb-library-uuid + public-lib-scheme_types ) target_sources(library-yql-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/aligned_page_pool.cpp diff --git a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt index fbcdd3b05ec..69b2929c681 100644 --- a/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/minikql/CMakeLists.linux-aarch64.txt @@ -44,6 +44,7 @@ target_link_libraries(library-yql-minikql PUBLIC public-udf-tz library-yql-utils ydb-library-uuid + public-lib-scheme_types ) target_sources(library-yql-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/aligned_page_pool.cpp diff --git a/ydb/library/yql/minikql/CMakeLists.linux.txt b/ydb/library/yql/minikql/CMakeLists.linux.txt index fbcdd3b05ec..69b2929c681 100644 --- a/ydb/library/yql/minikql/CMakeLists.linux.txt +++ b/ydb/library/yql/minikql/CMakeLists.linux.txt @@ -44,6 +44,7 @@ target_link_libraries(library-yql-minikql PUBLIC public-udf-tz library-yql-utils ydb-library-uuid + public-lib-scheme_types ) target_sources(library-yql-minikql PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/aligned_page_pool.cpp diff --git a/ydb/library/yql/minikql/mkql_node.cpp b/ydb/library/yql/minikql/mkql_node.cpp index f6cf9292b0c..0943f72a875 100644 --- a/ydb/library/yql/minikql/mkql_node.cpp +++ b/ydb/library/yql/minikql/mkql_node.cpp @@ -4,6 +4,7 @@ #include "mkql_node_visitor.h" #include "mkql_node_printer.h" #include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/public/lib/scheme_types/scheme_type_id.h> #include <util/stream/str.h> #include <util/string/join.h> @@ -362,6 +363,8 @@ TDataType::TDataType(NUdf::TDataTypeId schemeType, const TTypeEnvironment& env) TDataType* TDataType::Create(NUdf::TDataTypeId schemeType, const TTypeEnvironment& env) { MKQL_ENSURE(schemeType, "Null type isn't allowed."); MKQL_ENSURE(schemeType != NUdf::TDataType<NUdf::TDecimal>::Id, "Can't' create Decimal."); + MKQL_ENSURE(schemeType != NKikimr::NScheme::NTypeIds::Pg, "Can't create Pg."); + MKQL_ENSURE(schemeType != 0, "0 type"); return ::new(env.Allocate<TDataType>()) TDataType(schemeType, env); } @@ -479,6 +482,7 @@ TPgType::TPgType(ui32 typeId, const TTypeEnvironment& env) } TPgType* TPgType::Create(ui32 typeId, const TTypeEnvironment& env) { + MKQL_ENSURE(typeId != 0, "0 type"); return ::new(env.Allocate<TPgType>()) TPgType(typeId, env); } diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index f90e01991a0..476cf1cc083 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -2330,13 +2330,15 @@ bool ParsePgIntervalModifier(const TString& str, i32& ret) { } // NYql + namespace NKikimr { namespace NMiniKQL { using namespace NYql; -ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value) { - const auto& typeDesc = NYql::NPg::LookupType(type->GetTypeId()); +ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value) { + const auto& typeDesc = NYql::NPg::LookupType(pgTypeId); + if (typeDesc.TypeLen >= 0) { return typeDesc.TypeLen; } @@ -2351,6 +2353,10 @@ ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value) { } } +ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value) { + return PgValueSize(type->GetTypeId(), value); +} + void PGPackImpl(bool stable, const TPgType* type, const NUdf::TUnboxedValuePod& value, TBuffer& buf) { switch (type->GetTypeId()) { case BOOLOID: { diff --git a/ydb/library/yql/parser/pg_wrapper/interface/pack.h b/ydb/library/yql/parser/pg_wrapper/interface/pack.h index 72b598136d6..3847cec6cd1 100644 --- a/ydb/library/yql/parser/pg_wrapper/interface/pack.h +++ b/ydb/library/yql/parser/pg_wrapper/interface/pack.h @@ -17,7 +17,7 @@ NUdf::TUnboxedValue PGUnpackImpl(const TPgType* type, TStringBuf& buf); void EncodePresortPGValue(TPgType* type, const NUdf::TUnboxedValue& value, TVector<ui8>& output); NUdf::TUnboxedValue DecodePresortPGValue(TPgType* type, TStringBuf& input, TVector<ui8>& buffer); +ui64 PgValueSize(ui32 pgTypeId, const NUdf::TUnboxedValuePod& value); ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value); - } // namespace NMiniKQL } // namespace NKikimr diff --git a/ydb/library/yql/parser/pg_wrapper/parser.cpp b/ydb/library/yql/parser/pg_wrapper/parser.cpp index 3ad774eb722..2bf1b7ebd94 100644 --- a/ydb/library/yql/parser/pg_wrapper/parser.cpp +++ b/ydb/library/yql/parser/pg_wrapper/parser.cpp @@ -52,6 +52,7 @@ extern "C" { extern __thread Latch LocalLatchData; extern void destroy_timezone_hashtable(); +extern void free_current_locale_conv(); const char *progname; #define STDERR_BUFFER_LEN 4096 @@ -301,6 +302,7 @@ extern "C" void setup_pg_thread_cleanup() { struct TThreadCleanup { ~TThreadCleanup() { destroy_timezone_hashtable(); + free_current_locale_conv(); ResourceOwnerDelete(CurrentResourceOwner); MemoryContextDelete(TopMemoryContext); } diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/adt/pg_locale.c b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/adt/pg_locale.c index cf9b6e247e9..86f1d4aa401 100644 --- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/adt/pg_locale.c +++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/adt/pg_locale.c @@ -102,6 +102,8 @@ __thread char *localized_full_months[12 + 1]; /* indicates whether locale information cache is valid */ static __thread bool CurrentLocaleConvValid = false; static __thread bool CurrentLCTimeValid = false; +static __thread struct lconv CurrentLocaleConv; +static __thread bool CurrentLocaleConvAllocated = false; /* Cache for collation-related knowledge */ @@ -396,6 +398,14 @@ free_struct_lconv(struct lconv *s) free(s->negative_sign); } +void free_current_locale_conv() +{ + if (CurrentLocaleConvAllocated) + { + free_struct_lconv(&CurrentLocaleConv); + CurrentLocaleConvAllocated = false; + } +} /* * Check that all fields of a struct lconv (or at least, the ones we care * about) are non-NULL. The field list must match free_struct_lconv(). @@ -464,8 +474,6 @@ db_encoding_convert(int encoding, char **str) struct lconv * PGLC_localeconv(void) { - static __thread struct lconv CurrentLocaleConv; - static __thread bool CurrentLocaleConvAllocated = false; struct lconv *extlconv; struct lconv worklconv; char *save_lc_monetary; diff --git a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/pg_locale.h b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/pg_locale.h index 09ce306efb3..46dfffe1900 100644 --- a/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/pg_locale.h +++ b/ydb/library/yql/parser/pg_wrapper/postgresql/src/include/utils/pg_locale.h @@ -70,6 +70,12 @@ extern bool lc_ctype_is_c(Oid collation); */ extern struct lconv *PGLC_localeconv(void); +/* + * Free thread_local lconv struct + */ +extern void free_current_locale_conv(); + + extern void cache_locale_time(void); diff --git a/ydb/library/yql/sql/pg/pg_sql.cpp b/ydb/library/yql/sql/pg/pg_sql.cpp index e8b3d29ae99..3ab2e1a86e8 100644 --- a/ydb/library/yql/sql/pg/pg_sql.cpp +++ b/ydb/library/yql/sql/pg/pg_sql.cpp @@ -1172,13 +1172,14 @@ public: return {}; } - auto p = Settings.ClusterMapping.FindPtr(value->schemaname); + const char* schemaname = (value->schemaname) ? value->schemaname : Settings.DefaultCluster.Data(); + auto p = Settings.ClusterMapping.FindPtr(schemaname); if (!p) { - AddError(TStringBuilder() << "Unknown cluster: " << value->schemaname); + AddError(TStringBuilder() << "Unknown cluster: " << schemaname); return {}; } - auto sink = L(A("DataSink"), QAX(*p), QAX(value->schemaname)); + auto sink = L(A("DataSink"), QAX(*p), QAX(schemaname)); auto key = L(A("Key"), QL(QA("table"), L(A("String"), QAX(value->relname)))); return { sink, key }; } @@ -1204,14 +1205,10 @@ public: break; } } - if (!view) { auto viewIt = Views.find(value->relname); if (viewIt != Views.end()) { view = &viewIt->second; - } else { - AddError(TStringBuilder() << "View or CTE not found: '" << value->relname << "'"); - return {}; } } } @@ -1238,13 +1235,14 @@ public: return { s, alias, colnames, true }; } - auto p = Settings.ClusterMapping.FindPtr(value->schemaname); + const char* schemaname = (value->schemaname) ? value->schemaname : Settings.DefaultCluster.Data(); + auto p = Settings.ClusterMapping.FindPtr(schemaname); if (!p) { - AddError(TStringBuilder() << "Unknown cluster: " << value->schemaname); + AddError(TStringBuilder() << "Unknown cluster: " << schemaname); return {}; } - auto source = L(A("DataSource"), QAX(*p), QAX(value->schemaname)); + auto source = L(A("DataSource"), QAX(*p), QAX(schemaname)); return { L(A("Read!"), A("world"), source, L(A("Key"), QL(QA("table"), L(A("String"), QAX(TablePathPrefix + value->relname)))), L(A("Void")), @@ -1347,7 +1345,7 @@ public: AddError("RangeFunction: expected pair"); return {}; } - + TExprSettings settings; settings.AllowColumns = false; settings.AllowReturnSet = true; @@ -1678,7 +1676,7 @@ public: return VL(args.data(), args.size()); } - + TAstNode* ParseSubLinkExpr(const SubLink* value, const TExprSettings& settings) { AT_LOCATION(value); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 020bc146d5f..5b4376d75b7 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -130,6 +130,12 @@ void PgReleaseThreadContext(void* ctx) { Y_UNUSED(ctx); } +ui64 PgValueSize(ui32 type, const NUdf::TUnboxedValuePod& value) { + Y_UNUSED(type); + Y_UNUSED(value); + throw yexception() << "PG types are not supported"; +} + ui64 PgValueSize(const TPgType* type, const NUdf::TUnboxedValuePod& value) { Y_UNUSED(type); Y_UNUSED(value); diff --git a/ydb/public/sdk/cpp/client/ydb_table/table.cpp b/ydb/public/sdk/cpp/client/ydb_table/table.cpp index b12d65bed32..ee410153a83 100644 --- a/ydb/public/sdk/cpp/client/ydb_table/table.cpp +++ b/ydb/public/sdk/cpp/client/ydb_table/table.cpp @@ -1038,10 +1038,9 @@ TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const TDeci TTableBuilder& TTableBuilder::AddNullableColumn(const TString& name, const TPgType& type, const TString& family) { auto columnType = TTypeBuilder() - .BeginOptional() - .Pg(type) - .EndOptional() + .Pg(type) .Build(); + TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family); return *this; } @@ -1065,11 +1064,10 @@ TTableBuilder& TTableBuilder::AddNonNullableColumn(const TString& name, const TD } TTableBuilder& TTableBuilder::AddNonNullableColumn(const TString& name, const TPgType& type, const TString& family) { - auto columnType = TTypeBuilder() - .Pg(type) - .Build(); - - TableDescription_.AddColumn(name, TProtoAccessor::GetProto(columnType), family); + throw yexception() << "It is not allowed to create NOT NULL column with pg type"; + Y_UNUSED(name); + Y_UNUSED(type); + Y_UNUSED(family); return *this; } |