diff options
author | Sergey Uzhakov <uzhastik@gmail.com> | 2022-06-17 21:08:04 +0300 |
---|---|---|
committer | Sergey Uzhakov <uzhastik@gmail.com> | 2022-06-17 21:08:04 +0300 |
commit | c28bb661e30b37933da2962f11091f9a918211b3 (patch) | |
tree | b379a873df1cc576dd50ad1ab105430c91e19dc3 | |
parent | 91899b785a0b8408c0265385f98aa95101633440 (diff) | |
download | ydb-c28bb661e30b37933da2962f11091f9a918211b3.tar.gz |
YQ-1154: support pg types in yq results
ref:0c1fb8697ed1f7d184838f77a5bdf85424f68540
-rw-r--r-- | ydb/core/driver_lib/run/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/factories.h | 4 | ||||
-rw-r--r-- | ydb/core/driver_lib/run/kikimr_services_initializers.cpp | 4 | ||||
-rw-r--r-- | ydb/core/kqp/provider/yql_kikimr_results.cpp | 3 | ||||
-rw-r--r-- | ydb/core/testlib/test_client.cpp | 3 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.cpp | 10 | ||||
-rw-r--r-- | ydb/core/yq/libs/init/init.h | 3 | ||||
-rw-r--r-- | ydb/library/mkql_proto/CMakeLists.txt | 1 | ||||
-rw-r--r-- | ydb/library/mkql_proto/mkql_proto.cpp | 70 | ||||
-rw-r--r-- | ydb/library/mkql_proto/mkql_proto_ut.cpp | 15 | ||||
-rw-r--r-- | ydb/library/mkql_proto/protos/minikql.proto | 7 | ||||
-rw-r--r-- | ydb/library/yql/minikql/mkql_program_builder.cpp | 4 | ||||
-rw-r--r-- | ydb/library/yql/parser/pg_wrapper/comp_factory.cpp | 26 | ||||
-rw-r--r-- | ydb/library/yql/providers/common/codec/yql_pg_codec.h | 1 | ||||
-rw-r--r-- | ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp | 24 |
15 files changed, 136 insertions, 40 deletions
diff --git a/ydb/core/driver_lib/run/CMakeLists.txt b/ydb/core/driver_lib/run/CMakeLists.txt index 0b81a7dff4..dd1326dc76 100644 --- a/ydb/core/driver_lib/run/CMakeLists.txt +++ b/ydb/core/driver_lib/run/CMakeLists.txt @@ -108,6 +108,7 @@ target_link_libraries(run PUBLIC ydb-library-pdisk_io ydb-library-security yql-minikql-comp_nodes + yql-minikql-computation udf-service-exception_policy public-lib-base lib-deprecated-client diff --git a/ydb/core/driver_lib/run/factories.h b/ydb/core/driver_lib/run/factories.h index 884eb75cc8..76ca0a91d9 100644 --- a/ydb/core/driver_lib/run/factories.h +++ b/ydb/core/driver_lib/run/factories.h @@ -15,6 +15,7 @@ #include <ydb/library/pdisk_io/aio.h> #include <ydb/core/yq/libs/config/protos/audit.pb.h> +#include <ydb/library/yql/minikql/computation/mkql_computation_node.h> #include <ydb/library/yql/providers/pq/cm_client/interface/client.h> #include <library/cpp/actors/core/actorsystem.h> @@ -36,7 +37,7 @@ struct TModuleFactories { std::shared_ptr<NKqp::IQueryReplayBackendFactory> QueryReplayBackendFactory; // std::shared_ptr<NMsgBusProxy::IPersQueueGetReadSessionsInfoWorkerFactory> PQReadSessionsInfoWorkerFactory; - // Can be nullptr. In that case there would be no ability to work with Yandex Logbroker in Yandex Query. + // Can be nullptr. In that case there would be no ability to work with internal configuration manager. NPq::NConfigurationManager::IConnections::TPtr PqCmConnections; // Export implementation for Data Shards std::shared_ptr<NDataShard::IExportFactory> DataShardExportFactory; @@ -59,6 +60,7 @@ struct TModuleFactories { std::shared_ptr<NSQS::IAuthFactory> SqsAuthFactory; std::shared_ptr<NHttpProxy::IAuthFactory> DataStreamsAuthFactory; + std::vector<NKikimr::NMiniKQL::TComputationNodeFactory> AdditionalComputationNodeFactories; ~TModuleFactories(); }; diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index 375b6702ec..8967e8c88c 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -142,6 +142,7 @@ #include <ydb/library/folder_service/proto/config.pb.h> #include <ydb/library/yql/minikql/comp_nodes/mkql_factories.h> +#include <ydb/library/yql/parser/pg_wrapper/comp_factory.h> #include <library/cpp/actors/protos/services_common.pb.h> @@ -2332,7 +2333,8 @@ void TYandexQueryInitializer::InitializeServices(TActorSystemSetup* setup, const Factories->FolderServiceFactory, Factories->YqAuditServiceFactory, Factories->YdbCredentialProviderFactory, - IcPort + IcPort, + Factories->AdditionalComputationNodeFactories ); } diff --git a/ydb/core/kqp/provider/yql_kikimr_results.cpp b/ydb/core/kqp/provider/yql_kikimr_results.cpp index 323df77a3d..b1a5f90ea9 100644 --- a/ydb/core/kqp/provider/yql_kikimr_results.cpp +++ b/ydb/core/kqp/provider/yql_kikimr_results.cpp @@ -1212,8 +1212,9 @@ bool IsSameType(const NKikimrMiniKQL::TType& actual, const NKikimrMiniKQL::TType return IsSameType(actual.GetVariant(), expected.GetVariant()); case NKikimrMiniKQL::ETypeKind::Null: return true; + case NKikimrMiniKQL::ETypeKind::Pg: + return actual.GetPg().Getoid() == expected.GetPg().Getoid(); case NKikimrMiniKQL::ETypeKind::Unknown: - case NKikimrMiniKQL::ETypeKind::Reserved_10: case NKikimrMiniKQL::ETypeKind::Reserved_11: case NKikimrMiniKQL::ETypeKind::Reserved_12: case NKikimrMiniKQL::ETypeKind::Reserved_13: diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 6909485339..a9fb212f6a 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -818,7 +818,8 @@ namespace Tests { NKikimr::NFolderService::CreateMockFolderServiceActor, NYq::CreateMockYqAuditServiceActor, ydbCredFactory, - /*IcPort = */0 + /*IcPort = */0, + {} ); NYq::InitTest(Runtime.Get(), port, Settings->GrpcPort, YqSharedResources); } diff --git a/ydb/core/yq/libs/init/init.cpp b/ydb/core/yq/libs/init/init.cpp index 9f2b32a25f..4826f78de9 100644 --- a/ydb/core/yq/libs/init/init.cpp +++ b/ydb/core/yq/libs/init/init.cpp @@ -57,7 +57,8 @@ void Init( const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory, const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const ui32& icPort + ui32 icPort, + const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories ) { Y_VERIFY(iyqSharedResources, "No YQ shared resources created"); @@ -104,11 +105,14 @@ void Init( auto yqCounters = appData->Counters->GetSubgroup("counters", "yq"); auto workerManagerCounters = NYql::NDqs::TWorkerManagerCounters(yqCounters->GetSubgroup("subsystem", "worker_manager")); - NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory({ + TVector<NKikimr::NMiniKQL::TComputationNodeFactory> compNodeFactories = { NYql::GetCommonDqFactory(), NYql::GetDqYdbFactory(yqSharedResources->UserSpaceYdbDriver), NKikimr::NMiniKQL::GetYqlFactory() - }); + }; + + compNodeFactories.insert(compNodeFactories.end(), additionalCompNodeFactories.begin(), additionalCompNodeFactories.end()); + NKikimr::NMiniKQL::TComputationNodeFactory dqCompFactory = NKikimr::NMiniKQL::GetCompositeWithBuiltinFactory(std::move(compNodeFactories)); NYql::TTaskTransformFactory dqTaskTransformFactory = NYql::CreateCompositeTaskTransformFactory({ NYql::CreateCommonDqTaskTransformFactory(), diff --git a/ydb/core/yq/libs/init/init.h b/ydb/core/yq/libs/init/init.h index 8cbc1a6a8c..f826d0398f 100644 --- a/ydb/core/yq/libs/init/init.h +++ b/ydb/core/yq/libs/init/init.h @@ -37,7 +37,8 @@ void Init( const std::function<IActor*(const NKikimrProto::NFolderService::TFolderServiceConfig& authConfig)>& folderServiceFactory, const std::function<IActor*(const NYq::NConfig::TAuditConfig& auditConfig, const NMonitoring::TDynamicCounterPtr& counters)>& auditServiceFactory, const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory, - const ui32& icPort + ui32 icPort, + const std::vector<NKikimr::NMiniKQL::TComputationNodeFactory>& additionalCompNodeFactories ); } // NYq diff --git a/ydb/library/mkql_proto/CMakeLists.txt b/ydb/library/mkql_proto/CMakeLists.txt index 7ddbde41f4..ccfabd6744 100644 --- a/ydb/library/mkql_proto/CMakeLists.txt +++ b/ydb/library/mkql_proto/CMakeLists.txt @@ -19,6 +19,7 @@ target_link_libraries(ydb-library-mkql_proto PUBLIC api-protos library-yql-minikql yql-minikql-computation + providers-common-codec ) target_sources(ydb-library-mkql_proto PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/mkql_proto/mkql_proto.cpp diff --git a/ydb/library/mkql_proto/mkql_proto.cpp b/ydb/library/mkql_proto/mkql_proto.cpp index bfb6b1bf4a..a20febb305 100644 --- a/ydb/library/mkql_proto/mkql_proto.cpp +++ b/ydb/library/mkql_proto/mkql_proto.cpp @@ -6,6 +6,7 @@ #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/mkql_type_ops.h> #include <ydb/library/yql/public/decimal/yql_decimal.h> +#include <ydb/library/yql/providers/common/codec/yql_pg_codec.h> #include <library/cpp/containers/stack_vector/stack_vec.h> @@ -148,6 +149,13 @@ void ExportTypeToProtoImpl(TType* type, NKikimrMiniKQL::TType& res) { break; } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType *>(type); + res.SetKind(NKikimrMiniKQL::ETypeKind::Pg); + res.MutablePg()->set_oid(pgType->GetTypeId()); + break; + } + case TType::EKind::Optional: { auto optionalType = static_cast<TOptionalType *>(type); res.SetKind(NKikimrMiniKQL::ETypeKind::Optional); @@ -255,6 +263,13 @@ void ExportTypeToProtoImpl(TType* type, Ydb::Type& res) { break; } + case TType::EKind::Pg: { + auto pgType = static_cast<TPgType*>(type); + auto t = res.mutable_pg_type(); + t->set_oid(pgType->GetTypeId()); + break; + } + case TType::EKind::Optional: { auto optionalType = static_cast<TOptionalType*>(type); ExportTypeToProtoImpl(optionalType->GetItemType(), *res.mutable_optional_type()->mutable_item()); @@ -439,6 +454,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, NK break; } + case TType::EKind::Pg: { + if (!value) { + // do not set Text and Bytes fields + return; + } + auto pgType = static_cast<TPgType*>(type); + auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId()); + res.SetText(textValue); + break; + } + case TType::EKind::Optional: { auto optionalType = static_cast<TOptionalType*>(type); if (value) { @@ -523,7 +549,7 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd case TType::EKind::Void: case TType::EKind::EmptyList: case TType::EKind::EmptyDict: - break; + break; case TType::EKind::Null: { res.set_null_flag_value(::google::protobuf::NULL_VALUE); @@ -535,6 +561,17 @@ void ExportValueToProtoImpl(TType* type, const NUdf::TUnboxedValuePod& value, Yd break; } + case TType::EKind::Pg: { + if (!value) { + // do not set Text and Bytes fields + return; + } + auto pgType = static_cast<TPgType*>(type); + auto textValue = NYql::NCommon::PgValueToString(value, pgType->GetTypeId()); + res.set_text_value(textValue); + break; + } + case TType::EKind::Optional: { auto optionalType = static_cast<TOptionalType*>(type); if (value.HasValue()) { @@ -951,6 +988,10 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) { return TDataType::Create(schemeType, env); } } + case NKikimrMiniKQL::ETypeKind::Pg: { + const NKikimrMiniKQL::TPgType& protoPgType = type.GetPg(); + return TPgType::Create(protoPgType.Getoid(), env); + } case NKikimrMiniKQL::ETypeKind::Optional: { const NKikimrMiniKQL::TOptionalType& protoOptionalType = type.GetOptional(); TType* child = ImportTypeFromProto(protoOptionalType.GetItem()); @@ -1010,13 +1051,13 @@ TType* TProtoImporter::ImportTypeFromProto(const NKikimrMiniKQL::TType& type) { TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TValue& value) { switch (type->GetKind()) { - case TCallableType::EKind::Void: { + case TType::EKind::Void: { return env.GetVoid(); } - case TCallableType::EKind::Null: { + case TType::EKind::Null: { return env.GetNull(); } - case TCallableType::EKind::Data: { + case TType::EKind::Data: { TDataType* dataType = static_cast<TDataType*>(type); TDataLiteral* dataNode = nullptr; switch (const auto schemeType = dataType->GetSchemeType()) { @@ -1114,7 +1155,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV } return dataNode; } - case TCallableType::EKind::Optional: { + case TType::EKind::Optional: { TOptionalType* optionalType = static_cast<TOptionalType*>(type); TOptionalLiteral* optionalNode; if (value.HasOptional()) { @@ -1126,7 +1167,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV } return optionalNode; } - case TCallableType::EKind::List: { + case TType::EKind::List: { TListType* listType = static_cast<TListType*>(type); TType* itemType = listType->GetItemType(); TVector<TRuntimeNode> items; @@ -1140,7 +1181,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV TListLiteral* listNode = TListLiteral::Create(items.data(), items.size(), listType, env); return listNode; } - case TCallableType::EKind::Tuple: { + case TType::EKind::Tuple: { TTupleType* tupleType = static_cast<TTupleType*>(type); ui32 elementsCount = tupleType->GetElementsCount(); MKQL_ENSURE(elementsCount == value.TupleSize(), "Invalid protobuf format, tuple size mismatch between Type and Value"); @@ -1154,7 +1195,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV TTupleLiteral* tupleNode = TTupleLiteral::Create(elements.size(), elements.data(), tupleType, env); return tupleNode; } - case TCallableType::EKind::Struct: { + case TType::EKind::Struct: { TStructType* structType = static_cast<TStructType*>(type); ui32 membersCount = structType->GetMembersCount(); MKQL_ENSURE(membersCount == value.StructSize(), "Invalid protobuf format, struct size mismatch between Type and Value"); @@ -1170,7 +1211,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV TStructLiteral* structNode = TStructLiteral::Create(members.size(), members.data(), structType, env); return structNode; } - case TCallableType::EKind::Dict: { + case TType::EKind::Dict: { TDictType* dictType = static_cast<TDictType*>(type); ui32 dictSize = value.DictSize(); @@ -1185,7 +1226,7 @@ TNode* TProtoImporter::ImportNodeFromProto(TType* type, const NKikimrMiniKQL::TV TDictLiteral* dictNode = TDictLiteral::Create(items.size(), items.data(), dictType, env); return dictNode; } - case TCallableType::EKind::Variant: { + case TType::EKind::Variant: { TVariantType* variantType = static_cast<TVariantType*>(type); auto variantIndex = value.GetVariantIndex(); TType* innerType = variantType->GetAlternativeType(variantIndex); @@ -1218,6 +1259,15 @@ NUdf::TUnboxedValue TProtoImporter::ImportValueFromProto(const TType* type, cons case TType::EKind::Data: return HandleKindDataImport(type, value); + case TType::EKind::Pg: { + auto pgType = static_cast<const TPgType*>(type); + MKQL_ENSURE(!value.HasBytes(), "Pg binary format is not supported"); + if (!value.HasText() && !value.HasBytes()) { + return NUdf::TUnboxedValue(); + } + return NYql::NCommon::PgValueFromString(value.GetText(), pgType->GetTypeId()); + } + case TType::EKind::Optional: { auto optionalType = static_cast<const TOptionalType*>(type); if (value.HasOptional()) { diff --git a/ydb/library/mkql_proto/mkql_proto_ut.cpp b/ydb/library/mkql_proto/mkql_proto_ut.cpp index fdda085f18..2a589ba0b3 100644 --- a/ydb/library/mkql_proto/mkql_proto_ut.cpp +++ b/ydb/library/mkql_proto/mkql_proto_ut.cpp @@ -16,6 +16,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) { UNIT_ASSERT(!CanExportType(pgmBuilder.NewVoid().GetStaticType()->GetType(), env)); UNIT_ASSERT(CanExportType(pgmBuilder.NewVoid().GetStaticType(), env)); + UNIT_ASSERT(CanExportType(pgmBuilder.NewPgType(16), env)); auto dtype = pgmBuilder.NewDataType(NUdf::TDataType<ui32>::Id); UNIT_ASSERT(CanExportType(dtype, env)); UNIT_ASSERT(CanExportType(pgmBuilder.NewOptionalType(dtype), env)); @@ -45,7 +46,7 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) { "}\n"); } - Y_UNIT_TEST(TestExportDecimalType) { + Y_UNIT_TEST(TestExportDecimalType) { TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) { NYql::NDecimal::TInt128 x; ui64* p = (ui64*)&x; @@ -64,6 +65,18 @@ Y_UNIT_TEST_SUITE(TMiniKQLProtoTest) { "}\n"); } + Y_UNIT_TEST(TestExportPgType) { + TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) { + auto pgType = static_cast<TPgType*>(pgmBuilder.NewPgType(16)); + auto pgmReturn = pgmBuilder.PgConst(pgType, "true"); + return pgmReturn; + }, + "Kind: Pg\n" + "Pg {\n" + " oid: 16\n" + "}\n"); + } + Y_UNIT_TEST(TestExportUuidType) { TestExportType<NKikimrMiniKQL::TType>([](TProgramBuilder& pgmBuilder) { auto pgmReturn = pgmBuilder.NewDataLiteral<NUdf::EDataSlot::Uuid>(TStringBuf("\1\0\0\0\0\0\0\0\0\0\0\0\0\0\0\0"sv)); diff --git a/ydb/library/mkql_proto/protos/minikql.proto b/ydb/library/mkql_proto/protos/minikql.proto index 2a4243855c..b07ca18d4c 100644 --- a/ydb/library/mkql_proto/protos/minikql.proto +++ b/ydb/library/mkql_proto/protos/minikql.proto @@ -15,7 +15,7 @@ enum ETypeKind { Dict = 7; Variant = 8; Null = 9; - Reserved_10 = 10; + Pg = 10; Reserved_11 = 11; Reserved_12 = 12; Reserved_13 = 13; @@ -65,6 +65,10 @@ message TDictType { required TType Payload = 2; } +message TPgType { + required uint32 oid = 1; +} + message TType { required ETypeKind Kind = 1; oneof type_type { @@ -75,6 +79,7 @@ message TType { TStructType Struct = 6; TDictType Dict = 7; TVariantType Variant = 8; + TPgType Pg = 9; } } diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 816682fe4c..0e2157e466 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -5154,6 +5154,10 @@ bool CanExportType(TType* type, const TTypeEnvironment& env) { node->SetCookie(1); break; + case TType::EKind::Pg: + node->SetCookie(1); + break; + case TType::EKind::Optional: { auto optionalType = static_cast<TOptionalType*>(node); if (!optionalType->GetItemType()->GetCookie()) { diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 035c4bf217..3cd4591f8c 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1768,15 +1768,8 @@ NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(TPgType* type, char cmd, TInput } } -NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { - using namespace NYson::NDetail; - if (cmd == EntitySymbol) { - return NUdf::TUnboxedValuePod(); - } - - CHECK_EXPECTED(cmd, StringMarker); - auto s = buf.ReadYtString(); - switch (type->GetTypeId()) { +NUdf::TUnboxedValue PgValueFromString(const TStringBuf s, ui32 pgTypeId) { + switch (pgTypeId) { case BOOLOID: { return ScalarDatumToPod(BoolGetDatum(FromString<bool>(s))); } @@ -1806,10 +1799,10 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { return PointerDatumToPod((Datum)ret); } default: - TString str{s}; + TString str{ s }; TPAllocScope call; - const auto& typeInfo = NPg::LookupType(type->GetTypeId()); + const auto& typeInfo = NPg::LookupType(pgTypeId); auto typeIOParam = MakeTypeIOParam(typeInfo); auto inFuncId = typeInfo.InFuncId; if (typeInfo.TypeId == typeInfo.ArrayTypeId) { @@ -1839,6 +1832,17 @@ NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { } } +NUdf::TUnboxedValue ReadYsonValuePg(TPgType* type, char cmd, TInputBuf& buf) { + using namespace NYson::NDetail; + if (cmd == EntitySymbol) { + return NUdf::TUnboxedValuePod(); + } + + CHECK_EXPECTED(cmd, StringMarker); + auto s = buf.ReadYtString(); + return PgValueFromString(s, type->GetTypeId()); +} + NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) { auto marker = buf.Read(); if (!marker) { diff --git a/ydb/library/yql/providers/common/codec/yql_pg_codec.h b/ydb/library/yql/providers/common/codec/yql_pg_codec.h index 36a09bec4f..aed53d4faf 100644 --- a/ydb/library/yql/providers/common/codec/yql_pg_codec.h +++ b/ydb/library/yql/providers/common/codec/yql_pg_codec.h @@ -13,6 +13,7 @@ namespace NYql { namespace NCommon { TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId); +NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId); void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, const TVector<ui32>* structPositions); diff --git a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp index 3f97980d3e..514f02f330 100644 --- a/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp +++ b/ydb/library/yql/sql/pg_dummy/pg_sql_dummy.cpp @@ -23,7 +23,13 @@ namespace NCommon { TString PgValueToString(const NUdf::TUnboxedValuePod& value, ui32 pgTypeId) { Y_UNUSED(value); Y_UNUSED(pgTypeId); - throw yexception() << "PG types are not supported"; + throw yexception() << "PgValueToString: PG types are not supported"; +} + +NUdf::TUnboxedValue PgValueFromString(const TStringBuf text, ui32 pgTypeId) { + Y_UNUSED(text); + Y_UNUSED(pgTypeId); + throw yexception() << "PgValueFromString: PG types are not supported"; } void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& value, NKikimr::NMiniKQL::TPgType* type, @@ -32,55 +38,55 @@ void WriteYsonValuePg(TYsonResultWriter& writer, const NUdf::TUnboxedValuePod& v Y_UNUSED(value); Y_UNUSED(type); Y_UNUSED(structPositions); - throw yexception() << "PG types are not supported"; + throw yexception() << "WriteYsonValuePg: PG types are not supported"; } void WriteYsonValueInTableFormatPg(TOutputBuf& buf, NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value) { Y_UNUSED(buf); Y_UNUSED(type); Y_UNUSED(value); - throw yexception() << "PG types are not supported"; + throw yexception() << "WriteYsonValueInTableFormatPg: PG types are not supported"; } NUdf::TUnboxedValue ReadYsonValueInTableFormatPg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) { Y_UNUSED(type); Y_UNUSED(cmd); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "ReadYsonValueInTableFormatPg: PG types are not supported"; } NUdf::TUnboxedValue ReadYsonValuePg(NKikimr::NMiniKQL::TPgType* type, char cmd, TInputBuf& buf) { Y_UNUSED(type); Y_UNUSED(cmd); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "ReadYsonValuePg: PG types are not supported"; } NKikimr::NUdf::TUnboxedValue ReadSkiffPg(NKikimr::NMiniKQL::TPgType* type, NCommon::TInputBuf& buf) { Y_UNUSED(type); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "ReadSkiffPg: PG types are not supported"; } void WriteSkiffPg(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { Y_UNUSED(type); Y_UNUSED(value); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "WriteSkiffPg: PG types are not supported"; } extern "C" void ReadSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, NKikimr::NUdf::TUnboxedValue& value, NCommon::TInputBuf& buf) { Y_UNUSED(type); Y_UNUSED(value); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "ReadSkiffPgValue: PG types are not supported"; } extern "C" void WriteSkiffPgValue(NKikimr::NMiniKQL::TPgType* type, const NKikimr::NUdf::TUnboxedValuePod& value, NCommon::TOutputBuf& buf) { Y_UNUSED(type); Y_UNUSED(value); Y_UNUSED(buf); - throw yexception() << "PG types are not supported"; + throw yexception() << "WriteSkiffPgValue: PG types are not supported"; } } // namespace NCommon |