aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorSergey Uzhakov <uzhastik@gmail.com>2022-06-17 21:08:04 +0300
committerSergey Uzhakov <uzhastik@gmail.com>2022-06-17 21:08:04 +0300
commitc28bb661e30b37933da2962f11091f9a918211b3 (patch)
treeb379a873df1cc576dd50ad1ab105430c91e19dc3
parent91899b785a0b8408c0265385f98aa95101633440 (diff)
downloadydb-c28bb661e30b37933da2962f11091f9a918211b3.tar.gz
YQ-1154: support pg types in yq results
ref:0c1fb8697ed1f7d184838f77a5bdf85424f68540
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.txt1
-rw-r--r--ydb/core/driver_lib/run/factories.h4
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp4
-rw-r--r--ydb/core/kqp/provider/yql_kikimr_results.cpp3
-rw-r--r--ydb/core/testlib/test_client.cpp3
-rw-r--r--ydb/core/yq/libs/init/init.cpp10
-rw-r--r--ydb/core/yq/libs/init/init.h3
-rw-r--r--ydb/library/mkql_proto/CMakeLists.txt1
-rw-r--r--ydb/library/mkql_proto/mkql_proto.cpp70
-rw-r--r--ydb/library/mkql_proto/mkql_proto_ut.cpp15
-rw-r--r--ydb/library/mkql_proto/protos/minikql.proto7
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp4
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp26
-rw-r--r--ydb/library/yql/providers/common/codec/yql_pg_codec.h1
-rw-r--r--ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp24
15 files changed, 136 insertions, 40 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt
index 0b81a7dff4..dd1326dc76 100644
--- a/ydb/core/driver_lib/run/CMakeLists.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.txt
@@ -108,6 +108,7 @@ target_link_libraries(run PUBLIC
ydb-library-pdisk_io
ydb-library-security
yql-minikql-comp_nodes
+ yql-minikql-computation
udf-service-exception_policy
public-lib-base
lib-deprecated-client
diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h
index 884eb75cc8..76ca0a91d9 100644
--- a/ydb/core/driver_lib/run/factories.h
+++ b/ydb/core/driver_lib/run/factories.h
@@ -15,6 +15,7 @@
#include <ydb/library/pdisk_io/aio.h>
#include <ydb/core/yq/libs/config/protos/audit.pb.h>
+#include <ydb/library/yql/minikql/computation/mkql_computation_node.h>
#include <ydb/library/yql/providers/pq/cm_client/interface/client.h>
#include <library/cpp/actors/core/actorsystem.h>
@@ -36,7 +37,7 @@ struct TModuleFactories {
std::shared_ptr<NKqp::IQueryReplayBackendFactory> QueryReplayBackendFactory;
//
std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory;
- // Can be nullptr. In that case there would be no ability to work with Yandex Logbroker in Yandex Query.
+ // Can be nullptr. In that case there would be no ability to work with internal configuration manager.
NPq::NConfigurationManager::IConnections::TPtr PqCmConnections;
// Export implementation for Data Shards
std::shared_ptr<NDataShard::IExportFactory> DataShardExportFactory;
@@ -59,6 +60,7 @@ struct TModuleFactories {
std::shared_ptr<NSQS::IAuthFactory> SqsAuthFactory;
std::shared_ptr<NHttpProxy::IAuthFactory> DataStreamsAuthFactory;
+ std::vector<NKikimr::NMiniKQL::TComputationNodeFactory> AdditionalComputationNodeFactories;
~TModuleFactories();
};
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index 375b6702ec..8967e8c88c 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -142,6 +142,7 @@
#include <ydb/library/folder_service/proto/config.pb.h>
#include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h>
+#include <ydb/library/yql/parser/pg_wrapper/comp_factory.h>
#include <library/cpp/actors/protos/services_common.pb.h>
@@ -2332,7 +2333,8 @@ void TYandexQueryInitializer::InitializeServices(TActorSystemSetup* setup, const
Factories->FolderServiceFactory,
Factories->YqAuditServiceFactory,
Factories->YdbCredentialProviderFactory,
- IcPort
+ IcPort,
+ Factories->AdditionalComputationNodeFactories
);
}
diff --git a/ydb/core/kqp/provider/yql_kikimr_results.cpp b/ydb/core/kqp/provider/yql_kikimr_results.cpp
index 323df77a3d..b1a5f90ea9 100644
--- a/ydb/core/kqp/provider/yql_kikimr_results.cpp
+++ b/ydb/core/kqp/provider/yql_kikimr_results.cpp
@@ -1212,8 +1212,9 @@ bool IsSameType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType
return IsSameType(actual.GetVariant(), expected.GetVariant());
case NKikimrMiniKQL::ETypeKind::Null:
return true;
+ case NKikimrMiniKQL::ETypeKind::Pg:
+ return actual.GetPg().Getoid() == expected.GetPg().Getoid();
case NKikimrMiniKQL::ETypeKind::Unknown:
- case NKikimrMiniKQL::ETypeKind::Reserved_10:
case NKikimrMiniKQL::ETypeKind::Reserved_11:
case NKikimrMiniKQL::ETypeKind::Reserved_12:
case NKikimrMiniKQL::ETypeKind::Reserved_13:
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 6909485339..a9fb212f6a 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -818,7 +818,8 @@ namespace Tests {
NKikimr::NFolderService::CreateMockFolderServiceActor,
NYq::CreateMockYqAuditServiceActor,
ydbCredFactory,
- /*IcPort = */0
+ /*IcPort = */0,
+ {}
);
NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources);
}
diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp
index 9f2b32a25f..4826f78de9 100644
--- a/ydb/core/yq/libs/init/init.cpp
+++ b/ydb/core/yq/libs/init/init.cpp
@@ -57,7 +57,8 @@ void Init(
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const ui32& icPort
+ ui32 icPort,
+ const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
)
{
Y_VERIFY(iyqSharedResources, "No YQ shared resources created");
@@ -104,11 +105,14 @@ void Init(
auto yqCounters = appData->Counters->GetSubgroup("counters", "yq");
auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager"));
- NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({
+ TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = {
NYql::GetCommonDqFactory(),
NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver),
NKikimr::NMiniKQL::GetYqlFactory()
- });
+ };
+
+ compNodeFactories.insert(compNodeFactories.end(), additionalCompNodeFactories.begin(), additionalCompNodeFactories.end());
+ NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(std::move(compNodeFactories));
NYql::TTaskTransformFactory dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({
NYql::CreateCommonDqTaskTransformFactory(),
diff --git a/ydb/core/yq/libs/init/init.h b/ydb/core/yq/libs/init/init.h
index 8cbc1a6a8c..f826d0398f 100644
--- a/ydb/core/yq/libs/init/init.h
+++ b/ydb/core/yq/libs/init/init.h
@@ -37,7 +37,8 @@ void Init(
const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory,
const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory,
const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory,
- const ui32& icPort
+ ui32 icPort,
+ const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories
);
} // NYq
diff --git a/ydb/library/mkql_proto/CMakeLists.txt b/ydb/library/mkql_proto/CMakeLists.txt
index 7ddbde41f4..ccfabd6744 100644
--- a/ydb/library/mkql_proto/CMakeLists.txt
+++ b/ydb/library/mkql_proto/CMakeLists.txt
@@ -19,6 +19,7 @@ target_link_libraries(ydb-library-mkql_proto PUBLIC
api-protos
library-yql-minikql
yql-minikql-computation
+ providers-common-codec
)
target_sources(ydb-library-mkql_proto PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/mkql_proto/mkql_proto.cpp
diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp
index bfb6b1bf4a..a20febb305 100644
--- a/ydb/library/mkql_proto/mkql_proto.cpp
+++ b/ydb/library/mkql_proto/mkql_proto.cpp
@@ -6,6 +6,7 @@
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/mkql_type_ops.h>
#include <ydb/library/yql/public/decimal/yql_decimal.h>
+#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h>
#include <library/cpp/containers/stack_vector/stack_vec.h>
@@ -148,6 +149,13 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) {
break;
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType *>(type);
+ res.SetKind(NKikimrMiniKQL::ETypeKind::Pg);
+ res.MutablePg()->set_oid(pgType->GetTypeId());
+ break;
+ }
+
case TType::EKind::Optional: {
auto optionalType = static_cast<TOptionalType *>(type);
res.SetKind(NKikimrMiniKQL::ETypeKind::Optional);
@@ -255,6 +263,13 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) {
break;
}
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<TPgType*>(type);
+ auto t = res.mutable_pg_type();
+ t->set_oid(pgType->GetTypeId());
+ break;
+ }
+
case TType::EKind::Optional: {
auto optionalType = static_cast<TOptionalType*>(type);
ExportTypeToProtoImpl(optionalType->GetItemType(), *res.mutable_optional_type()->mutable_item());
@@ -439,6 +454,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, NK
break;
}
+ case TType::EKind::Pg: {
+ if (!value) {
+ // do not set Text and Bytes fields
+ return;
+ }
+ auto pgType = static_cast<TPgType*>(type);
+ auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId());
+ res.SetText(textValue);
+ break;
+ }
+
case TType::EKind::Optional: {
auto optionalType = static_cast<TOptionalType*>(type);
if (value) {
@@ -523,7 +549,7 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd
case TType::EKind::Void:
case TType::EKind::EmptyList:
case TType::EKind::EmptyDict:
- break;
+ break;
case TType::EKind::Null: {
res.set_null_flag_value(::google::protobuf::NULL_VALUE);
@@ -535,6 +561,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd
break;
}
+ case TType::EKind::Pg: {
+ if (!value) {
+ // do not set Text and Bytes fields
+ return;
+ }
+ auto pgType = static_cast<TPgType*>(type);
+ auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId());
+ res.set_text_value(textValue);
+ break;
+ }
+
case TType::EKind::Optional: {
auto optionalType = static_cast<TOptionalType*>(type);
if (value.HasValue()) {
@@ -951,6 +988,10 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) {
return TDataType::Create(schemeType, env);
}
}
+ case NKikimrMiniKQL::ETypeKind::Pg: {
+ const NKikimrMiniKQL::TPgType& protoPgType = type.GetPg();
+ return TPgType::Create(protoPgType.Getoid(), env);
+ }
case NKikimrMiniKQL::ETypeKind::Optional: {
const NKikimrMiniKQL::TOptionalType& protoOptionalType = type.GetOptional();
TType* child = ImportTypeFromProto(protoOptionalType.GetItem());
@@ -1010,13 +1051,13 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) {
TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TValue& value) {
switch (type->GetKind()) {
- case TCallableType::EKind::Void: {
+ case TType::EKind::Void: {
return env.GetVoid();
}
- case TCallableType::EKind::Null: {
+ case TType::EKind::Null: {
return env.GetNull();
}
- case TCallableType::EKind::Data: {
+ case TType::EKind::Data: {
TDataType* dataType = static_cast<TDataType*>(type);
TDataLiteral* dataNode = nullptr;
switch (const auto schemeType = dataType->GetSchemeType()) {
@@ -1114,7 +1155,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
}
return dataNode;
}
- case TCallableType::EKind::Optional: {
+ case TType::EKind::Optional: {
TOptionalType* optionalType = static_cast<TOptionalType*>(type);
TOptionalLiteral* optionalNode;
if (value.HasOptional()) {
@@ -1126,7 +1167,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
}
return optionalNode;
}
- case TCallableType::EKind::List: {
+ case TType::EKind::List: {
TListType* listType = static_cast<TListType*>(type);
TType* itemType = listType->GetItemType();
TVector<TRuntimeNode> items;
@@ -1140,7 +1181,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
TListLiteral* listNode = TListLiteral::Create(items.data(), items.size(), listType, env);
return listNode;
}
- case TCallableType::EKind::Tuple: {
+ case TType::EKind::Tuple: {
TTupleType* tupleType = static_cast<TTupleType*>(type);
ui32 elementsCount = tupleType->GetElementsCount();
MKQL_ENSURE(elementsCount == value.TupleSize(), "Invalid protobuf format, tuple size mismatch between Type and Value");
@@ -1154,7 +1195,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
TTupleLiteral* tupleNode = TTupleLiteral::Create(elements.size(), elements.data(), tupleType, env);
return tupleNode;
}
- case TCallableType::EKind::Struct: {
+ case TType::EKind::Struct: {
TStructType* structType = static_cast<TStructType*>(type);
ui32 membersCount = structType->GetMembersCount();
MKQL_ENSURE(membersCount == value.StructSize(), "Invalid protobuf format, struct size mismatch between Type and Value");
@@ -1170,7 +1211,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
TStructLiteral* structNode = TStructLiteral::Create(members.size(), members.data(), structType, env);
return structNode;
}
- case TCallableType::EKind::Dict: {
+ case TType::EKind::Dict: {
TDictType* dictType = static_cast<TDictType*>(type);
ui32 dictSize = value.DictSize();
@@ -1185,7 +1226,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV
TDictLiteral* dictNode = TDictLiteral::Create(items.size(), items.data(), dictType, env);
return dictNode;
}
- case TCallableType::EKind::Variant: {
+ case TType::EKind::Variant: {
TVariantType* variantType = static_cast<TVariantType*>(type);
auto variantIndex = value.GetVariantIndex();
TType* innerType = variantType->GetAlternativeType(variantIndex);
@@ -1218,6 +1259,15 @@ NUdf::TUnboxedValue TProtoImporter::ImportValueFromProto(const TType* type, cons
case TType::EKind::Data:
return HandleKindDataImport(type, value);
+ case TType::EKind::Pg: {
+ auto pgType = static_cast<const TPgType*>(type);
+ MKQL_ENSURE(!value.HasBytes(), "Pg binary format is not supported");
+ if (!value.HasText() && !value.HasBytes()) {
+ return NUdf::TUnboxedValue();
+ }
+ return NYql::NCommon::PgValueFromString(value.GetText(), pgType->GetTypeId());
+ }
+
case TType::EKind::Optional: {
auto optionalType = static_cast<const TOptionalType*>(type);
if (value.HasOptional()) {
diff --git a/ydb/library/mkql_proto/mkql_proto_ut.cpp b/ydb/library/mkql_proto/mkql_proto_ut.cpp
index fdda085f18..2a589ba0b3 100644
--- a/ydb/library/mkql_proto/mkql_proto_ut.cpp
+++ b/ydb/library/mkql_proto/mkql_proto_ut.cpp
@@ -16,6 +16,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
UNIT_ASSERT(!CanExportType(pgmBuilder.NewVoid().GetStaticType()->GetType(), env));
UNIT_ASSERT(CanExportType(pgmBuilder.NewVoid().GetStaticType(), env));
+ UNIT_ASSERT(CanExportType(pgmBuilder.NewPgType(16), env));
auto dtype = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id);
UNIT_ASSERT(CanExportType(dtype, env));
UNIT_ASSERT(CanExportType(pgmBuilder.NewOptionalType(dtype), env));
@@ -45,7 +46,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
"}\n");
}
- Y_UNIT_TEST(TestExportDecimalType) {
+ Y_UNIT_TEST(TestExportDecimalType) {
TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
NYql::NDecimal::TInt128 x;
ui64* p = (ui64*)&x;
@@ -64,6 +65,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) {
"}\n");
}
+ Y_UNIT_TEST(TestExportPgType) {
+ TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
+ auto pgType = static_cast<TPgType*>(pgmBuilder.NewPgType(16));
+ auto pgmReturn = pgmBuilder.PgConst(pgType, "true");
+ return pgmReturn;
+ },
+ "Kind: Pg\n"
+ "Pg {\n"
+ " oid: 16\n"
+ "}\n");
+ }
+
Y_UNIT_TEST(TestExportUuidType) {
TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) {
auto pgmReturn = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(TStringBuf("\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"sv));
diff --git a/ydb/library/mkql_proto/protos/minikql.proto b/ydb/library/mkql_proto/protos/minikql.proto
index 2a4243855c..b07ca18d4c 100644
--- a/ydb/library/mkql_proto/protos/minikql.proto
+++ b/ydb/library/mkql_proto/protos/minikql.proto
@@ -15,7 +15,7 @@ enum ETypeKind {
Dict = 7;
Variant = 8;
Null = 9;
- Reserved_10 = 10;
+ Pg = 10;
Reserved_11 = 11;
Reserved_12 = 12;
Reserved_13 = 13;
@@ -65,6 +65,10 @@ message TDictType {
required TType Payload = 2;
}
+message TPgType {
+ required uint32 oid = 1;
+}
+
message TType {
required ETypeKind Kind = 1;
oneof type_type {
@@ -75,6 +79,7 @@ message TType {
TStructType Struct = 6;
TDictType Dict = 7;
TVariantType Variant = 8;
+ TPgType Pg = 9;
}
}
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 816682fe4c..0e2157e466 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -5154,6 +5154,10 @@ bool CanExportType(TType* type, const TTypeEnvironment& env) {
node->SetCookie(1);
break;
+ case TType::EKind::Pg:
+ node->SetCookie(1);
+ break;
+
case TType::EKind::Optional: {
auto optionalType = static_cast<TOptionalType*>(node);
if (!optionalType->GetItemType()->GetCookie()) {
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 035c4bf217..3cd4591f8c 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -1768,15 +1768,8 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput
}
}
-NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
- using namespace NYson::NDetail;
- if (cmd == EntitySymbol) {
- return NUdf::TUnboxedValuePod();
- }
-
- CHECK_EXPECTED(cmd, StringMarker);
- auto s = buf.ReadYtString();
- switch (type->GetTypeId()) {
+NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) {
+ switch (pgTypeId) {
case BOOLOID: {
return ScalarDatumToPod(BoolGetDatum(FromString<bool>(s)));
}
@@ -1806,10 +1799,10 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
return PointerDatumToPod((Datum)ret);
}
default:
- TString str{s};
+ TString str{ s };
TPAllocScope call;
- const auto& typeInfo = NPg::LookupType(type->GetTypeId());
+ const auto& typeInfo = NPg::LookupType(pgTypeId);
auto typeIOParam = MakeTypeIOParam(typeInfo);
auto inFuncId = typeInfo.InFuncId;
if (typeInfo.TypeId == typeInfo.ArrayTypeId) {
@@ -1839,6 +1832,17 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
}
}
+NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) {
+ using namespace NYson::NDetail;
+ if (cmd == EntitySymbol) {
+ return NUdf::TUnboxedValuePod();
+ }
+
+ CHECK_EXPECTED(cmd, StringMarker);
+ auto s = buf.ReadYtString();
+ return PgValueFromString(s, type->GetTypeId());
+}
+
NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) {
auto marker = buf.Read();
if (!marker) {
diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
index 36a09bec4f..aed53d4faf 100644
--- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h
+++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h
@@ -13,6 +13,7 @@ namespace NYql {
namespace NCommon {
TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId);
+NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId);
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
const TVector<ui32>* structPositions);
diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
index 3f97980d3e..514f02f330 100644
--- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
+++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp
@@ -23,7 +23,13 @@ namespace NCommon {
TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) {
Y_UNUSED(value);
Y_UNUSED(pgTypeId);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "PgValueToString: PG types are not supported";
+}
+
+NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId) {
+ Y_UNUSED(text);
+ Y_UNUSED(pgTypeId);
+ throw yexception() << "PgValueFromString: PG types are not supported";
}
void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type,
@@ -32,55 +38,55 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v
Y_UNUSED(value);
Y_UNUSED(type);
Y_UNUSED(structPositions);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "WriteYsonValuePg: PG types are not supported";
}
void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value) {
Y_UNUSED(buf);
Y_UNUSED(type);
Y_UNUSED(value);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "WriteYsonValueInTableFormatPg: PG types are not supported";
}
NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(cmd);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "ReadYsonValueInTableFormatPg: PG types are not supported";
}
NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(cmd);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "ReadYsonValuePg: PG types are not supported";
}
NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "ReadSkiffPg: PG types are not supported";
}
void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(value);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "WriteSkiffPg: PG types are not supported";
}
extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(value);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "ReadSkiffPgValue: PG types are not supported";
}
extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) {
Y_UNUSED(type);
Y_UNUSED(value);
Y_UNUSED(buf);
- throw yexception() << "PG types are not supported";
+ throw yexception() << "WriteSkiffPgValue: PG types are not supported";
}
} // namespace NCommon