aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authortheqwertiest <theqwertiest@yandex-team.com>2023-06-13 12:48:36 +0300
committertheqwertiest <theqwertiest@yandex-team.com>2023-06-13 12:48:36 +0300
commit27917da41ffbe0ad4f30d2824d9492798e57464d (patch)
treef7091c53c77d24c59fbb971db704b74c57dc2a4e
parent6a63f5c36674a7d5656ab2e5f008635bad2247f4 (diff)
downloadydb-27917da41ffbe0ad4f30d2824d9492798e57464d.tar.gz
feat rps-read-rows: add pg type support
Drive-by: - fix memory leaks - fix bug with pg null handling
-rw-r--r--ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/grpc_services/rpc_read_rows.cpp112
-rw-r--r--ydb/core/grpc_services/ya.make1
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt4
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt4
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt4
-rw-r--r--ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt4
-rw-r--r--ydb/core/kqp/ut/opt/kqp_kv_ut.cpp251
-rw-r--r--ydb/core/kqp/ut/opt/ya.make8
-rw-r--r--ydb/public/sdk/cpp/client/ydb_value/value.cpp6
13 files changed, 351 insertions, 47 deletions
diff --git a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
index 9ba2c54c10..c25d9faed5 100644
--- a/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.darwin-x86_64.txt
@@ -54,6 +54,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-library-dynumber
ydb-library-mkql_proto
library-persqueue-topic_parser
+ parser-pg_wrapper-interface
yql-public-types
api-grpc-draft
api-protos
diff --git a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
index 94522095a3..8d1841680a 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-aarch64.txt
@@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-library-dynumber
ydb-library-mkql_proto
library-persqueue-topic_parser
+ parser-pg_wrapper-interface
yql-public-types
api-grpc-draft
api-protos
diff --git a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
index 94522095a3..8d1841680a 100644
--- a/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.linux-x86_64.txt
@@ -55,6 +55,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-library-dynumber
ydb-library-mkql_proto
library-persqueue-topic_parser
+ parser-pg_wrapper-interface
yql-public-types
api-grpc-draft
api-protos
diff --git a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
index 9ba2c54c10..c25d9faed5 100644
--- a/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/grpc_services/CMakeLists.windows-x86_64.txt
@@ -54,6 +54,7 @@ target_link_libraries(ydb-core-grpc_services PUBLIC
ydb-library-dynumber
ydb-library-mkql_proto
library-persqueue-topic_parser
+ parser-pg_wrapper-interface
yql-public-types
api-grpc-draft
api-protos
diff --git a/ydb/core/grpc_services/rpc_read_rows.cpp b/ydb/core/grpc_services/rpc_read_rows.cpp
index 4772547424..92b8bca53f 100644
--- a/ydb/core/grpc_services/rpc_read_rows.cpp
+++ b/ydb/core/grpc_services/rpc_read_rows.cpp
@@ -10,6 +10,7 @@
#include <ydb/core/tx/tx_proxy/upload_rows_common_impl.h>
#include <ydb/core/ydb_convert/ydb_convert.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/type_desc.h>
#include <ydb/library/yql/public/udf/udf_types.h>
#include <ydb/library/yql/minikql/dom/yson.h>
#include <ydb/library/yql/minikql/dom/json.h>
@@ -37,17 +38,17 @@ TVector<std::pair<TString, Ydb::Type>> GetRequestColumns(const Ydb::Table::ReadR
TVector<std::pair<TString, Ydb::Type>> result;
result.reserve(rowFields.size());
- for (i32 pos = 0; pos < rowFields.size(); ++pos) {
- const auto& name = rowFields[pos].Getname();
- const auto& typeInProto = rowFields[pos].type().has_optional_type() ?
- rowFields[pos].type().optional_type().item() : rowFields[pos].type();
+ for (const auto& rowField: rowFields) {
+ const auto& name = rowField.Getname();
+ const auto& typeInProto = rowField.type().has_optional_type() ?
+ rowField.type().optional_type().item() : rowField.type();
result.emplace_back(name, typeInProto);
}
return result;
}
-}
+} // namespace
using TEvReadRowsRequest = TGrpcRequestNoOperationCall<Ydb::Table::ReadRowsRequest, Ydb::Table::ReadRowsResponse>;
@@ -61,8 +62,8 @@ class TReadRowsRPC : public TActorBootstrapped<TReadRowsRPC> {
static constexpr TDuration DEFAULT_TIMEOUT = TDuration::Seconds(60);
public:
- explicit TReadRowsRPC(IRequestNoOpCtx* request)
- : Request(request)
+ explicit TReadRowsRPC(std::unique_ptr<IRequestNoOpCtx> request)
+ : Request(std::move(request))
, PipeCache(MakePipePeNodeCacheID(true))
{}
@@ -79,11 +80,15 @@ public:
THashSet<TString> keyColumnsLeft;
THashMap<TString, ui32> columnByName;
- for (const auto& [_, colInfo] : entry.Columns) {
+ for (const auto& [colId, colInfo] : entry.Columns) {
ui32 id = colInfo.Id;
- auto& name = colInfo.Name;
- auto& type = colInfo.PType;
- ColumnsMeta.emplace_back(TColumnMeta{name, type, id});
+ const auto& name = colInfo.Name;
+ const auto& type = colInfo.PType;
+
+ ColumnsMeta.emplace_back(TColumnMeta{.Id = id,
+ .Name = name,
+ .Type = type,
+ .PTypeMod = colInfo.PTypeMod});
columnByName[name] = id;
i32 keyOrder = colInfo.KeyOrder;
@@ -93,10 +98,6 @@ public:
KeyColumnTypes[keyOrder] = type;
keyColumnsLeft.insert(name);
}
- if (colInfo.PType.GetTypeId() == NScheme::NTypeIds::Pg) {
- errorMessage = "Pg types are not supported yet";
- return false;
- }
}
KeyColumnPositions.resize(KeyColumnTypes.size());
@@ -112,13 +113,13 @@ public:
}
i32 typmod = -1;
ui32 colId = *cp;
- auto& ci = *entry.Columns.FindPtr(colId);
+ auto& colInfo = *entry.Columns.FindPtr(colId);
const auto& typeInProto = reqColumns[pos].second;
if (typeInProto.type_id()) {
// TODO check Arrow types
- } else if (typeInProto.has_decimal_type() && ci.PType.GetTypeId() == NScheme::NTypeIds::Decimal) {
+ } else if (typeInProto.has_decimal_type() && colInfo.PType.GetTypeId() == NScheme::NTypeIds::Decimal) {
int precision = typeInProto.decimal_type().precision();
int scale = typeInProto.decimal_type().scale();
if (precision != NScheme::DECIMAL_PRECISION || scale != NScheme::DECIMAL_SCALE) {
@@ -137,18 +138,20 @@ public:
name.c_str(), typeName.c_str());
return false;
}
- auto typeInRequest = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
- if (typeInRequest != ci.PType) {
+
+ const auto typeInRequest = NScheme::TTypeInfo(NScheme::NTypeIds::Pg, typeDesc);
+ if (typeInRequest != colInfo.PType) {
errorMessage = Sprintf("Type mismatch for column %s: expected %s, got %s",
- name.c_str(), NScheme::TypeName(ci.PType).c_str(),
+ name.c_str(), NScheme::TypeName(colInfo.PType).c_str(),
NScheme::TypeName(typeInRequest).c_str());
return false;
}
- if (!ci.PTypeMod.empty() && NPg::TypeDescNeedsCoercion(typeDesc)) {
- auto result = NPg::BinaryTypeModFromTextTypeMod(ci.PTypeMod, typeDesc);
+
+ if (!colInfo.PTypeMod.empty() && NPg::TypeDescNeedsCoercion(typeDesc)) {
+ const auto result = NPg::BinaryTypeModFromTextTypeMod(colInfo.PTypeMod, typeDesc);
if (result.Error) {
errorMessage = Sprintf("Invalid typemod for column %s: type %s, error %s",
- name.c_str(), NScheme::TypeName(ci.PType, ci.PTypeMod).c_str(),
+ name.c_str(), NScheme::TypeName(colInfo.PType, colInfo.PTypeMod).c_str(),
result.Error->c_str());
return false;
}
@@ -156,15 +159,15 @@ public:
}
} else {
errorMessage = Sprintf("Unexpected type for column %s: expected %s",
- name.c_str(), NScheme::TypeName(ci.PType).c_str());
+ name.c_str(), NScheme::TypeName(colInfo.PType).c_str());
return false;
}
- bool notNull = entry.NotNullColumns.contains(ci.Name);
+ bool notNull = entry.NotNullColumns.contains(colInfo.Name);
- if (ci.KeyOrder != -1) {
- KeyColumnPositions[ci.KeyOrder] = NTxProxy::TFieldDescription{ci.Id, ci.Name, (ui32)pos, ci.PType, typmod, notNull};
- keyColumnsLeft.erase(ci.Name);
+ if (colInfo.KeyOrder != -1) {
+ KeyColumnPositions[colInfo.KeyOrder] = NTxProxy::TFieldDescription{colInfo.Id, colInfo.Name, (ui32)pos, colInfo.PType, typmod, notNull};
+ keyColumnsLeft.erase(colInfo.Name);
}
}
@@ -329,17 +332,17 @@ public:
Sprintf("Table '%s' is a system view. ReadRows is not supported.", GetTable().c_str()));
}
- auto* resolveNamesResult = ev->Get()->Request.Release();
+ auto& resolveNamesResult = ev->Get()->Request;
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST,
"TReadRowsRPC going to create keys to read from proto: " << GetProto()->DebugString());
TString errorMessage;
- if (!CheckAccess(resolveNamesResult, errorMessage)) {
+ if (!CheckAccess(resolveNamesResult.Get(), errorMessage)) {
return ReplyWithError(Ydb::StatusIds::UNAUTHORIZED, errorMessage);
}
- if (!BuildSchema(resolveNamesResult, errorMessage)) {
+ if (!BuildSchema(resolveNamesResult.Get(), errorMessage)) {
return ReplyWithError(Ydb::StatusIds::SCHEME_ERROR, errorMessage);
}
if (entry.Kind != NSchemeCache::TSchemeCacheNavigate::KindTable) {
@@ -351,7 +354,7 @@ public:
return;
}
- ResolveShards(resolveNamesResult);
+ ResolveShards(resolveNamesResult.Get());
}
void ResolveShards(NSchemeCache::TSchemeCacheNavigate* resolveNamesResult) {
@@ -359,9 +362,9 @@ public:
// We are going to request only key columns
TVector<TKeyDesc::TColumnOp> columns;
- for (const auto& [_, ci] : entry.Columns) {
- if (ci.KeyOrder != -1) {
- TKeyDesc::TColumnOp op = { ci.Id, TKeyDesc::EColumnOperation::Set, ci.PType, 0, 0 };
+ for (const auto& [colId, colInfo] : entry.Columns) {
+ if (colInfo.KeyOrder != -1) {
+ TKeyDesc::TColumnOp op = { colInfo.Id, TKeyDesc::EColumnOperation::Set, colInfo.PType, 0, 0 };
columns.push_back(op);
}
}
@@ -457,8 +460,23 @@ public:
NKqp::TProgressStatEntry stats;
auto& ioStats = stats.ReadIOStat;
+ const auto getPgTypeFromColMeta = [](const auto &colMeta) {
+ return NYdb::TPgType(NPg::PgTypeNameFromTypeDesc(colMeta.Type.GetTypeDesc()),
+ colMeta.PTypeMod);
+ };
+
+ const auto getTypeFromColMeta = [&](const auto &colMeta) {
+ if (colMeta.Type.GetTypeId() == NScheme::NTypeIds::Pg) {
+ return NYdb::TTypeBuilder().Pg(getPgTypeFromColMeta(colMeta)).Build();
+ } else {
+ return NYdb::TTypeBuilder()
+ .Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId())
+ .Build();
+ }
+ };
+
for (const auto& colMeta : ColumnsMeta) {
- auto type = NYdb::TTypeBuilder().Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId()).Build();
+ const auto type = getTypeFromColMeta(colMeta);
auto* col = resultSet->Addcolumns();
*col->mutable_type() = NYdb::TProtoAccessor::GetProto(type);
*col->mutable_name() = colMeta.Name;
@@ -471,13 +489,26 @@ public:
vb.BeginStruct();
ui64 sz = 0;
for (const auto& colMeta : ColumnsMeta) {
- auto type = NYdb::TTypeBuilder().Primitive((NYdb::EPrimitiveType)colMeta.Type.GetTypeId()).Build();
+ const auto type = getTypeFromColMeta(colMeta);
LOG_DEBUG_S(TlsActivationContext->AsActorContext(), NKikimrServices::RPC_REQUEST, "TReadRowsRPC "
<< " name: " << colMeta.Name
);
const auto& cell = row[colMeta.Id - 1];
vb.AddMember(colMeta.Name);
- ProtoValueFromCell(vb, colMeta.Type, cell);
+ if (colMeta.Type.GetTypeId() == NScheme::NTypeIds::Pg)
+ {
+ const auto pgTypeId = NPg::PgTypeIdFromTypeDesc(colMeta.Type.GetTypeDesc());
+ const NYdb::TPgValue pgValue{
+ cell.IsNull() ? NYdb::TPgValue::VK_NULL : NYdb::TPgValue::VK_TEXT,
+ NPg::PgNativeTextFromNativeBinary({cell.AsBuf().data(), cell.AsBuf().size()}, pgTypeId).Str,
+ getPgTypeFromColMeta(colMeta)
+ };
+ vb.Pg(pgValue);
+ }
+ else
+ {
+ ProtoValueFromCell(vb, colMeta.Type, cell);
+ }
sz += cell.Size();
}
vb.EndStruct();
@@ -554,9 +585,10 @@ private:
TVector<NTxProxy::TFieldDescription> KeyColumnPositions;
TVector<NScheme::TTypeInfo> KeyColumnTypes;
struct TColumnMeta {
+ ui32 Id;
TString Name;
NScheme::TTypeInfo Type;
- ui32 Id;
+ TString PTypeMod;
};
TVector<TColumnMeta> ColumnsMeta;
@@ -569,7 +601,7 @@ private:
};
void DoReadRowsRequest(std::unique_ptr<IRequestNoOpCtx> p, const IFacilityProvider& f) {
- f.RegisterActor(new TReadRowsRPC(p.release()));
+ f.RegisterActor(new TReadRowsRPC(std::move(p)));
}
} // namespace NKikimr::NGRpcService
diff --git a/ydb/core/grpc_services/ya.make b/ydb/core/grpc_services/ya.make
index b415c68906..d6e169c22a 100644
--- a/ydb/core/grpc_services/ya.make
+++ b/ydb/core/grpc_services/ya.make
@@ -113,6 +113,7 @@ PEERDIR(
ydb/library/dynumber
ydb/library/mkql_proto
ydb/library/persqueue/topic_parser
+ ydb/library/yql/parser/pg_wrapper/interface
ydb/library/yql/public/types
ydb/public/api/grpc/draft
ydb/public/api/protos
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt
index c774741560..b4756b8e8f 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.darwin-x86_64.txt
@@ -10,9 +10,11 @@
add_executable(ydb-core-kqp-ut-opt)
target_compile_options(ydb-core-kqp-ut-opt PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
+ $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
target_include_directories(ydb-core-kqp-ut-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include
)
target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
contrib-libs-cxxsupp
@@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
cpp-testing-unittest_main
ydb-core-kqp
kqp-ut-common
- yql-sql-pg_dummy
+ yql-sql-pg
re2_udf
)
target_link_options(ydb-core-kqp-ut-opt PRIVATE
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
index 9ce7258b7d..7e41ceeb34 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-aarch64.txt
@@ -10,9 +10,11 @@
add_executable(ydb-core-kqp-ut-opt)
target_compile_options(ydb-core-kqp-ut-opt PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
+ $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
target_include_directories(ydb-core-kqp-ut-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include
)
target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
contrib-libs-linux-headers
@@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
cpp-testing-unittest_main
ydb-core-kqp
kqp-ut-common
- yql-sql-pg_dummy
+ yql-sql-pg
re2_udf
)
target_link_options(ydb-core-kqp-ut-opt PRIVATE
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt
index ec94e29aa1..0ad33099d5 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.linux-x86_64.txt
@@ -10,9 +10,11 @@
add_executable(ydb-core-kqp-ut-opt)
target_compile_options(ydb-core-kqp-ut-opt PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
+ $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
target_include_directories(ydb-core-kqp-ut-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include
)
target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
contrib-libs-linux-headers
@@ -22,7 +24,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
cpp-testing-unittest_main
ydb-core-kqp
kqp-ut-common
- yql-sql-pg_dummy
+ yql-sql-pg
re2_udf
)
target_link_options(ydb-core-kqp-ut-opt PRIVATE
diff --git a/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt
index cd8ab8da24..dd2804c278 100644
--- a/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/ut/opt/CMakeLists.windows-x86_64.txt
@@ -10,9 +10,11 @@
add_executable(ydb-core-kqp-ut-opt)
target_compile_options(ydb-core-kqp-ut-opt PRIVATE
-DUSE_CURRENT_UDF_ABI_VERSION
+ $<IF:$<CXX_COMPILER_ID:MSVC>,,-Wno-everything>
)
target_include_directories(ydb-core-kqp-ut-opt PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/kqp
+ ${CMAKE_SOURCE_DIR}/ydb/library/yql/parser/pg_wrapper/postgresql/src/include
)
target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
contrib-libs-cxxsupp
@@ -21,7 +23,7 @@ target_link_libraries(ydb-core-kqp-ut-opt PUBLIC
cpp-testing-unittest_main
ydb-core-kqp
kqp-ut-common
- yql-sql-pg_dummy
+ yql-sql-pg
re2_udf
)
target_sources(ydb-core-kqp-ut-opt PRIVATE
diff --git a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp
index 4908d03b1b..f592fccd51 100644
--- a/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp
+++ b/ydb/core/kqp/ut/opt/kqp_kv_ut.cpp
@@ -1,6 +1,58 @@
#include <ydb/core/kqp/ut/common/kqp_ut_common.h>
+#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+#include <ydb/library/yql/parser/pg_wrapper/interface/codec.h>
+#include <ydb/library/yql/utils/log/log.h>
#include <ydb/public/sdk/cpp/client/ydb_proto/accessor.h>
+#include <util/system/env.h>
+
+
+extern "C" {
+#include "postgres.h"
+#include "catalog/pg_type_d.h"
+}
+
+namespace
+{
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+struct ReadRowsPgParam
+{
+ ui32 TypeId;
+ TString TypeMod;
+ TString ValueContent;
+};
+
+} // namespace
+
+namespace
+{
+
+using namespace NYdb;
+using namespace NYdb::NTable;
+
+template <typename T>
+void ValidateSinglePgRowResult(T& result, const TString& columnName, const TPgValue& expectedValue) {
+ TResultSetParser parser{result.GetResultSet()};
+ UNIT_ASSERT_VALUES_EQUAL(parser.RowsCount(), 1);
+
+ bool gotRows = false;
+ while( parser.TryNextRow()) {
+ gotRows = true;
+ auto& col = parser.ColumnParser(columnName);
+
+ const auto pgValue = col.GetPg();
+ UNIT_ASSERT_VALUES_EQUAL(pgValue.Content_, expectedValue.Content_);
+ UNIT_ASSERT_VALUES_EQUAL(pgValue.Kind_, expectedValue.Kind_);
+ UNIT_ASSERT_VALUES_EQUAL(pgValue.PgType_.TypeName, expectedValue.PgType_.TypeName);
+ UNIT_ASSERT_VALUES_EQUAL(pgValue.PgType_.TypeModifier, expectedValue.PgType_.TypeModifier);
+ }
+ Y_ENSURE(gotRows, "empty select result");
+}
+
+} // namespace
namespace NKikimr::NKqp {
@@ -107,6 +159,205 @@ Y_UNIT_TEST_SUITE(KqpKv) {
]
)", res);
}
+
+ TVector<::ReadRowsPgParam> readRowsPgParams
+ {
+ {.TypeId = BOOLOID, .TypeMod={}, .ValueContent="t"},
+ {.TypeId = CHAROID, .TypeMod={}, .ValueContent="v"},
+ {.TypeId = INT2OID, .TypeMod={}, .ValueContent="42"},
+ {.TypeId = INT4OID, .TypeMod={}, .ValueContent="42"},
+ {.TypeId = INT8OID, .TypeMod={}, .ValueContent="42"},
+ {.TypeId = FLOAT4OID, .TypeMod={}, .ValueContent="42.42"},
+ {.TypeId = FLOAT8OID, .TypeMod={}, .ValueContent="42.42"},
+ {.TypeId = TEXTOID, .TypeMod={}, .ValueContent="i'm a text"},
+ {.TypeId = BPCHAROID, .TypeMod={}, .ValueContent="i'm a text"},
+ {.TypeId = VARCHAROID, .TypeMod={}, .ValueContent="i'm a text"},
+ {.TypeId = NAMEOID, .TypeMod={}, .ValueContent="i'm a text"},
+ {.TypeId = NUMERICOID, .TypeMod={}, .ValueContent="42.42"},
+ {.TypeId = MONEYOID, .TypeMod={}, .ValueContent="$42.42"},
+ {.TypeId = DATEOID, .TypeMod={}, .ValueContent="1999-01-01"},
+ {.TypeId = TIMEOID, .TypeMod={}, .ValueContent="23:59:59.999"},
+ {.TypeId = TIMESTAMPOID, .TypeMod={}, .ValueContent="1999-01-01 23:59:59.999"},
+ {.TypeId = TIMETZOID, .TypeMod={}, .ValueContent="23:59:59.999+03"},
+ {.TypeId = TIMESTAMPTZOID, .TypeMod={}, .ValueContent="1999-01-01 23:59:59.999+00"},
+ {.TypeId = INTERVALOID, .TypeMod={}, .ValueContent="1 year 2 mons 3 days 01:02:03"},
+ {.TypeId = BITOID, .TypeMod={}, .ValueContent="1011"},
+ {.TypeId = VARBITOID, .TypeMod={}, .ValueContent="1011"},
+ {.TypeId = POINTOID, .TypeMod={}, .ValueContent="(42,24)"},
+ {.TypeId = LINEOID, .TypeMod={}, .ValueContent="{42,24,13}"},
+ {.TypeId = LSEGOID, .TypeMod={}, .ValueContent="[(0,0),(42,42)]"},
+ {.TypeId = BOXOID, .TypeMod={}, .ValueContent="(42,42),(0,0)"},
+ {.TypeId = PATHOID, .TypeMod={}, .ValueContent="((0,0),(42,42),(13,13))"},
+ {.TypeId = POLYGONOID, .TypeMod={}, .ValueContent="((0,0),(42,42),(13,13))"},
+ {.TypeId = CIRCLEOID, .TypeMod={}, .ValueContent="<(0,0),42>"},
+ {.TypeId = INETOID, .TypeMod={}, .ValueContent="127.0.0.1/16"},
+ {.TypeId = CIDROID, .TypeMod={}, .ValueContent="16.0.0.0/8"},
+ {.TypeId = MACADDROID, .TypeMod={}, .ValueContent="08:00:2b:01:02:03"},
+ {.TypeId = MACADDR8OID, .TypeMod={}, .ValueContent="08:00:2b:01:02:03:04:05"},
+ {.TypeId = UUIDOID, .TypeMod={}, .ValueContent="00000000-0000-0000-0000-000000000042"},
+ {.TypeId = JSONOID, .TypeMod={}, .ValueContent="{\"value\": 42}"},
+ {.TypeId = JSONBOID, .TypeMod={}, .ValueContent="{\"value\": 42}"},
+ {.TypeId = JSONPATHOID, .TypeMod={}, .ValueContent="($.\"field\"[*] > 42)"},
+ {.TypeId = XMLOID, .TypeMod={}, .ValueContent="<tag>42</tag>"},
+ {.TypeId = TSQUERYOID, .TypeMod={}, .ValueContent="'a' & 'b1'"},
+ {.TypeId = TSVECTOROID, .TypeMod={}, .ValueContent="'cat' 'fat' '|'"},
+ {.TypeId = INT2VECTOROID, .TypeMod={}, .ValueContent="42 24 13"},
+ {.TypeId = BYTEAOID, .TypeMod={}, .ValueContent="\\x627974656120ff"},
+ {.TypeId = BYTEAARRAYOID, .TypeMod={}, .ValueContent="{\"\\\\x61ff\",\"\\\\x6231ff\"}"},
+ {.TypeId = BPCHAROID, .TypeMod="4", .ValueContent="abcd"},
+ {.TypeId = VARCHAROID, .TypeMod="4", .ValueContent="abcd"},
+ {.TypeId = BITOID, .TypeMod="4", .ValueContent="1101"},
+ {.TypeId = VARBITOID, .TypeMod="4", .ValueContent="1101"},
+ {.TypeId = NUMERICOID, .TypeMod="4", .ValueContent="1234"},
+ {.TypeId = TIMEOID, .TypeMod="4", .ValueContent="23:59:59.9999"},
+ {.TypeId = TIMETZOID, .TypeMod="4", .ValueContent="23:59:59.9999+00"},
+ {.TypeId = TIMESTAMPOID, .TypeMod="4", .ValueContent="1999-01-01 23:59:59.9999"},
+ {.TypeId = TIMESTAMPTZOID, .TypeMod="4", .ValueContent="1999-01-01 23:59:59.9999+00"},
+ };
+ ::ReadRowsPgParam readRowsPgNullParam{.TypeId = BOOLOID, .TypeMod={}, .ValueContent=""};
+
+ Y_UNIT_TEST(ReadRowsPgValue) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(true);
+ auto kikimr = TKikimrRunner{settings};
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ const auto tableName = "/Root/TestTable";
+ const auto keyColumnName = "Key";
+ const auto valueColumnName = "Value";
+
+ const auto testSingle = [&](const ::ReadRowsPgParam& testParam, bool isNull)
+ {
+ auto* typeDesc = NPg::TypeDescFromPgTypeId(testParam.TypeId);
+ UNIT_ASSERT(!!typeDesc);
+ const auto typeName = NPg::PgTypeNameFromTypeDesc(typeDesc);
+ const auto& pgType = TPgType(typeName, testParam.TypeMod);
+
+ Cout << Sprintf("TestParam: type: `%s`; mod: `%s`, is null: %s\n", typeName.data(), testParam.TypeMod.data(), isNull ? "+" : "-" );
+
+ TTableBuilder builder;
+ builder.AddNonNullableColumn(keyColumnName, EPrimitiveType::Uint64);
+ builder.SetPrimaryKeyColumn(keyColumnName);
+ builder.AddNullableColumn(valueColumnName, pgType);
+
+ auto result = session.CreateTable(tableName, builder.Build()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ const ui64 keyValue = 1;
+ const TPgValue pgValue(
+ isNull ? TPgValue::VK_NULL : TPgValue::VK_TEXT,
+ testParam.ValueContent,
+ pgType
+ );
+ NYdb::TValueBuilder rows;
+ rows.BeginList();
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember(keyColumnName).Uint64(keyValue)
+ .AddMember(valueColumnName).Pg(pgValue)
+ .EndStruct();
+ rows.EndList();
+ auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync();
+ UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
+
+ NYdb::TValueBuilder keys;
+ keys.BeginList();
+ keys.AddListItem()
+ .BeginStruct()
+ .AddMember(keyColumnName).Uint64(keyValue)
+ .EndStruct();
+ keys.EndList();
+ auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync();
+ UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString());
+
+ ValidateSinglePgRowResult(selectResult, valueColumnName, pgValue);
+
+ result = session.DropTable(tableName).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ };
+
+ for (const auto& testParam: readRowsPgParams)
+ {
+ testSingle(testParam, false);
+ }
+ testSingle(readRowsPgNullParam, true);
+ }
+
+ TVector<::ReadRowsPgParam> readRowsPgKeyParams
+ {
+ {.TypeId = TEXTOID, .TypeMod={}, .ValueContent="i'm a text"},
+ {.TypeId = BITOID, .TypeMod="4", .ValueContent="0110"},
+ };
+ ::ReadRowsPgParam readRowsPgNullKeyParam{.TypeId = TEXTOID, .TypeMod={}, .ValueContent=""};
+
+ Y_UNIT_TEST(ReadRowsPgKey) {
+ auto settings = TKikimrSettings()
+ .SetWithSampleTables(true);
+ auto kikimr = TKikimrRunner{settings};
+
+ auto db = kikimr.GetTableClient();
+ auto session = db.CreateSession().GetValueSync().GetSession();
+ const auto tableName = "/Root/TestTable";
+ const auto keyColumnName = "Key";
+ const auto valueColumnName = "Value";
+
+ const auto testSingle = [&](const ::ReadRowsPgParam& testParam, bool isNull)
+ {
+ auto* typeDesc = NPg::TypeDescFromPgTypeId(testParam.TypeId);
+ UNIT_ASSERT(!!typeDesc);
+ const auto typeName = NPg::PgTypeNameFromTypeDesc(typeDesc);
+ const auto& pgType = TPgType(typeName, testParam.TypeMod);
+
+ Cout << Sprintf("TestParam: type: `%s`; mod: `%s`, is null: %s\n", typeName.data(), testParam.TypeMod.data(), isNull ? "+" : "-" );
+
+ TTableBuilder builder;
+ builder.AddNullableColumn(keyColumnName, pgType);
+ builder.SetPrimaryKeyColumn(keyColumnName);
+ builder.AddNullableColumn(valueColumnName, EPrimitiveType::Uint64);
+
+ auto result = session.CreateTable(tableName, builder.Build()).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+
+ const TPgValue pgValue(
+ isNull ? TPgValue::VK_NULL : TPgValue::VK_TEXT,
+ testParam.ValueContent,
+ pgType
+ );
+ const ui64 value = 1;
+ NYdb::TValueBuilder rows;
+ rows.BeginList();
+ rows.AddListItem()
+ .BeginStruct()
+ .AddMember(keyColumnName).Pg(pgValue)
+ .AddMember(valueColumnName).Uint64(value)
+ .EndStruct();
+ rows.EndList();
+ auto upsertResult = db.BulkUpsert(tableName, rows.Build()).GetValueSync();
+ UNIT_ASSERT_C(upsertResult.IsSuccess(), upsertResult.GetIssues().ToString());
+
+ NYdb::TValueBuilder keys;
+ keys.BeginList();
+ keys.AddListItem()
+ .BeginStruct()
+ .AddMember(keyColumnName).Pg(pgValue)
+ .EndStruct();
+ keys.EndList();
+ auto selectResult = db.ReadRows(tableName, keys.Build()).GetValueSync();
+ UNIT_ASSERT_C(selectResult.IsSuccess(), selectResult.GetIssues().ToString());
+
+ ValidateSinglePgRowResult(selectResult, keyColumnName, pgValue);
+
+ result = session.DropTable(tableName).GetValueSync();
+ UNIT_ASSERT_C(result.IsSuccess(), result.GetIssues().ToString());
+ };
+
+ for (const auto& testParam: readRowsPgKeyParams)
+ {
+ testSingle(testParam, false);
+ }
+ testSingle(readRowsPgNullKeyParam, true);
+ }
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/opt/ya.make b/ydb/core/kqp/ut/opt/ya.make
index 18fede554b..f6b800a0b4 100644
--- a/ydb/core/kqp/ut/opt/ya.make
+++ b/ydb/core/kqp/ut/opt/ya.make
@@ -26,10 +26,16 @@ SRCS(
PEERDIR(
ydb/core/kqp
ydb/core/kqp/ut/common
- ydb/library/yql/sql/pg_dummy
+ ydb/library/yql/sql/pg
ydb/library/yql/udfs/common/re2
)
+ADDINCL(
+ ydb/library/yql/parser/pg_wrapper/postgresql/src/include
+)
+
+NO_COMPILER_WARNINGS()
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/ydb/public/sdk/cpp/client/ydb_value/value.cpp b/ydb/public/sdk/cpp/client/ydb_value/value.cpp
index 153232692f..ef7a6179b9 100644
--- a/ydb/public/sdk/cpp/client/ydb_value/value.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_value/value.cpp
@@ -2144,9 +2144,11 @@ public:
void Pg(const TPgValue& value) {
FillPgType(value.PgType_);
- if (value.IsText()) {
+ if (value.IsNull()) {
+ GetValue().set_null_flag_value(::google::protobuf::NULL_VALUE);
+ } else if (value.IsText()) {
GetValue().set_text_value(value.Content_);
- } else if (!value.IsNull()) {
+ } else {
GetValue().set_bytes_value(value.Content_);
}
}