aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2025-03-25 16:04:58 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2025-03-25 16:32:13 +0300
commitae51283471d012a56713063d298be892ef732b68 (patch)
tree90aacf0876ff932c9c9f4880887fb3d5f094071a
parent30c40b21dc1527fb15426ef6c9fe742aff8dadb0 (diff)
downloadydb-ae51283471d012a56713063d298be892ef732b68.tar.gz
ListToBlocks computation node
Семантика ноды ListToBlocks - преобразование списка структур в список блочных структур:`List<Struct<a:T1,...>> -> List<Struct<a:Block`<T1>`, ... ,_yql_block_len:Scalar`<Uint64>`>` (как WideToBlocks, но для списков) На вход ожидается ленивый список, на выход также выдается ленивый список commit_hash:0b364f55810a492fdcce7dc06b1e69701a1db01f
-rw-r--r--yql/essentials/core/sql_types/block.h7
-rw-r--r--yql/essentials/core/sql_types/ya.make1
-rw-r--r--yql/essentials/core/yql_expr_type_annotation.h3
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_blocks.cpp236
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_blocks.h1
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp119
-rw-r--r--yql/essentials/minikql/comp_nodes/ya.make.inc1
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.cpp28
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.h6
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt2
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp37
-rw-r--r--yql/essentials/minikql/mkql_program_builder.h2
-rw-r--r--yql/essentials/minikql/mkql_runtime_version.h2
14 files changed, 430 insertions, 16 deletions
diff --git a/yql/essentials/core/sql_types/block.h b/yql/essentials/core/sql_types/block.h
new file mode 100644
index 00000000000..ac08ce538d9
--- /dev/null
+++ b/yql/essentials/core/sql_types/block.h
@@ -0,0 +1,7 @@
+#pragma once
+
+#include <util/generic/strbuf.h>
+
+namespace NYql {
+ inline constexpr TStringBuf BlockLengthColumnName = "_yql_block_length";
+}
diff --git a/yql/essentials/core/sql_types/ya.make b/yql/essentials/core/sql_types/ya.make
index 136ff07733a..88eb76fb511 100644
--- a/yql/essentials/core/sql_types/ya.make
+++ b/yql/essentials/core/sql_types/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ block.h
match_recognize.h
match_recognize.cpp
simple_types.h
diff --git a/yql/essentials/core/yql_expr_type_annotation.h b/yql/essentials/core/yql_expr_type_annotation.h
index 13907df704a..b429f46f394 100644
--- a/yql/essentials/core/yql_expr_type_annotation.h
+++ b/yql/essentials/core/yql_expr_type_annotation.h
@@ -5,6 +5,7 @@
#include <yql/essentials/ast/yql_expr.h>
#include <yql/essentials/core/expr_nodes/yql_expr_nodes.h>
+#include <yql/essentials/core/sql_types/block.h>
#include <yql/essentials/minikql/mkql_type_ops.h>
#include <yql/essentials/parser/pg_catalog/catalog.h>
@@ -348,8 +349,6 @@ const TTypeAnnotationNode* GetOriginalResultType(TPositionHandle pos, bool isMan
bool ApplyOriginalType(TExprNode::TPtr input, bool isMany, const TTypeAnnotationNode* originalExtractorType, TExprContext& ctx);
TExprNode::TPtr ConvertToMultiLambda(const TExprNode::TPtr& lambda, TExprContext& ctx);
-const TStringBuf BlockLengthColumnName = "_yql_block_length";
-
TStringBuf NormalizeCallableName(TStringBuf name);
void CheckExpectedTypeAndColumnOrder(const TExprNode& node, TExprContext& ctx, TTypeAnnotationContext& typesCtx);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
index a0ae63138c4..5f54771ab7f 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
@@ -9,9 +9,11 @@
#include <yql/essentials/minikql/arrow/arrow_util.h>
#include <yql/essentials/minikql/mkql_type_builder.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_codegen.h> // Y_IGNORE
+#include <yql/essentials/minikql/computation/mkql_custom_list.h>
#include <yql/essentials/minikql/mkql_node_builder.h>
#include <yql/essentials/minikql/mkql_node_cast.h>
+#include <yql/essentials/core/sql_types/block.h>
#include <yql/essentials/parser/pg_wrapper/interface/arrow.h>
#include <arrow/scalar.h>
@@ -461,6 +463,223 @@ private:
const size_t MaxLength_;
};
+class TListToBlocksState : public TBlockState {
+public:
+ TListToBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t blockLengthIndex, size_t maxLength)
+ : TBlockState(memInfo, types.size(), blockLengthIndex)
+ , Builders_(types.size())
+ , BlockLengthIndex_(blockLengthIndex)
+ , MaxLength_(maxLength)
+ {
+ for (size_t i = 0; i < types.size(); ++i) {
+ if (i == blockLengthIndex) {
+ continue;
+ }
+ Builders_[i] = MakeArrayBuilder(TTypeInfoHelper(), types[i], ctx.ArrowMemoryPool, maxLength, &ctx.Builder->GetPgBuilder(), &BuilderAllocatedSize_);
+ }
+ MaxBuilderAllocatedSize_ = MaxAllocatedFactor_ * BuilderAllocatedSize_;
+ }
+
+ void AddRow(const NUdf::TUnboxedValuePod& row) {
+ auto items = row.GetElements();
+ size_t inputStructIdx = 0;
+ for (size_t i = 0; i < Builders_.size(); i++) {
+ if (i == BlockLengthIndex_) {
+ continue;
+ }
+ Builders_[i]->Add(items[inputStructIdx++]);
+ }
+ Rows_++;
+ }
+
+ void MakeBlocks(const THolderFactory& holderFactory) {
+ Values[BlockLengthIndex_] = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(Rows_)));
+ Rows_ = 0;
+ BuilderAllocatedSize_ = 0;
+
+ for (size_t i = 0; i < Builders_.size(); ++i) {
+ if (i == BlockLengthIndex_) {
+ continue;
+ }
+ Values[i] = holderFactory.CreateArrowBlock(Builders_[i]->Build(IsFinished_));
+ }
+ FillArrays();
+ }
+
+ void Finish() {
+ IsFinished_ = true;
+ }
+
+ bool IsNotFull() const {
+ return Rows_ < MaxLength_ && BuilderAllocatedSize_ <= MaxBuilderAllocatedSize_;
+ }
+
+ bool IsFinished() const {
+ return IsFinished_;
+ }
+
+ bool HasBlocks() const {
+ return Count > 0;
+ }
+
+ bool IsEmpty() const {
+ return Rows_ == 0;
+ }
+
+private:
+ size_t Rows_ = 0;
+ bool IsFinished_ = false;
+
+ size_t BuilderAllocatedSize_ = 0;
+ size_t MaxBuilderAllocatedSize_ = 0;
+
+ std::vector<std::unique_ptr<IArrayBuilder>> Builders_;
+
+ const size_t BlockLengthIndex_;
+ const size_t MaxLength_;
+ static const size_t MaxAllocatedFactor_ = 4;
+};
+
+class TListToBlocksWrapper : public TMutableComputationNode<TListToBlocksWrapper>
+{
+ using TBaseComputation = TMutableComputationNode<TListToBlocksWrapper>;
+
+public:
+ TListToBlocksWrapper(TComputationMutables& mutables,
+ IComputationNode* list,
+ TStructType* structType
+ )
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , List_(list)
+ {
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) {
+ BlockLengthIndex_ = i;
+ Types_.push_back(nullptr);
+ continue;
+ }
+ Types_.push_back(AS_TYPE(TBlockType, structType->GetMemberType(i))->GetItemType());
+ }
+
+ MaxLength_ = CalcBlockLen(std::accumulate(Types_.cbegin(), Types_.cend(), 0ULL, [](size_t max, const TType* type){
+ if (!type) {
+ return max;
+ }
+ return std::max(max, CalcMaxBlockItemSize(type));
+ }));
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+ {
+ return ctx.HolderFactory.Create<TListToBlocksValue>(
+ ctx,
+ Types_,
+ BlockLengthIndex_,
+ List_->GetValue(ctx),
+ MaxLength_
+ );
+ }
+
+private:
+ class TListToBlocksValue : public TCustomListValue {
+ using TState = TListToBlocksState;
+
+ public:
+ class TIterator : public TComputationValue<TIterator> {
+ public:
+ TIterator(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& iter)
+ : TComputationValue<TIterator>(memInfo)
+ , HolderFactory_(holderFactory)
+ , BlockState_(std::move(blockState))
+ , Iter_(std::move(iter))
+ {}
+
+ private:
+ bool Next(NUdf::TUnboxedValue& value) final {
+ auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
+ const size_t structSize = blockState.Values.size();
+
+ if (!blockState.HasBlocks()) {
+ while (!blockState.IsFinished() && blockState.IsNotFull()) {
+ if (Iter_.Next(Row_)) {
+ blockState.AddRow(Row_);
+ } else {
+ blockState.Finish();
+ }
+ }
+ if (blockState.IsEmpty()) {
+ return false;
+ }
+ blockState.MakeBlocks(HolderFactory_);
+ }
+
+ NUdf::TUnboxedValue* items = nullptr;
+ value = HolderFactory_.CreateDirectArrayHolder(structSize, items);
+
+ const auto sliceSize = blockState.Slice();
+ for (size_t i = 0; i < structSize; i++) {
+ items[i] = blockState.Get(sliceSize, HolderFactory_, i);
+ }
+
+ return true;
+ }
+
+ private:
+ const THolderFactory& HolderFactory_;
+
+ const NUdf::TUnboxedValue BlockState_;
+ const NUdf::TUnboxedValue Iter_;
+
+ NUdf::TUnboxedValue Row_;
+ };
+
+ TListToBlocksValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
+ const TVector<TType*>& types, ui32 blockLengthIndex, NUdf::TUnboxedValue&& list, size_t maxLength
+ )
+ : TCustomListValue(memInfo)
+ , CompCtx_(ctx)
+ , Types_(types)
+ , BlockLengthIndex_(blockLengthIndex)
+ , List_(std::move(list))
+ , MaxLength_(maxLength)
+ {}
+
+ private:
+ NUdf::TUnboxedValue GetListIterator() const final {
+ auto state = CompCtx_.HolderFactory.Create<TState>(CompCtx_, Types_, BlockLengthIndex_, MaxLength_);
+ return CompCtx_.HolderFactory.Create<TIterator>(CompCtx_.HolderFactory, std::move(state), List_.GetListIterator());
+ }
+
+ bool HasListItems() const final {
+ if (!HasItems.has_value()) {
+ HasItems = List_.HasListItems();
+ }
+ return *HasItems;
+ }
+
+ private:
+ TComputationContext& CompCtx_;
+
+ const TVector<TType*>& Types_;
+ size_t BlockLengthIndex_ = 0;
+
+ NUdf::TUnboxedValue List_;
+ const size_t MaxLength_;
+ };
+
+ void RegisterDependencies() const final {
+ this->DependsOn(List_);
+ }
+
+private:
+ TVector<TType*> Types_;
+ size_t BlockLengthIndex_ = 0;
+
+ IComputationNode* const List_;
+
+ size_t MaxLength_ = 0;
+};
+
class TFromBlocksWrapper : public TStatefulFlowCodegeneratorNode<TFromBlocksWrapper> {
using TBaseComputation = TStatefulFlowCodegeneratorNode<TFromBlocksWrapper>;
public:
@@ -1373,6 +1592,23 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa
return new TWideToBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items));
}
+IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ const auto inputType = callable.GetInput(0).GetStaticType();
+ MKQL_ENSURE(inputType->IsList(), "Expected List as an input");
+ const auto outputType = callable.GetType()->GetReturnType();
+ MKQL_ENSURE(outputType->IsList(), "Expected List as an output");
+
+ const auto inputItemType = AS_TYPE(TListType, inputType)->GetItemType();
+ MKQL_ENSURE(inputItemType->IsStruct(), "Expected List of Struct as an input");
+ const auto outputItemType = AS_TYPE(TListType, outputType)->GetItemType();
+ MKQL_ENSURE(outputItemType->IsStruct(), "Expected List of Struct as an output");
+
+ const auto list = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TListToBlocksWrapper(ctx.Mutables, list, AS_TYPE(TStructType, outputItemType));
+}
+
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.h b/yql/essentials/minikql/comp_nodes/mkql_blocks.h
index 46f39e71643..326e25e57d9 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_blocks.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.h
@@ -7,6 +7,7 @@ namespace NMiniKQL {
IComputationNode* WrapToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
index c6b198463bf..9615a5d4d78 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
@@ -290,6 +290,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"WideTopBlocks", &WrapWideTopBlocks},
{"WideTopSortBlocks", &WrapWideTopSortBlocks},
{"WideSortBlocks", &WrapWideSortBlocks},
+ {"ListToBlocks", &WrapListToBlocks},
{"AsScalar", &WrapAsScalar},
{"ReplicateScalar", &WrapReplicateScalar},
{"BlockCoalesce", &WrapBlockCoalesce},
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index a873fbeb5b9..15dd02bf30f 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -5,6 +5,7 @@
#include <arrow/compute/exec_internal.h>
#include <arrow/array/builder_primitive.h>
+#include <yql/essentials/core/sql_types/block.h>
#include <yql/essentials/public/udf/udf_helpers.h>
#include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h>
@@ -61,6 +62,24 @@ namespace {
datums[i + topology->InputArgsCount] = output;
}
}
+
+ // Hand-made variant using WideFromBlocks (in order to test ListToBlocks by well-tested nodes rather than actual ListFromBlocks)
+ TRuntimeNode ListFromBlocks(TProgramBuilder& pb, TRuntimeNode blockList) {
+ const auto wideBlocksStream = pb.FromFlow(pb.ExpandMap(pb.ToFlow(blockList), [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {
+ pb.Member(item, "key"),
+ pb.Member(item, "value"),
+ pb.Member(item, NYql::BlockLengthColumnName)
+ };
+ }));
+
+ return pb.ForwardList(pb.NarrowMap(pb.ToFlow(pb.WideFromBlocks(wideBlocksStream)), [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return pb.NewStruct({
+ {"key", items[0]},
+ {"value", items[1]}
+ });
+ }));
+ }
}
Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) {
@@ -150,6 +169,106 @@ Y_UNIT_TEST_LLVM(TestWideToBlocks) {
UNIT_ASSERT(!iterator.Next(item));
}
+Y_UNIT_TEST(TestListToBlocks) {
+ constexpr size_t TEST_SIZE = 1 << 16;
+ const TString hugeString(128, '1');
+
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id);
+
+ const auto structType = pb.NewStructType({
+ {"key", ui64Type},
+ {"value", strType},
+ });
+
+ TVector<TRuntimeNode> listItems;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = hugeString + ToString(i);
+ listItems.push_back(pb.NewStruct({
+ {"key", pb.NewDataLiteral<ui64>(i)},
+ // Huge string is used to make less rows fit into one block (in order to test output slicing)
+ {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)},
+ }));
+ }
+
+ const auto list = pb.NewList(structType, listItems);
+ const auto blockList = pb.ListToBlocks(list);
+
+ const auto graph = setup.BuildGraph(ListFromBlocks(pb, blockList));
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue structValue;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = hugeString + ToString(i);
+ UNIT_ASSERT(iterator.Next(structValue));
+
+ const auto key = structValue.GetElement(0);
+ UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i);
+ const auto value = structValue.GetElement(1);
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
+ }
+
+ UNIT_ASSERT(!iterator.Next(structValue));
+ UNIT_ASSERT(!iterator.Next(structValue));
+}
+
+Y_UNIT_TEST(TestListToBlocksMultiUsage) {
+ constexpr size_t TEST_SIZE = 1 << 10;
+
+ TSetup<false> setup;
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id);
+
+ const auto structType = pb.NewStructType({
+ {"key", ui64Type},
+ {"value", strType},
+ });
+
+ TVector<TRuntimeNode> listItems;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = ToString(i);
+ listItems.push_back(pb.NewStruct({
+ {"key", pb.NewDataLiteral<ui64>(i)},
+ {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)},
+ }));
+ }
+
+ const auto list = pb.NewList(structType, listItems);
+ const auto blockList1 = pb.ListToBlocks(list);
+ const auto blockList2 = pb.ListToBlocks(list);
+
+ const auto result = pb.Zip({ListFromBlocks(pb, blockList1), ListFromBlocks(pb, blockList2)});
+
+ const auto graph = setup.BuildGraph(result);
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue tupleValue;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = ToString(i);
+ UNIT_ASSERT(iterator.Next(tupleValue));
+
+ auto structValue = tupleValue.GetElement(0);
+ auto key = structValue.GetElement(0);
+ UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i);
+ auto value = structValue.GetElement(1);
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
+
+ structValue = tupleValue.GetElement(1);
+ key = structValue.GetElement(0);
+ UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i);
+ value = structValue.GetElement(1);
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
+ }
+
+ UNIT_ASSERT(!iterator.Next(tupleValue));
+ UNIT_ASSERT(!iterator.Next(tupleValue));
+}
+
namespace {
template<bool LLVM>
void TestChunked(bool withBlockExpand) {
diff --git a/yql/essentials/minikql/comp_nodes/ya.make.inc b/yql/essentials/minikql/comp_nodes/ya.make.inc
index 29656cf1285..01f4e8aac3f 100644
--- a/yql/essentials/minikql/comp_nodes/ya.make.inc
+++ b/yql/essentials/minikql/comp_nodes/ya.make.inc
@@ -162,6 +162,7 @@ PEERDIR(
yql/essentials/types/binary_json
yql/essentials/minikql
yql/essentials/minikql/arrow
+ yql/essentials/core/sql_types
yql/essentials/public/udf/arrow
yql/essentials/parser/pg_wrapper/interface
yql/essentials/utils
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp
index 49d52eb0b88..ba9238e45a7 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp
@@ -125,7 +125,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(scalar);
}
case NUdf::EDataSlot::TzDate: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::UInt16Scalar>(value.template Get<ui16>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
@@ -133,7 +133,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate>()));
}
case NUdf::EDataSlot::TzDatetime: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::UInt32Scalar>(value.template Get<ui32>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
@@ -141,7 +141,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime>()));
}
case NUdf::EDataSlot::TzTimestamp: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::UInt64Scalar>(value.template Get<ui64>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
@@ -149,7 +149,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp>()));
}
case NUdf::EDataSlot::TzDate32: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::Int32Scalar>(value.template Get<i32>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
@@ -157,7 +157,7 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDate32>()));
}
case NUdf::EDataSlot::TzDatetime64: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
@@ -165,13 +165,13 @@ arrow::Datum DoConvertScalar(TType* type, const T& value, arrow::MemoryPool& poo
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzDatetime64>()));
}
case NUdf::EDataSlot::TzTimestamp64: {
- auto items = arrow::StructScalar::ValueType{
+ auto items = arrow::StructScalar::ValueType{
std::make_shared<arrow::Int64Scalar>(value.template Get<i64>()),
std::make_shared<arrow::UInt16Scalar>(value.GetTimezoneId())
};
return arrow::Datum(std::make_shared<arrow::StructScalar>(items, MakeTzDateArrowType<NUdf::EDataSlot::TzTimestamp64>()));
- }
+ }
case NUdf::EDataSlot::Decimal: {
std::shared_ptr<arrow::Buffer> buffer(ARROW_RESULT(arrow::AllocateBuffer(16, &pool)));
*reinterpret_cast<NYql::NDecimal::TInt128*>(buffer->mutable_data()) = value.GetInt128();
@@ -338,9 +338,11 @@ const IComputationNode* TBlockFuncNode::TArrowNode::GetArgument(ui32 index) cons
return Parent_->ArgsNodes[index];
}
-TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width)
- : TBase(memInfo), Values(width), Deques(width - 1ULL), Arrays(width - 1ULL)
+TBlockState::TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex)
+ : TBase(memInfo), Values(width), Deques(width), Arrays(width)
+ , BlockLengthIndex_(blockLengthIndex == LAST_COLUMN_MARKER ? width - 1 : blockLengthIndex)
{
+ MKQL_ENSURE(blockLengthIndex == LAST_COLUMN_MARKER || (0 <= blockLengthIndex && size_t(blockLengthIndex) < width), "Bad blockLengthIndex");
Pointer_ = Values.data();
}
@@ -350,13 +352,17 @@ void TBlockState::ClearValues() {
void TBlockState::FillArrays() {
MKQL_ENSURE(Count == 0, "All existing arrays have to be processed");
- auto& counterDatum = TArrowBlock::From(Values.back()).GetDatum();
+ auto& counterDatum = TArrowBlock::From(Values[BlockLengthIndex_]).GetDatum();
MKQL_ENSURE(counterDatum.is_scalar(), "Unexpected block length type (expecting scalar)");
Count = counterDatum.scalar_as<arrow::UInt64Scalar>().value;
if (!Count)
return;
for (size_t i = 0U; i < Deques.size(); ++i) {
+ if (i == BlockLengthIndex_) {
+ continue;
+ }
+
Deques[i].clear();
if (const auto& value = Values[i]) {
const auto& datum = TArrowBlock::From(value).GetDatum();
@@ -399,7 +405,7 @@ ui64 TBlockState::Slice() {
}
NUdf::TUnboxedValuePod TBlockState::Get(const ui64 sliceSize, const THolderFactory& holderFactory, const size_t idx) const {
- if (idx >= Deques.size())
+ if (idx == BlockLengthIndex_)
return holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(sliceSize)));
if (auto array = Arrays[idx])
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.h b/yql/essentials/minikql/computation/mkql_block_impl.h
index c94047aa049..61964c030e0 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.h
+++ b/yql/essentials/minikql/computation/mkql_block_impl.h
@@ -90,6 +90,8 @@ private:
};
struct TBlockState : public TComputationValue<TBlockState> {
+ static constexpr i64 LAST_COLUMN_MARKER = -1;
+
using TBase = TComputationValue<TBlockState>;
ui64 Count = 0;
@@ -99,7 +101,9 @@ struct TBlockState : public TComputationValue<TBlockState> {
std::vector<std::deque<std::shared_ptr<arrow::ArrayData>>> Deques;
std::vector<std::shared_ptr<arrow::ArrayData>> Arrays;
- TBlockState(TMemoryUsageInfo* memInfo, size_t width);
+ ui64 BlockLengthIndex_ = 0;
+
+ TBlockState(TMemoryUsageInfo* memInfo, size_t width, i64 blockLengthIndex = LAST_COLUMN_MARKER);
void ClearValues();
diff --git a/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt b/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt
index afc435fb443..66c4d23bc22 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt
+++ b/yql/essentials/minikql/computation/mkql_block_impl_codegen.h.txt
@@ -38,7 +38,7 @@ namespace NKikimr::NMiniKQL {
: TBase(context)
, CountType(llvm::Type::getInt64Ty(Context))
, PointerType(llvm::PointerType::getUnqual(llvm::ArrayType::get(llvm::Type::getInt128Ty(Context), width)))
- , SkipSpaceType(llvm::ArrayType::get(llvm::Type::getInt64Ty(Context), 9U)) // Skip std::vectors Values & Arrays
+ , SkipSpaceType(llvm::ArrayType::get(llvm::Type::getInt64Ty(Context), 10U)) // Skip Values, Deques, Arrays std::vectors and BlockLengthIndex ui64
{}
};
#endif
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index a0b92b2ead4..27c4229f996 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -6,6 +6,7 @@
#include "yql/essentials/minikql/mkql_function_registry.h"
#include "yql/essentials/minikql/mkql_utils.h"
#include "yql/essentials/minikql/mkql_type_builder.h"
+#include "yql/essentials/core/sql_types/block.h"
#include "yql/essentials/core/sql_types/match_recognize.h"
#include "yql/essentials/core/sql_types/time_order_recover.h"
#include <yql/essentials/parser/pg_catalog/catalog.h>
@@ -1493,6 +1494,42 @@ TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TType* TProgramBuilder::BuildBlockStructType(const TStructType* structType) {
+ std::vector<std::pair<std::string_view, TType*>> blockStructItems;
+ blockStructItems.reserve(structType->GetMembersCount() + 1);
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ auto itemType = structType->GetMemberType(i);
+ MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here");
+ blockStructItems.emplace_back(
+ structType->GetMemberName(i),
+ NewBlockType(itemType, TBlockType::EShape::Many)
+ );
+ }
+ blockStructItems.emplace_back(
+ NYql::BlockLengthColumnName,
+ NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)
+ );
+ return NewStructType(blockStructItems);
+}
+
+TRuntimeNode TProgramBuilder::ListToBlocks(TRuntimeNode list) {
+ if constexpr (RuntimeVersion < 60U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(list.GetStaticType()->IsList(), "Expected List as input type");
+ const auto listType = AS_TYPE(TListType, list.GetStaticType());
+
+ MKQL_ENSURE(listType->GetItemType()->IsStruct(), "Expected List of Struct as input type");
+ const auto itemStructType = AS_TYPE(TStructType, listType->GetItemType());
+
+ const auto itemBlockStructType = BuildBlockStructType(itemStructType);
+
+ TCallableBuilder callableBuilder(Env, __func__, NewListType(itemBlockStructType));
+ callableBuilder.Add(list);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::FromBlocks(TRuntimeNode flow) {
auto* flowType = AS_TYPE(TFlowType, flow.GetStaticType());
auto* blockType = AS_TYPE(TBlockType, flowType->GetItemType());
diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h
index 01c02232933..fb0b097dfaa 100644
--- a/yql/essentials/minikql/mkql_program_builder.h
+++ b/yql/essentials/minikql/mkql_program_builder.h
@@ -241,6 +241,7 @@ public:
TRuntimeNode ToBlocks(TRuntimeNode flow);
TRuntimeNode WideToBlocks(TRuntimeNode flow);
+ TRuntimeNode ListToBlocks(TRuntimeNode list);
TRuntimeNode FromBlocks(TRuntimeNode flow);
TRuntimeNode WideFromBlocks(TRuntimeNode flow);
TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count);
@@ -861,6 +862,7 @@ private:
TType* ChooseCommonType(TType* type1, TType* type2);
TType* BuildArithmeticCommonType(TType* type1, TType* type2);
TType* BuildWideBlockType(const TArrayRef<TType* const>& wideComponents);
+ TType* BuildBlockStructType(const TStructType* structType);
bool IsNull(TRuntimeNode arg);
protected:
diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h
index d0dc62b9390..df026ed9015 100644
--- a/yql/essentials/minikql/mkql_runtime_version.h
+++ b/yql/essentials/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 59U
+#define MKQL_RUNTIME_VERSION 60U
#endif
// History: