summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorziganshinmr <[email protected]>2025-03-27 14:59:42 +0300
committerziganshinmr <[email protected]>2025-03-27 15:16:27 +0300
commit3098b5d7f2fe6c0bbee56a7b57371dec369a9e1a (patch)
tree81c68ec3e758783d71e03baa8f8ec509e89f8aa1
parente925ea9f73b6f4eda560bf872f261e671c82b2d2 (diff)
ListFromBlocks computation node
commit_hash:bae79a39ae78ceed103c460f7949d1a2483e0b73
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_blocks.cpp219
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_blocks.h1
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_factory.cpp1
-rw-r--r--yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp131
-rw-r--r--yql/essentials/minikql/computation/mkql_block_impl.cpp6
-rw-r--r--yql/essentials/minikql/mkql_program_builder.cpp37
-rw-r--r--yql/essentials/minikql/mkql_program_builder.h2
-rw-r--r--yql/essentials/minikql/mkql_runtime_version.h2
-rw-r--r--yql/essentials/minikql/mkql_type_builder.cpp42
-rw-r--r--yql/essentials/minikql/mkql_type_builder.h4
10 files changed, 382 insertions, 63 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
index 5f54771ab7f..6bf6bc38894 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.cpp
@@ -1186,6 +1186,208 @@ private:
const TVector<TType*> Types_;
};
+struct TListFromBlocksState : public TComputationValue<TListFromBlocksState> {
+public:
+ TListFromBlocksState(TMemoryUsageInfo* memInfo, TComputationContext& ctx, const TVector<TType*>& types, size_t blockLengthIndex)
+ : TComputationValue(memInfo)
+ , HolderFactory_(ctx.HolderFactory)
+ , BlockLengthIndex_(blockLengthIndex)
+ , Readers_(types.size())
+ , Converters_(types.size())
+ , ValuesDescr_(ToValueDescr(types))
+ {
+ const auto& pgBuilder = ctx.Builder->GetPgBuilder();
+ for (size_t i = 0; i < types.size(); ++i) {
+ if (i == blockLengthIndex) {
+ continue;
+ }
+ const TType* blockItemType = AS_TYPE(TBlockType, types[i])->GetItemType();
+ Readers_[i] = MakeBlockReader(TTypeInfoHelper(), blockItemType);
+ Converters_[i] = MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder);
+ }
+ }
+
+ NUdf::TUnboxedValue GetRow() {
+ MKQL_ENSURE(CurrentRow_ < RowCount_, "Rows out of range");
+
+ NUdf::TUnboxedValue* outItems = nullptr;
+ auto row = HolderFactory_.CreateDirectArrayHolder(Readers_.size() - 1, outItems);
+
+ size_t outputStructIdx = 0;
+ for (size_t i = 0; i < Readers_.size(); i++) {
+ if (i == BlockLengthIndex_) {
+ continue;
+ }
+
+ const auto& datum = TArrowBlock::From(BlockItems_[i]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(ValuesDescr_[i], datum.descr());
+
+ TBlockItem item;
+ if (datum.is_scalar()) {
+ item = Readers_[i]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ item = Readers_[i]->GetItem(*datum.array(), CurrentRow_);
+ }
+
+ outItems[outputStructIdx++] = Converters_[i]->MakeValue(item, HolderFactory_);
+ }
+
+ CurrentRow_++;
+ return row;
+ }
+
+ void SetBlock(NUdf::TUnboxedValue block) {
+ BlockItems_ = block.GetElements();
+ Block_ = std::move(block);
+
+ CurrentRow_ = 0;
+ RowCount_ = GetBlockCount(BlockItems_[BlockLengthIndex_]);
+ }
+
+ bool HasRows() const {
+ return CurrentRow_ < RowCount_;
+ }
+
+private:
+ const THolderFactory& HolderFactory_;
+
+ size_t CurrentRow_ = 0;
+ size_t RowCount_ = 0;
+
+ size_t BlockLengthIndex_ = 0;
+
+ NUdf::TUnboxedValue Block_;
+ const NUdf::TUnboxedValue* BlockItems_ = nullptr;
+
+ std::vector<std::unique_ptr<IBlockReader>> Readers_;
+ std::vector<std::unique_ptr<IBlockItemConverter>> Converters_;
+ const std::vector<arrow::ValueDescr> ValuesDescr_;
+};
+
+class TListFromBlocksWrapper : public TMutableComputationNode<TListFromBlocksWrapper>
+{
+ using TBaseComputation = TMutableComputationNode<TListFromBlocksWrapper>;
+
+public:
+ TListFromBlocksWrapper(TComputationMutables& mutables,
+ IComputationNode* list,
+ TStructType* structType
+ )
+ : TBaseComputation(mutables, EValueRepresentation::Boxed)
+ , List_(list)
+ {
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) {
+ BlockLengthIndex_ = i;
+ Types_.push_back(nullptr);
+ continue;
+ }
+ Types_.push_back(structType->GetMemberType(i));
+ }
+ }
+
+ NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const
+ {
+ return ctx.HolderFactory.Create<TListFromBlocksValue>(
+ ctx,
+ Types_,
+ BlockLengthIndex_,
+ List_->GetValue(ctx)
+ );
+ }
+
+private:
+ class TListFromBlocksValue : public TCustomListValue {
+ using TState = TListFromBlocksState;
+
+ public:
+ class TIterator : public TComputationValue<TIterator> {
+ public:
+ TIterator(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& iter)
+ : TComputationValue<TIterator>(memInfo)
+ , BlockState_(std::move(blockState))
+ , Iter_(std::move(iter))
+ {}
+
+ private:
+ bool Next(NUdf::TUnboxedValue& value) final {
+ auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get());
+ if (!blockState.HasRows()) {
+ NUdf::TUnboxedValue block;
+ if (!Iter_.Next(block)) {
+ return false;
+ }
+ blockState.SetBlock(std::move(block));
+ }
+
+ value = blockState.GetRow();
+ return true;
+ }
+
+ private:
+ const NUdf::TUnboxedValue BlockState_;
+ const NUdf::TUnboxedValue Iter_;
+ };
+
+ TListFromBlocksValue(TMemoryUsageInfo* memInfo, TComputationContext& ctx,
+ const TVector<TType*>& types, ui32 blockLengthIndex, NUdf::TUnboxedValue&& list
+ )
+ : TCustomListValue(memInfo)
+ , CompCtx_(ctx)
+ , Types_(types)
+ , BlockLengthIndex_(blockLengthIndex)
+ , List_(std::move(list))
+ {}
+
+ private:
+ NUdf::TUnboxedValue GetListIterator() const final {
+ auto state = CompCtx_.HolderFactory.Create<TState>(CompCtx_, Types_, BlockLengthIndex_);
+ return CompCtx_.HolderFactory.Create<TIterator>(std::move(state), List_.GetListIterator());
+ }
+
+ bool HasListItems() const final {
+ if (!HasItems.has_value()) {
+ HasItems = List_.HasListItems();
+ }
+ return *HasItems;
+ }
+
+ ui64 GetListLength() const final {
+ if (!Length.has_value()) {
+ auto iter = List_.GetListIterator();
+
+ Length = 0;
+ NUdf::TUnboxedValue block;
+ while (iter.Next(block)) {
+ auto blockLengthValue = block.GetElement(BlockLengthIndex_);
+ *Length += GetBlockCount(blockLengthValue);
+ }
+ }
+
+ return *Length;
+ }
+
+ private:
+ TComputationContext& CompCtx_;
+
+ const TVector<TType*>& Types_;
+ size_t BlockLengthIndex_ = 0;
+
+ NUdf::TUnboxedValue List_;
+ };
+
+ void RegisterDependencies() const final {
+ this->DependsOn(List_);
+ }
+
+private:
+ TVector<TType*> Types_;
+ size_t BlockLengthIndex_ = 0;
+
+ IComputationNode* const List_;
+};
+
class TPrecomputedArrowNode : public IArrowKernelComputationNode {
public:
TPrecomputedArrowNode(const arrow::Datum& datum, TStringBuf kernelName)
@@ -1645,6 +1847,23 @@ IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNode
return new TWideFromBlocksFlowWrapper(ctx.Mutables, wideFlow, std::move(items));
}
+IComputationNode* WrapListFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
+ MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
+
+ const auto inputType = callable.GetInput(0).GetStaticType();
+ MKQL_ENSURE(inputType->IsList(), "Expected List as an input");
+ const auto outputType = callable.GetType()->GetReturnType();
+ MKQL_ENSURE(outputType->IsList(), "Expected List as an output");
+
+ const auto inputItemType = AS_TYPE(TListType, inputType)->GetItemType();
+ MKQL_ENSURE(inputItemType->IsStruct(), "Expected List of Struct as an input");
+ const auto outputItemType = AS_TYPE(TListType, outputType)->GetItemType();
+ MKQL_ENSURE(outputItemType->IsStruct(), "Expected List of Struct as an output");
+
+ const auto list = LocateNode(ctx.NodeLocator, callable, 0);
+ return new TListFromBlocksWrapper(ctx.Mutables, list, AS_TYPE(TStructType, inputItemType));
+}
+
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx) {
MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 args, got " << callable.GetInputsCount());
diff --git a/yql/essentials/minikql/comp_nodes/mkql_blocks.h b/yql/essentials/minikql/comp_nodes/mkql_blocks.h
index 326e25e57d9..fc718212091 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_blocks.h
+++ b/yql/essentials/minikql/comp_nodes/mkql_blocks.h
@@ -10,6 +10,7 @@ IComputationNode* WrapWideToBlocks(TCallable& callable, const TComputationNodeFa
IComputationNode* WrapListToBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapWideFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
+IComputationNode* WrapListFromBlocks(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapAsScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapReplicateScalar(TCallable& callable, const TComputationNodeFactoryContext& ctx);
IComputationNode* WrapBlockExpandChunked(TCallable& callable, const TComputationNodeFactoryContext& ctx);
diff --git a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
index 9615a5d4d78..e32aaf1fd79 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_factory.cpp
@@ -291,6 +291,7 @@ struct TCallableComputationNodeBuilderFuncMapFiller {
{"WideTopSortBlocks", &WrapWideTopSortBlocks},
{"WideSortBlocks", &WrapWideSortBlocks},
{"ListToBlocks", &WrapListToBlocks},
+ {"ListFromBlocks", &WrapListFromBlocks},
{"AsScalar", &WrapAsScalar},
{"ReplicateScalar", &WrapReplicateScalar},
{"BlockCoalesce", &WrapBlockCoalesce},
diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
index 15dd02bf30f..c2faa29bed5 100644
--- a/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
+++ b/yql/essentials/minikql/comp_nodes/ut/mkql_blocks_ut.cpp
@@ -80,6 +80,71 @@ namespace {
});
}));
}
+
+ // Hand-made variant using WideToBlocks (in order to test ListFromBlocks by well-tested nodes rather than actual ListToBlocks)
+ TRuntimeNode ListToBlocks(TProgramBuilder& pb, TRuntimeNode list) {
+ const auto wideBlocksStream = pb.WideToBlocks(pb.FromFlow(pb.ExpandMap(pb.ToFlow(list), [&](TRuntimeNode item) -> TRuntimeNode::TList {
+ return {
+ pb.Member(item, "key"),
+ pb.Member(item, "value")
+ };
+ })));
+
+ return pb.Collect(pb.NarrowMap(pb.ToFlow(wideBlocksStream), [&](TRuntimeNode::TList items) -> TRuntimeNode {
+ return pb.NewStruct({
+ {"key", items[0]},
+ {"value", items[1]},
+ {NYql::BlockLengthColumnName, items[2]}
+ });
+ }));
+ }
+
+ using TListTransformer = std::function<TRuntimeNode(TProgramBuilder&, TRuntimeNode)>;
+
+ void DoTestListToAndFromBlocks(TSetup<false>& setup, TListTransformer listToBlocksImpl, TListTransformer listFromBlocksImpl) {
+ constexpr size_t TEST_SIZE = 1 << 16;
+ const TString hugeString(128, '1');
+
+ TProgramBuilder& pb = *setup.PgmBuilder;
+
+ const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
+ const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id);
+
+ const auto structType = pb.NewStructType({
+ {"key", ui64Type},
+ {"value", strType},
+ });
+
+ TVector<TRuntimeNode> listItems;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = hugeString + ToString(i);
+ listItems.push_back(pb.NewStruct({
+ {"key", pb.NewDataLiteral<ui64>(i)},
+ // Huge string is used to make less rows fit into one block (in order to test output slicing)
+ {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)},
+ }));
+ }
+
+ const auto list = pb.NewList(structType, listItems);
+ const auto blockList = listToBlocksImpl(pb, list);
+
+ const auto graph = setup.BuildGraph(listFromBlocksImpl(pb, blockList));
+ const auto iterator = graph->GetValue().GetListIterator();
+
+ NUdf::TUnboxedValue structValue;
+ for (size_t i = 0; i < TEST_SIZE; i++) {
+ const auto str = hugeString + ToString(i);
+ UNIT_ASSERT(iterator.Next(structValue));
+
+ const auto key = structValue.GetElement(0);
+ UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i);
+ const auto value = structValue.GetElement(1);
+ UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
+ }
+
+ UNIT_ASSERT(!iterator.Next(structValue));
+ UNIT_ASSERT(!iterator.Next(structValue));
+ }
}
Y_UNIT_TEST_SUITE(TMiniKQLBlocksTest) {
@@ -170,49 +235,13 @@ Y_UNIT_TEST_LLVM(TestWideToBlocks) {
}
Y_UNIT_TEST(TestListToBlocks) {
- constexpr size_t TEST_SIZE = 1 << 16;
- const TString hugeString(128, '1');
-
TSetup<false> setup;
- TProgramBuilder& pb = *setup.PgmBuilder;
- const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id);
- const auto strType = pb.NewDataType(NUdf::TDataType<char*>::Id);
-
- const auto structType = pb.NewStructType({
- {"key", ui64Type},
- {"value", strType},
- });
-
- TVector<TRuntimeNode> listItems;
- for (size_t i = 0; i < TEST_SIZE; i++) {
- const auto str = hugeString + ToString(i);
- listItems.push_back(pb.NewStruct({
- {"key", pb.NewDataLiteral<ui64>(i)},
- // Huge string is used to make less rows fit into one block (in order to test output slicing)
- {"value", pb.NewDataLiteral<NUdf::EDataSlot::String>(str)},
- }));
- }
-
- const auto list = pb.NewList(structType, listItems);
- const auto blockList = pb.ListToBlocks(list);
-
- const auto graph = setup.BuildGraph(ListFromBlocks(pb, blockList));
- const auto iterator = graph->GetValue().GetListIterator();
-
- NUdf::TUnboxedValue structValue;
- for (size_t i = 0; i < TEST_SIZE; i++) {
- const auto str = hugeString + ToString(i);
- UNIT_ASSERT(iterator.Next(structValue));
-
- const auto key = structValue.GetElement(0);
- UNIT_ASSERT_VALUES_EQUAL(key.Get<ui64>(), i);
- const auto value = structValue.GetElement(1);
- UNIT_ASSERT_VALUES_EQUAL(std::string_view(value.AsStringRef()), str);
- }
-
- UNIT_ASSERT(!iterator.Next(structValue));
- UNIT_ASSERT(!iterator.Next(structValue));
+ DoTestListToAndFromBlocks(
+ setup,
+ [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListToBlocks(list); },
+ ListFromBlocks
+ );
}
Y_UNIT_TEST(TestListToBlocksMultiUsage) {
@@ -700,6 +729,16 @@ Y_UNIT_TEST_LLVM(TestWideFromBlocks) {
UNIT_ASSERT(!iterator.Next(item));
}
+Y_UNIT_TEST(TestListFromBlocks) {
+ TSetup<false> setup;
+
+ DoTestListToAndFromBlocks(
+ setup,
+ ListToBlocks,
+ [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListFromBlocks(list); }
+ );
+}
+
Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) {
TSetup<LLVM> setup;
TProgramBuilder& pb = *setup.PgmBuilder;
@@ -741,6 +780,16 @@ Y_UNIT_TEST_LLVM(TestWideToAndFromBlocks) {
UNIT_ASSERT(!iterator.Next(item));
UNIT_ASSERT(!iterator.Next(item));
}
+
+Y_UNIT_TEST(TestListToAndFromBlocks) {
+ TSetup<false> setup;
+
+ DoTestListToAndFromBlocks(
+ setup,
+ [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListToBlocks(list); },
+ [] (TProgramBuilder& pb, TRuntimeNode list) { return pb.ListFromBlocks(list); }
+ );
+}
}
Y_UNIT_TEST_SUITE(TMiniKQLDirectKernelTest) {
diff --git a/yql/essentials/minikql/computation/mkql_block_impl.cpp b/yql/essentials/minikql/computation/mkql_block_impl.cpp
index ba9238e45a7..e960e8562e3 100644
--- a/yql/essentials/minikql/computation/mkql_block_impl.cpp
+++ b/yql/essentials/minikql/computation/mkql_block_impl.cpp
@@ -220,7 +220,11 @@ std::vector<arrow::ValueDescr> ToValueDescr(const TVector<TType*>& types) {
std::vector<arrow::ValueDescr> res;
res.reserve(types.size());
for (const auto& type : types) {
- res.emplace_back(ToValueDescr(type));
+ if (type) {
+ res.emplace_back(ToValueDescr(type));
+ } else {
+ res.emplace_back();
+ }
}
return res;
diff --git a/yql/essentials/minikql/mkql_program_builder.cpp b/yql/essentials/minikql/mkql_program_builder.cpp
index b11a2924c32..5e8e89da4ca 100644
--- a/yql/essentials/minikql/mkql_program_builder.cpp
+++ b/yql/essentials/minikql/mkql_program_builder.cpp
@@ -6,7 +6,6 @@
#include "yql/essentials/minikql/mkql_function_registry.h"
#include "yql/essentials/minikql/mkql_utils.h"
#include "yql/essentials/minikql/mkql_type_builder.h"
-#include "yql/essentials/core/sql_types/block.h"
#include "yql/essentials/core/sql_types/match_recognize.h"
#include "yql/essentials/core/sql_types/time_order_recover.h"
#include <yql/essentials/parser/pg_catalog/catalog.h>
@@ -1494,24 +1493,6 @@ TRuntimeNode TProgramBuilder::WideToBlocks(TRuntimeNode stream) {
return TRuntimeNode(callableBuilder.Build(), false);
}
-TType* TProgramBuilder::BuildBlockStructType(const TStructType* structType) {
- std::vector<std::pair<std::string_view, TType*>> blockStructItems;
- blockStructItems.reserve(structType->GetMembersCount() + 1);
- for (size_t i = 0; i < structType->GetMembersCount(); i++) {
- auto itemType = structType->GetMemberType(i);
- MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here");
- blockStructItems.emplace_back(
- structType->GetMemberName(i),
- NewBlockType(itemType, TBlockType::EShape::Many)
- );
- }
- blockStructItems.emplace_back(
- NYql::BlockLengthColumnName,
- NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)
- );
- return NewStructType(blockStructItems);
-}
-
TRuntimeNode TProgramBuilder::ListToBlocks(TRuntimeNode list) {
if constexpr (RuntimeVersion < 60U) {
THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
@@ -1564,6 +1545,24 @@ TRuntimeNode TProgramBuilder::WideFromBlocks(TRuntimeNode stream) {
return TRuntimeNode(callableBuilder.Build(), false);
}
+TRuntimeNode TProgramBuilder::ListFromBlocks(TRuntimeNode list) {
+ if constexpr (RuntimeVersion < 61U) {
+ THROW yexception() << "Runtime version (" << RuntimeVersion << ") too old for " << __func__;
+ }
+
+ MKQL_ENSURE(list.GetStaticType()->IsList(), "Expected List as input type");
+ const auto listType = AS_TYPE(TListType, list.GetStaticType());
+
+ MKQL_ENSURE(listType->GetItemType()->IsStruct(), "Expected List of Struct as input type");
+ const auto itemBlockStructType = AS_TYPE(TStructType, listType->GetItemType());
+
+ const auto itemStructType = ValidateBlockStructType(itemBlockStructType);
+
+ TCallableBuilder callableBuilder(Env, __func__, NewListType(itemStructType));
+ callableBuilder.Add(list);
+ return TRuntimeNode(callableBuilder.Build(), false);
+}
+
TRuntimeNode TProgramBuilder::WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count) {
return BuildWideSkipTakeBlocks(__func__, flow, count);
}
diff --git a/yql/essentials/minikql/mkql_program_builder.h b/yql/essentials/minikql/mkql_program_builder.h
index fb0b097dfaa..f71d47a3872 100644
--- a/yql/essentials/minikql/mkql_program_builder.h
+++ b/yql/essentials/minikql/mkql_program_builder.h
@@ -244,6 +244,7 @@ public:
TRuntimeNode ListToBlocks(TRuntimeNode list);
TRuntimeNode FromBlocks(TRuntimeNode flow);
TRuntimeNode WideFromBlocks(TRuntimeNode flow);
+ TRuntimeNode ListFromBlocks(TRuntimeNode list);
TRuntimeNode WideSkipBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode WideTakeBlocks(TRuntimeNode flow, TRuntimeNode count);
TRuntimeNode WideTopBlocks(TRuntimeNode flow, TRuntimeNode count, const std::vector<std::pair<ui32, TRuntimeNode>>& keys);
@@ -862,7 +863,6 @@ private:
TType* ChooseCommonType(TType* type1, TType* type2);
TType* BuildArithmeticCommonType(TType* type1, TType* type2);
TType* BuildWideBlockType(const TArrayRef<TType* const>& wideComponents);
- TType* BuildBlockStructType(const TStructType* structType);
bool IsNull(TRuntimeNode arg);
protected:
diff --git a/yql/essentials/minikql/mkql_runtime_version.h b/yql/essentials/minikql/mkql_runtime_version.h
index df026ed9015..ed228c7b03d 100644
--- a/yql/essentials/minikql/mkql_runtime_version.h
+++ b/yql/essentials/minikql/mkql_runtime_version.h
@@ -24,7 +24,7 @@ namespace NMiniKQL {
// 1. Bump this version every time incompatible runtime nodes are introduced.
// 2. Make sure you provide runtime node generation for previous runtime versions.
#ifndef MKQL_RUNTIME_VERSION
-#define MKQL_RUNTIME_VERSION 60U
+#define MKQL_RUNTIME_VERSION 61U
#endif
// History:
diff --git a/yql/essentials/minikql/mkql_type_builder.cpp b/yql/essentials/minikql/mkql_type_builder.cpp
index e37bf91b91f..c9d6e363b4f 100644
--- a/yql/essentials/minikql/mkql_type_builder.cpp
+++ b/yql/essentials/minikql/mkql_type_builder.cpp
@@ -2806,6 +2806,48 @@ TType* TTypeBuilder::NewVariantType(TType* underlyingType) const {
return TVariantType::Create(underlyingType, Env);
}
+TType* TTypeBuilder::ValidateBlockStructType(const TStructType* structType) const {
+ MKQL_ENSURE(structType->GetMembersCount() > 0, "Expected at least one column");
+
+ std::vector<std::pair<std::string_view, TType*>> outStructItems;
+ outStructItems.reserve(structType->GetMembersCount() - 1);
+ bool hasBlockLengthColumn = false;
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ auto blockType = AS_TYPE(TBlockType, structType->GetMemberType(i));
+ bool isScalar = blockType->GetShape() == TBlockType::EShape::Scalar;
+ auto itemType = blockType->GetItemType();
+ if (structType->GetMemberName(i) == NYql::BlockLengthColumnName) {
+ MKQL_ENSURE(isScalar, "Block length column should be scalar");
+ MKQL_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64");
+
+ MKQL_ENSURE(!hasBlockLengthColumn, "Block struct must contain only one block length column");
+ hasBlockLengthColumn = true;
+ } else {
+ outStructItems.emplace_back(structType->GetMemberName(i), itemType);
+ }
+ }
+ MKQL_ENSURE(hasBlockLengthColumn, "Block struct must contain block length column");
+ return NewStructType(outStructItems);
+}
+
+TType* TTypeBuilder::BuildBlockStructType(const TStructType* structType) const {
+ std::vector<std::pair<std::string_view, TType*>> blockStructItems;
+ blockStructItems.reserve(structType->GetMembersCount() + 1);
+ for (size_t i = 0; i < structType->GetMembersCount(); i++) {
+ auto itemType = structType->GetMemberType(i);
+ MKQL_ENSURE(!itemType->IsBlock() , "Block types are not allowed here");
+ blockStructItems.emplace_back(
+ structType->GetMemberName(i),
+ NewBlockType(itemType, TBlockType::EShape::Many)
+ );
+ }
+ blockStructItems.emplace_back(
+ NYql::BlockLengthColumnName,
+ NewBlockType(NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)
+ );
+ return NewStructType(blockStructItems);
+}
+
void RebuildTypeIndex() {
HugeSingleton<TPgTypeIndex>()->Rebuild();
}
diff --git a/yql/essentials/minikql/mkql_type_builder.h b/yql/essentials/minikql/mkql_type_builder.h
index f1ec8a9e4f3..678db9157ca 100644
--- a/yql/essentials/minikql/mkql_type_builder.h
+++ b/yql/essentials/minikql/mkql_type_builder.h
@@ -2,6 +2,7 @@
#include "mkql_node.h"
+#include <yql/essentials/core/sql_types/block.h>
#include <yql/essentials/public/udf/udf_type_builder.h>
#include <yql/essentials/public/udf/arrow/block_type_helper.h>
#include <yql/essentials/parser/pg_wrapper/interface/compare.h>
@@ -325,6 +326,9 @@ public:
TType* NewResourceType(const std::string_view& tag) const;
TType* NewVariantType(TType* underlyingType) const;
+ TType* BuildBlockStructType(const TStructType* structType) const;
+ TType* ValidateBlockStructType(const TStructType* structType) const;
+
protected:
const TTypeEnvironment& Env;
bool UseNullType = true;