diff options
author | ziganshinmr <ziganshinmr@yandex-team.com> | 2024-12-26 21:10:21 +0300 |
---|---|---|
committer | ziganshinmr <ziganshinmr@yandex-team.com> | 2024-12-26 21:46:11 +0300 |
commit | a472cb8a8fac0b378bbdcc1ae9a8b3314c78686e (patch) | |
tree | ddb1eb7669dab4405a677b81540fcf4dc446528b /yql/essentials | |
parent | bdc1680410d66d515192998b7aaf8a4679781dfa (diff) | |
download | ydb-a472cb8a8fac0b378bbdcc1ae9a8b3314c78686e.tar.gz |
BlockMapJoinCore: don't keep rows with the same keys in the block index in RightAny mode
commit_hash:da99f555af301f58cac1645447e931a57d28d738
Diffstat (limited to 'yql/essentials')
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp | 97 |
1 files changed, 64 insertions, 33 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 230509757b..827e295db6 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -394,11 +394,11 @@ public: } EntryConsumed_ = true; - return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing(); + return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TIndexEntry>(Entry_) : Nothing(); case EIteratorType::LIST: for (; Node_ != nullptr; Node_ = Node_->Next) { - if (CheckEntry(Node_->Entry)) { + if (BlockIndex_->IsKeyEquals(Node_->Entry, ItemsToLookup_)) { auto entry = Node_->Entry; Node_ = Node_->Next; return entry; @@ -431,18 +431,6 @@ public: } private: - bool CheckEntry(const TIndexEntry& entry) { - for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) { - auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]); - if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) { - return true; - } - } - - return false; - } - - private: EIteratorType Type_; TBlockIndex* BlockIndex_ = nullptr; @@ -462,13 +450,15 @@ public: TMemoryUsageInfo* memInfo, const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, - NUdf::TUnboxedValue stream + NUdf::TUnboxedValue stream, + bool any ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) , KeyColumns_(keyColumns) , Stream_(stream) , Inputs_(itemTypes.size()) + , Any_(any) { TBlockTypeHelper helper; for (size_t i = 0; i < itemTypes.size(); i++) { @@ -489,17 +479,23 @@ public: break; } - std::vector<arrow::Datum> block; - for (size_t i = 0; i < Inputs_.size() - 1; i++) { - auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); - ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); - block.push_back(std::move(datum)); + { + std::vector<arrow::Datum> block; + for (size_t i = 0; i < Inputs_.size() - 1; i++) { + auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); + ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); + block.push_back(std::move(datum)); + } + Data_.push_back(std::move(block)); } + const auto& block = Data_.back(); + auto blockOffset = Data_.size() - 1; auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]); std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch; std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries; + std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys; ui32 insertBatchLen = 0; auto processInsertBatch = [&]() { @@ -510,6 +506,10 @@ public: *value = TIndexMapValue(insertBatchEntries[i]); Index_.CheckGrow(); } else { + if (Any_ && ContainsKey(value, insertBatchKeys[i])) { + return; + } + // Store as list if (value->IsInplace()) { *value = TIndexMapValue(InsertIndexNode(value->GetEntry())); @@ -520,15 +520,15 @@ public: }); }; - Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max()); + Y_ENSURE(blockOffset <= std::numeric_limits<ui32>::max()); Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max()); - for (size_t i = 0; i < blockSize; i++) { - ui64 keyHash = CalculateKeyHash(block, i); + for (size_t itemOffset = 0; itemOffset < blockSize; itemOffset++) { + ui64 keyHash = GetKey(block, itemOffset, insertBatchKeys[insertBatchLen]); if (!keyHash) { continue; } - insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i); + insertBatchEntries[insertBatchLen] = TIndexEntry(blockOffset, itemOffset); insertBatch[insertBatchLen].ConstructKey(keyHash); insertBatchLen++; @@ -542,7 +542,6 @@ public: processInsertBatch(); } - Data_.push_back(std::move(block)); return NUdf::EFetchStatus::Ok; } @@ -576,6 +575,7 @@ public: } TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) { + Y_ENSURE(entry.BlockOffset < Data_.size()); Y_ENSURE(columnIdx < Inputs_.size() - 1); auto& datum = Data_[entry.BlockOffset][columnIdx]; @@ -590,20 +590,36 @@ public: } } + bool IsKeyEquals(TIndexEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + Y_ENSURE(keyItems.size() == KeyColumns_.size()); + for (size_t i = 0; i < KeyColumns_.size(); i++) { + auto indexItem = GetItem(entry, KeyColumns_[i]); + if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { + return true; + } + } + + return false; + } + private: - ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const { + ui64 GetKey(const std::vector<arrow::Datum>& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { ui64 keyHash = 0; + keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { auto& datum = block[keyColumn]; MKQL_ENSURE(datum.is_array(), "Expecting array"); auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset); if (!item) { + keyItems.clear(); return 0; } keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item)); + keyItems.push_back(std::move(item)); } + return keyHash; } @@ -611,6 +627,22 @@ private: return &IndexNodes_.emplace_back(entry, currentHead); } + bool ContainsKey(const TIndexMapValue* chain, const std::vector<NYql::NUdf::TBlockItem>& keyItems) { + if (chain->IsInplace()) { + return IsKeyEquals(chain->GetEntry(), keyItems); + } else { + for (TIndexNode* node = chain->GetList(); node != nullptr; node = node->Next) { + if (IsKeyEquals(node->Entry, keyItems)) { + return true; + } + + node = node->Next; + } + + return false; + } + } + private: const std::vector<arrow::ValueDescr> InputsDescr_; const TVector<ui32>& KeyColumns_; @@ -626,6 +658,8 @@ private: NUdf::TUnboxedValue Stream_; TUnboxedValueVector Inputs_; + + const bool Any_; }; template <bool WithoutRight, bool RightRequired, bool RightAny> @@ -670,7 +704,8 @@ public: const auto indexState = ctx.HolderFactory.Create<TIndexState>( RightItemTypes_, RightKeyColumns_, - std::move(RightStream_->GetValue(ctx)) + std::move(RightStream_->GetValue(ctx)), + RightAny ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, @@ -768,13 +803,9 @@ private: auto key = iter.Next(); indexState.GetRow(*key, RightIOMap_, rightRow); joinState.MakeRow(rightRow); - - if constexpr (RightAny) { - break; - } } - if (RightAny || iter.IsEmpty()) { + if (iter.IsEmpty()) { joinState.NextRow(); LookupBatchCurrent_++; } @@ -844,7 +875,7 @@ private: const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; - std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_; + std::array<typename TIndexState::TIterator, PrefetchBatchSize> LookupBatchIterators_; ui32 LookupBatchCurrent_ = 0; ui32 LookupBatchSize_ = 0; |