diff options
author | vvvv <vvvv@ydb.tech> | 2023-04-27 19:32:08 +0300 |
---|---|---|
committer | vvvv <vvvv@ydb.tech> | 2023-04-27 19:32:08 +0300 |
commit | bf8606832efaa409859a241235f10c6100a20bb9 (patch) | |
tree | 081f473627391531d0ee1a81c26ef641044068c9 | |
parent | a66c59109292f9e0fb44ede41adfdebe569e4df3 (diff) | |
download | ydb-bf8606832efaa409859a241235f10c6100a20bb9.tar.gz |
Autovectorization on fixed arguments and results in PG kernel
старый вариант
%%
<----- TPgCodegen
[exec] TPgCodegen::PgFunc...
begin...
done, elapsed: 2.628702s
[good] TPgCodegen::PgFunc
[exec] TPgCodegen::PgFuncBC...
begin...
done, elapsed: 1.265912s
[good] TPgCodegen::PgFuncBC
-----> TPgCodegen -> ok: 2
[DONE] ok: 2
%%
новый
%%
<----- TPgCodegen
[exec] TPgCodegen::PgFuncIdeal...
begin...
done, elapsed: 0.206223s
with const arg
begin...
done, elapsed: 0.143428s
[good] TPgCodegen::PgFuncIdeal
[exec] TPgCodegen::PgFuncCpp...
begin...
done, elapsed: 0.417580s
with const arg
begin...
done, elapsed: 0.435337s
[good] TPgCodegen::PgFuncCpp
[exec] TPgCodegen::PgFuncDefArg...
begin...
done, elapsed: 0.893608s
with const arg
begin...
done, elapsed: 0.940435s
[good] TPgCodegen::PgFuncDefArg
[exec] TPgCodegen::PgFuncBC...
begin...
done, elapsed: 0.192867s
with const arg
begin...
done, elapsed: 0.158422s
[good] TPgCodegen::PgFuncBC
-----> TPgCodegen -> ok: 4
[DONE] ok: 4
%%
7 files changed, 264 insertions, 73 deletions
diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt index 3d35afa7464..72c6e0c44c7 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.darwin-x86_64.txt @@ -114,6 +114,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-cpp-resource library-cpp-yson library-yql-core + yql-minikql-arrow yql-minikql-computation yql-parser-pg_catalog providers-common-codec diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt index ee8484bb873..b16ecbe2e8c 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-aarch64.txt @@ -113,6 +113,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-cpp-resource library-cpp-yson library-yql-core + yql-minikql-arrow yql-minikql-computation yql-parser-pg_catalog providers-common-codec diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt index f733093a4aa..58e4727d26c 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.linux-x86_64.txt @@ -115,6 +115,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-cpp-resource library-cpp-yson library-yql-core + yql-minikql-arrow yql-minikql-computation yql-parser-pg_catalog providers-common-codec diff --git a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt index e91e91ac701..0e1d5a27f8b 100644 --- a/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt +++ b/ydb/library/yql/parser/pg_wrapper/CMakeLists.windows-x86_64.txt @@ -130,6 +130,7 @@ target_link_libraries(yql-parser-pg_wrapper PUBLIC library-cpp-resource library-cpp-yson library-yql-core + yql-minikql-arrow yql-minikql-computation yql-parser-pg_catalog providers-common-codec diff --git a/ydb/library/yql/parser/pg_wrapper/arrow.h b/ydb/library/yql/parser/pg_wrapper/arrow.h index 9940752b475..02db8daca57 100644 --- a/ydb/library/yql/parser/pg_wrapper/arrow.h +++ b/ydb/library/yql/parser/pg_wrapper/arrow.h @@ -4,6 +4,7 @@ #include <ydb/library/yql/public/udf/arrow/block_builder.cpp> #include <arrow/compute/kernel.h> #include <ydb/library/yql/minikql/mkql_type_builder.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> extern "C" { #include "postgres.h" @@ -97,24 +98,95 @@ constexpr bool constexpr_for_tuple(F&& f, Tuple&& tuple) { }); } -template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, typename TBuilder> -void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder, FunctionCallInfo fcinfo) { +enum class EScalarArgBinary { + Unknown, + First, + Second +}; + +template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, EScalarArgBinary ScalarArgBinary, typename TBuilder> +Y_NO_INLINE arrow::Datum GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder) { + LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); + fcinfo->flinfo = state.flinfo; + fcinfo->context = state.context; + fcinfo->resultinfo = state.resultinfo; + fcinfo->fncollation = state.fncollation; + fcinfo->nargs = batch.values.size(); + + std::array<NullableDatum, TArgsPolicy::IsFixedArg.size()> scalars; + std::array<bool, TArgsPolicy::IsFixedArg.size()> isScalar; + std::array<ui64, TArgsPolicy::IsFixedArg.size()> offsets; + std::array<const ui8*, TArgsPolicy::IsFixedArg.size()> validMasks; + std::array<ui64, TArgsPolicy::IsFixedArg.size()> validOffsetMask; + ui8 fakeValidByte = 0xFF; + std::array<const ui64*, TArgsPolicy::IsFixedArg.size()> fixedArrays; + std::array<const ui32*, TArgsPolicy::IsFixedArg.size()> stringOffsetsArrays; + std::array<const ui8*, TArgsPolicy::IsFixedArg.size()> stringDataArrays; + if constexpr (!TArgsPolicy::VarArgs) { + for (size_t j = 0; j < TArgsPolicy::IsFixedArg.size(); ++j) { + isScalar[j] = batch.values[j].is_scalar(); + if (isScalar[j]) { + const auto& scalar = *batch.values[j].scalar(); + if (!scalar.is_valid) { + scalars[j].isnull = true; + } else { + scalars[j].isnull = false; + if (TArgsPolicy::IsFixedArg[j]) { + scalars[j].value = (Datum)*static_cast<const ui64*>(arrow::internal::checked_cast<const arrow::internal::PrimitiveScalarBase&>(scalar).data()); + } else { + auto buffer = arrow::internal::checked_cast<const arrow::BaseBinaryScalar&>(scalar).value; + scalars[j].value = (Datum)buffer->data(); + } + } + } else { + const auto& array = *batch.values[j].array(); + offsets[j] = array.offset; + validMasks[j] = array.GetValues<ui8>(0, 0); + if (validMasks[j]) { + validOffsetMask[j] = ~0ull; + } else { + validOffsetMask[j] = 0ull; + validMasks[j] = &fakeValidByte; + } + if (TArgsPolicy::IsFixedArg[j]) { + fixedArrays[j] = array.GetValues<ui64>(1); + } else { + stringOffsetsArrays[j] = array.GetValues<ui32>(1); + stringDataArrays[j] = array.GetValues<ui8>(2); + } + } + } + } + + ui64* fixedResultData = nullptr; + ui8* fixedResultValidMask = nullptr; + if constexpr (IsFixedResult) { + builder.UnsafeReserve(length); + fixedResultData = builder.MutableData(); + fixedResultValidMask = builder.MutableValidMask(); + } + for (size_t i = 0; i < length; ++i) { Datum ret; if constexpr (!TArgsPolicy::VarArgs) { if (!constexpr_for_tuple([&](auto const& j, auto const& v) { NullableDatum d; - if (HasScalars && batch.values[j].is_scalar()) { - if (v) { - FillScalarItem<HasNulls, true>(*batch.values[j].scalar(), d); - } else { - FillScalarItem<HasNulls, false>(*batch.values[j].scalar(), d); - } + if (HasScalars && ( + (ScalarArgBinary == EScalarArgBinary::First && j == 0) || + (ScalarArgBinary == EScalarArgBinary::Second && j == 1) || + isScalar[j])) { + d = scalars[j]; } else { + d.isnull = false; + if constexpr (HasNulls) { + ui64 fullIndex = (i + offsets[j]) & validOffsetMask[j]; + d.isnull = ((validMasks[j][fullIndex >> 3] >> (fullIndex & 0x07)) & 1) == 0; + } + if (v) { - FillArrayItem<HasNulls, true>(*batch.values[j].array(), i, d); + d.value = (Datum)fixedArrays[j][i]; } else { - FillArrayItem<HasNulls, false>(*batch.values[j].array(), i, d); + d.value = (Datum)(stringOffsetsArrays[j][i] + stringDataArrays[j]); } } @@ -125,10 +197,15 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons fcinfo->args[j] = d; return true; }, TArgsPolicy::IsFixedArg)) { + if constexpr (IsFixedResult) { + fixedResultValidMask[i] = 0; + } else { + builder.Add(NUdf::TBlockItem{}); + } goto SkipCall; } } else { - for (size_t j = 0; j < batch.values.size(); ++i) { + for (size_t j = 0; j < batch.values.size(); ++j) { NullableDatum d; if (HasScalars && batch.values[j].is_scalar()) { if (state.IsFixedArg[j]) { @@ -145,7 +222,11 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons } if (HasNulls && IsStrict && d.isnull) { - builder.Add(NUdf::TBlockItem{}); + if constexpr (IsFixedResult) { + fixedResultValidMask[i] = 0; + } else { + builder.Add(NUdf::TBlockItem{}); + } goto SkipCall; } @@ -156,11 +237,8 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons fcinfo->isnull = false; ret = TFunc()(fcinfo); if constexpr (IsFixedResult) { - if (fcinfo->isnull) { - builder.Add(NUdf::TBlockItem{}); - } else { - builder.Add(NUdf::TBlockItem(ui64(ret))); - } + fixedResultData[i] = ui64(ret); + fixedResultValidMask[i] = !fcinfo->isnull; } else { if (fcinfo->isnull) { builder.Add(NUdf::TBlockItem{}); @@ -172,54 +250,64 @@ void GenericExecImpl(const arrow::compute::ExecBatch& batch, size_t length, cons } SkipCall:; } + + return builder.Build(true); +} + +template <typename TFunc, bool IsStrict, bool IsFixedResult, bool HasScalars, bool HasNulls, typename TArgsPolicy, typename TBuilder> +Y_NO_INLINE arrow::Datum GenericExecImpl3(const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, TBuilder& builder) { + if constexpr (!TArgsPolicy::VarArgs) { + if (TArgsPolicy::IsFixedArg.size() == 2) { + if (batch.values[0].is_scalar()) { + return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::First, TBuilder>(batch, length, state, builder); + } + + if (batch.values[1].is_scalar()) { + return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::Second, TBuilder>(batch, length, state, builder); + } + } + } + + return GenericExecImpl<TFunc, IsStrict, IsFixedResult, HasScalars, HasNulls, TArgsPolicy, EScalarArgBinary::Unknown, TBuilder>(batch, length, state, builder); } template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy> -void GenericExecImpl2(bool hasScalars, bool hasNulls, arrow::compute::KernelContext* ctx, - const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, - FunctionCallInfo fcinfo, arrow::Datum* res) { +Y_NO_INLINE void GenericExecImpl2(bool hasScalars, bool hasNulls, arrow::compute::KernelContext* ctx, + const arrow::compute::ExecBatch& batch, size_t length, const TPgKernelState& state, arrow::Datum* res) { if (hasScalars) { if (hasNulls) { if constexpr (IsFixedResult) { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder); } else { NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, true, TArgsPolicy>(batch, length, state, builder); } } else { if constexpr (IsFixedResult) { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder); } else { NUdf::TStringArrayBuilder<arrow::BinaryType, true, NUdf::EPgStringType::None> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, true, false, TArgsPolicy>(batch, length, state, builder); } } } else { if (hasNulls) { if constexpr (IsFixedResult) { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder); } else { NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, true, TArgsPolicy>(batch, length, state, builder); } } else { if constexpr (IsFixedResult) { NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder); } else { NUdf::TStringArrayBuilder<arrow::BinaryType, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *ctx->memory_pool(), length); - GenericExecImpl<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder, fcinfo); - *res = builder.Build(true); + *res = GenericExecImpl3<TFunc, IsStrict, IsFixedResult, false, false, TArgsPolicy>(batch, length, state, builder); } } } @@ -233,14 +321,8 @@ struct TDefaultArgsPolicy { extern "C" TPgKernelState& GetPGKernelState(arrow::compute::KernelContext* ctx); template <typename TFunc, bool IsStrict, bool IsFixedResult, typename TArgsPolicy = TDefaultArgsPolicy> -arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { - LOCAL_FCINFO(fcinfo, FUNC_MAX_ARGS); +Y_NO_INLINE arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { const auto& state = GetPGKernelState(ctx); - fcinfo->flinfo = state.flinfo; - fcinfo->context = state.context; - fcinfo->resultinfo = state.resultinfo; - fcinfo->fncollation = state.fncollation; - fcinfo->nargs = batch.values.size(); if constexpr (!TArgsPolicy::VarArgs) { Y_ENSURE(batch.values.size() == TArgsPolicy::IsFixedArg.size()); Y_ENSURE(state.IsFixedArg.size() == TArgsPolicy::IsFixedArg.size()); @@ -272,7 +354,7 @@ arrow::Status GenericExec(arrow::compute::KernelContext* ctx, const arrow::compu Y_ENSURE(hasArrays); Y_ENSURE(state.flinfo->fn_strict == IsStrict); Y_ENSURE(state.IsFixedResult == IsFixedResult); - GenericExecImpl2<TFunc, IsStrict, IsFixedResult, TArgsPolicy>(hasScalars, hasNulls, ctx, batch, length, state, fcinfo, res); + GenericExecImpl2<TFunc, IsStrict, IsFixedResult, TArgsPolicy>(hasScalars, hasNulls, ctx, batch, length, state, res); return arrow::Status::OK(); } diff --git a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp index a8fc9732575..aa7d1f0357e 100644 --- a/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp +++ b/ydb/library/yql/parser/pg_wrapper/ut/codegen_ut.cpp @@ -9,8 +9,14 @@ #include <llvm/IR/Module.h> #include <ydb/library/yql/parser/pg_wrapper/arrow.h> + +extern "C" { #include <ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/catalog/pg_collation_d.h> +#include <ydb/library/yql/parser/pg_wrapper/postgresql/src/backend/utils/fmgrprotos.h> +} + #include <ydb/library/yql/parser/pg_catalog/catalog.h> +#include <ydb/library/yql/minikql/arrow/arrow_util.h> #include <util/datetime/cputimer.h> @@ -21,11 +27,27 @@ extern "C" { extern TExecFunc arrow_date_eq(); } +enum class EKernelFlavor { + DefArg, + Cpp, + BitCode, + Ideal +}; + Y_UNIT_TEST_SUITE(TPgCodegen) { - void PgFuncImpl(bool useBC) { + void PgFuncImpl(EKernelFlavor flavor, bool constArg) { TExecFunc execFunc; ICodegen::TPtr codegen; - if (useBC) { + switch (flavor) { + case EKernelFlavor::DefArg: { + execFunc = &GenericExec<TPgDirectFunc<&date_eq>, true, true, TDefaultArgsPolicy>; + break; + } + case EKernelFlavor::Cpp: { + execFunc = arrow_date_eq(); + break; + } + case EKernelFlavor::BitCode: { codegen = ICodegen::Make(ETarget::Native); auto bitcode = NResource::Find("/llvm_bc/PgFuncs1"); codegen->LoadBitCode(bitcode, "Funcs"); @@ -38,8 +60,53 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { typedef TExecFunc (*TFunc)(); auto funcPtr = (TFunc)codegen->GetPointerToFunction(func); execFunc = funcPtr(); - } else { - execFunc = arrow_date_eq(); + break; + } + case EKernelFlavor::Ideal: { + execFunc = [](arrow::compute::KernelContext* ctx, const arrow::compute::ExecBatch& batch, arrow::Datum* res) { + size_t length = batch.values[0].length(); + //NUdf::TFixedSizeArrayBuilder<ui64, true> builder(NKikimr::NMiniKQL::TTypeInfoHelper(), arrow::uint64(), *arrow::default_memory_pool(), length); + NUdf::TTypedBufferBuilder<ui64> dataBuilder(arrow::default_memory_pool()); + NUdf::TTypedBufferBuilder<ui8> nullBuilder(arrow::default_memory_pool()); + dataBuilder.Reserve(length); + nullBuilder.Reserve(length); + auto out = dataBuilder.MutableData(); + auto outNulls = nullBuilder.MutableData(); + NUdf::TFixedSizeBlockReader<ui64, false> reader1; + NUdf::TFixedSizeBlockReader<ui64, false> reader2; + const auto& array1 = *batch.values[0].array(); + const auto ptr1 = array1.GetValues<ui64>(1); + if (batch.values[1].is_array()) { + const auto& array2 = *batch.values[1].array(); + const auto ptr2 = array2.GetValues<ui64>(1); + for (size_t i = 0; i < length; ++i) { + //auto x = reader1.GetItem(array1, i).As<ui64>(); + //auto y = reader2.GetItem(array2, i).As<ui64>(); + auto x = ptr1[i]; + auto y = ptr2[i]; + out[i] = x == y ? 1 : 0; + outNulls[i] = false; + } + } else { + ui64 yConst = reader2.GetScalarItem(*batch.values[1].scalar()).As<ui64>(); + for (size_t i = 0; i < length; ++i) { + auto x = ptr1[i]; + out[i] = x == yConst ? 1 : 0; + outNulls[i] = false; + } + } + + std::shared_ptr<arrow::Buffer> nulls; + nulls = nullBuilder.Finish(); + nulls = NUdf::MakeDenseBitmap(nulls->data(), length, arrow::default_memory_pool()); + std::shared_ptr<arrow::Buffer> data = dataBuilder.Finish(); + + *res = arrow::ArrayData::Make(arrow::uint64(), length ,{ data, nulls}); + return arrow::Status::OK(); + }; + + break; + } } Y_ENSURE(execFunc); @@ -67,7 +134,13 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { std::shared_ptr<arrow::ArrayData> out; ARROW_OK(builder.FinishInternal(&out)); - arrow::Datum arg1(out), arg2(out); + arrow::Datum arg1(out), arg2; + if (constArg) { + Cout << "with const arg\n"; + arg2 = NKikimr::NMiniKQL::MakeScalarDatum<ui64>(0); + } else { + arg2 = out; + } { Cout << "begin...\n"; @@ -84,13 +157,25 @@ Y_UNIT_TEST_SUITE(TPgCodegen) { } } - Y_UNIT_TEST(PgFunc) { - PgFuncImpl(false); + Y_UNIT_TEST(PgFuncIdeal) { + PgFuncImpl(EKernelFlavor::Ideal, false); + PgFuncImpl(EKernelFlavor::Ideal, true); + } + + Y_UNIT_TEST(PgFuncCpp) { + PgFuncImpl(EKernelFlavor::Cpp, false); + PgFuncImpl(EKernelFlavor::Cpp, true); } + Y_UNIT_TEST(PgFuncDefArg) { + PgFuncImpl(EKernelFlavor::DefArg, false); + PgFuncImpl(EKernelFlavor::DefArg, true); + } + #if defined(NDEBUG) && !defined(_asan_enabled_) Y_UNIT_TEST(PgFuncBC) { - PgFuncImpl(true); + PgFuncImpl(EKernelFlavor::BitCode, false); + PgFuncImpl(EKernelFlavor::BitCode, true); } #endif } diff --git a/ydb/library/yql/public/udf/arrow/block_builder.h b/ydb/library/yql/public/udf/arrow/block_builder.h index ca1be1df58c..3178569aa9f 100644 --- a/ydb/library/yql/public/udf/arrow/block_builder.h +++ b/ydb/library/yql/public/udf/arrow/block_builder.h @@ -179,6 +179,7 @@ public: if (arrayCount == 1) { Y_VERIFY(arrays->Data); DoAddMany(*arrays->Data, indexes, count); + CurrLen += count; } else { const IArrayBuilder::TArrayDataItem* currData = nullptr; TVector<ui64> currDataIndexes; @@ -191,6 +192,7 @@ public: if (data != currData) { DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size()); + CurrLen += currDataIndexes.size(); currDataIndexes.clear(); currData = data; } @@ -198,9 +200,9 @@ public: } if (!currDataIndexes.empty()) { DoAddMany(*currData->Data, currDataIndexes.data(), currDataIndexes.size()); + CurrLen += currDataIndexes.size(); } } - CurrLen += count; } arrow::Datum Build(bool finish) final { @@ -287,6 +289,11 @@ protected: return CurrLen; } + void SetCurrLen(size_t len) { + Y_VERIFY(len <= MaxLen); + CurrLen = len; + } + const std::shared_ptr<arrow::DataType> ArrowType; arrow::MemoryPool* const Pool; const size_t MaxLen; @@ -310,6 +317,18 @@ public: Reserve(); } + void UnsafeReserve(size_t length) { + SetCurrLen(length); + } + + T* MutableData() { + return DataPtr; + } + + ui8* MutableValidMask() { + return NullPtr; + } + void DoAdd(NUdf::TUnboxedValuePod value) final { if constexpr (Nullable) { if (!value) { @@ -322,14 +341,14 @@ public: void DoAdd(TBlockItem value) final { if constexpr (Nullable) { if (!value) { - NullBuilder->UnsafeAppend(0); - DataBuilder->UnsafeAppend(T{}); + NullPtr[GetCurrLen()] = 0; + DataPtr[GetCurrLen()] = T{}; return; } - NullBuilder->UnsafeAppend(1); + NullPtr[GetCurrLen()] = 1; } - DataBuilder->UnsafeAppend(value.As<T>()); + DataPtr[GetCurrLen()] = value.As<T>(); } void DoAdd(TInputBuffer& input) final { @@ -343,68 +362,65 @@ public: void DoAddDefault() final { if constexpr (Nullable) { - NullBuilder->UnsafeAppend(1); + NullPtr[GetCurrLen()] = 1; } - DataBuilder->UnsafeAppend(T{}); + DataPtr[GetCurrLen()] = T{}; } void DoAddMany(const arrow::ArrayData& array, const ui8* sparseBitmap, size_t popCount) final { Y_VERIFY(array.buffers.size() > 1); if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); if (array.buffers.front()) { - ui8* dstBitmap = NullBuilder->End(); + ui8* dstBitmap = NullPtr + GetCurrLen(); CompressAsSparseBitmap(array.GetValues<ui8>(0, 0), array.offset, sparseBitmap, dstBitmap, array.length); - NullBuilder->UnsafeAdvance(popCount); } else { - NullBuilder->UnsafeAppend(popCount, 1); + ui8* dstBitmap = NullPtr + GetCurrLen(); + std::fill_n(dstBitmap, popCount, 1); } } const T* src = array.GetValues<T>(1); - T* dst = DataBuilder->End(); + T* dst = DataPtr + GetCurrLen(); CompressArray(src, sparseBitmap, dst, array.length); - DataBuilder->UnsafeAdvance(popCount); } void DoAddMany(const arrow::ArrayData& array, ui64 beginIndex, size_t count) final { Y_VERIFY(array.buffers.size() > 1); if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); for (size_t i = beginIndex; i < beginIndex + count; ++i) { - NullBuilder->UnsafeAppend(!IsNull(array, i)); + NullPtr[GetCurrLen() + i - beginIndex] = !IsNull(array, i); } } const T* values = array.GetValues<T>(1); for (size_t i = beginIndex; i < beginIndex + count; ++i) { - DataBuilder->UnsafeAppend(T(values[i])); + DataPtr[GetCurrLen() + i - beginIndex] = T(values[i]); } } void DoAddMany(const arrow::ArrayData& array, const ui64* indexes, size_t count) final { Y_VERIFY(array.buffers.size() > 1); if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == DataBuilder->Length()); for (size_t i = 0; i < count; ++i) { - NullBuilder->UnsafeAppend(!IsNull(array, indexes[i])); + NullPtr[GetCurrLen() + i] = !IsNull(array, indexes[i]); } } const T* values = array.GetValues<T>(1); for (size_t i = 0; i < count; ++i) { - DataBuilder->UnsafeAppend(T(values[indexes[i]])); + DataPtr[GetCurrLen() + i] = T(values[indexes[i]]); } } TBlockArrayTree::Ptr DoBuildTree(bool finish) final { - const size_t len = DataBuilder->Length(); + const size_t len = GetCurrLen(); std::shared_ptr<arrow::Buffer> nulls; if constexpr (Nullable) { - Y_VERIFY(NullBuilder->Length() == len); + NullBuilder->UnsafeAdvance(len); nulls = NullBuilder->Finish(); nulls = MakeDenseBitmap(nulls->data(), len, Pool); } + DataBuilder->UnsafeAdvance(len); std::shared_ptr<arrow::Buffer> data = DataBuilder->Finish(); TBlockArrayTree::Ptr result = std::make_shared<TBlockArrayTree>(); @@ -422,14 +438,18 @@ private: void Reserve() { DataBuilder = std::make_unique<TTypedBufferBuilder<T>>(Pool); DataBuilder->Reserve(MaxLen + 1); + DataPtr = DataBuilder->MutableData(); if constexpr (Nullable) { NullBuilder = std::make_unique<TTypedBufferBuilder<ui8>>(Pool); NullBuilder->Reserve(MaxLen + 1); + NullPtr = NullBuilder->MutableData(); } } std::unique_ptr<TTypedBufferBuilder<ui8>> NullBuilder; std::unique_ptr<TTypedBufferBuilder<T>> DataBuilder; + ui8* NullPtr = nullptr; + T* DataPtr = nullptr; }; template<typename TStringType, bool Nullable, EPgStringType PgString = EPgStringType::None> |