aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-05-04 18:16:08 +0300
committervvvv <vvvv@ydb.tech>2023-05-04 18:16:08 +0300
commit4e5d5c7da7468ccd5fbd7f7a7af5146939839635 (patch)
tree2221bc02de455943c643a12947d2e82f168d39b5
parent882f8bae704c932198c275721a1ad8c10c5d5ce7 (diff)
downloadydb-4e5d5c7da7468ccd5fbd7f7a7af5146939839635.tar.gz
Block implementation of ToPg/FromPg
-rw-r--r--ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.cpp55
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_blocks.h2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_core.cpp2
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.cpp103
-rw-r--r--ydb/library/yql/core/type_ann/type_ann_pg.h2
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.cpp20
-rw-r--r--ydb/library/yql/minikql/mkql_program_builder.h2
-rw-r--r--ydb/library/yql/minikql/mkql_runtime_version.h2
-rw-r--r--ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt2
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/comp_factory.cpp299
-rw-r--r--ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp12
-rw-r--r--ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt2
17 files changed, 464 insertions, 45 deletions
diff --git a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
index d4d0b837bb0..450e6c2f575 100644
--- a/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
+++ b/ydb/library/yql/core/peephole_opt/yql_opt_peephole_physical.cpp
@@ -4952,7 +4952,7 @@ bool CollectBlockRewrites(const TMultiExprType* multiInputType, bool keepInputCo
TExprNode::TListType funcArgs;
std::string_view arrowFunctionName;
- if (node->IsList() || node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If", "Just", "Nth"}))
+ if (node->IsList() || node->IsCallable({"And", "Or", "Xor", "Not", "Coalesce", "If", "Just", "Nth", "ToPg", "FromPg"}))
{
for (auto& child : node->ChildrenList()) {
if (!child->GetTypeAnn()->IsComputable()) {
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
index 21a11d2b9f7..73d51e281be 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.cpp
@@ -1,6 +1,7 @@
#include "type_ann_blocks.h"
#include "type_ann_list.h"
#include "type_ann_wide.h"
+#include "type_ann_pg.h"
#include <ydb/library/yql/core/expr_nodes/yql_expr_nodes.h>
#include <ydb/library/yql/core/yql_expr_type_annotation.h>
@@ -355,6 +356,60 @@ IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNo
return IGraphTransformer::TStatus::Ok;
}
+IGraphTransformer::TStatus BlockToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto child = input->Child(0);
+ if (!EnsureBlockOrScalarType(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ bool isScalar;
+ const TTypeAnnotationNode* blockItemType = GetBlockItemType(*child->GetTypeAnn(), isScalar);
+ auto resultType = ToPgImpl(input->Pos(), blockItemType, ctx.Expr);
+ if (!resultType) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (isScalar) {
+ resultType = ctx.Expr.MakeType<TScalarExprType>(resultType);
+ } else {
+ resultType = ctx.Expr.MakeType<TBlockExprType>(resultType);
+ }
+
+ input->SetTypeAnn(resultType);
+ return IGraphTransformer::TStatus::Ok;
+}
+
+IGraphTransformer::TStatus BlockFromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ Y_UNUSED(output);
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ auto child = input->Child(0);
+ if (!EnsureBlockOrScalarType(*child, ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+ bool isScalar;
+ const TTypeAnnotationNode* blockItemType = GetBlockItemType(*child->GetTypeAnn(), isScalar);
+ auto resultType = FromPgImpl(input->Pos(), blockItemType, ctx.Expr);
+ if (!resultType) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (isScalar) {
+ resultType = ctx.Expr.MakeType<TScalarExprType>(resultType);
+ } else {
+ resultType = ctx.Expr.MakeType<TBlockExprType>(resultType);
+ }
+
+ input->SetTypeAnn(resultType);
+ return IGraphTransformer::TStatus::Ok;
+}
+
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
Y_UNUSED(output);
if (!EnsureMinArgsCount(*input, 2U, ctx.Expr)) {
diff --git a/ydb/library/yql/core/type_ann/type_ann_blocks.h b/ydb/library/yql/core/type_ann/type_ann_blocks.h
index 35f0c0d026e..1096c6b22fe 100644
--- a/ydb/library/yql/core/type_ann/type_ann_blocks.h
+++ b/ydb/library/yql/core/type_ann/type_ann_blocks.h
@@ -17,6 +17,8 @@ namespace NTypeAnnImpl {
IGraphTransformer::TStatus BlockJustWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockAsTupleWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockNthWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus BlockToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
+ IGraphTransformer::TStatus BlockFromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus BlockFuncWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockBitCastWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
IGraphTransformer::TStatus BlockCombineAllWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
diff --git a/ydb/library/yql/core/type_ann/type_ann_core.cpp b/ydb/library/yql/core/type_ann/type_ann_core.cpp
index fc95f85015c..e7831cdbe15 100644
--- a/ydb/library/yql/core/type_ann/type_ann_core.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_core.cpp
@@ -11937,6 +11937,8 @@ template <NKikimr::NUdf::EDataSlot DataSlot>
Functions["BlockJust"] = &BlockJustWrapper;
Functions["BlockAsTuple"] = &BlockAsTupleWrapper;
Functions["BlockNth"] = &BlockNthWrapper;
+ Functions["BlockToPg"] = &BlockToPgWrapper;
+ Functions["BlockFromPg"] = &BlockFromPgWrapper;
ExtFunctions["BlockFunc"] = &BlockFuncWrapper;
ExtFunctions["BlockBitCast"] = &BlockBitCastWrapper;
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.cpp b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
index 1facbd26829..89392486334 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.cpp
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.cpp
@@ -152,55 +152,35 @@ IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode
}
}
-IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
- if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (!EnsureComputable(input->Head(), ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
- }
-
- if (IsNull(input->Head())) {
- output = input->TailPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
- output = input->HeadPtr();
- return IGraphTransformer::TStatus::Repeat;
- }
-
- auto name = input->Head().GetTypeAnn()->Cast<TPgExprType>()->GetName();
+const TTypeAnnotationNode* FromPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx) {
+ auto name = type->Cast<TPgExprType>()->GetName();
const TDataExprType* dataType;
if (name == "bool") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Bool);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Bool);
} else if (name == "int2") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int16);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int16);
} else if (name == "int4") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int32);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int32);
} else if (name == "int8") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Int64);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Int64);
} else if (name == "float4") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Float);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Float);
} else if (name == "float8") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Double);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Double);
} else if (name == "text" || name == "varchar" || name == "cstring") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::Utf8);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::Utf8);
} else if (name == "bytea") {
- dataType = ctx.Expr.MakeType<TDataExprType>(EDataSlot::String);
+ dataType = ctx.MakeType<TDataExprType>(EDataSlot::String);
} else {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
TStringBuilder() << "Unsupported type: " << name));
- return IGraphTransformer::TStatus::Error;
+ return nullptr;
}
- auto result = ctx.Expr.MakeType<TOptionalExprType>(dataType);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
+ return ctx.MakeType<TOptionalExprType>(dataType);
}
-IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+IGraphTransformer::TStatus FromPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
@@ -214,15 +194,25 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::
return IGraphTransformer::TStatus::Repeat;
}
- if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
+ if (input->Head().GetTypeAnn()->GetKind() != ETypeAnnotationKind::Pg) {
output = input->HeadPtr();
return IGraphTransformer::TStatus::Repeat;
}
+ auto resultType = FromPgImpl(input->Pos(), input->Head().GetTypeAnn(), ctx.Expr);
+ if (!resultType) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(resultType);
+ return IGraphTransformer::TStatus::Ok;
+}
+
+const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx) {
bool isOptional;
const TDataExprType* dataType;
- if (!EnsureDataOrOptionalOfData(input->Head(), isOptional, dataType, ctx.Expr)) {
- return IGraphTransformer::TStatus::Error;
+ if (!EnsureDataOrOptionalOfData(pos, type, isOptional, dataType, ctx)) {
+ return nullptr;
}
TString pgType;
@@ -252,19 +242,46 @@ IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::
pgType = "text";
break;
default:
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()),
+ ctx.AddError(TIssue(ctx.GetPosition(pos),
TStringBuilder() << "Unsupported type: " << dataType->GetName()));
- return IGraphTransformer::TStatus::Error;
+ return nullptr;
}
try {
- auto result = ctx.Expr.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId);
- input->SetTypeAnn(result);
- return IGraphTransformer::TStatus::Ok;
+ auto result = ctx.MakeType<TPgExprType>(NPg::LookupType(pgType).TypeId);
+ return result;
} catch (const yexception& e) {
- ctx.Expr.AddError(TIssue(ctx.Expr.GetPosition(input->Pos()), e.what()));
+ ctx.AddError(TIssue(ctx.GetPosition(pos), e.what()));
+ return nullptr;
+ }
+}
+
+IGraphTransformer::TStatus ToPgWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx) {
+ if (!EnsureArgsCount(*input, 1, ctx.Expr)) {
return IGraphTransformer::TStatus::Error;
}
+
+ if (!EnsureComputable(input->Head(), ctx.Expr)) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ if (IsNull(input->Head())) {
+ output = input->TailPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ if (input->Head().GetTypeAnn()->GetKind() == ETypeAnnotationKind::Pg) {
+ output = input->HeadPtr();
+ return IGraphTransformer::TStatus::Repeat;
+ }
+
+ auto resultType = ToPgImpl(input->Pos(), input->Head().GetTypeAnn(), ctx.Expr);
+ if (!resultType) {
+ return IGraphTransformer::TStatus::Error;
+ }
+
+ input->SetTypeAnn(resultType);
+ return IGraphTransformer::TStatus::Ok;
}
IGraphTransformer::TStatus PgOpWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx) {
diff --git a/ydb/library/yql/core/type_ann/type_ann_pg.h b/ydb/library/yql/core/type_ann/type_ann_pg.h
index cc7c6903169..a699db1cf5d 100644
--- a/ydb/library/yql/core/type_ann/type_ann_pg.h
+++ b/ydb/library/yql/core/type_ann/type_ann_pg.h
@@ -15,6 +15,8 @@ TStringBuf RemoveAlias(TStringBuf column, TStringBuf& alias);
const TItemExprType* RemoveAlias(const TItemExprType* item, TExprContext& ctx);
TMap<TString, ui32> ExtractExternalColumns(const TExprNode& select);
bool IsPlainMemberOverArg(const TExprNode& expr, TStringBuf& memberName);
+const TTypeAnnotationNode* ToPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx);
+const TTypeAnnotationNode* FromPgImpl(TPositionHandle pos, const TTypeAnnotationNode* type, TExprContext& ctx);
IGraphTransformer::TStatus PgStarWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TContext& ctx);
IGraphTransformer::TStatus PgCallWrapper(const TExprNode::TPtr& input, TExprNode::TPtr& output, TExtContext& ctx);
diff --git a/ydb/library/yql/minikql/mkql_program_builder.cpp b/ydb/library/yql/minikql/mkql_program_builder.cpp
index 4d48dec16b4..a76f51a9b0b 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.cpp
+++ b/ydb/library/yql/minikql/mkql_program_builder.cpp
@@ -1597,6 +1597,26 @@ TRuntimeNode TProgramBuilder::BlockAsTuple(const TArrayRef<const TRuntimeNode>&
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::BlockToPg(TRuntimeNode input, TType* returnType) {
+ if constexpr (RuntimeVersion < 37U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(input);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
+TRuntimeNode TProgramBuilder::BlockFromPg(TRuntimeNode input, TType* returnType) {
+ if constexpr (RuntimeVersion < 37U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ TCallableBuilder callableBuilder(Env, __func__, returnType);
+ callableBuilder.Add(input);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::BlockNot(TRuntimeNode data) {
auto dataType = AS_TYPE(TBlockType, data.GetStaticType());
diff --git a/ydb/library/yql/minikql/mkql_program_builder.h b/ydb/library/yql/minikql/mkql_program_builder.h
index 815af9df4f9..c0d84372cdd 100644
--- a/ydb/library/yql/minikql/mkql_program_builder.h
+++ b/ydb/library/yql/minikql/mkql_program_builder.h
@@ -261,6 +261,8 @@ public:
TRuntimeNode BlockCoalesce(TRuntimeNode first, TRuntimeNode second);
TRuntimeNode BlockNth(TRuntimeNode tuple, ui32 index);
TRuntimeNode BlockAsTuple(const TArrayRef<const TRuntimeNode>& args);
+ TRuntimeNode BlockToPg(TRuntimeNode input, TType* returnType);
+ TRuntimeNode BlockFromPg(TRuntimeNode input, TType* returnType);
//-- logical functions
TRuntimeNode BlockNot(TRuntimeNode data);
diff --git a/ydb/library/yql/minikql/mkql_runtime_version.h b/ydb/library/yql/minikql/mkql_runtime_version.h
index a61585f2204..257553f36e1 100644
--- a/ydb/library/yql/minikql/mkql_runtime_version.h
+++ b/ydb/library/yql/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 36U
+#define MKQL_RUNTIME_VERSION 37U
#endif
// History:
diff --git a/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt
index 7cb1d6dbb1e..0f0c5e58494 100644
--- a/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/minikql/ut/CMakeLists.darwin-x86_64.txt
@@ -30,6 +30,8 @@ target_link_options(ydb-library-yql-minikql-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-fPIC
-fPIC
+ -framework
+ CoreFoundation
)
target_sources(ydb-library-yql-minikql-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/minikql/compact_hash_ut.cpp
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
index fba1dca5f7e..595730ecc2f 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
@@ -117,6 +117,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-yql-core
yql-minikql-arrow
yql-minikql-computation
+ yql-minikql-comp_nodes
yql-parser-pg_catalog
providers-common-codec
yql-public-issue
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
index 23798855ab7..b71412a898b 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
@@ -116,6 +116,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-yql-core
yql-minikql-arrow
yql-minikql-computation
+ yql-minikql-comp_nodes
yql-parser-pg_catalog
providers-common-codec
yql-public-issue
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
index af9c454bfb7..d4cc64deba1 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
@@ -118,6 +118,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-yql-core
yql-minikql-arrow
yql-minikql-computation
+ yql-minikql-comp_nodes
yql-parser-pg_catalog
providers-common-codec
yql-public-issue
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
index 6b27385bbda..f6977015d93 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
@@ -133,6 +133,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-yql-core
yql-minikql-arrow
yql-minikql-computation
+ yql-minikql-comp_nodes
yql-parser-pg_catalog
providers-common-codec
yql-public-issue
diff --git a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
index 5f75abf925b..88cf0bda39e 100644
--- a/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/comp_factory.cpp
@@ -1,4 +1,5 @@
#include <ydb/library/yql/parser/pg_wrapper/interface/interface.h>
+#include <ydb/library/yql/minikql/comp_nodes/mkql_block_impl.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_impl.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_holders.h>
#include <ydb/library/yql/minikql/computation/mkql_computation_node_pack_impl.h>
@@ -10,6 +11,8 @@
#include <ydb/library/yql/minikql/mkql_node_builder.h>
#include <ydb/library/yql/minikql/mkql_string_util.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/public/udf/arrow/block_reader.h>
+#include <ydb/library/yql/public/udf/arrow/block_builder.cpp>
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <ydb/library/yql/providers/common/codec/yql_codec_buf.h>
#include <ydb/library/yql/providers/common/codec/yql_codec_results.h>
@@ -1523,6 +1526,284 @@ private:
bool MultiDims = false;
};
+struct TFromPgExec {
+ TFromPgExec(ui32 sourceId)
+ : SourceId(sourceId)
+ {}
+
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ arrow::Datum inputDatum = batch.values[0];
+ Y_ENSURE(inputDatum.is_array());
+ const auto& array= *inputDatum.array();
+ size_t length = array.length;
+ switch (SourceId) {
+ case BOOLOID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui8>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetBool(inputPtr[i]) ? 1 : 0;
+ }
+ break;
+ }
+ case INT2OID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<i16>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetInt16(inputPtr[i]);
+ }
+ break;
+ }
+ case INT4OID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<i32>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetInt32(inputPtr[i]);
+ }
+ break;
+ }
+ case INT8OID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<i64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetInt64(inputPtr[i]);
+ }
+ break;
+ }
+ case FLOAT4OID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<float>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetFloat4(inputPtr[i]);
+ }
+ break;
+ }
+ case FLOAT8OID: {
+ auto inputPtr = array.GetValues<ui64>(1);
+ auto outputPtr = res->array()->GetMutableValues<double>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = DatumGetFloat8(inputPtr[i]);
+ }
+ break;
+ }
+ case TEXTOID:
+ case VARCHAROID:
+ case BYTEAOID:
+ case CSTRINGOID: {
+ NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ for (size_t i = 0; i < length; ++i) {
+ auto item = reader.GetItem(array, i);
+ if (!item) {
+ builder.Add(NUdf::TBlockItem());
+ continue;
+ }
+
+ ui32 len;
+ const char* ptr = item.AsStringRef().Data();
+ if (SourceId == CSTRINGOID) {
+ len = strlen(item.AsStringRef().Data());
+ } else {
+ len = GetCleanVarSize((const text*)item.AsStringRef().Data());
+ Y_ENSURE(len + VARHDRSZ == item.AsStringRef().Size());
+ ptr += VARHDRSZ;
+ }
+
+ builder.Add(NUdf::TBlockItem(NUdf::TStringRef(ptr, len)));
+ }
+
+ *res = builder.Build(true);
+ break;
+ }
+ default:
+ ythrow yexception() << "Unsupported type: " << NPg::LookupType(SourceId).Name;
+ }
+ return arrow::Status::OK();
+ }
+
+ const ui32 SourceId;
+};
+
+std::shared_ptr<arrow::compute::ScalarKernel> MakeFromPgKernel(TType* inputType, TType* resultType, ui32 sourceId) {
+ const TVector<TType*> argTypes = { inputType };
+
+ std::shared_ptr<arrow::DataType> returnArrowType;
+ MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
+ auto exec = std::make_shared<TFromPgExec>(sourceId);
+ auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
+ [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ return exec->Exec(ctx, batch, res);
+ });
+
+ switch (sourceId) {
+ case BOOLOID:
+ case INT2OID:
+ case INT4OID:
+ case INT8OID:
+ case FLOAT4OID:
+ case FLOAT8OID:
+ break;
+ case TEXTOID:
+ case VARCHAROID:
+ case BYTEAOID:
+ case CSTRINGOID:
+ kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
+ break;
+ default:
+ ythrow yexception() << "Unsupported type: " << NPg::LookupType(sourceId).Name;
+ }
+
+ return kernel;
+}
+
+struct TToPgExec {
+ TToPgExec(ui32 targetId)
+ : TargetId(targetId)
+ {}
+
+ arrow::Status Exec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) const {
+ arrow::Datum inputDatum = batch.values[0];
+ Y_ENSURE(inputDatum.is_array());
+ const auto& array= *inputDatum.array();
+ size_t length = array.length;
+ switch (TargetId) {
+ case BOOLOID: {
+ auto inputPtr = array.GetValues<ui8>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = BoolGetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case INT2OID: {
+ auto inputPtr = array.GetValues<i16>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = Int16GetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case INT4OID: {
+ auto inputPtr = array.GetValues<i32>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = Int32GetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case INT8OID: {
+ auto inputPtr = array.GetValues<i64>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = Int64GetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case FLOAT4OID: {
+ auto inputPtr = array.GetValues<float>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = Float4GetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case FLOAT8OID: {
+ auto inputPtr = array.GetValues<double>(1);
+ auto outputPtr = res->array()->GetMutableValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ outputPtr[i] = Float8GetDatum(inputPtr[i]);
+ }
+ break;
+ }
+ case TEXTOID:
+ case VARCHAROID:
+ case BYTEAOID:
+ case CSTRINGOID: {
+ NUdf::TStringBlockReader<arrow::BinaryType, true> reader;
+ NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
+ std::vector<char> tmp;
+ for (size_t i = 0; i < length; ++i) {
+ auto item = reader.GetItem(array, i);
+ if (!item) {
+ builder.Add(NUdf::TBlockItem());
+ continue;
+ }
+
+ ui32 len;
+ if (TargetId == CSTRINGOID) {
+ len = 1 + item.AsStringRef().Size();
+ if (Y_UNLIKELY(len < item.AsStringRef().Size())) {
+ ythrow yexception() << "Too long string";
+ }
+
+ if (tmp.capacity() < len) {
+ tmp.reserve(Max<ui64>(len, tmp.capacity() * 2));
+ }
+
+ tmp.resize(len);
+ memcpy(tmp.data(), item.AsStringRef().Data(), len - 1);
+ tmp[len - 1] = 0;
+ } else {
+ len = VARHDRSZ + item.AsStringRef().Size();
+ if (Y_UNLIKELY(len < item.AsStringRef().Size())) {
+ ythrow yexception() << "Too long string";
+ }
+
+ if (tmp.capacity() < len) {
+ tmp.reserve(Max<ui64>(len, tmp.capacity() * 2));
+ }
+
+ tmp.resize(len);
+ memcpy(tmp.data() + VARHDRSZ, item.AsStringRef().Data(), len - VARHDRSZ);
+ UpdateCleanVarSize((text*)tmp.data(), item.AsStringRef().Size());
+ }
+
+ builder.Add(NUdf::TBlockItem(NUdf::TStringRef(tmp.data(), len)));
+ }
+
+ *res = builder.Build(true);
+ break;
+ }
+ default:
+ ythrow yexception() << "Unsupported type: " << NPg::LookupType(TargetId).Name;
+ }
+ return arrow::Status::OK();
+ }
+
+ const ui32 TargetId;
+};
+
+std::shared_ptr<arrow::compute::ScalarKernel> MakeToPgKernel(TType* inputType, TType* resultType, ui32 targetId) {
+ const TVector<TType*> argTypes = { inputType };
+
+ std::shared_ptr<arrow::DataType> returnArrowType;
+ MKQL_ENSURE(ConvertArrowType(AS_TYPE(TBlockType, resultType)->GetItemType(), returnArrowType), "Unsupported arrow type");
+ auto exec = std::make_shared<TToPgExec>(targetId);
+ auto kernel = std::make_shared<arrow::compute::ScalarKernel>(ConvertToInputTypes(argTypes), ConvertToOutputType(resultType),
+ [exec](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ return exec->Exec(ctx, batch, res);
+ });
+
+ switch (targetId) {
+ case BOOLOID:
+ case INT2OID:
+ case INT4OID:
+ case INT8OID:
+ case FLOAT4OID:
+ case FLOAT8OID:
+ break;
+ case TEXTOID:
+ case VARCHAROID:
+ case BYTEAOID:
+ case CSTRINGOID:
+ kernel->null_handling = arrow::compute::NullHandling::COMPUTED_NO_PREALLOCATE;
+ break;
+ default:
+ ythrow yexception() << "Unsupported type: " << NPg::LookupType(targetId).Name;
+ }
+
+ return kernel;
+}
+
TComputationNodeFactory GetPgFactory() {
return [] (TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* {
TStringBuf name = callable.GetType()->GetName();
@@ -1617,6 +1898,15 @@ TComputationNodeFactory GetPgFactory() {
}
}
+ if (name == "BlockFromPg") {
+ auto arg = LocateNode(ctx.NodeLocator, callable, 0);
+ auto inputType = callable.GetInput(0).GetStaticType();
+ auto returnType = callable.GetType()->GetReturnType();
+ ui32 sourceId = AS_TYPE(TPgType, AS_TYPE(TBlockType, inputType)->GetItemType())->GetTypeId();
+ auto kernel = MakeFromPgKernel(inputType, returnType, sourceId);
+ return new TBlockFuncNode(ctx.Mutables, { arg }, { inputType }, *kernel, kernel);
+ }
+
if (name == "ToPg") {
auto arg = LocateNode(ctx.NodeLocator, callable, 0);
auto returnType = callable.GetType()->GetReturnType();
@@ -1643,6 +1933,15 @@ TComputationNodeFactory GetPgFactory() {
}
}
+ if (name == "BlockToPg") {
+ auto arg = LocateNode(ctx.NodeLocator, callable, 0);
+ auto inputType = callable.GetInput(0).GetStaticType();
+ auto returnType = callable.GetType()->GetReturnType();
+ auto targetId = AS_TYPE(TPgType, AS_TYPE(TBlockType, returnType)->GetItemType())->GetTypeId();
+ auto kernel = MakeToPgKernel(inputType, returnType, targetId);
+ return new TBlockFuncNode(ctx.Mutables, { arg }, { inputType }, *kernel, kernel);
+ }
+
if (name == "PgArray") {
TComputationNodePtrVector argNodes;
TVector<TType*> argTypes;
diff --git a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
index 60dd3adeefe..30e750ae63c 100644
--- a/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
+++ b/ydb/library/yql/providers/common/mkql/yql_provider_mkql.cpp
@@ -2463,6 +2463,18 @@ TMkqlCommonCallableCompiler::TShared::TShared() {
return ctx.ProgramBuilder.ToPg(input, returnType);
});
+ AddCallable("BlockFromPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ auto input = MkqlBuildExpr(*node.Child(0), ctx);
+ auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+ return ctx.ProgramBuilder.BlockFromPg(input, returnType);
+ });
+
+ AddCallable("BlockToPg", [](const TExprNode& node, TMkqlBuildContext& ctx) {
+ auto input = MkqlBuildExpr(*node.Child(0), ctx);
+ auto returnType = BuildType(node, *node.GetTypeAnn(), ctx.ProgramBuilder);
+ return ctx.ProgramBuilder.BlockToPg(input, returnType);
+ });
+
AddCallable("WithContext", [](const TExprNode& node, TMkqlBuildContext& ctx) {
auto input = MkqlBuildExpr(*node.Child(0), ctx);
return ctx.ProgramBuilder.WithContext(input, node.Child(1)->Content());
diff --git a/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt
index e815fce8c52..314a5e2d14f 100644
--- a/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/sql/pg/ut/CMakeLists.darwin-x86_64.txt
@@ -24,6 +24,8 @@ target_link_options(ydb-library-yql-sql-pg-ut PRIVATE
-Wl,-platform_version,macos,11.0,11.0
-fPIC
-fPIC
+ -framework
+ CoreFoundation
)
target_sources(ydb-library-yql-sql-pg-ut PRIVATE
${CMAKE_SOURCE_DIR}/ydb/library/yql/sql/pg/pg_sql_ut.cpp