diff options
author | vvvv <vvvv@ydb.tech> | 2023-05-04 18:16:08 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-05-04 18:16:08 +0300 |
commit | 4e5d5c7da7468ccd5fbd7f7a7af5146939839635 (patch) | |
tree | 2221bc02de455943c643a12947d2e82f168d39b5 | |
parent | 882f8bae704c932198c275721a1ad8c10c5d5ce7 (diff) | |
download | ydb-4e5d5c7da7468ccd5fbd7f7a7af5146939839635.tar.gz |
Block implementation of ToPg/FromPg
17 files changed, 464 insertions, 45 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp index d4d0b837bb0..450e6c2f575 100644 --- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp +++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp @@ -4952,7 +4952,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo TExprNode::TListType funcArgs; std::string_view arrowFunctionName; - if (node->IsList() || node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If", "Just", "Nth"})) + if (node->IsList() || node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If", "Just", "Nth", "ToPg", "FromPg"})) { for (auto& child : node->ChildrenList()) { if (!child->GetTypeAnn()->IsComputable()) { diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp index 21a11d2b9f7..73d51e281be 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp @@ -1,6 +1,7 @@ #include "type_ann_blocks.h" #include "type_ann_list.h" #include "type_ann_wide.h" +#include "type_ann_pg.h" #include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h> #include <ydb/library/yql/core/yql_expr_type_annotation.h> @@ -355,6 +356,60 @@ IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNo return IGraphTransformer::TStatus::Ok; } +IGraphTransformer::TStatus BlockToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto child = input->Child(0); + if (!EnsureBlockOrScalarType(*child, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + bool isScalar; + const TTypeAnnotationNode* blockItemType = GetBlockItemType(*child->GetTypeAnn(), isScalar); + auto resultType = ToPgImpl(input->Pos(), blockItemType, ctx.Expr); + if (!resultType) { + return IGraphTransformer::TStatus::Error; + } + + if (isScalar) { + resultType = ctx.Expr.MakeType<TScalarExprType>(resultType); + } else { + resultType = ctx.Expr.MakeType<TBlockExprType>(resultType); + } + + input->SetTypeAnn(resultType); + return IGraphTransformer::TStatus::Ok; +} + +IGraphTransformer::TStatus BlockFromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + Y_UNUSED(output); + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + auto child = input->Child(0); + if (!EnsureBlockOrScalarType(*child, ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + bool isScalar; + const TTypeAnnotationNode* blockItemType = GetBlockItemType(*child->GetTypeAnn(), isScalar); + auto resultType = FromPgImpl(input->Pos(), blockItemType, ctx.Expr); + if (!resultType) { + return IGraphTransformer::TStatus::Error; + } + + if (isScalar) { + resultType = ctx.Expr.MakeType<TScalarExprType>(resultType); + } else { + resultType = ctx.Expr.MakeType<TBlockExprType>(resultType); + } + + input->SetTypeAnn(resultType); + return IGraphTransformer::TStatus::Ok; +} + IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { Y_UNUSED(output); if (!EnsureMinArgsCount(*input, 2U, ctx.Expr)) { diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h index 35f0c0d026e..1096c6b22fe 100644 --- a/ydb/library/yql/core/type_ann/type_ann_blocks.h +++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h @@ -17,6 +17,8 @@ namespace NTypeAnnImpl { IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); + IGraphTransformer::TStatus BlockFromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp index fc95f85015c..e7831cdbe15 100644 --- a/ydb/library/yql/core/type_ann/type_ann_core.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp @@ -11937,6 +11937,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot> Functions["BlockJust"] = &BlockJustWrapper; Functions["BlockAsTuple"] = &BlockAsTupleWrapper; Functions["BlockNth"] = &BlockNthWrapper; + Functions["BlockToPg"] = &BlockToPgWrapper; + Functions["BlockFromPg"] = &BlockFromPgWrapper; ExtFunctions["BlockFunc"] = &BlockFuncWrapper; ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper; diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp index 1facbd26829..89392486334 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp +++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp @@ -152,55 +152,35 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode } } -IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { - if (!EnsureArgsCount(*input, 1, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (!EnsureComputable(input->Head(), ctx.Expr)) { - return IGraphTransformer::TStatus::Error; - } - - if (IsNull(input->Head())) { - output = input->TailPtr(); - return IGraphTransformer::TStatus::Repeat; - } - - if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) { - output = input->HeadPtr(); - return IGraphTransformer::TStatus::Repeat; - } - - auto name = input->Head().GetTypeAnn()->Cast<TPgExprType>()->GetName(); +const TTypeAnnotationNode* FromPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx) { + auto name = type->Cast<TPgExprType>()->GetName(); const TDataExprType* dataType; if (name == "bool") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Bool); } else if (name == "int2") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int16); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int16); } else if (name == "int4") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int32); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int32); } else if (name == "int8") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int64); } else if (name == "float4") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Float); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Float); } else if (name == "float8") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Double); } else if (name == "text" || name == "varchar" || name == "cstring") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::Utf8); } else if (name == "bytea") { - dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::String); + dataType = ctx.MakeType<TDataExprType>(EDataSlot::String); } else { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unsupported type: " << name)); - return IGraphTransformer::TStatus::Error; + return nullptr; } - auto result = ctx.Expr.MakeType<TOptionalExprType>(dataType); - input->SetTypeAnn(result); - return IGraphTransformer::TStatus::Ok; + return ctx.MakeType<TOptionalExprType>(dataType); } -IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { +IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { if (!EnsureArgsCount(*input, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } @@ -214,15 +194,25 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode:: return IGraphTransformer::TStatus::Repeat; } - if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) { + if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) { output = input->HeadPtr(); return IGraphTransformer::TStatus::Repeat; } + auto resultType = FromPgImpl(input->Pos(), input->Head().GetTypeAnn(), ctx.Expr); + if (!resultType) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(resultType); + return IGraphTransformer::TStatus::Ok; +} + +const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx) { bool isOptional; const TDataExprType* dataType; - if (!EnsureDataOrOptionalOfData(input->Head(), isOptional, dataType, ctx.Expr)) { - return IGraphTransformer::TStatus::Error; + if (!EnsureDataOrOptionalOfData(pos, type, isOptional, dataType, ctx)) { + return nullptr; } TString pgType; @@ -252,19 +242,46 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode:: pgType = "text"; break; default: - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), + ctx.AddError(TIssue(ctx.GetPosition(pos), TStringBuilder() << "Unsupported type: " << dataType->GetName())); - return IGraphTransformer::TStatus::Error; + return nullptr; } try { - auto result = ctx.Expr.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId); - input->SetTypeAnn(result); - return IGraphTransformer::TStatus::Ok; + auto result = ctx.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId); + return result; } catch (const yexception& e) { - ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), e.what())); + ctx.AddError(TIssue(ctx.GetPosition(pos), e.what())); + return nullptr; + } +} + +IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) { + if (!EnsureArgsCount(*input, 1, ctx.Expr)) { return IGraphTransformer::TStatus::Error; } + + if (!EnsureComputable(input->Head(), ctx.Expr)) { + return IGraphTransformer::TStatus::Error; + } + + if (IsNull(input->Head())) { + output = input->TailPtr(); + return IGraphTransformer::TStatus::Repeat; + } + + if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) { + output = input->HeadPtr(); + return IGraphTransformer::TStatus::Repeat; + } + + auto resultType = ToPgImpl(input->Pos(), input->Head().GetTypeAnn(), ctx.Expr); + if (!resultType) { + return IGraphTransformer::TStatus::Error; + } + + input->SetTypeAnn(resultType); + return IGraphTransformer::TStatus::Ok; } IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) { diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.h b/ydb/library/yql/core/type_ann/type_ann_pg.h index cc7c6903169..a699db1cf5d 100644 --- a/ydb/library/yql/core/type_ann/type_ann_pg.h +++ b/ydb/library/yql/core/type_ann/type_ann_pg.h @@ -15,6 +15,8 @@ TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias); const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx); TMap<TString, ui32> ExtractExternalColumns(const TExprNode& select); bool IsPlainMemberOverArg(const TExprNode& expr, TStringBuf& memberName); +const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx); +const TTypeAnnotationNode* FromPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx); IGraphTransformer::TStatus PgStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx); IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx); diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp index 4d48dec16b4..a76f51a9b0b 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.cpp +++ b/ydb/library/yql/minikql/mkql_program_builder.cpp @@ -1597,6 +1597,26 @@ TRuntimeNode TProgramBuilder::BlockAsTuple(const TArrayRef<const TRuntimeNode>& return TRuntimeNode(callableBuilder.Build(), false); } +TRuntimeNode TProgramBuilder::BlockToPg(TRuntimeNode input, TType* returnType) { + if constexpr (RuntimeVersion < 37U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(input); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode TProgramBuilder::BlockFromPg(TRuntimeNode input, TType* returnType) { + if constexpr (RuntimeVersion < 37U) { + THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__; + } + + TCallableBuilder callableBuilder(Env, __func__, returnType); + callableBuilder.Add(input); + return TRuntimeNode(callableBuilder.Build(), false); +} + TRuntimeNode TProgramBuilder::BlockNot(TRuntimeNode data) { auto dataType = AS_TYPE(TBlockType, data.GetStaticType()); diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h index 815af9df4f9..c0d84372cdd 100644 --- a/ydb/library/yql/minikql/mkql_program_builder.h +++ b/ydb/library/yql/minikql/mkql_program_builder.h @@ -261,6 +261,8 @@ public: TRuntimeNode BlockCoalesce(TRuntimeNode first, TRuntimeNode second); TRuntimeNode BlockNth(TRuntimeNode tuple, ui32 index); TRuntimeNode BlockAsTuple(const TArrayRef<const TRuntimeNode>& args); + TRuntimeNode BlockToPg(TRuntimeNode input, TType* returnType); + TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType); //-- logical functions TRuntimeNode BlockNot(TRuntimeNode data); diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h index a61585f2204..257553f36e1 100644 --- a/ydb/library/yql/minikql/mkql_runtime_version.h +++ b/ydb/library/yql/minikql/mkql_runtime_version.h @@ -24,7 +24,7 @@ namespace NMiniKQL { // 1. Bump this version every time incompatible runtime nodes are introduced. // 2. Make sure you provide runtime node generation for previous runtime versions. #ifndef MKQL_RUNTIME_VERSION -#define MKQL_RUNTIME_VERSION 36U +#define MKQL_RUNTIME_VERSION 37U #endif // History: diff --git a/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt index 7cb1d6dbb1e..0f0c5e58494 100644 --- a/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt @@ -30,6 +30,8 @@ target_link_options(ydb-library-yql-minikql-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 -fPIC -fPIC + -framework + CoreFoundation ) target_sources(ydb-library-yql-minikql-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/compact_hash_ut.cpp diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt index fba1dca5f7e..595730ecc2f 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt @@ -117,6 +117,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-yql-core yql-minikql-arrow yql-minikql-computation + yql-minikql-comp_nodes yql-parser-pg_catalog providers-common-codec yql-public-issue diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt index 23798855ab7..b71412a898b 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt @@ -116,6 +116,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-yql-core yql-minikql-arrow yql-minikql-computation + yql-minikql-comp_nodes yql-parser-pg_catalog providers-common-codec yql-public-issue diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt index af9c454bfb7..d4cc64deba1 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt @@ -118,6 +118,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-yql-core yql-minikql-arrow yql-minikql-computation + yql-minikql-comp_nodes yql-parser-pg_catalog providers-common-codec yql-public-issue diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt index 6b27385bbda..f6977015d93 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt @@ -133,6 +133,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-yql-core yql-minikql-arrow yql-minikql-computation + yql-minikql-comp_nodes yql-parser-pg_catalog providers-common-codec yql-public-issue diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp index 5f75abf925b..88cf0bda39e 100644 --- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp +++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp @@ -1,4 +1,5 @@ #include <ydb/library/yql/parser/pg_wrapper/interface/interface.h> +#include <ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h> #include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h> @@ -10,6 +11,8 @@ #include <ydb/library/yql/minikql/mkql_node_builder.h> #include <ydb/library/yql/minikql/mkql_string_util.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/public/udf/arrow/block_reader.h> +#include <ydb/library/yql/public/udf/arrow/block_builder.cpp> #include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <ydb/library/yql/providers/common/codec/yql_codec_buf.h> #include <ydb/library/yql/providers/common/codec/yql_codec_results.h> @@ -1523,6 +1526,284 @@ private: bool MultiDims = false; }; +struct TFromPgExec { + TFromPgExec(ui32 sourceId) + : SourceId(sourceId) + {} + + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + arrow::Datum inputDatum = batch.values[0]; + Y_ENSURE(inputDatum.is_array()); + const auto& array= *inputDatum.array(); + size_t length = array.length; + switch (SourceId) { + case BOOLOID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<ui8>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetBool(inputPtr[i]) ? 1 : 0; + } + break; + } + case INT2OID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<i16>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetInt16(inputPtr[i]); + } + break; + } + case INT4OID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<i32>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetInt32(inputPtr[i]); + } + break; + } + case INT8OID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<i64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetInt64(inputPtr[i]); + } + break; + } + case FLOAT4OID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<float>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetFloat4(inputPtr[i]); + } + break; + } + case FLOAT8OID: { + auto inputPtr = array.GetValues<ui64>(1); + auto outputPtr = res->array()->GetMutableValues<double>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = DatumGetFloat8(inputPtr[i]); + } + break; + } + case TEXTOID: + case VARCHAROID: + case BYTEAOID: + case CSTRINGOID: { + NUdf::TStringBlockReader<arrow::BinaryType, true> reader; + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + for (size_t i = 0; i < length; ++i) { + auto item = reader.GetItem(array, i); + if (!item) { + builder.Add(NUdf::TBlockItem()); + continue; + } + + ui32 len; + const char* ptr = item.AsStringRef().Data(); + if (SourceId == CSTRINGOID) { + len = strlen(item.AsStringRef().Data()); + } else { + len = GetCleanVarSize((const text*)item.AsStringRef().Data()); + Y_ENSURE(len + VARHDRSZ == item.AsStringRef().Size()); + ptr += VARHDRSZ; + } + + builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr, len))); + } + + *res = builder.Build(true); + break; + } + default: + ythrow yexception() << "Unsupported type: " << NPg::LookupType(SourceId).Name; + } + return arrow::Status::OK(); + } + + const ui32 SourceId; +}; + +std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType, TType* resultType, ui32 sourceId) { + const TVector<TType*> argTypes = { inputType }; + + std::shared_ptr<arrow::DataType> returnArrowType; + MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type"); + auto exec = std::make_shared<TFromPgExec>(sourceId); + auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), + [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + return exec->Exec(ctx, batch, res); + }); + + switch (sourceId) { + case BOOLOID: + case INT2OID: + case INT4OID: + case INT8OID: + case FLOAT4OID: + case FLOAT8OID: + break; + case TEXTOID: + case VARCHAROID: + case BYTEAOID: + case CSTRINGOID: + kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + break; + default: + ythrow yexception() << "Unsupported type: " << NPg::LookupType(sourceId).Name; + } + + return kernel; +} + +struct TToPgExec { + TToPgExec(ui32 targetId) + : TargetId(targetId) + {} + + arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const { + arrow::Datum inputDatum = batch.values[0]; + Y_ENSURE(inputDatum.is_array()); + const auto& array= *inputDatum.array(); + size_t length = array.length; + switch (TargetId) { + case BOOLOID: { + auto inputPtr = array.GetValues<ui8>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = BoolGetDatum(inputPtr[i]); + } + break; + } + case INT2OID: { + auto inputPtr = array.GetValues<i16>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = Int16GetDatum(inputPtr[i]); + } + break; + } + case INT4OID: { + auto inputPtr = array.GetValues<i32>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = Int32GetDatum(inputPtr[i]); + } + break; + } + case INT8OID: { + auto inputPtr = array.GetValues<i64>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = Int64GetDatum(inputPtr[i]); + } + break; + } + case FLOAT4OID: { + auto inputPtr = array.GetValues<float>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = Float4GetDatum(inputPtr[i]); + } + break; + } + case FLOAT8OID: { + auto inputPtr = array.GetValues<double>(1); + auto outputPtr = res->array()->GetMutableValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + outputPtr[i] = Float8GetDatum(inputPtr[i]); + } + break; + } + case TEXTOID: + case VARCHAROID: + case BYTEAOID: + case CSTRINGOID: { + NUdf::TStringBlockReader<arrow::BinaryType, true> reader; + NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); + std::vector<char> tmp; + for (size_t i = 0; i < length; ++i) { + auto item = reader.GetItem(array, i); + if (!item) { + builder.Add(NUdf::TBlockItem()); + continue; + } + + ui32 len; + if (TargetId == CSTRINGOID) { + len = 1 + item.AsStringRef().Size(); + if (Y_UNLIKELY(len < item.AsStringRef().Size())) { + ythrow yexception() << "Too long string"; + } + + if (tmp.capacity() < len) { + tmp.reserve(Max<ui64>(len, tmp.capacity() * 2)); + } + + tmp.resize(len); + memcpy(tmp.data(), item.AsStringRef().Data(), len - 1); + tmp[len - 1] = 0; + } else { + len = VARHDRSZ + item.AsStringRef().Size(); + if (Y_UNLIKELY(len < item.AsStringRef().Size())) { + ythrow yexception() << "Too long string"; + } + + if (tmp.capacity() < len) { + tmp.reserve(Max<ui64>(len, tmp.capacity() * 2)); + } + + tmp.resize(len); + memcpy(tmp.data() + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ); + UpdateCleanVarSize((text*)tmp.data(), item.AsStringRef().Size()); + } + + builder.Add(NUdf::TBlockItem(NUdf::TStringRef(tmp.data(), len))); + } + + *res = builder.Build(true); + break; + } + default: + ythrow yexception() << "Unsupported type: " << NPg::LookupType(TargetId).Name; + } + return arrow::Status::OK(); + } + + const ui32 TargetId; +}; + +std::shared_ptr<arrow::compute::ScalarKernel> MakeToPgKernel(TType* inputType, TType* resultType, ui32 targetId) { + const TVector<TType*> argTypes = { inputType }; + + std::shared_ptr<arrow::DataType> returnArrowType; + MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type"); + auto exec = std::make_shared<TToPgExec>(targetId); + auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType), + [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + return exec->Exec(ctx, batch, res); + }); + + switch (targetId) { + case BOOLOID: + case INT2OID: + case INT4OID: + case INT8OID: + case FLOAT4OID: + case FLOAT8OID: + break; + case TEXTOID: + case VARCHAROID: + case BYTEAOID: + case CSTRINGOID: + kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE; + break; + default: + ythrow yexception() << "Unsupported type: " << NPg::LookupType(targetId).Name; + } + + return kernel; +} + TComputationNodeFactory GetPgFactory() { return [] (TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { TStringBuf name = callable.GetType()->GetName(); @@ -1617,6 +1898,15 @@ TComputationNodeFactory GetPgFactory() { } } + if (name == "BlockFromPg") { + auto arg = LocateNode(ctx.NodeLocator, callable, 0); + auto inputType = callable.GetInput(0).GetStaticType(); + auto returnType = callable.GetType()->GetReturnType(); + ui32 sourceId = AS_TYPE(TPgType, AS_TYPE(TBlockType, inputType)->GetItemType())->GetTypeId(); + auto kernel = MakeFromPgKernel(inputType, returnType, sourceId); + return new TBlockFuncNode(ctx.Mutables, { arg }, { inputType }, *kernel, kernel); + } + if (name == "ToPg") { auto arg = LocateNode(ctx.NodeLocator, callable, 0); auto returnType = callable.GetType()->GetReturnType(); @@ -1643,6 +1933,15 @@ TComputationNodeFactory GetPgFactory() { } } + if (name == "BlockToPg") { + auto arg = LocateNode(ctx.NodeLocator, callable, 0); + auto inputType = callable.GetInput(0).GetStaticType(); + auto returnType = callable.GetType()->GetReturnType(); + auto targetId = AS_TYPE(TPgType, AS_TYPE(TBlockType, returnType)->GetItemType())->GetTypeId(); + auto kernel = MakeToPgKernel(inputType, returnType, targetId); + return new TBlockFuncNode(ctx.Mutables, { arg }, { inputType }, *kernel, kernel); + } + if (name == "PgArray") { TComputationNodePtrVector argNodes; TVector<TType*> argTypes; diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp index 60dd3adeefe..30e750ae63c 100644 --- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp +++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp @@ -2463,6 +2463,18 @@ TMkqlCommonCallableCompiler::TShared::TShared() { return ctx.ProgramBuilder.ToPg(input, returnType); }); + AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) { + auto input = MkqlBuildExpr(*node.Child(0), ctx); + auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockFromPg(input, returnType); + }); + + AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) { + auto input = MkqlBuildExpr(*node.Child(0), ctx); + auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder); + return ctx.ProgramBuilder.BlockToPg(input, returnType); + }); + AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) { auto input = MkqlBuildExpr(*node.Child(0), ctx); return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content()); diff --git a/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt index e815fce8c52..314a5e2d14f 100644 --- a/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt @@ -24,6 +24,8 @@ target_link_options(ydb-library-yql-sql-pg-ut PRIVATE -Wl,-platform_version,macos,11.0,11.0 -fPIC -fPIC + -framework + CoreFoundation ) target_sources(ydb-library-yql-sql-pg-ut PRIVATE ${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/pg/pg_sql_ut.cpp |