summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/computation/mkql_block_impl.cpp
diff options
context:
space:
mode:
authorvvvv <[email protected]>2025-10-09 12:25:18 +0300
committervvvv <[email protected]>2025-10-09 12:57:17 +0300
commitcb77d014972b2cdb27d2e6d979fc3a2772b27ad4 (patch)
tree7f3bcd8ce71c6bd0f3ccc11e31b9f665475b819e /yql/essentials/minikql/computation/mkql_block_impl.cpp
parentd58a8990d353b051c27e1069141117fdfde64358 (diff)
YQL-20086 minikql
commit_hash:e96f7390db5fcbe7e9f64f898141a263ad522daa
Diffstat (limited to 'yql/essentials/minikql/computation/mkql_block_impl.cpp')
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.cpp242
1 files changed, 128 insertions, 114 deletions
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp
index cd042d46c3d..2daef9dbf57 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp
@@ -36,7 +36,7 @@ namespace NKikimr::NMiniKQL {
namespace {
// TODO(YQL): This must be rewrited via traits dispatcher.
-template<typename T>
+template <typename T>
arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& pool) {
type = SkipTaggedType(type);
std::shared_ptr<arrow::DataType> arrowType;
@@ -61,7 +61,10 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
auto structType = AS_TYPE(TStructType, type);
std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
for (ui32 i = 0; i < structType->GetMembersCount(); ++i) {
- arrowValue.emplace_back(DoConvertScalar(SkipTaggedType(structType->GetMemberType(i)), value.GetElement(i), pool).scalar());
+ arrowValue.emplace_back(
+ DoConvertScalar(
+ SkipTaggedType(structType->GetMemberType(i)), value.GetElement(i), pool)
+ .scalar());
}
return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
@@ -71,7 +74,10 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
auto tupleType = AS_TYPE(TTupleType, type);
std::vector<std::shared_ptr<arrow::Scalar>> arrowValue;
for (ui32 i = 0; i < tupleType->GetElementsCount(); ++i) {
- arrowValue.emplace_back(DoConvertScalar(SkipTaggedType(tupleType->GetElementType(i)), value.GetElement(i), pool).scalar());
+ arrowValue.emplace_back(
+ DoConvertScalar(
+ SkipTaggedType(tupleType->GetElementType(i)), value.GetElement(i), pool)
+ .scalar());
}
return arrow::Datum(std::make_shared<arrow::StructScalar>(arrowValue, arrowType));
@@ -80,108 +86,102 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
if (type->IsData()) {
auto slot = *AS_TYPE(TDataType, type)->GetDataSlot();
switch (slot) {
- case NUdf::EDataSlot::Int8:
- return arrow::Datum(static_cast<int8_t>(value.template Get<i8>()));
- case NUdf::EDataSlot::Bool:
- case NUdf::EDataSlot::Uint8:
- return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>()));
- case NUdf::EDataSlot::Int16:
- return arrow::Datum(static_cast<int16_t>(value.template Get<i16>()));
- case NUdf::EDataSlot::Uint16:
- case NUdf::EDataSlot::Date:
- return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>()));
- case NUdf::EDataSlot::Int32:
- case NUdf::EDataSlot::Date32:
- return arrow::Datum(static_cast<int32_t>(value.template Get<i32>()));
- case NUdf::EDataSlot::Uint32:
- case NUdf::EDataSlot::Datetime:
- return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>()));
- case NUdf::EDataSlot::Int64:
- case NUdf::EDataSlot::Interval:
- case NUdf::EDataSlot::Interval64:
- case NUdf::EDataSlot::Datetime64:
- case NUdf::EDataSlot::Timestamp64:
- return arrow::Datum(static_cast<int64_t>(value.template Get<i64>()));
- case NUdf::EDataSlot::Uint64:
- case NUdf::EDataSlot::Timestamp:
- return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>()));
- case NUdf::EDataSlot::Float:
- return arrow::Datum(static_cast<float>(value.template Get<float>()));
- case NUdf::EDataSlot::Double:
- return arrow::Datum(static_cast<double>(value.template Get<double>()));
- case NUdf::EDataSlot::String:
- case NUdf::EDataSlot::Utf8:
- case NUdf::EDataSlot::Yson:
- case NUdf::EDataSlot::Json:
- case NUdf::EDataSlot::JsonDocument: {
- const auto& str = value.AsStringRef();
- std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool)));
- std::memcpy(buffer->mutable_data(), str.Data(), str.Size());
- std::shared_ptr<arrow::Scalar> scalar;
- if (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) {
- scalar = std::make_shared<arrow::BinaryScalar>(buffer, arrow::binary());
- } else {
- // NOTE: Do not use |arrow::BinaryScalar| for utf8 and json types directly.
- // This is necessary so that the type of the scalar is clearly preserved at runtime.
- scalar = std::make_shared<arrow::StringScalar>(buffer);
+ case NUdf::EDataSlot::Int8:
+ return arrow::Datum(static_cast<int8_t>(value.template Get<i8>()));
+ case NUdf::EDataSlot::Bool:
+ case NUdf::EDataSlot::Uint8:
+ return arrow::Datum(static_cast<uint8_t>(value.template Get<ui8>()));
+ case NUdf::EDataSlot::Int16:
+ return arrow::Datum(static_cast<int16_t>(value.template Get<i16>()));
+ case NUdf::EDataSlot::Uint16:
+ case NUdf::EDataSlot::Date:
+ return arrow::Datum(static_cast<uint16_t>(value.template Get<ui16>()));
+ case NUdf::EDataSlot::Int32:
+ case NUdf::EDataSlot::Date32:
+ return arrow::Datum(static_cast<int32_t>(value.template Get<i32>()));
+ case NUdf::EDataSlot::Uint32:
+ case NUdf::EDataSlot::Datetime:
+ return arrow::Datum(static_cast<uint32_t>(value.template Get<ui32>()));
+ case NUdf::EDataSlot::Int64:
+ case NUdf::EDataSlot::Interval:
+ case NUdf::EDataSlot::Interval64:
+ case NUdf::EDataSlot::Datetime64:
+ case NUdf::EDataSlot::Timestamp64:
+ return arrow::Datum(static_cast<int64_t>(value.template Get<i64>()));
+ case NUdf::EDataSlot::Uint64:
+ case NUdf::EDataSlot::Timestamp:
+ return arrow::Datum(static_cast<uint64_t>(value.template Get<ui64>()));
+ case NUdf::EDataSlot::Float:
+ return arrow::Datum(static_cast<float>(value.template Get<float>()));
+ case NUdf::EDataSlot::Double:
+ return arrow::Datum(static_cast<double>(value.template Get<double>()));
+ case NUdf::EDataSlot::String:
+ case NUdf::EDataSlot::Utf8:
+ case NUdf::EDataSlot::Yson:
+ case NUdf::EDataSlot::Json:
+ case NUdf::EDataSlot::JsonDocument: {
+ const auto& str = value.AsStringRef();
+ std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(str.Size(), &pool)));
+ std::memcpy(buffer->mutable_data(), str.Data(), str.Size());
+ std::shared_ptr<arrow::Scalar> scalar;
+ if (slot == NUdf::EDataSlot::String || slot == NUdf::EDataSlot::Yson || slot == NUdf::EDataSlot::JsonDocument) {
+ scalar = std::make_shared<arrow::BinaryScalar>(buffer, arrow::binary());
+ } else {
+ // NOTE: Do not use |arrow::BinaryScalar| for utf8 and json types directly.
+ // This is necessary so that the type of the scalar is clearly preserved at runtime.
+ scalar = std::make_shared<arrow::StringScalar>(buffer);
+ }
+ return arrow::Datum(scalar);
}
- return arrow::Datum(scalar);
- }
- case NUdf::EDataSlot::TzDate: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ case NUdf::EDataSlot::TzDate: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>()));
- }
- case NUdf::EDataSlot::TzDatetime: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>()));
+ }
+ case NUdf::EDataSlot::TzDatetime: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>()));
- }
- case NUdf::EDataSlot::TzTimestamp: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>()));
+ }
+ case NUdf::EDataSlot::TzTimestamp: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>()));
- }
- case NUdf::EDataSlot::TzDate32: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>()));
+ }
+ case NUdf::EDataSlot::TzDate32: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>()));
- }
- case NUdf::EDataSlot::TzDatetime64: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>()));
+ }
+ case NUdf::EDataSlot::TzDatetime64: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>()));
- }
- case NUdf::EDataSlot::TzTimestamp64: {
- auto items = arrow::StructScalar::ValueType{
- std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
- std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
- };
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>()));
+ }
+ case NUdf::EDataSlot::TzTimestamp64: {
+ auto items = arrow::StructScalar::ValueType{
+ std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
+ std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())};
- return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>()));
- }
- case NUdf::EDataSlot::Decimal: {
- std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool)));
- *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128();
- return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer));
- }
- default:
- MKQL_ENSURE(false, "Unsupported data slot " << slot);
+ return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>()));
+ }
+ case NUdf::EDataSlot::Decimal: {
+ std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool)));
+ *reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128();
+ return arrow::Datum(std::make_shared<TPrimitiveDataType<NYql::NDecimal::TInt128>::TScalarResult>(buffer));
+ }
+ default:
+ MKQL_ENSURE(false, "Unsupported data slot " << slot);
}
}
@@ -254,8 +254,12 @@ NUdf::TUnboxedValuePod MakeBlockCount(const THolderFactory& holderFactory, const
return holderFactory.CreateArrowBlock(arrow::Datum(count));
}
-TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables, NYql::NUdf::EValidateDatumMode validateDatumMode, TStringBuf name, TComputationNodePtrVector&& argsNodes,
- const TVector<TType*>& argsTypes, TType* outputType, const arrow::compute::ScalarKernel& kernel,
+TBlockFuncNode::TBlockFuncNode(TComputationMutables& mutables,
+ NYql::NUdf::EValidateDatumMode validateDatumMode,
+ TStringBuf name, TComputationNodePtrVector&& argsNodes,
+ const TVector<TType*>& argsTypes,
+ TType* outputType,
+ const arrow::compute::ScalarKernel& kernel,
std::shared_ptr<arrow::compute::ScalarKernel> kernelHolder,
const arrow::compute::FunctionOptions* functionOptions)
: TMutableComputationNode(mutables)
@@ -284,7 +288,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
if (ScalarOutput_) {
auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
- ARROW_OK(executor->Init(&state.KernelContext, { &Kernel_, ArgsValuesDescr_, Options_ }));
+ ARROW_OK(executor->Init(&state.KernelContext, {&Kernel_, ArgsValuesDescr_, Options_}));
auto listener = std::make_shared<arrow::compute::detail::DatumAccumulator>();
ARROW_OK(executor->Execute(argDatums, listener.get()));
@@ -298,7 +302,7 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
while (dechunker.Next(chunk)) {
auto executor = arrow::compute::detail::KernelExecutor::MakeScalar();
- ARROW_OK(executor->Init(&state.KernelContext, { &Kernel_, ArgsValuesDescr_, Options_ }));
+ ARROW_OK(executor->Init(&state.KernelContext, {&Kernel_, ArgsValuesDescr_, Options_}));
arrow::compute::detail::DatumAccumulator listener;
ARROW_OK(executor->Execute(chunk, &listener));
@@ -311,7 +315,6 @@ NUdf::TUnboxedValuePod TBlockFuncNode::DoCalculate(TComputationContext& ctx) con
return ctx.HolderFactory.CreateArrowBlock(std::move(resultArray));
}
-
void TBlockFuncNode::RegisterDependencies() const {
for (const auto& arg : ArgsNodes_) {
DependsOn(arg);
@@ -333,7 +336,8 @@ std::unique_ptr<IArrowKernelComputationNode> TBlockFuncNode::PrepareArrowKernelC
TBlockFuncNode::TArrowNode::TArrowNode(const TBlockFuncNode* parent)
: Parent_(parent)
-{}
+{
+}
TStringBuf TBlockFuncNode::TArrowNode::GetKernelName() const {
return Parent_->Name_;
@@ -353,10 +357,14 @@ const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) cons
}
TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex)
- : TBase(memInfo), Values(width), Deques(width), Arrays(width)
+ : TBase(memInfo)
+ , Values(width)
+ , Deques(width)
+ , Arrays(width)
, BlockLengthIndex(blockLengthIndex == LAST_COLUMN_MARKER ? width - 1 : blockLengthIndex)
{
- MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER || (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex");
+ MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER ||
+ (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex");
Pointer = Values.data();
}
@@ -369,8 +377,9 @@ void TBlockState::FillArrays() {
auto& counterDatum = TArrowBlock::From(Values[BlockLengthIndex]).GetDatum();
MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;
- if (!Count)
+ if (!Count) {
return;
+ }
for (size_t i = 0U; i < Deques.size(); ++i) {
if (i == BlockLengthIndex) {
@@ -395,8 +404,9 @@ ui64 TBlockState::Slice() {
auto sliceSize = Count;
for (size_t i = 0; i < Deques.size(); ++i) {
const auto& arr = Deques[i];
- if (arr.empty())
+ if (arr.empty()) {
continue;
+ }
Y_ABORT_UNLESS(ui64(arr.front()->length) <= Count);
MKQL_ENSURE(ui64(arr.front()->length) <= Count, "Unexpected array length at column #" << i);
@@ -405,13 +415,15 @@ ui64 TBlockState::Slice() {
for (size_t i = 0; i < Arrays.size(); ++i) {
auto& arr = Deques[i];
- if (arr.empty())
+ if (arr.empty()) {
continue;
+ }
if (auto& array = arr.front(); ui64(array->length) == sliceSize) {
Arrays[i] = std::move(array);
Deques[i].pop_front();
- } else
+ } else {
Arrays[i] = Chop(array, sliceSize);
+ }
}
Count -= sliceSize;
@@ -419,13 +431,15 @@ ui64 TBlockState::Slice() {
}
NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
- if (idx == BlockLengthIndex)
+ if (idx == BlockLengthIndex) {
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
+ }
- if (auto array = Arrays[idx])
+ if (auto array = Arrays[idx]) {
return holderFactory.CreateArrowBlock(std::move(array));
- else
+ } else {
return Values[idx];
+ }
}
-}
+} // namespace NKikimr::NMiniKQL