diff options
| author | vvvv <[email protected]> | 2025-10-09 12:25:18 +0300 |
|---|---|---|
| committer | vvvv <[email protected]> | 2025-10-09 12:57:17 +0300 |
| commit | cb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch) | |
| tree | 7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_block_impl.cpp | |
| parent | d58a8990d353b051c27e1069141117fdfde64358 (diff) | |
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_block_impl.cpp')
| -rw-r--r-- | yql/essentials/minikql/computation/mkql_block_impl.cpp | 242 |
1 files changed, 128 insertions, 114 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp index cd042d46c3d..2daef9dbf57 100644 --- a/yql/essentials/minikql/computation/mkql_block_impl.cpp +++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp @@ -36,7 +36,7 @@ namespace NKikimr::NMiniKQL { namespace { // TODO(YQL): This must be rewrited via traits dispatcher. -template<typename T> +template <typename T> arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& pool) { type = SkipTaggedType(type); std::shared_ptr<arrow::DataType> arrowType; @@ -61,7 +61,10 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo auto structType = AS_TYPE(TStructType, type); std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; for (ui32 i = 0; i < structType->GetMembersCount(); ++i) { - arrowValue.emplace_back(DoConvertScalar(SkipTaggedType(structType->GetMemberType(i)), value.GetElement(i), pool).scalar()); + arrowValue.emplace_back( + DoConvertScalar( + SkipTaggedType(structType->GetMemberType(i)), value.GetElement(i), pool) + .scalar()); } return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType)); @@ -71,7 +74,10 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo auto tupleType = AS_TYPE(TTupleType, type); std::vector<std::shared_ptr<arrow::Scalar>> arrowValue; for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) { - arrowValue.emplace_back(DoConvertScalar(SkipTaggedType(tupleType->GetElementType(i)), value.GetElement(i), pool).scalar()); + arrowValue.emplace_back( + DoConvertScalar( + SkipTaggedType(tupleType->GetElementType(i)), value.GetElement(i), pool) + .scalar()); } return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType)); @@ -80,108 +86,102 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo if (type->IsData()) { auto slot = *AS_TYPE(TDataType, type)->GetDataSlot(); switch (slot) { - case NUdf::EDataSlot::Int8: - return arrow::Datum(static_cast<int8_t>(value.template Get<i8>())); - case NUdf::EDataSlot::Bool: - case NUdf::EDataSlot::Uint8: - return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>())); - case NUdf::EDataSlot::Int16: - return arrow::Datum(static_cast<int16_t>(value.template Get<i16>())); - case NUdf::EDataSlot::Uint16: - case NUdf::EDataSlot::Date: - return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>())); - case NUdf::EDataSlot::Int32: - case NUdf::EDataSlot::Date32: - return arrow::Datum(static_cast<int32_t>(value.template Get<i32>())); - case NUdf::EDataSlot::Uint32: - case NUdf::EDataSlot::Datetime: - return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>())); - case NUdf::EDataSlot::Int64: - case NUdf::EDataSlot::Interval: - case NUdf::EDataSlot::Interval64: - case NUdf::EDataSlot::Datetime64: - case NUdf::EDataSlot::Timestamp64: - return arrow::Datum(static_cast<int64_t>(value.template Get<i64>())); - case NUdf::EDataSlot::Uint64: - case NUdf::EDataSlot::Timestamp: - return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>())); - case NUdf::EDataSlot::Float: - return arrow::Datum(static_cast<float>(value.template Get<float>())); - case NUdf::EDataSlot::Double: - return arrow::Datum(static_cast<double>(value.template Get<double>())); - case NUdf::EDataSlot::String: - case NUdf::EDataSlot::Utf8: - case NUdf::EDataSlot::Yson: - case NUdf::EDataSlot::Json: - case NUdf::EDataSlot::JsonDocument: { - const auto& str = value.AsStringRef(); - std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool))); - std::memcpy(buffer->mutable_data(), str.Data(), str.Size()); - std::shared_ptr<arrow::Scalar> scalar; - if (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) { - scalar = std::make_shared<arrow::BinaryScalar>(buffer, arrow::binary()); - } else { - // NOTE: Do not use |arrow::BinaryScalar| for utf8 and json types directly. - // This is necessary so that the type of the scalar is clearly preserved at runtime. - scalar = std::make_shared<arrow::StringScalar>(buffer); + case NUdf::EDataSlot::Int8: + return arrow::Datum(static_cast<int8_t>(value.template Get<i8>())); + case NUdf::EDataSlot::Bool: + case NUdf::EDataSlot::Uint8: + return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>())); + case NUdf::EDataSlot::Int16: + return arrow::Datum(static_cast<int16_t>(value.template Get<i16>())); + case NUdf::EDataSlot::Uint16: + case NUdf::EDataSlot::Date: + return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>())); + case NUdf::EDataSlot::Int32: + case NUdf::EDataSlot::Date32: + return arrow::Datum(static_cast<int32_t>(value.template Get<i32>())); + case NUdf::EDataSlot::Uint32: + case NUdf::EDataSlot::Datetime: + return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>())); + case NUdf::EDataSlot::Int64: + case NUdf::EDataSlot::Interval: + case NUdf::EDataSlot::Interval64: + case NUdf::EDataSlot::Datetime64: + case NUdf::EDataSlot::Timestamp64: + return arrow::Datum(static_cast<int64_t>(value.template Get<i64>())); + case NUdf::EDataSlot::Uint64: + case NUdf::EDataSlot::Timestamp: + return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>())); + case NUdf::EDataSlot::Float: + return arrow::Datum(static_cast<float>(value.template Get<float>())); + case NUdf::EDataSlot::Double: + return arrow::Datum(static_cast<double>(value.template Get<double>())); + case NUdf::EDataSlot::String: + case NUdf::EDataSlot::Utf8: + case NUdf::EDataSlot::Yson: + case NUdf::EDataSlot::Json: + case NUdf::EDataSlot::JsonDocument: { + const auto& str = value.AsStringRef(); + std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool))); + std::memcpy(buffer->mutable_data(), str.Data(), str.Size()); + std::shared_ptr<arrow::Scalar> scalar; + if (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) { + scalar = std::make_shared<arrow::BinaryScalar>(buffer, arrow::binary()); + } else { + // NOTE: Do not use |arrow::BinaryScalar| for utf8 and json types directly. + // This is necessary so that the type of the scalar is clearly preserved at runtime. + scalar = std::make_shared<arrow::StringScalar>(buffer); + } + return arrow::Datum(scalar); } - return arrow::Datum(scalar); - } - case NUdf::EDataSlot::TzDate: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + case NUdf::EDataSlot::TzDate: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>())); - } - case NUdf::EDataSlot::TzDatetime: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>())); + } + case NUdf::EDataSlot::TzDatetime: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>())); - } - case NUdf::EDataSlot::TzTimestamp: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>())); + } + case NUdf::EDataSlot::TzTimestamp: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>())); - } - case NUdf::EDataSlot::TzDate32: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>())); + } + case NUdf::EDataSlot::TzDate32: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>())); - } - case NUdf::EDataSlot::TzDatetime64: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>())); + } + case NUdf::EDataSlot::TzDatetime64: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>())); - } - case NUdf::EDataSlot::TzTimestamp64: { - auto items = arrow::StructScalar::ValueType{ - std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), - std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId()) - }; + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>())); + } + case NUdf::EDataSlot::TzTimestamp64: { + auto items = arrow::StructScalar::ValueType{ + std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()), + std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())}; - return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>())); - } - case NUdf::EDataSlot::Decimal: { - std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool))); - *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128(); - return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer)); - } - default: - MKQL_ENSURE(false, "Unsupported data slot " << slot); + return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>())); + } + case NUdf::EDataSlot::Decimal: { + std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool))); + *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128(); + return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer)); + } + default: + MKQL_ENSURE(false, "Unsupported data slot " << slot); } } @@ -254,8 +254,12 @@ NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const return holderFactory.CreateArrowBlock(arrow::Datum(count)); } -TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes, - const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel, +TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, + NYql::NUdf::EValidateDatumMode validateDatumMode, + TStringBuf name, TComputationNodePtrVector&& argsNodes, + const TVector<TType*>& argsTypes, + TType* outputType, + const arrow::compute::ScalarKernel& kernel, std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder, const arrow::compute::FunctionOptions* functionOptions) : TMutableComputationNode(mutables) @@ -284,7 +288,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con if (ScalarOutput_) { auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); - ARROW_OK(executor->Init(&state.KernelContext, { &Kernel_, ArgsValuesDescr_, Options_ })); + ARROW_OK(executor->Init(&state.KernelContext, {&Kernel_, ArgsValuesDescr_, Options_})); auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>(); ARROW_OK(executor->Execute(argDatums, listener.get())); @@ -298,7 +302,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con while (dechunker.Next(chunk)) { auto executor = arrow::compute::detail::KernelExecutor::MakeScalar(); - ARROW_OK(executor->Init(&state.KernelContext, { &Kernel_, ArgsValuesDescr_, Options_ })); + ARROW_OK(executor->Init(&state.KernelContext, {&Kernel_, ArgsValuesDescr_, Options_})); arrow::compute::detail::DatumAccumulator listener; ARROW_OK(executor->Execute(chunk, &listener)); @@ -311,7 +315,6 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con return ctx.HolderFactory.CreateArrowBlock(std::move(resultArray)); } - void TBlockFuncNode::RegisterDependencies() const { for (const auto& arg : ArgsNodes_) { DependsOn(arg); @@ -333,7 +336,8 @@ std::unique_ptr<IArrowKernelComputationNode> TBlockFuncNode::PrepareArrowKernelC TBlockFuncNode::TArrowNode::TArrowNode(const TBlockFuncNode* parent) : Parent_(parent) -{} +{ +} TStringBuf TBlockFuncNode::TArrowNode::GetKernelName() const { return Parent_->Name_; @@ -353,10 +357,14 @@ const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) cons } TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex) - : TBase(memInfo), Values(width), Deques(width), Arrays(width) + : TBase(memInfo) + , Values(width) + , Deques(width) + , Arrays(width) , BlockLengthIndex(blockLengthIndex == LAST_COLUMN_MARKER ? width - 1 : blockLengthIndex) { - MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER || (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex"); + MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER || + (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex"); Pointer = Values.data(); } @@ -369,8 +377,9 @@ void TBlockState::FillArrays() { auto& counterDatum = TArrowBlock::From(Values[BlockLengthIndex]).GetDatum(); MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)"); Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value; - if (!Count) + if (!Count) { return; + } for (size_t i = 0U; i < Deques.size(); ++i) { if (i == BlockLengthIndex) { @@ -395,8 +404,9 @@ ui64 TBlockState::Slice() { auto sliceSize = Count; for (size_t i = 0; i < Deques.size(); ++i) { const auto& arr = Deques[i]; - if (arr.empty()) + if (arr.empty()) { continue; + } Y_ABORT_UNLESS(ui64(arr.front()->length) <= Count); MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i); @@ -405,13 +415,15 @@ ui64 TBlockState::Slice() { for (size_t i = 0; i < Arrays.size(); ++i) { auto& arr = Deques[i]; - if (arr.empty()) + if (arr.empty()) { continue; + } if (auto& array = arr.front(); ui64(array->length) == sliceSize) { Arrays[i] = std::move(array); Deques[i].pop_front(); - } else + } else { Arrays[i] = Chop(array, sliceSize); + } } Count -= sliceSize; @@ -419,13 +431,15 @@ ui64 TBlockState::Slice() { } NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const { - if (idx == BlockLengthIndex) + if (idx == BlockLengthIndex) { return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize))); + } - if (auto array = Arrays[idx]) + if (auto array = Arrays[idx]) { return holderFactory.CreateArrowBlock(std::move(array)); - else + } else { return Values[idx]; + } } -} +} // namespace NKikimr::NMiniKQL |
