summaryrefslogtreecommitdiffstats
path: root/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
diff options
context:
space:
mode:
authorAlexander Smirnov <[email protected]>2025-01-11 00:21:49 +0000
committerAlexander Smirnov <[email protected]>2025-01-11 00:21:49 +0000
commit457aacf7daabd8837feef98d1edcfe62420a1f47 (patch)
tree3f8ca7735aac2ab4574833bf4ea5e1881a02ef84 /yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
parentaf411bb10f1133d6e7f4c6324a89dde2f745d675 (diff)
parent2d3b7f1966f9716551a0d7db72a9608addab8ecf (diff)
Merge branch 'rightlib' into merge-libs-250111-0020
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp54
1 files changed, 34 insertions, 20 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 827e295db6f..847e4409b6c 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -3,6 +3,7 @@
#include <yql/essentials/minikql/computation/mkql_block_builder.h>
#include <yql/essentials/minikql/computation/mkql_block_impl.h>
#include <yql/essentials/minikql/computation/mkql_block_reader.h>
+#include <yql/essentials/minikql/computation/mkql_block_trimmer.h>
#include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h>
#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h>
#include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h>
@@ -329,12 +330,12 @@ public:
public:
TIterator() = default;
- TIterator(TBlockIndex* blockIndex)
+ TIterator(const TBlockIndex* blockIndex)
: Type_(EIteratorType::EMPTY)
, BlockIndex_(blockIndex)
{}
- TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::INPLACE)
, BlockIndex_(blockIndex)
, Entry_(entry)
@@ -342,7 +343,7 @@ public:
, ItemsToLookup_(std::move(itemsToLookup))
{}
- TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
+ TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup)
: Type_(EIteratorType::LIST)
, BlockIndex_(blockIndex)
, Node_(node)
@@ -432,7 +433,7 @@ public:
private:
EIteratorType Type_;
- TBlockIndex* BlockIndex_ = nullptr;
+ const TBlockIndex* BlockIndex_ = nullptr;
union {
TIndexNode* Node_;
@@ -451,7 +452,8 @@ public:
const TVector<TType*>& itemTypes,
const TVector<ui32>& keyColumns,
NUdf::TUnboxedValue stream,
- bool any
+ bool any,
+ arrow::MemoryPool* pool
)
: TBase(memInfo)
, InputsDescr_(ToValueDescr(itemTypes))
@@ -466,6 +468,7 @@ public:
Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType));
Hashers_.push_back(helper.MakeHasher(blockItemType));
Comparators_.push_back(helper.MakeComparator(blockItemType));
+ Trimmers_.push_back(MakeBlockTrimmer(TTypeInfoHelper(), blockItemType, pool));
}
}
@@ -484,7 +487,12 @@ public:
for (size_t i = 0; i < Inputs_.size() - 1; i++) {
auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
- block.push_back(std::move(datum));
+ if (datum.is_scalar()) {
+ block.push_back(datum);
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ block.push_back(Trimmers_[i]->Trim(datum.array()));
+ }
}
Data_.push_back(std::move(block));
}
@@ -565,7 +573,7 @@ public:
return;
}
- auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter));
+ auto value = static_cast<const TIndexMapValue*>(Index_.GetPayload(iter));
if (value->IsInplace()) {
iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i]));
} else {
@@ -574,23 +582,20 @@ public:
});
}
- TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+ TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) const {
Y_ENSURE(entry.BlockOffset < Data_.size());
Y_ENSURE(columnIdx < Inputs_.size() - 1);
-
- auto& datum = Data_[entry.BlockOffset][columnIdx];
- MKQL_ENSURE(datum.is_array(), "Expecting array");
- return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset);
+ return GetItemFromBlock(Data_[entry.BlockOffset], columnIdx, entry.ItemOffset);
}
- void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) {
+ void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const {
Y_ENSURE(row.size() == ioMap.size());
for (size_t i = 0; i < row.size(); i++) {
row[i] = GetItem(entry, ioMap[i]);
}
}
- bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
Y_ENSURE(keyItems.size() == KeyColumns_.size());
for (size_t i = 0; i < KeyColumns_.size(); i++) {
auto indexItem = GetItem(entry, KeyColumns_[i]);
@@ -607,10 +612,7 @@ private:
ui64 keyHash = 0;
keyItems.clear();
for (ui32 keyColumn : KeyColumns_) {
- auto& datum = block[keyColumn];
- MKQL_ENSURE(datum.is_array(), "Expecting array");
-
- auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
+ auto item = GetItemFromBlock(block, keyColumn, offset);
if (!item) {
keyItems.clear();
return 0;
@@ -623,11 +625,21 @@ private:
return keyHash;
}
+ TBlockItem GetItemFromBlock(const std::vector<arrow::Datum>& block, ui32 columnIdx, size_t offset) const {
+ const auto& datum = block[columnIdx];
+ if (datum.is_scalar()) {
+ return Readers_[columnIdx]->GetScalarItem(*datum.scalar());
+ } else {
+ MKQL_ENSURE(datum.is_array(), "Expecting array");
+ return Readers_[columnIdx]->GetItem(*datum.array(), offset);
+ }
+ }
+
TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) {
return &IndexNodes_.emplace_back(entry, currentHead);
}
- bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
if (chain->IsInplace()) {
return IsKeyEquals(chain->GetEntry(), keyItems);
} else {
@@ -650,6 +662,7 @@ private:
TVector<std::unique_ptr<IBlockReader>> Readers_;
TVector<NUdf::IBlockItemHasher::TPtr> Hashers_;
TVector<NUdf::IBlockItemComparator::TPtr> Comparators_;
+ TVector<IBlockTrimmer::TPtr> Trimmers_;
std::vector<std::vector<arrow::Datum>> Data_;
@@ -705,7 +718,8 @@ public:
RightItemTypes_,
RightKeyColumns_,
std::move(RightStream_->GetValue(ctx)),
- RightAny
+ RightAny,
+ &ctx.ArrowMemoryPool
);
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,