diff options
author | Alexander Smirnov <[email protected]> | 2025-01-11 00:21:49 +0000 |
---|---|---|
committer | Alexander Smirnov <[email protected]> | 2025-01-11 00:21:49 +0000 |
commit | 457aacf7daabd8837feef98d1edcfe62420a1f47 (patch) | |
tree | 3f8ca7735aac2ab4574833bf4ea5e1881a02ef84 /yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp | |
parent | af411bb10f1133d6e7f4c6324a89dde2f745d675 (diff) | |
parent | 2d3b7f1966f9716551a0d7db72a9608addab8ecf (diff) |
Merge branch 'rightlib' into merge-libs-250111-0020
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp')
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp | 54 |
1 files changed, 34 insertions, 20 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 827e295db6f..847e4409b6c 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -3,6 +3,7 @@ #include <yql/essentials/minikql/computation/mkql_block_builder.h> #include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/minikql/computation/mkql_block_reader.h> +#include <yql/essentials/minikql/computation/mkql_block_trimmer.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> #include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> @@ -329,12 +330,12 @@ public: public: TIterator() = default; - TIterator(TBlockIndex* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -342,7 +343,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -432,7 +433,7 @@ public: private: EIteratorType Type_; - TBlockIndex* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; @@ -451,7 +452,8 @@ public: const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, NUdf::TUnboxedValue stream, - bool any + bool any, + arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) @@ -466,6 +468,7 @@ public: Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); Hashers_.push_back(helper.MakeHasher(blockItemType)); Comparators_.push_back(helper.MakeComparator(blockItemType)); + Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool)); } } @@ -484,7 +487,12 @@ public: for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); - block.push_back(std::move(datum)); + if (datum.is_scalar()) { + block.push_back(datum); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + block.push_back(Trimmers_[i]->Trim(datum.array())); + } } Data_.push_back(std::move(block)); } @@ -565,7 +573,7 @@ public: return; } - auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter)); + auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter)); if (value->IsInplace()) { iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i])); } else { @@ -574,23 +582,20 @@ public: }); } - TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) { + TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const { Y_ENSURE(entry.BlockOffset < Data_.size()); Y_ENSURE(columnIdx < Inputs_.size() - 1); - - auto& datum = Data_[entry.BlockOffset][columnIdx]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset); + return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset); } - void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) { + void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { Y_ENSURE(row.size() == ioMap.size()); for (size_t i = 0; i < row.size(); i++) { row[i] = GetItem(entry, ioMap[i]); } } - bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { auto indexItem = GetItem(entry, KeyColumns_[i]); @@ -607,10 +612,7 @@ private: ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto& datum = block[keyColumn]; - MKQL_ENSURE(datum.is_array(), "Expecting array"); - - auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset); + auto item = GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; @@ -623,11 +625,21 @@ private: return keyHash; } + TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const { + const auto& datum = block[columnIdx]; + if (datum.is_scalar()) { + return Readers_[columnIdx]->GetScalarItem(*datum.scalar()); + } else { + MKQL_ENSURE(datum.is_array(), "Expecting array"); + return Readers_[columnIdx]->GetItem(*datum.array(), offset); + } + } + TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } - bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { if (chain->IsInplace()) { return IsKeyEquals(chain->GetEntry(), keyItems); } else { @@ -650,6 +662,7 @@ private: TVector<std::unique_ptr<IBlockReader>> Readers_; TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; TVector<NUdf::IBlockItemComparator::TPtr> Comparators_; + TVector<IBlockTrimmer::TPtr> Trimmers_; std::vector<std::vector<arrow::Datum>> Data_; @@ -705,7 +718,8 @@ public: RightItemTypes_, RightKeyColumns_, std::move(RightStream_->GetValue(ctx)), - RightAny + RightAny, + &ctx.ArrowMemoryPool ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, |