aboutsummaryrefslogtreecommitdiffstats
path: root/yql/essentials
diff options
context:
space:
mode:
authorziganshinmr <ziganshinmr@yandex-team.com>2024-12-26 21:10:21 +0300
committerziganshinmr <ziganshinmr@yandex-team.com>2024-12-26 21:46:11 +0300
commita472cb8a8fac0b378bbdcc1ae9a8b3314c78686e (patch)
treeddb1eb7669dab4405a677b81540fcf4dc446528b /yql/essentials
parentbdc1680410d66d515192998b7aaf8a4679781dfa (diff)
downloadydb-a472cb8a8fac0b378bbdcc1ae9a8b3314c78686e.tar.gz
BlockMapJoinCore: don't keep rows with the same keys in the block index in RightAny mode
commit_hash:da99f555af301f58cac1645447e931a57d28d738
Diffstat (limited to 'yql/essentials')
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp97
1 files changed, 64 insertions, 33 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
index 230509757b..827e295db6 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp
@@ -394,11 +394,11 @@ public:
}
EntryConsumed_ = true;
- return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
+ return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TIndexEntry>(Entry_) : Nothing();
case EIteratorType::LIST:
for (; Node_ != nullptr; Node_ = Node_->Next) {
- if (CheckEntry(Node_->Entry)) {
+ if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) {
auto entry = Node_->Entry;
Node_ = Node_->Next;
return entry;
@@ -431,18 +431,6 @@ public:
}
private:
- bool CheckEntry(const TIndexEntry& entry) {
- for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) {
- auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]);
- if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) {
- return true;
- }
- }
-
- return false;
- }
-
- private:
EIteratorType Type_;
TBlockIndex* BlockIndex_ = nullptr;
@@ -462,13 +450,15 @@ public:
TMemoryUsageInfo* memInfo,
const TVector<TType*>& itemTypes,
const TVector<ui32>& keyColumns,
- NUdf::TUnboxedValue stream
+ NUdf::TUnboxedValue stream,
+ bool any
)
: TBase(memInfo)
, InputsDescr_(ToValueDescr(itemTypes))
, KeyColumns_(keyColumns)
, Stream_(stream)
, Inputs_(itemTypes.size())
+ , Any_(any)
{
TBlockTypeHelper helper;
for (size_t i = 0; i < itemTypes.size(); i++) {
@@ -489,17 +479,23 @@ public:
break;
}
- std::vector<arrow::Datum> block;
- for (size_t i = 0; i < Inputs_.size() - 1; i++) {
- auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
- ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
- block.push_back(std::move(datum));
+ {
+ std::vector<arrow::Datum> block;
+ for (size_t i = 0; i < Inputs_.size() - 1; i++) {
+ auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum();
+ ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr());
+ block.push_back(std::move(datum));
+ }
+ Data_.push_back(std::move(block));
}
+ const auto& block = Data_.back();
+ auto blockOffset = Data_.size() - 1;
auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]);
std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch;
std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries;
+ std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys;
ui32 insertBatchLen = 0;
auto processInsertBatch = [&]() {
@@ -510,6 +506,10 @@ public:
*value = TIndexMapValue(insertBatchEntries[i]);
Index_.CheckGrow();
} else {
+ if (Any_ && ContainsKey(value, insertBatchKeys[i])) {
+ return;
+ }
+
// Store as list
if (value->IsInplace()) {
*value = TIndexMapValue(InsertIndexNode(value->GetEntry()));
@@ -520,15 +520,15 @@ public:
});
};
- Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max());
+ Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max());
Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max());
- for (size_t i = 0; i < blockSize; i++) {
- ui64 keyHash = CalculateKeyHash(block, i);
+ for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) {
+ ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]);
if (!keyHash) {
continue;
}
- insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i);
+ insertBatchEntries[insertBatchLen] = TIndexEntry(blockOffset, itemOffset);
insertBatch[insertBatchLen].ConstructKey(keyHash);
insertBatchLen++;
@@ -542,7 +542,6 @@ public:
processInsertBatch();
}
- Data_.push_back(std::move(block));
return NUdf::EFetchStatus::Ok;
}
@@ -576,6 +575,7 @@ public:
}
TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) {
+ Y_ENSURE(entry.BlockOffset < Data_.size());
Y_ENSURE(columnIdx < Inputs_.size() - 1);
auto& datum = Data_[entry.BlockOffset][columnIdx];
@@ -590,20 +590,36 @@ public:
}
}
+ bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ Y_ENSURE(keyItems.size() == KeyColumns_.size());
+ for (size_t i = 0; i < KeyColumns_.size(); i++) {
+ auto indexItem = GetItem(entry, KeyColumns_[i]);
+ if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) {
+ return true;
+ }
+ }
+
+ return false;
+ }
+
private:
- ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const {
+ ui64 GetKey(const std::vector<arrow::Datum>& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const {
ui64 keyHash = 0;
+ keyItems.clear();
for (ui32 keyColumn : KeyColumns_) {
auto& datum = block[keyColumn];
MKQL_ENSURE(datum.is_array(), "Expecting array");
auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset);
if (!item) {
+ keyItems.clear();
return 0;
}
keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item));
+ keyItems.push_back(std::move(item));
}
+
return keyHash;
}
@@ -611,6 +627,22 @@ private:
return &IndexNodes_.emplace_back(entry, currentHead);
}
+ bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) {
+ if (chain->IsInplace()) {
+ return IsKeyEquals(chain->GetEntry(), keyItems);
+ } else {
+ for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) {
+ if (IsKeyEquals(node->Entry, keyItems)) {
+ return true;
+ }
+
+ node = node->Next;
+ }
+
+ return false;
+ }
+ }
+
private:
const std::vector<arrow::ValueDescr> InputsDescr_;
const TVector<ui32>& KeyColumns_;
@@ -626,6 +658,8 @@ private:
NUdf::TUnboxedValue Stream_;
TUnboxedValueVector Inputs_;
+
+ const bool Any_;
};
template <bool WithoutRight, bool RightRequired, bool RightAny>
@@ -670,7 +704,8 @@ public:
const auto indexState = ctx.HolderFactory.Create<TIndexState>(
RightItemTypes_,
RightKeyColumns_,
- std::move(RightStream_->GetValue(ctx))
+ std::move(RightStream_->GetValue(ctx)),
+ RightAny
);
return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory,
@@ -768,13 +803,9 @@ private:
auto key = iter.Next();
indexState.GetRow(*key, RightIOMap_, rightRow);
joinState.MakeRow(rightRow);
-
- if constexpr (RightAny) {
- break;
- }
}
- if (RightAny || iter.IsEmpty()) {
+ if (iter.IsEmpty()) {
joinState.NextRow();
LookupBatchCurrent_++;
}
@@ -844,7 +875,7 @@ private:
const TVector<ui32>& RightIOMap_;
bool RightStreamConsumed_ = false;
- std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_;
+ std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_;
ui32 LookupBatchCurrent_ = 0;
ui32 LookupBatchSize_ = 0;