diff options
author | theqwertiest <theqwertiest@yandex-team.com> | 2023-06-13 12:48:36 +0300 |
---|---|---|
committer | theqwertiest <theqwertiest@yandex-team.com> | 2023-06-13 12:48:36 +0300 |
commit | 27917da41ffbe0ad4f30d2824d9492798e57464d (patch) | |
tree | f7091c53c77d24c59fbb971db704b74c57dc2a4e | |
parent | 6a63f5c36674a7d5656ab2e5f008635bad2247f4 (diff) | |
download | ydb-27917da41ffbe0ad4f30d2824d9492798e57464d.tar.gz |
feat rps-read-rows: add pg type support
Drive-by:
- fix memory leaks
- fix bug with pg null handling
-rw-r--r-- | ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/CMakeLists.linux-aarch64.txt | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/CMakeLists.linux-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/CMakeLists.windows-x86_64.txt | 1 | ||||
-rw-r--r-- | ydb/core/grpc_services/rpc_read_rows.cpp | 112 | ||||
-rw-r--r-- | ydb/core/grpc_services/ya.make | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt | 4 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/kqp_kv_ut.cpp | 251 | ||||
-rw-r--r-- | ydb/core/kqp/ut/opt/ya.make | 8 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_value/value.cpp | 6 |
13 files changed, 351 insertions, 47 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt index 9ba2c54c10..c25d9faed5 100644 --- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt @@ -54,6 +54,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-library-dynumber ydb-library-mkql_proto library-persqueue-topic_parser + parser-pg_wrapper-interface yql-public-types api-grpc-draft api-protos diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt index 94522095a3..8d1841680a 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt @@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-library-dynumber ydb-library-mkql_proto library-persqueue-topic_parser + parser-pg_wrapper-interface yql-public-types api-grpc-draft api-protos diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt index 94522095a3..8d1841680a 100644 --- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt @@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-library-dynumber ydb-library-mkql_proto library-persqueue-topic_parser + parser-pg_wrapper-interface yql-public-types api-grpc-draft api-protos diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt index 9ba2c54c10..c25d9faed5 100644 --- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt +++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt @@ -54,6 +54,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC ydb-library-dynumber ydb-library-mkql_proto library-persqueue-topic_parser + parser-pg_wrapper-interface yql-public-types api-grpc-draft api-protos diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp index 4772547424..92b8bca53f 100644 --- a/ydb/core/grpc_services/rpc_read_rows.cpp +++ b/ydb/core/grpc_services/rpc_read_rows.cpp @@ -10,6 +10,7 @@ #include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h> #include <ydb/core/ydb_convert/ydb_convert.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h> #include <ydb/library/yql/public/udf/udf_types.h> #include <ydb/library/yql/minikql/dom/yson.h> #include <ydb/library/yql/minikql/dom/json.h> @@ -37,17 +38,17 @@ TVector<std::pair<TString, Ydb::Type>> GetRequestColumns(const Ydb::Table::ReadR TVector<std::pair<TString, Ydb::Type>> result; result.reserve(rowFields.size()); - for (i32 pos = 0; pos < rowFields.size(); ++pos) { - const auto& name = rowFields[pos].Getname(); - const auto& typeInProto = rowFields[pos].type().has_optional_type() ? - rowFields[pos].type().optional_type().item() : rowFields[pos].type(); + for (const auto& rowField: rowFields) { + const auto& name = rowField.Getname(); + const auto& typeInProto = rowField.type().has_optional_type() ? + rowField.type().optional_type().item() : rowField.type(); result.emplace_back(name, typeInProto); } return result; } -} +} // namespace using TEvReadRowsRequest = TGrpcRequestNoOperationCall<Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>; @@ -61,8 +62,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> { static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(60); public: - explicit TReadRowsRPC(IRequestNoOpCtx* request) - : Request(request) + explicit TReadRowsRPC(std::unique_ptr<IRequestNoOpCtx> request) + : Request(std::move(request)) , PipeCache(MakePipePeNodeCacheID(true)) {} @@ -79,11 +80,15 @@ public: THashSet<TString> keyColumnsLeft; THashMap<TString, ui32> columnByName; - for (const auto& [_, colInfo] : entry.Columns) { + for (const auto& [colId, colInfo] : entry.Columns) { ui32 id = colInfo.Id; - auto& name = colInfo.Name; - auto& type = colInfo.PType; - ColumnsMeta.emplace_back(TColumnMeta{name, type, id}); + const auto& name = colInfo.Name; + const auto& type = colInfo.PType; + + ColumnsMeta.emplace_back(TColumnMeta{.Id = id, + .Name = name, + .Type = type, + .PTypeMod = colInfo.PTypeMod}); columnByName[name] = id; i32 keyOrder = colInfo.KeyOrder; @@ -93,10 +98,6 @@ public: KeyColumnTypes[keyOrder] = type; keyColumnsLeft.insert(name); } - if (colInfo.PType.GetTypeId() == NScheme::NTypeIds::Pg) { - errorMessage = "Pg types are not supported yet"; - return false; - } } KeyColumnPositions.resize(KeyColumnTypes.size()); @@ -112,13 +113,13 @@ public: } i32 typmod = -1; ui32 colId = *cp; - auto& ci = *entry.Columns.FindPtr(colId); + auto& colInfo = *entry.Columns.FindPtr(colId); const auto& typeInProto = reqColumns[pos].second; if (typeInProto.type_id()) { // TODO check Arrow types - } else if (typeInProto.has_decimal_type() && ci.PType.GetTypeId() == NScheme::NTypeIds::Decimal) { + } else if (typeInProto.has_decimal_type() && colInfo.PType.GetTypeId() == NScheme::NTypeIds::Decimal) { int precision = typeInProto.decimal_type().precision(); int scale = typeInProto.decimal_type().scale(); if (precision != NScheme::DECIMAL_PRECISION || scale != NScheme::DECIMAL_SCALE) { @@ -137,18 +138,20 @@ public: name.c_str(), typeName.c_str()); return false; } - auto typeInRequest = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); - if (typeInRequest != ci.PType) { + + const auto typeInRequest = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc); + if (typeInRequest != colInfo.PType) { errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s", - name.c_str(), NScheme::TypeName(ci.PType).c_str(), + name.c_str(), NScheme::TypeName(colInfo.PType).c_str(), NScheme::TypeName(typeInRequest).c_str()); return false; } - if (!ci.PTypeMod.empty() && NPg::TypeDescNeedsCoercion(typeDesc)) { - auto result = NPg::BinaryTypeModFromTextTypeMod(ci.PTypeMod, typeDesc); + + if (!colInfo.PTypeMod.empty() && NPg::TypeDescNeedsCoercion(typeDesc)) { + const auto result = NPg::BinaryTypeModFromTextTypeMod(colInfo.PTypeMod, typeDesc); if (result.Error) { errorMessage = Sprintf("Invalid typemod for column %s: type %s, error %s", - name.c_str(), NScheme::TypeName(ci.PType, ci.PTypeMod).c_str(), + name.c_str(), NScheme::TypeName(colInfo.PType, colInfo.PTypeMod).c_str(), result.Error->c_str()); return false; } @@ -156,15 +159,15 @@ public: } } else { errorMessage = Sprintf("Unexpected type for column %s: expected %s", - name.c_str(), NScheme::TypeName(ci.PType).c_str()); + name.c_str(), NScheme::TypeName(colInfo.PType).c_str()); return false; } - bool notNull = entry.NotNullColumns.contains(ci.Name); + bool notNull = entry.NotNullColumns.contains(colInfo.Name); - if (ci.KeyOrder != -1) { - KeyColumnPositions[ci.KeyOrder] = NTxProxy::TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, typmod, notNull}; - keyColumnsLeft.erase(ci.Name); + if (colInfo.KeyOrder != -1) { + KeyColumnPositions[colInfo.KeyOrder] = NTxProxy::TFieldDescription{colInfo.Id, colInfo.Name, (ui32)pos, colInfo.PType, typmod, notNull}; + keyColumnsLeft.erase(colInfo.Name); } } @@ -329,17 +332,17 @@ public: Sprintf("Table '%s' is a system view. ReadRows is not supported.", GetTable().c_str())); } - auto* resolveNamesResult = ev->Get()->Request.Release(); + auto& resolveNamesResult = ev->Get()->Request; LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC going to create keys to read from proto: " << GetProto()->DebugString()); TString errorMessage; - if (!CheckAccess(resolveNamesResult, errorMessage)) { + if (!CheckAccess(resolveNamesResult.Get(), errorMessage)) { return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, errorMessage); } - if (!BuildSchema(resolveNamesResult, errorMessage)) { + if (!BuildSchema(resolveNamesResult.Get(), errorMessage)) { return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage); } if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindTable) { @@ -351,7 +354,7 @@ public: return; } - ResolveShards(resolveNamesResult); + ResolveShards(resolveNamesResult.Get()); } void ResolveShards(NSchemeCache::TSchemeCacheNavigate* resolveNamesResult) { @@ -359,9 +362,9 @@ public: // We are going to request only key columns TVector<TKeyDesc::TColumnOp> columns; - for (const auto& [_, ci] : entry.Columns) { - if (ci.KeyOrder != -1) { - TKeyDesc::TColumnOp op = { ci.Id, TKeyDesc::EColumnOperation::Set, ci.PType, 0, 0 }; + for (const auto& [colId, colInfo] : entry.Columns) { + if (colInfo.KeyOrder != -1) { + TKeyDesc::TColumnOp op = { colInfo.Id, TKeyDesc::EColumnOperation::Set, colInfo.PType, 0, 0 }; columns.push_back(op); } } @@ -457,8 +460,23 @@ public: NKqp::TProgressStatEntry stats; auto& ioStats = stats.ReadIOStat; + const auto getPgTypeFromColMeta = [](const auto &colMeta) { + return NYdb::TPgType(NPg::PgTypeNameFromTypeDesc(colMeta.Type.GetTypeDesc()), + colMeta.PTypeMod); + }; + + const auto getTypeFromColMeta = [&](const auto &colMeta) { + if (colMeta.Type.GetTypeId() == NScheme::NTypeIds::Pg) { + return NYdb::TTypeBuilder().Pg(getPgTypeFromColMeta(colMeta)).Build(); + } else { + return NYdb::TTypeBuilder() + .Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId()) + .Build(); + } + }; + for (const auto& colMeta : ColumnsMeta) { - auto type = NYdb::TTypeBuilder().Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId()).Build(); + const auto type = getTypeFromColMeta(colMeta); auto* col = resultSet->Addcolumns(); *col->mutable_type() = NYdb::TProtoAccessor::GetProto(type); *col->mutable_name() = colMeta.Name; @@ -471,13 +489,26 @@ public: vb.BeginStruct(); ui64 sz = 0; for (const auto& colMeta : ColumnsMeta) { - auto type = NYdb::TTypeBuilder().Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId()).Build(); + const auto type = getTypeFromColMeta(colMeta); LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC " << " name: " << colMeta.Name ); const auto& cell = row[colMeta.Id - 1]; vb.AddMember(colMeta.Name); - ProtoValueFromCell(vb, colMeta.Type, cell); + if (colMeta.Type.GetTypeId() == NScheme::NTypeIds::Pg) + { + const auto pgTypeId = NPg::PgTypeIdFromTypeDesc(colMeta.Type.GetTypeDesc()); + const NYdb::TPgValue pgValue{ + cell.IsNull() ? NYdb::TPgValue::VK_NULL : NYdb::TPgValue::VK_TEXT, + NPg::PgNativeTextFromNativeBinary({cell.AsBuf().data(), cell.AsBuf().size()}, pgTypeId).Str, + getPgTypeFromColMeta(colMeta) + }; + vb.Pg(pgValue); + } + else + { + ProtoValueFromCell(vb, colMeta.Type, cell); + } sz += cell.Size(); } vb.EndStruct(); @@ -554,9 +585,10 @@ private: TVector<NTxProxy::TFieldDescription> KeyColumnPositions; TVector<NScheme::TTypeInfo> KeyColumnTypes; struct TColumnMeta { + ui32 Id; TString Name; NScheme::TTypeInfo Type; - ui32 Id; + TString PTypeMod; }; TVector<TColumnMeta> ColumnsMeta; @@ -569,7 +601,7 @@ private: }; void DoReadRowsRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) { - f.RegisterActor(new TReadRowsRPC(p.release())); + f.RegisterActor(new TReadRowsRPC(std::move(p))); } } // namespace NKikimr::NGRpcService diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make index b415c68906..d6e169c22a 100644 --- a/ydb/core/grpc_services/ya.make +++ b/ydb/core/grpc_services/ya.make @@ -113,6 +113,7 @@ PEERDIR( ydb/library/dynumber ydb/library/mkql_proto ydb/library/persqueue/topic_parser + ydb/library/yql/parser/pg_wrapper/interface ydb/library/yql/public/types ydb/public/api/grpc/draft ydb/public/api/protos diff --git a/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt index c774741560..b4756b8e8f 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt @@ -10,9 +10,11 @@ add_executable(ydb-core-kqp-ut-opt) target_compile_options(ydb-core-kqp-ut-opt PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION + $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) target_include_directories(ydb-core-kqp-ut-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include ) target_link_libraries(ydb-core-kqp-ut-opt PUBLIC contrib-libs-cxxsupp @@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC cpp-testing-unittest_main ydb-core-kqp kqp-ut-common - yql-sql-pg_dummy + yql-sql-pg re2_udf ) target_link_options(ydb-core-kqp-ut-opt PRIVATE diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt index 9ce7258b7d..7e41ceeb34 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt @@ -10,9 +10,11 @@ add_executable(ydb-core-kqp-ut-opt) target_compile_options(ydb-core-kqp-ut-opt PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION + $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) target_include_directories(ydb-core-kqp-ut-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include ) target_link_libraries(ydb-core-kqp-ut-opt PUBLIC contrib-libs-linux-headers @@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC cpp-testing-unittest_main ydb-core-kqp kqp-ut-common - yql-sql-pg_dummy + yql-sql-pg re2_udf ) target_link_options(ydb-core-kqp-ut-opt PRIVATE diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt index ec94e29aa1..0ad33099d5 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt @@ -10,9 +10,11 @@ add_executable(ydb-core-kqp-ut-opt) target_compile_options(ydb-core-kqp-ut-opt PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION + $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) target_include_directories(ydb-core-kqp-ut-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include ) target_link_libraries(ydb-core-kqp-ut-opt PUBLIC contrib-libs-linux-headers @@ -22,7 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC cpp-testing-unittest_main ydb-core-kqp kqp-ut-common - yql-sql-pg_dummy + yql-sql-pg re2_udf ) target_link_options(ydb-core-kqp-ut-opt PRIVATE diff --git a/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt index cd8ab8da24..dd2804c278 100644 --- a/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt @@ -10,9 +10,11 @@ add_executable(ydb-core-kqp-ut-opt) target_compile_options(ydb-core-kqp-ut-opt PRIVATE -DUSE_CURRENT_UDF_ABI_VERSION + $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything> ) target_include_directories(ydb-core-kqp-ut-opt PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/kqp + ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include ) target_link_libraries(ydb-core-kqp-ut-opt PUBLIC contrib-libs-cxxsupp @@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC cpp-testing-unittest_main ydb-core-kqp kqp-ut-common - yql-sql-pg_dummy + yql-sql-pg re2_udf ) target_sources(ydb-core-kqp-ut-opt PRIVATE diff --git a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp index 4908d03b1b..f592fccd51 100644 --- a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp +++ b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp @@ -1,6 +1,58 @@ #include <ydb/core/kqp/ut/common/kqp_ut_common.h> +#include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h> +#include <ydb/library/yql/utils/log/log.h> #include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h> +#include <util/system/env.h> + + +extern "C" { +#include "postgres.h" +#include "catalog/pg_type_d.h" +} + +namespace +{ + +using namespace NYdb; +using namespace NYdb::NTable; + +struct ReadRowsPgParam +{ + ui32 TypeId; + TString TypeMod; + TString ValueContent; +}; + +} // namespace + +namespace +{ + +using namespace NYdb; +using namespace NYdb::NTable; + +template <typename T> +void ValidateSinglePgRowResult(T& result, const TString& columnName, const TPgValue& expectedValue) { + TResultSetParser parser{result.GetResultSet()}; + UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 1); + + bool gotRows = false; + while( parser.TryNextRow()) { + gotRows = true; + auto& col = parser.ColumnParser(columnName); + + const auto pgValue = col.GetPg(); + UNIT_ASSERT_VALUES_EQUAL(pgValue.Content_, expectedValue.Content_); + UNIT_ASSERT_VALUES_EQUAL(pgValue.Kind_, expectedValue.Kind_); + UNIT_ASSERT_VALUES_EQUAL(pgValue.PgType_.TypeName, expectedValue.PgType_.TypeName); + UNIT_ASSERT_VALUES_EQUAL(pgValue.PgType_.TypeModifier, expectedValue.PgType_.TypeModifier); + } + Y_ENSURE(gotRows, "empty select result"); +} + +} // namespace namespace NKikimr::NKqp { @@ -107,6 +159,205 @@ Y_UNIT_TEST_SUITE(KqpKv) { ] )", res); } + + TVector<::ReadRowsPgParam> readRowsPgParams + { + {.TypeId = BOOLOID, .TypeMod={}, .ValueContent="t"}, + {.TypeId = CHAROID, .TypeMod={}, .ValueContent="v"}, + {.TypeId = INT2OID, .TypeMod={}, .ValueContent="42"}, + {.TypeId = INT4OID, .TypeMod={}, .ValueContent="42"}, + {.TypeId = INT8OID, .TypeMod={}, .ValueContent="42"}, + {.TypeId = FLOAT4OID, .TypeMod={}, .ValueContent="42.42"}, + {.TypeId = FLOAT8OID, .TypeMod={}, .ValueContent="42.42"}, + {.TypeId = TEXTOID, .TypeMod={}, .ValueContent="i'm a text"}, + {.TypeId = BPCHAROID, .TypeMod={}, .ValueContent="i'm a text"}, + {.TypeId = VARCHAROID, .TypeMod={}, .ValueContent="i'm a text"}, + {.TypeId = NAMEOID, .TypeMod={}, .ValueContent="i'm a text"}, + {.TypeId = NUMERICOID, .TypeMod={}, .ValueContent="42.42"}, + {.TypeId = MONEYOID, .TypeMod={}, .ValueContent="$42.42"}, + {.TypeId = DATEOID, .TypeMod={}, .ValueContent="1999-01-01"}, + {.TypeId = TIMEOID, .TypeMod={}, .ValueContent="23:59:59.999"}, + {.TypeId = TIMESTAMPOID, .TypeMod={}, .ValueContent="1999-01-01 23:59:59.999"}, + {.TypeId = TIMETZOID, .TypeMod={}, .ValueContent="23:59:59.999+03"}, + {.TypeId = TIMESTAMPTZOID, .TypeMod={}, .ValueContent="1999-01-01 23:59:59.999+00"}, + {.TypeId = INTERVALOID, .TypeMod={}, .ValueContent="1 year 2 mons 3 days 01:02:03"}, + {.TypeId = BITOID, .TypeMod={}, .ValueContent="1011"}, + {.TypeId = VARBITOID, .TypeMod={}, .ValueContent="1011"}, + {.TypeId = POINTOID, .TypeMod={}, .ValueContent="(42,24)"}, + {.TypeId = LINEOID, .TypeMod={}, .ValueContent="{42,24,13}"}, + {.TypeId = LSEGOID, .TypeMod={}, .ValueContent="[(0,0),(42,42)]"}, + {.TypeId = BOXOID, .TypeMod={}, .ValueContent="(42,42),(0,0)"}, + {.TypeId = PATHOID, .TypeMod={}, .ValueContent="((0,0),(42,42),(13,13))"}, + {.TypeId = POLYGONOID, .TypeMod={}, .ValueContent="((0,0),(42,42),(13,13))"}, + {.TypeId = CIRCLEOID, .TypeMod={}, .ValueContent="<(0,0),42>"}, + {.TypeId = INETOID, .TypeMod={}, .ValueContent="127.0.0.1/16"}, + {.TypeId = CIDROID, .TypeMod={}, .ValueContent="16.0.0.0/8"}, + {.TypeId = MACADDROID, .TypeMod={}, .ValueContent="08:00:2b:01:02:03"}, + {.TypeId = MACADDR8OID, .TypeMod={}, .ValueContent="08:00:2b:01:02:03:04:05"}, + {.TypeId = UUIDOID, .TypeMod={}, .ValueContent="00000000-0000-0000-0000-000000000042"}, + {.TypeId = JSONOID, .TypeMod={}, .ValueContent="{\"value\": 42}"}, + {.TypeId = JSONBOID, .TypeMod={}, .ValueContent="{\"value\": 42}"}, + {.TypeId = JSONPATHOID, .TypeMod={}, .ValueContent="($.\"field\"[*] > 42)"}, + {.TypeId = XMLOID, .TypeMod={}, .ValueContent="<tag>42</tag>"}, + {.TypeId = TSQUERYOID, .TypeMod={}, .ValueContent="'a' & 'b1'"}, + {.TypeId = TSVECTOROID, .TypeMod={}, .ValueContent="'cat' 'fat' '|'"}, + {.TypeId = INT2VECTOROID, .TypeMod={}, .ValueContent="42 24 13"}, + {.TypeId = BYTEAOID, .TypeMod={}, .ValueContent="\\x627974656120ff"}, + {.TypeId = BYTEAARRAYOID, .TypeMod={}, .ValueContent="{\"\\\\x61ff\",\"\\\\x6231ff\"}"}, + {.TypeId = BPCHAROID, .TypeMod="4", .ValueContent="abcd"}, + {.TypeId = VARCHAROID, .TypeMod="4", .ValueContent="abcd"}, + {.TypeId = BITOID, .TypeMod="4", .ValueContent="1101"}, + {.TypeId = VARBITOID, .TypeMod="4", .ValueContent="1101"}, + {.TypeId = NUMERICOID, .TypeMod="4", .ValueContent="1234"}, + {.TypeId = TIMEOID, .TypeMod="4", .ValueContent="23:59:59.9999"}, + {.TypeId = TIMETZOID, .TypeMod="4", .ValueContent="23:59:59.9999+00"}, + {.TypeId = TIMESTAMPOID, .TypeMod="4", .ValueContent="1999-01-01 23:59:59.9999"}, + {.TypeId = TIMESTAMPTZOID, .TypeMod="4", .ValueContent="1999-01-01 23:59:59.9999+00"}, + }; + ::ReadRowsPgParam readRowsPgNullParam{.TypeId = BOOLOID, .TypeMod={}, .ValueContent=""}; + + Y_UNIT_TEST(ReadRowsPgValue) { + auto settings = TKikimrSettings() + .SetWithSampleTables(true); + auto kikimr = TKikimrRunner{settings}; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + const auto tableName = "/Root/TestTable"; + const auto keyColumnName = "Key"; + const auto valueColumnName = "Value"; + + const auto testSingle = [&](const ::ReadRowsPgParam& testParam, bool isNull) + { + auto* typeDesc = NPg::TypeDescFromPgTypeId(testParam.TypeId); + UNIT_ASSERT(!!typeDesc); + const auto typeName = NPg::PgTypeNameFromTypeDesc(typeDesc); + const auto& pgType = TPgType(typeName, testParam.TypeMod); + + Cout << Sprintf("TestParam: type: `%s`; mod: `%s`, is null: %s\n", typeName.data(), testParam.TypeMod.data(), isNull ? "+" : "-" ); + + TTableBuilder builder; + builder.AddNonNullableColumn(keyColumnName, EPrimitiveType::Uint64); + builder.SetPrimaryKeyColumn(keyColumnName); + builder.AddNullableColumn(valueColumnName, pgType); + + auto result = session.CreateTable(tableName, builder.Build()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + const ui64 keyValue = 1; + const TPgValue pgValue( + isNull ? TPgValue::VK_NULL : TPgValue::VK_TEXT, + testParam.ValueContent, + pgType + ); + NYdb::TValueBuilder rows; + rows.BeginList(); + rows.AddListItem() + .BeginStruct() + .AddMember(keyColumnName).Uint64(keyValue) + .AddMember(valueColumnName).Pg(pgValue) + .EndStruct(); + rows.EndList(); + auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString()); + + NYdb::TValueBuilder keys; + keys.BeginList(); + keys.AddListItem() + .BeginStruct() + .AddMember(keyColumnName).Uint64(keyValue) + .EndStruct(); + keys.EndList(); + auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync(); + UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString()); + + ValidateSinglePgRowResult(selectResult, valueColumnName, pgValue); + + result = session.DropTable(tableName).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + }; + + for (const auto& testParam: readRowsPgParams) + { + testSingle(testParam, false); + } + testSingle(readRowsPgNullParam, true); + } + + TVector<::ReadRowsPgParam> readRowsPgKeyParams + { + {.TypeId = TEXTOID, .TypeMod={}, .ValueContent="i'm a text"}, + {.TypeId = BITOID, .TypeMod="4", .ValueContent="0110"}, + }; + ::ReadRowsPgParam readRowsPgNullKeyParam{.TypeId = TEXTOID, .TypeMod={}, .ValueContent=""}; + + Y_UNIT_TEST(ReadRowsPgKey) { + auto settings = TKikimrSettings() + .SetWithSampleTables(true); + auto kikimr = TKikimrRunner{settings}; + + auto db = kikimr.GetTableClient(); + auto session = db.CreateSession().GetValueSync().GetSession(); + const auto tableName = "/Root/TestTable"; + const auto keyColumnName = "Key"; + const auto valueColumnName = "Value"; + + const auto testSingle = [&](const ::ReadRowsPgParam& testParam, bool isNull) + { + auto* typeDesc = NPg::TypeDescFromPgTypeId(testParam.TypeId); + UNIT_ASSERT(!!typeDesc); + const auto typeName = NPg::PgTypeNameFromTypeDesc(typeDesc); + const auto& pgType = TPgType(typeName, testParam.TypeMod); + + Cout << Sprintf("TestParam: type: `%s`; mod: `%s`, is null: %s\n", typeName.data(), testParam.TypeMod.data(), isNull ? "+" : "-" ); + + TTableBuilder builder; + builder.AddNullableColumn(keyColumnName, pgType); + builder.SetPrimaryKeyColumn(keyColumnName); + builder.AddNullableColumn(valueColumnName, EPrimitiveType::Uint64); + + auto result = session.CreateTable(tableName, builder.Build()).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + + const TPgValue pgValue( + isNull ? TPgValue::VK_NULL : TPgValue::VK_TEXT, + testParam.ValueContent, + pgType + ); + const ui64 value = 1; + NYdb::TValueBuilder rows; + rows.BeginList(); + rows.AddListItem() + .BeginStruct() + .AddMember(keyColumnName).Pg(pgValue) + .AddMember(valueColumnName).Uint64(value) + .EndStruct(); + rows.EndList(); + auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync(); + UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString()); + + NYdb::TValueBuilder keys; + keys.BeginList(); + keys.AddListItem() + .BeginStruct() + .AddMember(keyColumnName).Pg(pgValue) + .EndStruct(); + keys.EndList(); + auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync(); + UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString()); + + ValidateSinglePgRowResult(selectResult, keyColumnName, pgValue); + + result = session.DropTable(tableName).GetValueSync(); + UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString()); + }; + + for (const auto& testParam: readRowsPgKeyParams) + { + testSingle(testParam, false); + } + testSingle(readRowsPgNullKeyParam, true); + } } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/opt/ya.make b/ydb/core/kqp/ut/opt/ya.make index 18fede554b..f6b800a0b4 100644 --- a/ydb/core/kqp/ut/opt/ya.make +++ b/ydb/core/kqp/ut/opt/ya.make @@ -26,10 +26,16 @@ SRCS( PEERDIR( ydb/core/kqp ydb/core/kqp/ut/common - ydb/library/yql/sql/pg_dummy + ydb/library/yql/sql/pg ydb/library/yql/udfs/common/re2 ) +ADDINCL( + ydb/library/yql/parser/pg_wrapper/postgresql/src/include +) + +NO_COMPILER_WARNINGS() + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/public/sdk/cpp/client/ydb_value/value.cpp b/ydb/public/sdk/cpp/client/ydb_value/value.cpp index 153232692f..ef7a6179b9 100644 --- a/ydb/public/sdk/cpp/client/ydb_value/value.cpp +++ b/ydb/public/sdk/cpp/client/ydb_value/value.cpp @@ -2144,9 +2144,11 @@ public: void Pg(const TPgValue& value) { FillPgType(value.PgType_); - if (value.IsText()) { + if (value.IsNull()) { + GetValue().set_null_flag_value(::google::protobuf::NULL_VALUE); + } else if (value.IsText()) { GetValue().set_text_value(value.Content_); - } else if (!value.IsNull()) { + } else { GetValue().set_bytes_value(value.Content_); } } |