aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorvvvv <vvvv@ydb.tech>2023-04-27 19:32:08 +0300
committervvvv <vvvv@ydb.tech>2023-04-27 19:32:08 +0300
commitbf8606832efaa409859a241235f10c6100a20bb9 (patch)
tree081f473627391531d0ee1a81c26ef641044068c9
parenta66c59109292f9e0fb44ede41adfdebe569e4df3 (diff)
downloadydb-bf8606832efaa409859a241235f10c6100a20bb9.tar.gz
Autovectorization on fixed arguments and results in PG kernel
старый вариант %% <----- TPgCodegen [exec] TPgCodegen::PgFunc... begin... done, elapsed: 2.628702s [good] TPgCodegen::PgFunc [exec] TPgCodegen::PgFuncBC... begin... done, elapsed: 1.265912s [good] TPgCodegen::PgFuncBC -----> TPgCodegen -> ok: 2 [DONE] ok: 2 %% новый %% <----- TPgCodegen [exec] TPgCodegen::PgFuncIdeal... begin... done, elapsed: 0.206223s with const arg begin... done, elapsed: 0.143428s [good] TPgCodegen::PgFuncIdeal [exec] TPgCodegen::PgFuncCpp... begin... done, elapsed: 0.417580s with const arg begin... done, elapsed: 0.435337s [good] TPgCodegen::PgFuncCpp [exec] TPgCodegen::PgFuncDefArg... begin... done, elapsed: 0.893608s with const arg begin... done, elapsed: 0.940435s [good] TPgCodegen::PgFuncDefArg [exec] TPgCodegen::PgFuncBC... begin... done, elapsed: 0.192867s with const arg begin... done, elapsed: 0.158422s [good] TPgCodegen::PgFuncBC -----> TPgCodegen -> ok: 4 [DONE] ok: 4 %%
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/library/yql/parser/pg_wrapper/arrow.h170
-rw-r--r--ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp101
-rw-r--r--ydb/library/yql/public/udf/arrow/block_builder.h62
7 files changed, 264 insertions, 73 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
index 3d35afa7464..72c6e0c44c7 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt
@@ -114,6 +114,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-cpp-resource
library-cpp-yson
library-yql-core
+ yql-minikql-arrow
yql-minikql-computation
yql-parser-pg_catalog
providers-common-codec
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
index ee8484bb873..b16ecbe2e8c 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt
@@ -113,6 +113,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-cpp-resource
library-cpp-yson
library-yql-core
+ yql-minikql-arrow
yql-minikql-computation
yql-parser-pg_catalog
providers-common-codec
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
index f733093a4aa..58e4727d26c 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt
@@ -115,6 +115,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-cpp-resource
library-cpp-yson
library-yql-core
+ yql-minikql-arrow
yql-minikql-computation
yql-parser-pg_catalog
providers-common-codec
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
index e91e91ac701..0e1d5a27f8b 100644
--- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
+++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt
@@ -130,6 +130,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC
library-cpp-resource
library-cpp-yson
library-yql-core
+ yql-minikql-arrow
yql-minikql-computation
yql-parser-pg_catalog
providers-common-codec
diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h
index 9940752b475..02db8daca57 100644
--- a/ydb/library/yql/parser/pg_wrapper/arrow.h
+++ b/ydb/library/yql/parser/pg_wrapper/arrow.h
@@ -4,6 +4,7 @@
#include <ydb/library/yql/public/udf/arrow/block_builder.cpp>
#include <arrow/compute/kernel.h>
#include <ydb/library/yql/minikql/mkql_type_builder.h>
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
extern "C" {
#include "postgres.h"
@@ -97,24 +98,95 @@ constexpr bool constexpr_for_tuple(F&& f, Tuple&& tuple) {
});
}
-template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, typename TBuilder>
-void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder, FunctionCallInfo fcinfo) {
+enum class EScalarArgBinary {
+ Unknown,
+ First,
+ Second
+};
+
+template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, EScalarArgBinary ScalarArgBinary, typename TBuilder>
+Y_NO_INLINE arrow::Datum GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder) {
+ LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+ fcinfo->flinfo = state.flinfo;
+ fcinfo->context = state.context;
+ fcinfo->resultinfo = state.resultinfo;
+ fcinfo->fncollation = state.fncollation;
+ fcinfo->nargs = batch.values.size();
+
+ std::array<NullableDatum, TArgsPolicy::IsFixedArg.size()> scalars;
+ std::array<bool, TArgsPolicy::IsFixedArg.size()> isScalar;
+ std::array<ui64, TArgsPolicy::IsFixedArg.size()> offsets;
+ std::array<const ui8*, TArgsPolicy::IsFixedArg.size()> validMasks;
+ std::array<ui64, TArgsPolicy::IsFixedArg.size()> validOffsetMask;
+ ui8 fakeValidByte = 0xFF;
+ std::array<const ui64*, TArgsPolicy::IsFixedArg.size()> fixedArrays;
+ std::array<const ui32*, TArgsPolicy::IsFixedArg.size()> stringOffsetsArrays;
+ std::array<const ui8*, TArgsPolicy::IsFixedArg.size()> stringDataArrays;
+ if constexpr (!TArgsPolicy::VarArgs) {
+ for (size_t j = 0; j < TArgsPolicy::IsFixedArg.size(); ++j) {
+ isScalar[j] = batch.values[j].is_scalar();
+ if (isScalar[j]) {
+ const auto& scalar = *batch.values[j].scalar();
+ if (!scalar.is_valid) {
+ scalars[j].isnull = true;
+ } else {
+ scalars[j].isnull = false;
+ if (TArgsPolicy::IsFixedArg[j]) {
+ scalars[j].value = (Datum)*static_cast<const ui64*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data());
+ } else {
+ auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value;
+ scalars[j].value = (Datum)buffer->data();
+ }
+ }
+ } else {
+ const auto& array = *batch.values[j].array();
+ offsets[j] = array.offset;
+ validMasks[j] = array.GetValues<ui8>(0, 0);
+ if (validMasks[j]) {
+ validOffsetMask[j] = ~0ull;
+ } else {
+ validOffsetMask[j] = 0ull;
+ validMasks[j] = &fakeValidByte;
+ }
+ if (TArgsPolicy::IsFixedArg[j]) {
+ fixedArrays[j] = array.GetValues<ui64>(1);
+ } else {
+ stringOffsetsArrays[j] = array.GetValues<ui32>(1);
+ stringDataArrays[j] = array.GetValues<ui8>(2);
+ }
+ }
+ }
+ }
+
+ ui64* fixedResultData = nullptr;
+ ui8* fixedResultValidMask = nullptr;
+ if constexpr (IsFixedResult) {
+ builder.UnsafeReserve(length);
+ fixedResultData = builder.MutableData();
+ fixedResultValidMask = builder.MutableValidMask();
+ }
+
for (size_t i = 0; i < length; ++i) {
Datum ret;
if constexpr (!TArgsPolicy::VarArgs) {
if (!constexpr_for_tuple([&](auto const& j, auto const& v) {
NullableDatum d;
- if (HasScalars && batch.values[j].is_scalar()) {
- if (v) {
- FillScalarItem<HasNulls, true>(*batch.values[j].scalar(), d);
- } else {
- FillScalarItem<HasNulls, false>(*batch.values[j].scalar(), d);
- }
+ if (HasScalars && (
+ (ScalarArgBinary == EScalarArgBinary::First && j == 0) ||
+ (ScalarArgBinary == EScalarArgBinary::Second && j == 1) ||
+ isScalar[j])) {
+ d = scalars[j];
} else {
+ d.isnull = false;
+ if constexpr (HasNulls) {
+ ui64 fullIndex = (i + offsets[j]) & validOffsetMask[j];
+ d.isnull = ((validMasks[j][fullIndex >> 3] >> (fullIndex & 0x07)) & 1) == 0;
+ }
+
if (v) {
- FillArrayItem<HasNulls, true>(*batch.values[j].array(), i, d);
+ d.value = (Datum)fixedArrays[j][i];
} else {
- FillArrayItem<HasNulls, false>(*batch.values[j].array(), i, d);
+ d.value = (Datum)(stringOffsetsArrays[j][i] + stringDataArrays[j]);
}
}
@@ -125,10 +197,15 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons
fcinfo->args[j] = d;
return true;
}, TArgsPolicy::IsFixedArg)) {
+ if constexpr (IsFixedResult) {
+ fixedResultValidMask[i] = 0;
+ } else {
+ builder.Add(NUdf::TBlockItem{});
+ }
goto SkipCall;
}
} else {
- for (size_t j = 0; j < batch.values.size(); ++i) {
+ for (size_t j = 0; j < batch.values.size(); ++j) {
NullableDatum d;
if (HasScalars && batch.values[j].is_scalar()) {
if (state.IsFixedArg[j]) {
@@ -145,7 +222,11 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons
}
if (HasNulls && IsStrict && d.isnull) {
- builder.Add(NUdf::TBlockItem{});
+ if constexpr (IsFixedResult) {
+ fixedResultValidMask[i] = 0;
+ } else {
+ builder.Add(NUdf::TBlockItem{});
+ }
goto SkipCall;
}
@@ -156,11 +237,8 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons
fcinfo->isnull = false;
ret = TFunc()(fcinfo);
if constexpr (IsFixedResult) {
- if (fcinfo->isnull) {
- builder.Add(NUdf::TBlockItem{});
- } else {
- builder.Add(NUdf::TBlockItem(ui64(ret)));
- }
+ fixedResultData[i] = ui64(ret);
+ fixedResultValidMask[i] = !fcinfo->isnull;
} else {
if (fcinfo->isnull) {
builder.Add(NUdf::TBlockItem{});
@@ -172,54 +250,64 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons
}
SkipCall:;
}
+
+ return builder.Build(true);
+}
+
+template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, typename TBuilder>
+Y_NO_INLINE arrow::Datum GenericExecImpl3(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder) {
+ if constexpr (!TArgsPolicy::VarArgs) {
+ if (TArgsPolicy::IsFixedArg.size() == 2) {
+ if (batch.values[0].is_scalar()) {
+ return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::First, TBuilder>(batch, length, state, builder);
+ }
+
+ if (batch.values[1].is_scalar()) {
+ return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::Second, TBuilder>(batch, length, state, builder);
+ }
+ }
+ }
+
+ return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::Unknown, TBuilder>(batch, length, state, builder);
}
template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy>
-void GenericExecImpl2(bool hasScalars, bool hasNulls, arrow::compute::KernelContext* ctx,
- const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state,
- FunctionCallInfo fcinfo, arrow::Datum* res) {
+Y_NO_INLINE void GenericExecImpl2(bool hasScalars, bool hasNulls, arrow::compute::KernelContext* ctx,
+ const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, arrow::Datum* res) {
if (hasScalars) {
if (hasNulls) {
if constexpr (IsFixedResult) {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder);
} else {
NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder);
}
} else {
if constexpr (IsFixedResult) {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder);
} else {
NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder);
}
}
} else {
if (hasNulls) {
if constexpr (IsFixedResult) {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder);
} else {
NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder);
}
} else {
if constexpr (IsFixedResult) {
NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder);
} else {
NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length);
- GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder, fcinfo);
- *res = builder.Build(true);
+ *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder);
}
}
}
@@ -233,14 +321,8 @@ struct TDefaultArgsPolicy {
extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx);
template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy = TDefaultArgsPolicy>
-arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
- LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS);
+Y_NO_INLINE arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
const auto& state = GetPGKernelState(ctx);
- fcinfo->flinfo = state.flinfo;
- fcinfo->context = state.context;
- fcinfo->resultinfo = state.resultinfo;
- fcinfo->fncollation = state.fncollation;
- fcinfo->nargs = batch.values.size();
if constexpr (!TArgsPolicy::VarArgs) {
Y_ENSURE(batch.values.size() == TArgsPolicy::IsFixedArg.size());
Y_ENSURE(state.IsFixedArg.size() == TArgsPolicy::IsFixedArg.size());
@@ -272,7 +354,7 @@ arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compu
Y_ENSURE(hasArrays);
Y_ENSURE(state.flinfo->fn_strict == IsStrict);
Y_ENSURE(state.IsFixedResult == IsFixedResult);
- GenericExecImpl2<TFunc, IsStrict, IsFixedResult, TArgsPolicy>(hasScalars, hasNulls, ctx, batch, length, state, fcinfo, res);
+ GenericExecImpl2<TFunc, IsStrict, IsFixedResult, TArgsPolicy>(hasScalars, hasNulls, ctx, batch, length, state, res);
return arrow::Status::OK();
}
diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
index a8fc9732575..aa7d1f0357e 100644
--- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
+++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp
@@ -9,8 +9,14 @@
#include <llvm/IR/Module.h>
#include <ydb/library/yql/parser/pg_wrapper/arrow.h>
+
+extern "C" {
#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/catalog/pg_collation_d.h>
+#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/fmgrprotos.h>
+}
+
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
+#include <ydb/library/yql/minikql/arrow/arrow_util.h>
#include <util/datetime/cputimer.h>
@@ -21,11 +27,27 @@ extern "C" {
extern TExecFunc arrow_date_eq();
}
+enum class EKernelFlavor {
+ DefArg,
+ Cpp,
+ BitCode,
+ Ideal
+};
+
Y_UNIT_TEST_SUITE(TPgCodegen) {
- void PgFuncImpl(bool useBC) {
+ void PgFuncImpl(EKernelFlavor flavor, bool constArg) {
TExecFunc execFunc;
ICodegen::TPtr codegen;
- if (useBC) {
+ switch (flavor) {
+ case EKernelFlavor::DefArg: {
+ execFunc = &GenericExec<TPgDirectFunc<&date_eq>, true, true, TDefaultArgsPolicy>;
+ break;
+ }
+ case EKernelFlavor::Cpp: {
+ execFunc = arrow_date_eq();
+ break;
+ }
+ case EKernelFlavor::BitCode: {
codegen = ICodegen::Make(ETarget::Native);
auto bitcode = NResource::Find("/llvm_bc/PgFuncs1");
codegen->LoadBitCode(bitcode, "Funcs");
@@ -38,8 +60,53 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
typedef TExecFunc (*TFunc)();
auto funcPtr = (TFunc)codegen->GetPointerToFunction(func);
execFunc = funcPtr();
- } else {
- execFunc = arrow_date_eq();
+ break;
+ }
+ case EKernelFlavor::Ideal: {
+ execFunc = [](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) {
+ size_t length = batch.values[0].length();
+ //NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length);
+ NUdf::TTypedBufferBuilder<ui64> dataBuilder(arrow::default_memory_pool());
+ NUdf::TTypedBufferBuilder<ui8> nullBuilder(arrow::default_memory_pool());
+ dataBuilder.Reserve(length);
+ nullBuilder.Reserve(length);
+ auto out = dataBuilder.MutableData();
+ auto outNulls = nullBuilder.MutableData();
+ NUdf::TFixedSizeBlockReader<ui64, false> reader1;
+ NUdf::TFixedSizeBlockReader<ui64, false> reader2;
+ const auto& array1 = *batch.values[0].array();
+ const auto ptr1 = array1.GetValues<ui64>(1);
+ if (batch.values[1].is_array()) {
+ const auto& array2 = *batch.values[1].array();
+ const auto ptr2 = array2.GetValues<ui64>(1);
+ for (size_t i = 0; i < length; ++i) {
+ //auto x = reader1.GetItem(array1, i).As<ui64>();
+ //auto y = reader2.GetItem(array2, i).As<ui64>();
+ auto x = ptr1[i];
+ auto y = ptr2[i];
+ out[i] = x == y ? 1 : 0;
+ outNulls[i] = false;
+ }
+ } else {
+ ui64 yConst = reader2.GetScalarItem(*batch.values[1].scalar()).As<ui64>();
+ for (size_t i = 0; i < length; ++i) {
+ auto x = ptr1[i];
+ out[i] = x == yConst ? 1 : 0;
+ outNulls[i] = false;
+ }
+ }
+
+ std::shared_ptr<arrow::Buffer> nulls;
+ nulls = nullBuilder.Finish();
+ nulls = NUdf::MakeDenseBitmap(nulls->data(), length, arrow::default_memory_pool());
+ std::shared_ptr<arrow::Buffer> data = dataBuilder.Finish();
+
+ *res = arrow::ArrayData::Make(arrow::uint64(), length ,{ data, nulls});
+ return arrow::Status::OK();
+ };
+
+ break;
+ }
}
Y_ENSURE(execFunc);
@@ -67,7 +134,13 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
std::shared_ptr<arrow::ArrayData> out;
ARROW_OK(builder.FinishInternal(&out));
- arrow::Datum arg1(out), arg2(out);
+ arrow::Datum arg1(out), arg2;
+ if (constArg) {
+ Cout << "with const arg\n";
+ arg2 = NKikimr::NMiniKQL::MakeScalarDatum<ui64>(0);
+ } else {
+ arg2 = out;
+ }
{
Cout << "begin...\n";
@@ -84,13 +157,25 @@ Y_UNIT_TEST_SUITE(TPgCodegen) {
}
}
- Y_UNIT_TEST(PgFunc) {
- PgFuncImpl(false);
+ Y_UNIT_TEST(PgFuncIdeal) {
+ PgFuncImpl(EKernelFlavor::Ideal, false);
+ PgFuncImpl(EKernelFlavor::Ideal, true);
+ }
+
+ Y_UNIT_TEST(PgFuncCpp) {
+ PgFuncImpl(EKernelFlavor::Cpp, false);
+ PgFuncImpl(EKernelFlavor::Cpp, true);
}
+ Y_UNIT_TEST(PgFuncDefArg) {
+ PgFuncImpl(EKernelFlavor::DefArg, false);
+ PgFuncImpl(EKernelFlavor::DefArg, true);
+ }
+
#if defined(NDEBUG) && !defined(_asan_enabled_)
Y_UNIT_TEST(PgFuncBC) {
- PgFuncImpl(true);
+ PgFuncImpl(EKernelFlavor::BitCode, false);
+ PgFuncImpl(EKernelFlavor::BitCode, true);
}
#endif
}
diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h
index ca1be1df58c..3178569aa9f 100644
--- a/ydb/library/yql/public/udf/arrow/block_builder.h
+++ b/ydb/library/yql/public/udf/arrow/block_builder.h
@@ -179,6 +179,7 @@ public:
if (arrayCount == 1) {
Y_VERIFY(arrays->Data);
DoAddMany(*arrays->Data, indexes, count);
+ CurrLen += count;
} else {
const IArrayBuilder::TArrayDataItem* currData = nullptr;
TVector<ui64> currDataIndexes;
@@ -191,6 +192,7 @@ public:
if (data != currData) {
DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
+ CurrLen += currDataIndexes.size();
currDataIndexes.clear();
currData = data;
}
@@ -198,9 +200,9 @@ public:
}
if (!currDataIndexes.empty()) {
DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size());
+ CurrLen += currDataIndexes.size();
}
}
- CurrLen += count;
}
arrow::Datum Build(bool finish) final {
@@ -287,6 +289,11 @@ protected:
return CurrLen;
}
+ void SetCurrLen(size_t len) {
+ Y_VERIFY(len <= MaxLen);
+ CurrLen = len;
+ }
+
const std::shared_ptr<arrow::DataType> ArrowType;
arrow::MemoryPool* const Pool;
const size_t MaxLen;
@@ -310,6 +317,18 @@ public:
Reserve();
}
+ void UnsafeReserve(size_t length) {
+ SetCurrLen(length);
+ }
+
+ T* MutableData() {
+ return DataPtr;
+ }
+
+ ui8* MutableValidMask() {
+ return NullPtr;
+ }
+
void DoAdd(NUdf::TUnboxedValuePod value) final {
if constexpr (Nullable) {
if (!value) {
@@ -322,14 +341,14 @@ public:
void DoAdd(TBlockItem value) final {
if constexpr (Nullable) {
if (!value) {
- NullBuilder->UnsafeAppend(0);
- DataBuilder->UnsafeAppend(T{});
+ NullPtr[GetCurrLen()] = 0;
+ DataPtr[GetCurrLen()] = T{};
return;
}
- NullBuilder->UnsafeAppend(1);
+ NullPtr[GetCurrLen()] = 1;
}
- DataBuilder->UnsafeAppend(value.As<T>());
+ DataPtr[GetCurrLen()] = value.As<T>();
}
void DoAdd(TInputBuffer& input) final {
@@ -343,68 +362,65 @@ public:
void DoAddDefault() final {
if constexpr (Nullable) {
- NullBuilder->UnsafeAppend(1);
+ NullPtr[GetCurrLen()] = 1;
}
- DataBuilder->UnsafeAppend(T{});
+ DataPtr[GetCurrLen()] = T{};
}
void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final {
Y_VERIFY(array.buffers.size() > 1);
if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
if (array.buffers.front()) {
- ui8* dstBitmap = NullBuilder->End();
+ ui8* dstBitmap = NullPtr + GetCurrLen();
CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length);
- NullBuilder->UnsafeAdvance(popCount);
} else {
- NullBuilder->UnsafeAppend(popCount, 1);
+ ui8* dstBitmap = NullPtr + GetCurrLen();
+ std::fill_n(dstBitmap, popCount, 1);
}
}
const T* src = array.GetValues<T>(1);
- T* dst = DataBuilder->End();
+ T* dst = DataPtr + GetCurrLen();
CompressArray(src, sparseBitmap, dst, array.length);
- DataBuilder->UnsafeAdvance(popCount);
}
void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final {
Y_VERIFY(array.buffers.size() > 1);
if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
for (size_t i = beginIndex; i < beginIndex + count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, i));
+ NullPtr[GetCurrLen() + i - beginIndex] = !IsNull(array, i);
}
}
const T* values = array.GetValues<T>(1);
for (size_t i = beginIndex; i < beginIndex + count; ++i) {
- DataBuilder->UnsafeAppend(T(values[i]));
+ DataPtr[GetCurrLen() + i - beginIndex] = T(values[i]);
}
}
void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final {
Y_VERIFY(array.buffers.size() > 1);
if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == DataBuilder->Length());
for (size_t i = 0; i < count; ++i) {
- NullBuilder->UnsafeAppend(!IsNull(array, indexes[i]));
+ NullPtr[GetCurrLen() + i] = !IsNull(array, indexes[i]);
}
}
const T* values = array.GetValues<T>(1);
for (size_t i = 0; i < count; ++i) {
- DataBuilder->UnsafeAppend(T(values[indexes[i]]));
+ DataPtr[GetCurrLen() + i] = T(values[indexes[i]]);
}
}
TBlockArrayTree::Ptr DoBuildTree(bool finish) final {
- const size_t len = DataBuilder->Length();
+ const size_t len = GetCurrLen();
std::shared_ptr<arrow::Buffer> nulls;
if constexpr (Nullable) {
- Y_VERIFY(NullBuilder->Length() == len);
+ NullBuilder->UnsafeAdvance(len);
nulls = NullBuilder->Finish();
nulls = MakeDenseBitmap(nulls->data(), len, Pool);
}
+ DataBuilder->UnsafeAdvance(len);
std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish();
TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>();
@@ -422,14 +438,18 @@ private:
void Reserve() {
DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool);
DataBuilder->Reserve(MaxLen + 1);
+ DataPtr = DataBuilder->MutableData();
if constexpr (Nullable) {
NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool);
NullBuilder->Reserve(MaxLen + 1);
+ NullPtr = NullBuilder->MutableData();
}
}
std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder;
std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder;
+ ui8* NullPtr = nullptr;
+ T* DataPtr = nullptr;
};
template<typename TStringType, bool Nullable, EPgStringType PgString = EPgStringType::None>