diff options
author | Maxim Yurchuk <maxim-yurchuk@ydb.tech> | 2024-12-12 15:00:43 +0000 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-12-12 15:00:43 +0000 |
commit | 42701242eaf5be980cb935631586d0e90b82641c (patch) | |
tree | 6dbf5fcd37d3c16591e196c4a69d166e3ab3a398 /yql/essentials/minikql/comp_nodes | |
parent | 7f5a9f394dbd9ac290cabbb7977538656b3a541e (diff) | |
parent | f7c04b5876af3d16849ab5e3079c0eabbd4e3a00 (diff) | |
download | ydb-42701242eaf5be980cb935631586d0e90b82641c.tar.gz |
Merge pull request #12554 from vitalyisaev2/YQ-3839.with_rightlib.3
Import from Arcadia + YDB FQ: turning gateways_config.proto into a file without external dependencies
Diffstat (limited to 'yql/essentials/minikql/comp_nodes')
7 files changed, 2045 insertions, 2029 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index e5b53bee5c..230509757b 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -1,9 +1,10 @@ -#include "mkql_map_join.h" +#include "mkql_block_map_join.h" #include <yql/essentials/minikql/computation/mkql_block_builder.h> #include <yql/essentials/minikql/computation/mkql_block_impl.h> #include <yql/essentials/minikql/computation/mkql_block_reader.h> #include <yql/essentials/minikql/computation/mkql_computation_node_holders_codegen.h> +#include <yql/essentials/minikql/comp_nodes/mkql_rh_hash.h> #include <yql/essentials/minikql/invoke_builtins/mkql_builtins.h> #include <yql/essentials/minikql/mkql_node_cast.h> #include <yql/essentials/minikql/mkql_program_builder.h> @@ -23,6 +24,19 @@ size_t CalcMaxBlockLength(const TVector<TType*>& items) { })); } +ui64 CalculateTupleHash(const std::vector<ui64>& hashes) { + ui64 hash = 0; + for (size_t i = 0; i < hashes.size(); i++) { + if (!hashes[i]) { + return 0; + } + + hash = CombineHashes(hash, hashes[i]); + } + + return hash; +} + template <bool RightRequired> class TBlockJoinState : public TBlockState { public: @@ -39,10 +53,12 @@ public: { const auto& pgBuilder = ctx.Builder->GetPgBuilder(); MaxLength_ = CalcMaxBlockLength(outputItems); + TBlockTypeHelper helper; for (size_t i = 0; i < inputItems.size(); i++) { - const TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType(); + TType* blockItemType = AS_TYPE(TBlockType, inputItems[i])->GetItemType(); Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); Converters_.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, pgBuilder)); + Hashers_.push_back(helper.MakeHasher(blockItemType)); } // The last output column (i.e. block length) doesn't require a block builder. for (size_t i = 0; i < OutputWidth_; i++) { @@ -91,6 +107,27 @@ public: OutputRows_++; } + void MakeRow(const std::vector<NYql::NUdf::TBlockItem>& rightColumns) { + size_t builderIndex = 0; + + for (size_t i = 0; i < LeftIOMap_.size(); i++, builderIndex++) { + AddItem(GetItem(LeftIOMap_[i]), builderIndex); + } + + if (!rightColumns.empty()) { + Y_ENSURE(LeftIOMap_.size() + rightColumns.size() == OutputWidth_); + for (size_t i = 0; i < rightColumns.size(); i++) { + AddItem(rightColumns[i], builderIndex++); + } + } else { + while (builderIndex < OutputWidth_) { + AddItem(TBlockItem(), builderIndex++); + } + } + + OutputRows_++; + } + void MakeBlocks(const THolderFactory& holderFactory) { Values.back() = holderFactory.CreateArrowBlock(arrow::Datum(std::make_shared<arrow::UInt64Scalar>(OutputRows_))); OutputRows_ = 0; @@ -102,14 +139,21 @@ public: FillArrays(); } - TBlockItem GetItem(size_t idx) const { + TBlockItem GetItem(size_t idx, size_t offset = 0) const { + Y_ENSURE(Current_ + offset < InputRows_); const auto& datum = TArrowBlock::From(Inputs_[idx]).GetDatum(); ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[idx], datum.descr()); if (datum.is_scalar()) { return Readers_[idx]->GetScalarItem(*datum.scalar()); } MKQL_ENSURE(datum.is_array(), "Expecting array"); - return Readers_[idx]->GetItem(*datum.array(), Current_); + return Readers_[idx]->GetItem(*datum.array(), Current_ + offset); + } + + std::pair<TBlockItem, ui64> GetItemWithHash(size_t idx, size_t offset) const { + auto item = GetItem(idx, offset); + ui64 hash = Hashers_[idx]->Hash(item); + return std::make_pair(item, hash); } NUdf::TUnboxedValuePod GetValue(const THolderFactory& holderFactory, size_t idx) const { @@ -117,7 +161,7 @@ public: } void Reset() { - Next_ = 0; + Current_ = 0; InputRows_ = GetBlockCount(Inputs_.back()); } @@ -125,12 +169,8 @@ public: IsFinished_ = true; } - bool NextRow() { - if (Next_ >= InputRows_) { - return false; - } - Current_ = Next_++; - return true; + void NextRow() { + Current_++; } bool HasBlocks() { @@ -150,6 +190,11 @@ public: return IsFinished_; } + size_t RemainingRowsCount() const { + Y_ENSURE(InputRows_ >= Current_); + return InputRows_ - Current_; + } + NUdf::TUnboxedValue* GetRawInputFields() { return Inputs_.data(); } @@ -174,7 +219,6 @@ private: } size_t Current_ = 0; - size_t Next_ = 0; bool IsFinished_ = false; size_t MaxLength_; size_t BuilderAllocatedSize_ = 0; @@ -190,307 +234,649 @@ private: TVector<std::unique_ptr<IBlockReader>> Readers_; TVector<std::unique_ptr<IBlockItemConverter>> Converters_; TVector<std::unique_ptr<IArrayBuilder>> Builders_; + TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_; }; -template <bool WithoutRight, bool RightRequired, bool IsTuple> -class TBlockWideMapJoinWrapper : public TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>> -{ -using TBaseComputation = TMutableComputationNode<TBlockWideMapJoinWrapper<WithoutRight, RightRequired, IsTuple>>; -using TState = TBlockJoinState<RightRequired>; -public: - TBlockWideMapJoinWrapper(TComputationMutables& mutables, - const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems, - const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - IComputationNode* stream, IComputationNode* dict) - : TBaseComputation(mutables, EValueRepresentation::Boxed) - , ResultJoinItems_(std::move(resultJoinItems)) - , LeftStreamItems_(std::move(leftStreamItems)) - , LeftKeyColumns_(std::move(leftKeyColumns)) - , LeftIOMap_(std::move(leftIOMap)) - , Stream_(stream) - , Dict_(dict) - , KeyTupleCache_(mutables) - {} +class TBlockIndex : public TComputationValue<TBlockIndex> { + struct TIndexEntry { + ui32 BlockOffset; + ui32 ItemOffset; - NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - NUdf::TUnboxedValue* items = nullptr; - const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items); - const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_, - LeftIOMap_, ResultJoinItems_); - return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, - std::move(state), - std::move(Stream_->GetValue(ctx)), - std::move(Dict_->GetValue(ctx)), - LeftKeyColumns_, - std::move(keys), items); - } + TIndexEntry() = default; + TIndexEntry(ui32 blockOffset, ui32 itemOffset) + : BlockOffset(blockOffset) + , ItemOffset(itemOffset) + {} + }; -private: - class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + struct TIndexNode { + TIndexEntry Entry; + TIndexNode* Next; + + TIndexNode() = delete; + TIndexNode(TIndexEntry entry, TIndexNode* next = nullptr) + : Entry(entry) + , Next(next) + {} + }; + + class TIndexMapValue { public: - TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, - NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream, - NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns, - NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems) - : TBase(memInfo) - , BlockState_(blockState) - , Stream_(stream) - , Dict_(dict) - , KeyValue_(keyValue) - , KeyItems_(keyItems) - , LeftKeyColumns_(leftKeyColumns) - , HolderFactory_(holderFactory) + TIndexMapValue() + : Raw(0) + {} + + TIndexMapValue(TIndexEntry entry) { + TIndexEntryUnion un; + un.Entry = entry; + + Y_ENSURE(((un.Raw << 1) >> 1) == un.Raw); + Raw = (un.Raw << 1) | 1; + } + + TIndexMapValue(TIndexNode* entryList) + : EntryList(entryList) {} + bool IsInplace() const { + return Raw & 1; + } + + TIndexNode* GetList() const { + Y_ENSURE(!IsInplace()); + return EntryList; + } + + TIndexEntry GetEntry() const { + Y_ENSURE(IsInplace()); + + TIndexEntryUnion un; + un.Raw = Raw >> 1; + return un.Entry; + } + private: - NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { - auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); - auto* inputFields = blockState.GetRawInputFields(); - const size_t inputWidth = blockState.GetInputWidth(); - const size_t outputWidth = blockState.GetOutputWidth(); + union TIndexEntryUnion { + TIndexEntry Entry; + ui64 Raw; + }; + + union { + TIndexNode* EntryList; + ui64 Raw; + }; + }; - MKQL_ENSURE(width == outputWidth, - "The given width doesn't equal to the result type size"); + static_assert(sizeof(TIndexMapValue) == 8); - while (!blockState.HasBlocks()) { - while (blockState.IsNotFull() && blockState.NextRow()) { - const auto key = MakeKeysTuple(blockState); - if constexpr (WithoutRight) { - if ((key && Dict_.Contains(key)) == RightRequired) { - blockState.CopyRow(); - } - } else if (NUdf::TUnboxedValue lookup; key && (lookup = Dict_.Lookup(key))) { - blockState.MakeRow(lookup); - } else if constexpr (!RightRequired) { - blockState.MakeRow(NUdf::TUnboxedValue()); - } + using TBase = TComputationValue<TBlockIndex>; + using TIndexMap = TRobinHoodHashFixedMap< + ui64, + TIndexMapValue, + std::equal_to<ui64>, + std::hash<ui64>, + TMKQLAllocator<char> + >; + +public: + class TIterator { + enum class EIteratorType { + EMPTY, + INPLACE, + LIST + }; + + public: + TIterator() = default; + + TIterator(TBlockIndex* blockIndex) + : Type_(EIteratorType::EMPTY) + , BlockIndex_(blockIndex) + {} + + TIterator(TBlockIndex* blockIndex, TIndexEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + : Type_(EIteratorType::INPLACE) + , BlockIndex_(blockIndex) + , Entry_(entry) + , EntryConsumed_(false) + , ItemsToLookup_(std::move(itemsToLookup)) + {} + + TIterator(TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + : Type_(EIteratorType::LIST) + , BlockIndex_(blockIndex) + , Node_(node) + , ItemsToLookup_(std::move(itemsToLookup)) + {} + + TIterator(const TIterator&) = delete; + TIterator& operator=(const TIterator&) = delete; + + TIterator(TIterator&& other) { + *this = std::move(other); + } + + TIterator& operator=(TIterator&& other) { + if (this != &other) { + Type_ = other.Type_; + BlockIndex_ = other.BlockIndex_; + ItemsToLookup_ = std::move(other.ItemsToLookup_); + + switch (Type_) { + case EIteratorType::EMPTY: + break; + + case EIteratorType::INPLACE: + Entry_ = other.Entry_; + EntryConsumed_ = other.EntryConsumed_; + break; + + case EIteratorType::LIST: + Node_ = other.Node_; + break; } - if (blockState.IsNotFull() && !blockState.IsFinished()) { - switch (Stream_.WideFetch(inputFields, inputWidth)) { - case NUdf::EFetchStatus::Yield: - return NUdf::EFetchStatus::Yield; - case NUdf::EFetchStatus::Ok: - blockState.Reset(); - continue; - case NUdf::EFetchStatus::Finish: - blockState.Finish(); - break; + + other.BlockIndex_ = nullptr; + } + return *this; + } + + TMaybe<TIndexEntry> Next() { + Y_ENSURE(IsValid()); + + switch (Type_) { + case EIteratorType::EMPTY: + return Nothing(); + + case EIteratorType::INPLACE: + if (EntryConsumed_) { + return Nothing(); + } + + EntryConsumed_ = true; + return CheckEntry(Entry_) ? TMaybe<TIndexEntry>(Entry_) : Nothing(); + + case EIteratorType::LIST: + for (; Node_ != nullptr; Node_ = Node_->Next) { + if (CheckEntry(Node_->Entry)) { + auto entry = Node_->Entry; + Node_ = Node_->Next; + return entry; } - // Leave the loop, if no values left in the stream. - Y_DEBUG_ABORT_UNLESS(blockState.IsFinished()); } - if (blockState.IsEmpty()) { - return NUdf::EFetchStatus::Finish; + + return Nothing(); + } + } + + bool IsValid() const { + return BlockIndex_; + } + + bool IsEmpty() const { + Y_ENSURE(IsValid()); + + switch (Type_) { + case EIteratorType::EMPTY: + return true; + case EIteratorType::INPLACE: + return EntryConsumed_; + case EIteratorType::LIST: + return Node_ == nullptr; + } + } + + void Reset() { + *this = TIterator(); + } + + private: + bool CheckEntry(const TIndexEntry& entry) { + for (size_t i = 0; i < BlockIndex_->KeyColumns_.size(); i++) { + auto indexItem = BlockIndex_->GetItem(entry, BlockIndex_->KeyColumns_[i]); + if (BlockIndex_->Comparators_[BlockIndex_->KeyColumns_[i]]->Equals(indexItem, ItemsToLookup_[i])) { + return true; } - blockState.MakeBlocks(HolderFactory_); } - const auto sliceSize = blockState.Slice(); + return false; + } - for (size_t i = 0; i < outputWidth; i++) { - output[i] = blockState.Get(sliceSize, HolderFactory_, i); + private: + EIteratorType Type_; + TBlockIndex* BlockIndex_ = nullptr; + + union { + TIndexNode* Node_; + struct { + TIndexEntry Entry_; + bool EntryConsumed_; + }; + }; + + std::vector<NYql::NUdf::TBlockItem> ItemsToLookup_; + }; + +public: + TBlockIndex( + TMemoryUsageInfo* memInfo, + const TVector<TType*>& itemTypes, + const TVector<ui32>& keyColumns, + NUdf::TUnboxedValue stream + ) + : TBase(memInfo) + , InputsDescr_(ToValueDescr(itemTypes)) + , KeyColumns_(keyColumns) + , Stream_(stream) + , Inputs_(itemTypes.size()) + { + TBlockTypeHelper helper; + for (size_t i = 0; i < itemTypes.size(); i++) { + TType* blockItemType = AS_TYPE(TBlockType, itemTypes[i])->GetItemType(); + Readers_.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); + Hashers_.push_back(helper.MakeHasher(blockItemType)); + Comparators_.push_back(helper.MakeComparator(blockItemType)); + } + } + + NUdf::EFetchStatus FetchStream() { + switch (Stream_.WideFetch(Inputs_.data(), Inputs_.size())) { + case NUdf::EFetchStatus::Yield: + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Finish: + return NUdf::EFetchStatus::Finish; + case NUdf::EFetchStatus::Ok: + break; + } + + std::vector<arrow::Datum> block; + for (size_t i = 0; i < Inputs_.size() - 1; i++) { + auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); + ARROW_DEBUG_CHECK_DATUM_TYPES(InputsDescr_[i], datum.descr()); + block.push_back(std::move(datum)); + } + + auto blockSize = GetBlockCount(Inputs_[Inputs_.size() - 1]); + + std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch; + std::array<TIndexEntry, PrefetchBatchSize> insertBatchEntries; + ui32 insertBatchLen = 0; + + auto processInsertBatch = [&]() { + Index_.BatchInsert({insertBatch.data(), insertBatchLen}, [&](size_t i, TIndexMap::iterator iter, bool isNew) { + auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter)); + if (isNew) { + // Store single entry inplace + *value = TIndexMapValue(insertBatchEntries[i]); + Index_.CheckGrow(); + } else { + // Store as list + if (value->IsInplace()) { + *value = TIndexMapValue(InsertIndexNode(value->GetEntry())); + } + + *value = TIndexMapValue(InsertIndexNode(insertBatchEntries[i], value->GetList())); + } + }); + }; + + Y_ENSURE(Data_.size() <= std::numeric_limits<ui32>::max()); + Y_ENSURE(blockSize <= std::numeric_limits<ui32>::max()); + for (size_t i = 0; i < blockSize; i++) { + ui64 keyHash = CalculateKeyHash(block, i); + if (!keyHash) { + continue; } - return NUdf::EFetchStatus::Ok; + insertBatchEntries[insertBatchLen] = TIndexEntry(Data_.size(), i); + insertBatch[insertBatchLen].ConstructKey(keyHash); + insertBatchLen++; + + if (insertBatchLen == PrefetchBatchSize) { + processInsertBatch(); + insertBatchLen = 0; + } + } + + if (insertBatchLen > 0) { + processInsertBatch(); } - NUdf::TUnboxedValue MakeKeysTuple(const TState& blockState) const { - // TODO: Handle converters. - if constexpr (!IsTuple) { - return blockState.GetValue(HolderFactory_, LeftKeyColumns_.front()); + Data_.push_back(std::move(block)); + return NUdf::EFetchStatus::Ok; + } + + template<typename TGetKey> + void BatchLookup(size_t batchSize, std::array<TBlockIndex::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { + Y_ENSURE(batchSize <= PrefetchBatchSize); + + std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch; + std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> itemsBatch; + + for (size_t i = 0; i < batchSize; i++) { + const auto& [items, keyHash] = getKey(i); + lookupBatch[i].ConstructKey(keyHash); + itemsBatch[i] = items; + } + + Index_.BatchLookup({lookupBatch.data(), batchSize}, [&](size_t i, TIndexMap::iterator iter) { + if (!iter) { + // Empty iterator + iterators[i] = TIterator(this); + return; } - Y_ABORT_IF(KeyItems_ == nullptr); - for (size_t i = 0; i < LeftKeyColumns_.size(); i++) { - KeyItems_[i] = blockState.GetValue(HolderFactory_, LeftKeyColumns_[i]); + auto value = static_cast<TIndexMapValue*>(Index_.GetMutablePayload(iter)); + if (value->IsInplace()) { + iterators[i] = TIterator(this, value->GetEntry(), std::move(itemsBatch[i])); + } else { + iterators[i] = TIterator(this, value->GetList(), std::move(itemsBatch[i])); } - return KeyValue_; + }); + } + + TBlockItem GetItem(TIndexEntry entry, ui32 columnIdx) { + Y_ENSURE(columnIdx < Inputs_.size() - 1); + + auto& datum = Data_[entry.BlockOffset][columnIdx]; + MKQL_ENSURE(datum.is_array(), "Expecting array"); + return Readers_[columnIdx]->GetItem(*datum.array(), entry.ItemOffset); + } + + void GetRow(TIndexEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) { + Y_ENSURE(row.size() == ioMap.size()); + for (size_t i = 0; i < row.size(); i++) { + row[i] = GetItem(entry, ioMap[i]); } + } - NUdf::TUnboxedValue BlockState_; - NUdf::TUnboxedValue Stream_; - NUdf::TUnboxedValue Dict_; - NUdf::TUnboxedValue KeyValue_; - NUdf::TUnboxedValue* KeyItems_; +private: + ui64 CalculateKeyHash(const std::vector<arrow::Datum>& block, size_t offset) const { + ui64 keyHash = 0; + for (ui32 keyColumn : KeyColumns_) { + auto& datum = block[keyColumn]; + MKQL_ENSURE(datum.is_array(), "Expecting array"); + + auto item = Readers_[keyColumn]->GetItem(*datum.array(), offset); + if (!item) { + return 0; + } - const TVector<ui32>& LeftKeyColumns_; - const THolderFactory& HolderFactory_; - }; + keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item)); + } + return keyHash; + } - void RegisterDependencies() const final { - this->DependsOn(Stream_); - this->DependsOn(Dict_); + TIndexNode* InsertIndexNode(TIndexEntry entry, TIndexNode* currentHead = nullptr) { + return &IndexNodes_.emplace_back(entry, currentHead); } - const TVector<TType*> ResultJoinItems_; - const TVector<TType*> LeftStreamItems_; - const TVector<ui32> LeftKeyColumns_; - const TVector<ui32> LeftIOMap_; - IComputationNode* const Stream_; - IComputationNode* const Dict_; - const TContainerCacheOnContext KeyTupleCache_; +private: + const std::vector<arrow::ValueDescr> InputsDescr_; + const TVector<ui32>& KeyColumns_; + + TVector<std::unique_ptr<IBlockReader>> Readers_; + TVector<NUdf::IBlockItemHasher::TPtr> Hashers_; + TVector<NUdf::IBlockItemComparator::TPtr> Comparators_; + + std::vector<std::vector<arrow::Datum>> Data_; + + TIndexMap Index_; + std::deque<TIndexNode> IndexNodes_; + + NUdf::TUnboxedValue Stream_; + TUnboxedValueVector Inputs_; }; -template<bool RightRequired, bool IsTuple> -class TBlockWideMultiMapJoinWrapper : public TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>> +template <bool WithoutRight, bool RightRequired, bool RightAny> +class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>> { -using TBaseComputation = TMutableComputationNode<TBlockWideMultiMapJoinWrapper<RightRequired, IsTuple>>; -using TState = TBlockJoinState<RightRequired>; +using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>; +using TJoinState = TBlockJoinState<RightRequired>; +using TIndexState = TBlockIndex; public: - TBlockWideMultiMapJoinWrapper(TComputationMutables& mutables, - const TVector<TType*>&& resultJoinItems, const TVector<TType*>&& leftStreamItems, - const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - IComputationNode* stream, IComputationNode* dict) + TBlockMapJoinCoreWraper( + TComputationMutables& mutables, + const TVector<TType*>&& resultItemTypes, + const TVector<TType*>&& leftItemTypes, + const TVector<ui32>&& leftKeyColumns, + const TVector<ui32>&& leftIOMap, + const TVector<TType*>&& rightItemTypes, + const TVector<ui32>&& rightKeyColumns, + const TVector<ui32>&& rightIOMap, + IComputationNode* leftStream, + IComputationNode* rightStream + ) : TBaseComputation(mutables, EValueRepresentation::Boxed) - , ResultJoinItems_(std::move(resultJoinItems)) - , LeftStreamItems_(std::move(leftStreamItems)) + , ResultItemTypes_(std::move(resultItemTypes)) + , LeftItemTypes_(std::move(leftItemTypes)) , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , Stream_(stream) - , Dict_(dict) + , RightItemTypes_(std::move(rightItemTypes)) + , RightKeyColumns_(std::move(rightKeyColumns)) + , RightIOMap_(std::move(rightIOMap)) + , LeftStream_(std::move(leftStream)) + , RightStream_(std::move(rightStream)) , KeyTupleCache_(mutables) {} NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { - NUdf::TUnboxedValue* items = nullptr; - const auto keys = KeyTupleCache_.NewArray(ctx, LeftKeyColumns_.size(), items); - const auto state = ctx.HolderFactory.Create<TState>(ctx, LeftStreamItems_, - LeftIOMap_, ResultJoinItems_); + const auto joinState = ctx.HolderFactory.Create<TJoinState>( + ctx, + LeftItemTypes_, + LeftIOMap_, + ResultItemTypes_ + ); + const auto indexState = ctx.HolderFactory.Create<TIndexState>( + RightItemTypes_, + RightKeyColumns_, + std::move(RightStream_->GetValue(ctx)) + ); + return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, - std::move(state), - std::move(Stream_->GetValue(ctx)), - std::move(Dict_->GetValue(ctx)), + std::move(joinState), + std::move(indexState), + std::move(LeftStream_->GetValue(ctx)), + LeftItemTypes_, LeftKeyColumns_, - std::move(keys), items); + std::move(RightStream_->GetValue(ctx)), + RightItemTypes_, + RightKeyColumns_, + RightIOMap_ + ); } private: class TStreamValue : public TComputationValue<TStreamValue> { using TBase = TComputationValue<TStreamValue>; public: - TStreamValue(TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, - NUdf::TUnboxedValue&& blockState, NUdf::TUnboxedValue&& stream, - NUdf::TUnboxedValue&& dict, const TVector<ui32>& leftKeyColumns, - NUdf::TUnboxedValue&& keyValue, NUdf::TUnboxedValue* keyItems) + TStreamValue( + TMemoryUsageInfo* memInfo, + const THolderFactory& holderFactory, + NUdf::TUnboxedValue&& joinState, + NUdf::TUnboxedValue&& indexState, + NUdf::TUnboxedValue&& leftStream, + const TVector<TType*>& leftTypes, + const TVector<ui32>& leftKeyColumns, + NUdf::TUnboxedValue&& rightStream, + const TVector<TType*>& rightTypes, + const TVector<ui32>& rightKeyColumns, + const TVector<ui32>& rightIOMap + ) : TBase(memInfo) - , BlockState_(blockState) - , Stream_(stream) - , Dict_(dict) - , KeyValue_(keyValue) - , KeyItems_(keyItems) - , List_(NUdf::TUnboxedValue::Invalid()) - , Iterator_(NUdf::TUnboxedValue::Invalid()) - , Current_(NUdf::TUnboxedValue::Invalid()) + , JoinState_(joinState) + , IndexState_(indexState) + , LeftStream_(leftStream) + , LeftItemTypes_(leftTypes) , LeftKeyColumns_(leftKeyColumns) + , RightStream_(rightStream) + , RightItemTypes_(rightTypes) + , RightKeyColumns_(rightKeyColumns) + , RightIOMap_(rightIOMap) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { - auto& blockState = *static_cast<TState*>(BlockState_.AsBoxed().Get()); - auto* inputFields = blockState.GetRawInputFields(); - const size_t inputWidth = blockState.GetInputWidth(); - const size_t outputWidth = blockState.GetOutputWidth(); + auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); + auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get()); + + if (!RightStreamConsumed_) { + auto fetchStatus = NUdf::EFetchStatus::Ok; + while (fetchStatus != NUdf::EFetchStatus::Finish) { + fetchStatus = indexState.FetchStream(); + if (fetchStatus == NUdf::EFetchStatus::Yield) { + return NUdf::EFetchStatus::Yield; + } + } + + RightStreamConsumed_ = true; + } + + auto* inputFields = joinState.GetRawInputFields(); + const size_t inputWidth = joinState.GetInputWidth(); + const size_t outputWidth = joinState.GetOutputWidth(); MKQL_ENSURE(width == outputWidth, "The given width doesn't equal to the result type size"); - while (!blockState.HasBlocks()) { - if (!Iterator_.IsInvalid()) { - // Process the remaining items from the iterator. - while (blockState.IsNotFull() && Iterator_.Next(Current_)) { - blockState.MakeRow(Current_); - } - } - if (blockState.IsNotFull() && blockState.NextRow()) { - const auto key = MakeKeysTuple(blockState); - // Lookup the item in the right dict. If the lookup succeeds, - // reset the iterator and proceed the execution from the - // beginning of the outer loop. Otherwise, the iterator is - // already invalidated (i.e. finished), so the execution will - // process the next tuple from the left stream. - if (key && (List_ = Dict_.Lookup(key))) { - Iterator_ = List_.GetListIterator(); + std::vector<NYql::NUdf::TBlockItem> leftKeyColumns(LeftKeyColumns_.size()); + std::vector<ui64> leftKeyColumnHashes(LeftKeyColumns_.size()); + std::vector<NYql::NUdf::TBlockItem> rightRow(RightIOMap_.size()); + + while (!joinState.HasBlocks()) { + while (joinState.IsNotFull() && LookupBatchCurrent_ < LookupBatchSize_) { + auto& iter = LookupBatchIterators_[LookupBatchCurrent_]; + if constexpr (WithoutRight) { + if (bool(iter.IsEmpty()) != RightRequired) { + joinState.CopyRow(); + } + + joinState.NextRow(); + LookupBatchCurrent_++; + continue; } else if constexpr (!RightRequired) { - blockState.MakeRow(NUdf::TUnboxedValue()); + if (iter.IsEmpty()) { + joinState.MakeRow(std::vector<NYql::NUdf::TBlockItem>()); + joinState.NextRow(); + LookupBatchCurrent_++; + continue; + } } + + while (joinState.IsNotFull() && !iter.IsEmpty()) { + auto key = iter.Next(); + indexState.GetRow(*key, RightIOMap_, rightRow); + joinState.MakeRow(rightRow); + + if constexpr (RightAny) { + break; + } + } + + if (RightAny || iter.IsEmpty()) { + joinState.NextRow(); + LookupBatchCurrent_++; + } + } + + if (joinState.IsNotFull() && joinState.RemainingRowsCount() > 0) { + LookupBatchSize_ = std::min(PrefetchBatchSize, static_cast<ui32>(joinState.RemainingRowsCount())); + indexState.BatchLookup(LookupBatchSize_, LookupBatchIterators_, [&](size_t i) { + MakeLeftKeys(leftKeyColumns, leftKeyColumnHashes, i); + ui64 keyHash = CalculateTupleHash(leftKeyColumnHashes); + return std::make_pair(std::ref(leftKeyColumns), keyHash); + }); + + LookupBatchCurrent_ = 0; continue; } - if (blockState.IsNotFull() && !blockState.IsFinished()) { - switch (Stream_.WideFetch(inputFields, inputWidth)) { + + if (joinState.IsNotFull() && !joinState.IsFinished()) { + switch (LeftStream_.WideFetch(inputFields, inputWidth)) { case NUdf::EFetchStatus::Yield: return NUdf::EFetchStatus::Yield; case NUdf::EFetchStatus::Ok: - blockState.Reset(); + joinState.Reset(); continue; case NUdf::EFetchStatus::Finish: - blockState.Finish(); + joinState.Finish(); break; } // Leave the loop, if no values left in the stream. - Y_DEBUG_ABORT_UNLESS(blockState.IsFinished()); + Y_DEBUG_ABORT_UNLESS(joinState.IsFinished()); } - if (blockState.IsEmpty()) { + if (joinState.IsEmpty()) { return NUdf::EFetchStatus::Finish; } - blockState.MakeBlocks(HolderFactory_); + joinState.MakeBlocks(HolderFactory_); } - const auto sliceSize = blockState.Slice(); + const auto sliceSize = joinState.Slice(); for (size_t i = 0; i < outputWidth; i++) { - output[i] = blockState.Get(sliceSize, HolderFactory_, i); + output[i] = joinState.Get(sliceSize, HolderFactory_, i); } return NUdf::EFetchStatus::Ok; } - NUdf::TUnboxedValue MakeKeysTuple(const TState& state) const { - // TODO: Handle converters. - if constexpr (!IsTuple) { - return state.GetValue(HolderFactory_, LeftKeyColumns_.front()); - } + void MakeLeftKeys(std::vector<NYql::NUdf::TBlockItem>& items, std::vector<ui64>& hashes, size_t offset) const { + auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - Y_ABORT_IF(KeyItems_ == nullptr); + Y_ENSURE(items.size() == LeftKeyColumns_.size()); + Y_ENSURE(hashes.size() == LeftKeyColumns_.size()); for (size_t i = 0; i < LeftKeyColumns_.size(); i++) { - KeyItems_[i] = state.GetValue(HolderFactory_, LeftKeyColumns_[i]); + std::tie(items[i], hashes[i]) = joinState.GetItemWithHash(LeftKeyColumns_[i], offset); } - return KeyValue_; } - NUdf::TUnboxedValue BlockState_; - NUdf::TUnboxedValue Stream_; - NUdf::TUnboxedValue Dict_; - NUdf::TUnboxedValue KeyValue_; - NUdf::TUnboxedValue* KeyItems_; - - NUdf::TUnboxedValue List_; - NUdf::TUnboxedValue Iterator_; - NUdf::TUnboxedValue Current_; + NUdf::TUnboxedValue JoinState_; + NUdf::TUnboxedValue IndexState_; + NUdf::TUnboxedValue LeftStream_; + const TVector<TType*>& LeftItemTypes_; const TVector<ui32>& LeftKeyColumns_; + + NUdf::TUnboxedValue RightStream_; + const TVector<TType*>& RightItemTypes_; + const TVector<ui32>& RightKeyColumns_; + const TVector<ui32>& RightIOMap_; + bool RightStreamConsumed_ = false; + + std::array<TBlockIndex::TIterator, PrefetchBatchSize> LookupBatchIterators_; + ui32 LookupBatchCurrent_ = 0; + ui32 LookupBatchSize_ = 0; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { - this->DependsOn(Stream_); - this->DependsOn(Dict_); + this->DependsOn(LeftStream_); + this->DependsOn(RightStream_); } - const TVector<TType*> ResultJoinItems_; - const TVector<TType*> LeftStreamItems_; +private: + const TVector<TType*> ResultItemTypes_; + + const TVector<TType*> LeftItemTypes_; const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - IComputationNode* const Stream_; - IComputationNode* const Dict_; + + const TVector<TType*> RightItemTypes_; + const TVector<ui32> RightKeyColumns_; + const TVector<ui32> RightIOMap_; + + IComputationNode* const LeftStream_; + IComputationNode* const RightStream_; + const TContainerCacheOnContext KeyTupleCache_; }; } // namespace IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { - MKQL_ENSURE(callable.GetInputsCount() == 5, "Expected 5 args"); + MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); const auto joinType = callable.GetType()->GetReturnType(); MKQL_ENSURE(joinType->IsStream(), "Expected WideStream as a resulting stream"); @@ -510,16 +896,14 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column"); const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend()); - const auto rightDictNode = callable.GetInput(1); - MKQL_ENSURE(rightDictNode.GetStaticType()->IsDict(), - "Expected Dict as a right join part"); - const auto rightDictType = AS_TYPE(TDictType, rightDictNode)->GetPayloadType(); - const auto isMulti = rightDictType->IsList(); - const auto rightDictItemType = isMulti - ? AS_TYPE(TListType, rightDictType)->GetItemType() - : rightDictType; - MKQL_ENSURE(rightDictItemType->IsVoid() || rightDictItemType->IsTuple(), - "Expected Void or Tuple as a right dict item type"); + const auto rightType = callable.GetInput(1).GetStaticType(); + MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream"); + const auto rightStreamType = AS_TYPE(TStreamType, rightType); + MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), + "Expected Multi as a right stream item type"); + const auto rightStreamComponents = GetWideComponents(rightStreamType); + MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); + const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); const auto joinKindNode = callable.GetInput(2); const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); @@ -527,34 +911,61 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly); - const auto keyColumnsLiteral = callable.GetInput(3); - const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral); + const auto leftKeyColumnsLiteral = callable.GetInput(3); + const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral); TVector<ui32> leftKeyColumns; - leftKeyColumns.reserve(keyColumnsTuple->GetValuesCount()); - for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) { - const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i)); + leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount()); + for (ui32 i = 0; i < leftKeyColumnsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, leftKeyColumnsTuple->GetValue(i)); leftKeyColumns.emplace_back(item->AsValue().Get<ui32>()); } - const bool isTupleKey = leftKeyColumns.size() > 1; + const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); - const auto keyDropsLiteral = callable.GetInput(4); - const auto keyDropsTuple = AS_VALUE(TTupleLiteral, keyDropsLiteral); + const auto leftKeyDropsLiteral = callable.GetInput(4); + const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral); THashSet<ui32> leftKeyDrops; - leftKeyDrops.reserve(keyDropsTuple->GetValuesCount()); - for (ui32 i = 0; i < keyDropsTuple->GetValuesCount(); i++) { - const auto item = AS_VALUE(TDataLiteral, keyDropsTuple->GetValue(i)); + leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount()); + for (ui32 i = 0; i < leftKeyDropsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, leftKeyDropsTuple->GetValue(i)); leftKeyDrops.emplace(item->AsValue().Get<ui32>()); } - const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); for (const auto& drop : leftKeyDrops) { MKQL_ENSURE(leftKeySet.contains(drop), "Only key columns has to be specified in drop column set"); + } + const auto rightKeyColumnsLiteral = callable.GetInput(5); + const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral); + TVector<ui32> rightKeyColumns; + rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount()); + for (ui32 i = 0; i < rightKeyColumnsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, rightKeyColumnsTuple->GetValue(i)); + rightKeyColumns.emplace_back(item->AsValue().Get<ui32>()); + } + const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend()); + + const auto rightKeyDropsLiteral = callable.GetInput(6); + const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral); + THashSet<ui32> rightKeyDrops; + rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount()); + for (ui32 i = 0; i < rightKeyDropsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, rightKeyDropsTuple->GetValue(i)); + rightKeyDrops.emplace(item->AsValue().Get<ui32>()); } - TVector<ui32> leftIOMap; + for (const auto& drop : rightKeyDrops) { + MKQL_ENSURE(rightKeySet.contains(drop), + "Only key columns has to be specified in drop column set"); + } + + MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch"); + + const auto rightAnyNode = callable.GetInput(7); + const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>(); + // XXX: Mind the last wide item, containing block length. + TVector<ui32> leftIOMap; for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { if (leftKeyDrops.contains(i)) { continue; @@ -562,51 +973,70 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo leftIOMap.push_back(i); } - const auto stream = LocateNode(ctx.NodeLocator, callable, 0); - const auto dict = LocateNode(ctx.NodeLocator, callable, 1); - -#define DISPATCH_JOIN(IS_TUPLE) do { \ - switch (joinKind) { \ - case EJoinKind::Inner: \ - if (isMulti) { \ - return new TBlockWideMultiMapJoinWrapper<true, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - } \ - return new TBlockWideMapJoinWrapper<false, true, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - case EJoinKind::Left: \ - if (isMulti) { \ - return new TBlockWideMultiMapJoinWrapper<false, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - } \ - return new TBlockWideMapJoinWrapper<false, false, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - case EJoinKind::LeftSemi: \ - return new TBlockWideMapJoinWrapper<true, true, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - case EJoinKind::LeftOnly: \ - return new TBlockWideMapJoinWrapper<true, false, IS_TUPLE>(ctx.Mutables, \ - std::move(joinItems), std::move(leftStreamItems), \ - std::move(leftKeyColumns), std::move(leftIOMap), stream, dict); \ - default: \ - /* TODO: Display the human-readable join kind name. */ \ - MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" \ - << static_cast<ui32>(joinKind)); \ - } \ -} while(0) - - if (isTupleKey) { - DISPATCH_JOIN(true); + // XXX: Mind the last wide item, containing block length. + TVector<ui32> rightIOMap; + if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) { + for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { + if (rightKeyDrops.contains(i)) { + continue; + } + rightIOMap.push_back(i); + } } else { - DISPATCH_JOIN(false); + MKQL_ENSURE(rightKeyDrops.empty(), "Right key drops are not allowed for semi/only join"); + } + + const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0); + const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1); + +#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \ + return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \ + ctx.Mutables, \ + std::move(joinItems), \ + std::move(leftStreamItems), \ + std::move(leftKeyColumns), \ + std::move(leftIOMap), \ + std::move(rightStreamItems), \ + std::move(rightKeyColumns), \ + std::move(rightIOMap), \ + leftStream, \ + rightStream \ + ) + + switch (joinKind) { + case EJoinKind::Inner: + if (rightAny) { + JOIN_WRAPPER(false, true, true); + } else { + JOIN_WRAPPER(false, true, false); + } + case EJoinKind::Left: + if (rightAny) { + JOIN_WRAPPER(false, false, true); + } else { + JOIN_WRAPPER(false, false, false); + } + case EJoinKind::LeftSemi: + MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join"); + if (rightAny) { + JOIN_WRAPPER(true, true, true); + } else { + JOIN_WRAPPER(true, true, false); + } + case EJoinKind::LeftOnly: + MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join"); + if (rightAny) { + JOIN_WRAPPER(true, false, true); + } else { + JOIN_WRAPPER(true, false, false); + } + default: + /* TODO: Display the human-readable join kind name. */ + MKQL_ENSURE(false, "BlockMapJoinCore doesn't support join type #" + << static_cast<ui32>(joinKind)); } -#undef DISPATCH_JOIN +#undef JOIN_WRAPPER } } // namespace NMiniKQL diff --git a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h index 1909b2e714..f5c40a6944 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h +++ b/yql/essentials/minikql/comp_nodes/mkql_rh_hash.h @@ -31,7 +31,7 @@ struct TRobinHoodBatchRequestItem { void ConstructKey(const TKey& key) { new (KeyStorage) TKey(key); } - + // intermediate data ui64 Hash; char* InitialIterator; @@ -114,6 +114,14 @@ public: } } + // returns iterator or nullptr if key is not present + Y_FORCE_INLINE char* Lookup(TKey key) { + auto hash = HashLocal(key); + auto ptr = MakeIterator(hash, Data, CapacityShift); + auto ret = LookupImpl(key, hash, Data, DataEnd, ptr); + return ret; + } + template <typename TSink> Y_NO_INLINE void BatchInsert(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { while (2 * (Size + batchRequest.size()) >= Capacity) { @@ -136,6 +144,22 @@ public: } } + template <typename TSink> + Y_NO_INLINE void BatchLookup(std::span<TRobinHoodBatchRequestItem<TKey>> batchRequest, TSink&& sink) { + for (size_t i = 0; i < batchRequest.size(); ++i) { + auto& r = batchRequest[i]; + r.Hash = HashLocal(r.GetKey()); + r.InitialIterator = MakeIterator(r.Hash, Data, CapacityShift); + NYql::PrefetchForRead(r.InitialIterator); + } + + for (size_t i = 0; i < batchRequest.size(); ++i) { + auto& r = batchRequest[i]; + auto iter = LookupImpl(r.GetKey(), r.Hash, Data, DataEnd, r.InitialIterator); + sink(i, iter); + } + } + ui64 GetCapacity() const { return Capacity; } @@ -215,7 +239,7 @@ private: }; Y_FORCE_INLINE char* MakeIterator(const ui64 hash, char* data, ui64 capacityShift) { - // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ + // https://probablydance.com/2018/06/16/fibonacci-hashing-the-optimization-that-the-world-forgot-or-a-better-alternative-to-integer-modulo/ ui64 bucket = ((SelfHash ^ hash) * 11400714819323198485llu) >> capacityShift; char* ptr = data + AsDeriv().GetCellSize() * bucket; return ptr; @@ -283,6 +307,29 @@ private: } } + Y_FORCE_INLINE char* LookupImpl(TKey key, const ui64 hash, char* data, char* dataEnd, char* ptr) { + i32 currDistance = 0; + for (;;) { + auto& pslPtr = GetPSL(ptr); + if (pslPtr.Distance < 0 || currDistance > pslPtr.Distance) { + return nullptr; + } + + if constexpr (CacheHash) { + if (pslPtr.Hash == hash && EqualLocal(GetKey(ptr), key)) { + return ptr; + } + } else { + if (EqualLocal(GetKey(ptr), key)) { + return ptr; + } + } + + ++currDistance; + AdvancePointer(ptr, data, dataEnd); + } + } + Y_NO_INLINE void Grow() { ui64 growFactor; if (Capacity < 100'000) { diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp index 1a7546eeed..f1cb1ea1aa 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut.cpp @@ -1,1985 +1,995 @@ +#include "mkql_block_map_join_ut_utils.h" #include "mkql_computation_node_ut.h" -#include <arrow/array/builder_binary.h> -#include <arrow/array/builder_primitive.h> -#include <arrow/compute/kernel.h> -#include <yql/essentials/minikql/computation/mkql_block_builder.h> -#include <yql/essentials/minikql/computation/mkql_block_impl.h> -#include <yql/essentials/minikql/computation/mkql_block_reader.h> -#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> #include <yql/essentials/minikql/mkql_node_cast.h> -#include <yql/essentials/public/udf/arrow/udf_arrow_helpers.h> namespace NKikimr { namespace NMiniKQL { namespace { -const TRuntimeNode MakeSet(TProgramBuilder& pgmBuilder, - const TVector<const TRuntimeNode>& keys -) { - const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front(); - - return pgmBuilder.ToHashedDict(keysList, false, - [&](TRuntimeNode item) { - return item; - }, [&](TRuntimeNode) { - return pgmBuilder.NewVoid(); - }); -} - -const TRuntimeNode MakeDict(TProgramBuilder& pgmBuilder, - const TVector<const TRuntimeNode>& keys, - const TVector<const TRuntimeNode>& payloads -) { - const auto keysList = keys.size() > 1 ? pgmBuilder.Zip(keys) : keys.front(); - // TODO: Process containers properly. Now just use Zip to pack - // the data type in a tuple. - TVector<const TRuntimeNode> wrappedPayloads; - std::transform(payloads.cbegin(), payloads.cend(), std::back_inserter(wrappedPayloads), - [&](const auto payload) { - return pgmBuilder.Zip({payload}); - }); - TVector<const TRuntimeNode> pairsChunks; - std::transform(wrappedPayloads.cbegin(), wrappedPayloads.cend(), std::back_inserter(pairsChunks), - [&](const auto payload) { - return pgmBuilder.Zip({keysList, payload}); - }); - const auto pairsList = pgmBuilder.Extend(pairsChunks); - - return pgmBuilder.ToHashedDict(pairsList, payloads.size() > 1, - [&](TRuntimeNode item) { - return pgmBuilder.Nth(item, 0); - }, [&](TRuntimeNode item) { - return pgmBuilder.Nth(item, 1); - }); -} - -// XXX: Copy-pasted from program builder sources. Adjusted on demand. -const std::vector<TType*> ValidateBlockStreamType(const TType* streamType) { - const auto wideComponents = GetWideComponents(AS_TYPE(TStreamType, streamType)); - Y_ENSURE(wideComponents.size() > 0, "Expected at least one column"); - std::vector<TType*> items; - items.reserve(wideComponents.size()); - // XXX: Declare these variables outside the loop body to use for the last - // item (i.e. block length column) in the assertions below. - bool isScalar; - TType* itemType; - for (const auto& wideComponent : wideComponents) { - auto blockType = AS_TYPE(TBlockType, wideComponent); - isScalar = blockType->GetShape() == TBlockType::EShape::Scalar; - itemType = blockType->GetItemType(); - items.push_back(blockType); - } - - Y_ENSURE(isScalar, "Last column should be scalar"); - Y_ENSURE(AS_TYPE(TDataType, itemType)->GetSchemeType() == NUdf::TDataType<ui64>::Id, "Expected Uint64"); - return items; -} - -bool IsOptionalOrNull(const TType* type) { - return type->IsOptional() || type->IsNull() || type->IsPg(); -} - -const TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, - const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, - TRuntimeNode& leftArg, TType* leftTuple, const TRuntimeNode& dictNode -) { - // 1. Make left argument node. - const auto tupleType = AS_TYPE(TTupleType, leftTuple); - const auto listTupleType = pgmBuilder.NewListType(leftTuple); - leftArg = pgmBuilder.Arg(listTupleType); - - // 2. Make left wide stream node. - const auto leftWideStream = pgmBuilder.FromFlow(pgmBuilder.ExpandMap(pgmBuilder.ToFlow(leftArg), +// List<Tuple<...>> -> Stream<Multi<...>> +TRuntimeNode ToWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode list) { + auto wideFlow = pgmBuilder.ExpandMap(pgmBuilder.ToFlow(list), [&](TRuntimeNode tupleNode) -> TRuntimeNode::TList { + TTupleType* tupleType = AS_TYPE(TTupleType, tupleNode.GetStaticType()); TRuntimeNode::TList wide; wide.reserve(tupleType->GetElementsCount()); for (size_t i = 0; i < tupleType->GetElementsCount(); i++) { wide.emplace_back(pgmBuilder.Nth(tupleNode, i)); } return wide; - })); - - // 3. Calculate the resulting join type. - const auto leftStreamItems = ValidateBlockStreamType(leftWideStream.GetStaticType()); - const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); - TVector<TType*> returnJoinItems; - for (size_t i = 0; i < leftStreamItems.size(); i++) { - if (leftKeyDropsSet.contains(i)) { - continue; - } - returnJoinItems.push_back(leftStreamItems[i]); - } - - const auto payloadType = AS_TYPE(TDictType, dictNode.GetStaticType())->GetPayloadType(); - const auto payloadItemType = payloadType->IsList() - ? AS_TYPE(TListType, payloadType)->GetItemType() - : payloadType; - if (joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left) { - // XXX: This is the contract ensured by the expression compiler and - // optimizers to ease the processing of the dict payload in wide context. - Y_ENSURE(payloadItemType->IsTuple(), "Dict payload has to be a Tuple"); - const auto payloadItems = AS_TYPE(TTupleType, payloadItemType)->GetElements(); - TVector<TType*> dictBlockItems; - dictBlockItems.reserve(payloadItems.size()); - for (const auto& payloadItem : payloadItems) { - MKQL_ENSURE(!payloadItem->IsBlock(), "Dict payload item has to be non-block"); - const auto itemType = joinKind == EJoinKind::Inner ? payloadItem - : IsOptionalOrNull(payloadItem) ? payloadItem - : pgmBuilder.NewOptionalType(payloadItem); - dictBlockItems.emplace_back(pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many)); } - // Block length column has to be the last column in wide block stream item, - // so all contents of the dict payload should be appended to the resulting - // wide type before the block size column. - const auto blockLenPos = std::prev(returnJoinItems.end()); - returnJoinItems.insert(blockLenPos, dictBlockItems.cbegin(), dictBlockItems.cend()); - } else { - // XXX: This is the contract ensured by the expression compiler and - // optimizers for join types that don't require the right (i.e. dict) part. - Y_ENSURE(payloadItemType->IsVoid(), "Dict payload has to be Void"); - } - TType* returnJoinType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(returnJoinItems)); + ); - // 4. Build BlockMapJoinCore node. - const auto joinNode = pgmBuilder.BlockMapJoinCore(leftWideStream, dictNode, joinKind, - leftKeyColumns, leftKeyDrops, - returnJoinType); - - // 5. Build the root node with list of tuples. - const auto joinItems = GetWideComponents(AS_TYPE(TStreamType, joinNode.GetStaticType())); - const auto resultType = AS_TYPE(TTupleType, pgmBuilder.NewTupleType(joinItems)); + return pgmBuilder.FromFlow(wideFlow); +} - const auto rootNode = pgmBuilder.Collect(pgmBuilder.FromFlow(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(joinNode), +// Stream<Multi<...>> -> List<Tuple<...>> +TRuntimeNode FromWideStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) { + return pgmBuilder.Collect(pgmBuilder.NarrowMap(pgmBuilder.ToFlow(stream), [&](TRuntimeNode::TList items) -> TRuntimeNode { TVector<TRuntimeNode> tupleElements; - tupleElements.reserve(resultType->GetElementsCount()); - for (size_t i = 0; i < resultType->GetElementsCount(); i++) { + tupleElements.reserve(items.size()); + for (size_t i = 0; i < items.size(); i++) { tupleElements.emplace_back(items[i]); } return pgmBuilder.NewTuple(tupleElements); - }))); - - return rootNode; + }) + ); } -NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, - const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values +TRuntimeNode BuildBlockJoin(TProgramBuilder& pgmBuilder, EJoinKind joinKind, + TRuntimeNode leftList, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, + TRuntimeNode rightList, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny ) { - const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL, - [](size_t max, const TType* type) { - return std::max(max, CalcMaxBlockItemSize(type)); - })); - TVector<std::unique_ptr<IArrayBuilder>> builders; - std::transform(types.cbegin(), types.cend(), std::back_inserter(builders), - [&](const auto& type) { - return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, - maxLength, &ctx.Builder->GetPgBuilder()); - }); - - const auto& holderFactory = ctx.HolderFactory; - const size_t width = types.size(); - const size_t total = values.GetListLength(); - NUdf::TUnboxedValue iterator = values.GetListIterator(); - NUdf::TUnboxedValue current; - size_t converted = 0; - TDefaultListRepresentation listValues; - while (converted < total) { - for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) { - for (size_t j = 0; j < builders.size(); j++) { - const NUdf::TUnboxedValuePod& item = current.GetElement(j); - builders[j]->Add(item); - } - } - NUdf::TUnboxedValue* items = nullptr; - const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items); - for (size_t i = 0; i < width; i++) { - items[i] = holderFactory.CreateArrowBlock(builders[i]->Build(converted >= total)); - } - items[width] = MakeBlockCount(holderFactory, blockSize); - listValues = listValues.Append(std::move(tuple)); - } - return holderFactory.CreateDirectListHolder(std::move(listValues)); -} + const auto leftStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, leftList)); + const auto rightStream = ThrottleStream(pgmBuilder, ToWideStream(pgmBuilder, rightList)); -NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, - const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values -) { - TVector<std::unique_ptr<IBlockReader>> readers; - TVector<std::unique_ptr<IBlockItemConverter>> converters; - for (const auto& type : types) { - const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType(); - readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); - converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, - ctx.Builder->GetPgBuilder())); + const auto leftStreamItems = ValidateBlockStreamType(leftStream.GetStaticType()); + const auto rightStreamItems = ValidateBlockStreamType(rightStream.GetStaticType()); + + TVector<TType*> joinReturnItems; + + const THashSet<ui32> leftKeyDropsSet(leftKeyDrops.cbegin(), leftKeyDrops.cend()); + for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { // Excluding block size + if (leftKeyDropsSet.contains(i)) { + continue; + } + joinReturnItems.push_back(pgmBuilder.NewBlockType(leftStreamItems[i], TBlockType::EShape::Many)); } - const auto& holderFactory = ctx.HolderFactory; - const size_t width = types.size() - 1; - TDefaultListRepresentation listValues; - NUdf::TUnboxedValue iterator = values.GetListIterator(); - NUdf::TUnboxedValue current; - while (iterator.Next(current)) { - const auto blockLengthValue = current.GetElement(width); - const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum(); - Y_ENSURE(blockLengthDatum.is_scalar()); - const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value; - for (size_t i = 0; i < blockLength; i++) { - NUdf::TUnboxedValue* items = nullptr; - const auto tuple = holderFactory.CreateDirectArrayHolder(width, items); - for (size_t j = 0; j < width; j++) { - const auto arrayValue = current.GetElement(j); - const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum(); - UNIT_ASSERT(arrayDatum.is_array()); - const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i); - items[j] = converters[j]->MakeValue(blockItem, holderFactory); + if (joinKind != EJoinKind::LeftSemi && joinKind != EJoinKind::LeftOnly) { + const THashSet<ui32> rightKeyDropsSet(rightKeyDrops.cbegin(), rightKeyDrops.cend()); + for (size_t i = 0; i < rightStreamItems.size() - 1; i++) { // Excluding block size + if (rightKeyDropsSet.contains(i)) { + continue; } - listValues = listValues.Append(std::move(tuple)); + + joinReturnItems.push_back(pgmBuilder.NewBlockType( + joinKind == EJoinKind::Inner ? rightStreamItems[i] + : IsOptionalOrNull(rightStreamItems[i]) ? rightStreamItems[i] + : pgmBuilder.NewOptionalType(rightStreamItems[i]), + TBlockType::EShape::Many + )); } } - return holderFactory.CreateDirectListHolder(std::move(listValues)); -} -NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, - const TType* leftType, const NUdf::TUnboxedValue& leftListValue, - const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, - const TRuntimeNode& rightNode, EJoinKind joinKind, size_t blockSize -) { - TProgramBuilder& pb = *setup.PgmBuilder; - - // 1. Prepare block type for the input produced by the left node. - Y_ENSURE(leftType->IsList(), "Left node has to be list"); - const auto leftListType = AS_TYPE(TListType, leftType)->GetItemType(); - Y_ENSURE(leftListType->IsTuple(), "List item has to be tuple"); - const auto leftItems = AS_TYPE(TTupleType, leftListType)->GetElements(); - const auto ui64Type = pb.NewDataType(NUdf::TDataType<ui64>::Id); - const auto blockLenType = pb.NewBlockType(ui64Type, TBlockType::EShape::Scalar); - TVector<TType*> leftBlockItems; - std::transform(leftItems.cbegin(), leftItems.cend(), std::back_inserter(leftBlockItems), - [&](const auto& itemType) { - return pb.NewBlockType(itemType, TBlockType::EShape::Many); - }); - // XXX: Mind the last block length column. - leftBlockItems.push_back(blockLenType); - const auto leftBlockType = pb.NewTupleType(leftBlockItems); - - // 2. Build AST with BlockMapJoinCore. - TRuntimeNode leftArg; - const auto joinNode = BuildBlockJoin(pb, joinKind, leftKeyColumns, leftKeyDrops, - leftArg, leftBlockType, rightNode); - - // 3. Prepare non-block type for the result of the join node. - const auto joinBlockType = joinNode.GetStaticType(); - Y_ENSURE(joinBlockType->IsList(), "Join result has to be list"); - const auto joinListType = AS_TYPE(TListType, joinBlockType)->GetItemType(); - Y_ENSURE(joinListType->IsTuple(), "List item has to be tuple"); - const auto joinBlockItems = AS_TYPE(TTupleType, joinListType)->GetElements(); - TVector<TType*> joinItems; - // XXX: Mind the last block length column. - std::transform(joinBlockItems.cbegin(), std::prev(joinBlockItems.cend()), std::back_inserter(joinItems), - [](const auto& blockItemType) { - const auto& blockType = AS_TYPE(TBlockType, blockItemType); - Y_ENSURE(blockType->GetShape() == TBlockType::EShape::Many); - return blockType->GetItemType(); - }); - - // 4. Build computation graph with BlockMapJoinCore node as a root. - // Pass the values from the "left node" as the input for the - // BlockMapJoinCore graph. - const auto graph = setup.BuildGraph(joinNode, {leftArg.GetNode()}); - const auto& leftBlocks = graph->GetEntryPoint(0, true); - auto& ctx = graph->GetContext(); - leftBlocks->SetValue(ctx, ToBlocks(ctx, blockSize, leftItems, leftListValue)); - const auto joinValues = FromBlocks(ctx, joinBlockItems, graph->GetValue()); - return joinValues; -} + joinReturnItems.push_back(pgmBuilder.NewBlockType(pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id), TBlockType::EShape::Scalar)); -TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) { - NUdf::TUnboxedValue current; - NUdf::TUnboxedValue iterator = list.GetListIterator(); - TVector<NUdf::TUnboxedValue> items; - while (iterator.Next(current)) { - items.push_back(current); - } - return items; -} + TType* joinReturnType = pgmBuilder.NewStreamType(pgmBuilder.NewMultiType(joinReturnItems)); + auto joinNode = pgmBuilder.BlockMapJoinCore( + leftStream, + rightStream, + joinKind, + leftKeyColumns, + leftKeyDrops, + rightKeyColumns, + rightKeyDrops, + rightAny, + joinReturnType + ); -void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected, - const NUdf::TUnboxedValue& got -) { - const auto itemType = AS_TYPE(TListType, type)->GetItemType(); - const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType); - const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType); - // XXX: Stub both keyTypes and isTuple arguments, since - // ICompare/IEquate are used. - TKeyTypes keyTypesStub; - bool isTupleStub = false; - const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get()); - const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get()); - - auto expectedItems = ConvertListToVector(expected); - auto gotItems = ConvertListToVector(got); - UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size()); - Sort(expectedItems, valueLess); - Sort(gotItems, valueLess); - for (size_t i = 0; i < expectedItems.size(); i++) { - UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i])); - } + return FromWideStream(pgmBuilder, DethrottleStream(pgmBuilder, joinNode)); } -void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, - const TType* expectedType, const NUdf::TUnboxedValue& expected, - const TRuntimeNode& rightNode, const TType* leftType, - const NUdf::TUnboxedValue& leftListValue, const TVector<ui32>& leftKeyColumns, - const TVector<ui32>& leftKeyDrops = {} +NUdf::TUnboxedValue DoTestBlockJoin(TSetup<false>& setup, + TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, const TVector<ui32>& leftKeyDrops, + TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, const TVector<ui32>& rightKeyDrops, bool rightAny, + EJoinKind joinKind, size_t blockSize ) { - const size_t testSize = leftListValue.GetListLength(); - for (size_t blockSize = 8; blockSize <= testSize; blockSize <<= 1) { - const auto got = DoTestBlockJoin(setup, leftType, leftListValue, - leftKeyColumns, leftKeyDrops, - rightNode, joinKind, blockSize); - CompareResults(expectedType, expected, got); - } -} + TProgramBuilder& pb = *setup.PgmBuilder; -// -// Auxiliary routines to build list nodes from the given vectors. -// - -struct TTypeMapperBase { - TProgramBuilder& Pb; - TType* ItemType; - auto GetType() { return ItemType; } -}; - -template <typename Type> -struct TTypeMapper: TTypeMapperBase { - TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {} - auto GetValue(const Type& value) { - return Pb.NewDataLiteral<Type>(value); - } -}; + Y_ENSURE(leftType->IsList(), "Left node has to be list"); + const auto leftItemType = AS_TYPE(TListType, leftType)->GetItemType(); + Y_ENSURE(leftItemType->IsTuple(), "List item has to be tuple"); + TType* leftBlockType = MakeBlockTupleType(pb, leftItemType); -template <> -struct TTypeMapper<TString>: TTypeMapperBase { - TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {} - auto GetValue(const TString& value) { - return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value); - } -}; - -template <typename TNested> -class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> { - using TBase = TTypeMapper<TNested>; -public: - TTypeMapper(TProgramBuilder& pb): TBase(pb) {} - auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); } - auto GetValue(const std::optional<TNested>& value) { - if (value == std::nullopt) { - return TBase::Pb.NewEmptyOptional(GetType()); - } else { - return TBase::Pb.NewOptional(TBase::GetValue(*value)); - } - } -}; + Y_ENSURE(rightType->IsList(), "Right node has to be list"); + const auto rightItemType = AS_TYPE(TListType, rightType)->GetItemType(); + Y_ENSURE(rightItemType->IsTuple(), "Right item has to be tuple"); + TType* rightBlockType = MakeBlockTupleType(pb, rightItemType); -template<typename Type> -const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb, - const TVector<Type>& vector -) { - TTypeMapper<Type> mapper(pb); + TRuntimeNode leftList = pb.Arg(pb.NewListType(leftBlockType)); + TRuntimeNode rightList = pb.Arg(pb.NewListType(rightBlockType)); + const auto joinNode = BuildBlockJoin(pb, joinKind, leftList, leftKeyColumns, leftKeyDrops, rightList, rightKeyColumns, rightKeyDrops, rightAny); - TRuntimeNode::TList listItems; - std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems), - [&](const auto value) { - return mapper.GetValue(value); - }); + const auto joinType = joinNode.GetStaticType(); + Y_ENSURE(joinType->IsList(), "Join result has to be list"); + const auto joinItemType = AS_TYPE(TListType, joinType)->GetItemType(); + Y_ENSURE(joinItemType->IsTuple(), "List item has to be tuple"); - return {pb.NewList(mapper.GetType(), listItems)}; -} + const auto graph = setup.BuildGraph(joinNode, {leftList.GetNode(), rightList.GetNode()}); -template<typename Type, typename... Tail> -const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb, - const TVector<Type>& vector, Tail... vectors -) { - const auto frontList = BuildListNodes(pb, vector); - const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...); - TVector<const TRuntimeNode> lists; - lists.reserve(tailLists.size() + 1); - lists.push_back(frontList.front());; - for (const auto& list : tailLists) { - lists.push_back(list); - } - return lists; + auto& ctx = graph->GetContext(); + graph->GetEntryPoint(0, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, leftItemType)->GetElements(), std::move(leftListValue))); + graph->GetEntryPoint(1, true)->SetValue(ctx, ToBlocks(ctx, blockSize, AS_TYPE(TTupleType, rightItemType)->GetElements(), std::move(rightListValue))); + return FromBlocks(ctx, AS_TYPE(TTupleType, joinItemType)->GetElements(), graph->GetValue()); } -template<typename... TVectors> -const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples( - TSetup<false>& setup, TVectors... vectors +void RunTestBlockJoin(TSetup<false>& setup, EJoinKind joinKind, + TType* expectedType, const NUdf::TUnboxedValue& expected, + TType* leftType, NUdf::TUnboxedValue&& leftListValue, const TVector<ui32>& leftKeyColumns, + TType* rightType, NUdf::TUnboxedValue&& rightListValue, const TVector<ui32>& rightKeyColumns, + const TVector<ui32>& leftKeyDrops = {}, const TVector<ui32>& rightKeyDrops = {}, bool rightAny = false ) { - TProgramBuilder& pb = *setup.PgmBuilder; - const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...); - const auto tuplesNode = pb.Zip(lists); - const auto tuplesNodeType = tuplesNode.GetStaticType(); - const auto tuples = setup.BuildGraph(tuplesNode)->GetValue(); - return std::make_pair(tuplesNodeType, tuples); -} - -TVector<TString> GenerateValues(size_t level) { - constexpr size_t alphaSize = 'Z' - 'A' + 1; - if (level == 1) { - TVector<TString> alphabet(alphaSize); - std::iota(alphabet.begin(), alphabet.end(), 'A'); - return alphabet; - } - const auto subValues = GenerateValues(level - 1); - TVector<TString> values; - values.reserve(alphaSize * subValues.size()); - for (char ch = 'A'; ch <= 'Z'; ch++) { - for (const auto& tail : subValues) { - values.emplace_back(ch + tail); - } - } - return values; -} - -TSet<ui64> GenerateFibonacci(size_t count) { - TSet<ui64> fibSet; - ui64 a = 0, b = 1; - fibSet.insert(a); - while (count--) { - a = std::exchange(b, a + b); - fibSet.insert(b); + const size_t testSize = leftListValue.GetListLength(); + for (size_t blockSize = 1; blockSize <= testSize; blockSize <<= 1) { + const auto got = DoTestBlockJoin(setup, + leftType, std::move(leftListValue), leftKeyColumns, leftKeyDrops, + rightType, std::move(rightListValue), rightKeyColumns, rightKeyDrops, rightAny, + joinKind, blockSize + ); + CompareResults(expectedType, expected, got); } - return fibSet; } } // namespace -Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinBasicTest) { - +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestBasic) { constexpr size_t testSize = 1 << 14; constexpr size_t valueSize = 3; static const TVector<TString> threeLetterValues = GenerateValues(valueSize); - static const TSet<ui64> fibonacci = GenerateFibonacci(21); + static const TSet<ui64> fibonacci = GenerateFibonacci(testSize); + static const TString hugeString(128, '1'); + + Y_UNIT_TEST(TestInnerJoin) { + TSetup<false> setup(GetNodeFactory()); - Y_UNIT_TEST(TestInnerOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. + + // 2. Make input for the "right" stream. const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key); }); + // 3. Make "expected" data. TMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMap.find(keyInit[i]); + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& found = rightMap.find(leftKeyInit[i]); if (found != rightMap.cend()) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(found->second); + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMapNode, leftType, leftList, {0}); - } - Y_UNIT_TEST(TestInnerMultiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), - [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), - [](const auto key) { return std::to_string(key * 1001); }); - // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } - Y_UNIT_TEST(TestLeftOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), - [](const auto key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<ui64, TString> rightMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - const auto& found = rightMap.find(keyInit[i]); - if (found != rightMap.cend()) { - rightExpected.push_back(found->second); - } else { - rightExpected.push_back(std::nullopt); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMapNode, leftType, leftList, {0}); - } + Y_UNIT_TEST(TestInnerJoinMulti) { + TSetup<false> setup(GetNodeFactory()); - Y_UNIT_TEST(TestLeftMultiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), + + // Add rows with the same keys + std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit)); + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key * 1001); }); + // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; + TMultiMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; + rightMap.insert({rightKeyInit[i], rightValueInit[i]}); } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } else { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(std::nullopt); + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]); + for (auto it = begin; it != end; it++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(it->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); - } - Y_UNIT_TEST(TestLeftSemiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (rightSet.contains(keyInit[i])) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, - rightSetNode, leftType, leftList, {0}); + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } - Y_UNIT_TEST(TestLeftOnlyOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestInnerJoinRightAny) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!rightSet.contains(keyInit[i])) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, - rightSetNode, leftType, leftList, {0}); - } -} // Y_UNIT_TEST_SUITE - -Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinNullKeysTest) { - - constexpr size_t testSize = 1 << 14; - constexpr size_t valueSize = 3; - static const TVector<TString> threeLetterValues = GenerateValues(valueSize); - static const TSet<ui64> fibonacci = GenerateFibonacci(21); + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(key); }); - Y_UNIT_TEST(TestInnerOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), + // Add rows with the same keys + std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit)); + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key); }); + // 3. Make "expected" data. TMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!keyInit[i]) { - continue; - } - const auto& found = rightMap.find(*keyInit[i]); + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + auto found = rightMap.find(leftKeyInit[i]); if (found != rightMap.cend()) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(found->second); + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMapNode, leftType, leftList, {0}); - } - Y_UNIT_TEST(TestInnerMultiOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), - [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), - [](const auto key) { return std::to_string(key * 1001); }); - // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!keyInit[i]) { - continue; - } - const auto& found = rightMultiMap.find(*keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0}, true + ); } - Y_UNIT_TEST(TestLeftOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftJoin) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key); }); + // 3. Make "expected" data. TMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - const auto& found = keyInit[i] ? rightMap.find(*keyInit[i]) : rightMap.cend(); + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<std::optional<TString>> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + const auto& found = rightMap.find(leftKeyInit[i]); if (found != rightMap.cend()) { - rightExpected.push_back(found->second); + expectedRightValue.push_back(found->second); } else { - rightExpected.push_back(std::nullopt); + expectedRightValue.push_back(std::nullopt); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMapNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } - Y_UNIT_TEST(TestLeftMultiOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftJoinMulti) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), + + // Add rows with the same keys + std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit)); + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), [](const auto key) { return std::to_string(key * 1001); }); + // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; + TMultiMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = keyInit[i] ? rightMultiMap.find(*keyInit[i]) : rightMultiMap.cend(); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); + rightMap.insert({rightKeyInit[i], rightValueInit[i]}); + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<std::optional<TString>> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]); + if (begin != end) { + for (auto it = begin; it != end; it++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(it->second); } } else { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(std::nullopt); + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(std::nullopt); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } - Y_UNIT_TEST(TestLeftSemiOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftSemiJoin) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + // Add rows with the same keys + std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit)); + // 3. Make "expected" data. TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (keyInit[i] && rightSet.contains(*keyInit[i])) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + if (rightSet.contains(leftKeyInit[i])) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue); + RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, - rightSetNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } - Y_UNIT_TEST(TestLeftOnlyOnOptionalUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftOnlyJoin) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<std::optional<ui64>> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return *key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[*key]; }); - // 1a. Make some keys NULL - keyInit[0] = std::nullopt; - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + // Add rows with the same keys + std::copy_n(rightKeyInit.begin(), rightKeyInit.size(), std::back_inserter(rightKeyInit)); + // 3. Make "expected" data. TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<std::optional<ui64>> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!(keyInit[i] && rightSet.contains(*keyInit[i]))) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + if (!rightSet.contains(leftKeyInit[i])) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue); + RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, - rightSetNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } -} // Y_UNIT_TEST_SUITE + Y_UNIT_TEST(TestKeyTuple) { + TSetup<false> setup(GetNodeFactory()); -Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMoreTest) { + // 1. Make input for the "left" stream. + TVector<ui64> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); - constexpr size_t testSize = 1 << 14; - constexpr size_t valueSize = 3; - static const TVector<TString> threeLetterValues = GenerateValues(valueSize); - static const TString hugeString(128, '1'); + // 2. Make input for the "right" stream. + TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<ui64> rightKey2Init; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), + [](const auto& key) { return key * 1001; }); + TVector<TString> rightValueInit; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit), + [](const auto& key) { return std::to_string(key); }); - Y_UNIT_TEST(TestInnerOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit({1}); - TVector<TString> rightPayloadInit({hugeString}); // 3. Make "expected" data. - TMap<ui64, TString> rightMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMap.find(keyInit[i]); + TMap<std::tuple<ui64, ui64>, TString> rightMap; + for (size_t i = 0; i < rightKey1Init.size(); i++) { + const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); + rightMap[key] = rightValueInit[i]; + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto key = std::make_tuple(leftKeyInit[i], leftSubkeyInit[i]); + const auto found = rightMap.find(key); if (found != rightMap.cend()) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(found->second); + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMapNode, leftType, leftList, {0}); - } - Y_UNIT_TEST(TestInnerMultiOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit({1}); - TVector<TString> rightPayload1Init({"1"}); - TVector<TString> rightPayload2Init({hugeString}); - // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKey1Init, rightValueInit, rightKey2Init); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); + leftType, std::move(leftList), {0, 1}, + rightType, std::move(rightList), {0, 2}, + {}, {0, 2} + ); } - Y_UNIT_TEST(TestLeftOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestInnerJoinOutputSlicing) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. + TVector<ui64> leftKeyInit(testSize); + std::fill(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return key * 1001; }); + TVector<TString> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[key]; }); + + // 2. Make input for the "right" stream. + // Huge string is used to make less rows fit into one block const TVector<ui64> rightKeyInit({1}); - TVector<TString> rightPayloadInit({hugeString}); + TVector<TString> rightValueInit({hugeString}); + // 3. Make "expected" data. TMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - const auto& found = rightMap.find(keyInit[i]); + rightMap[rightKeyInit[i]] = rightValueInit[i]; + } + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& found = rightMap.find(leftKeyInit[i]); if (found != rightMap.cend()) { - rightExpected.push_back(found->second); - } else { - rightExpected.push_back(std::nullopt); + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMapNode, leftType, leftList, {0}); + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); + + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } - Y_UNIT_TEST(TestLeftMultiOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestInnerJoinHugeIterator) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit({1}); - TVector<TString> rightPayload1Init({"1"}); - TVector<TString> rightPayload2Init({hugeString}); + TVector<ui64> leftKeyInit({1}); + TVector<ui64> leftSubkeyInit({1001}); + TVector<TString> leftValueInit({threeLetterValues[1]}); + + // 2. Make input for the "right" stream. + // Huge string is used to make less rows fit into one block + TVector<ui64> rightKeyInit(1 << 16); + std::fill(rightKeyInit.begin(), rightKeyInit.end(), 1); + TVector<TString> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto& key) { return std::to_string(key); }); + // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; + TMultiMap<ui64, TString> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; + rightMap.insert({rightKeyInit[i], rightValueInit[i]}); } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } else { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(std::nullopt); + TVector<ui64> expectedKey; + TVector<ui64> expectedSubkey; + TVector<TString> expectedValue; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + const auto& [begin, end] = rightMap.equal_range(leftKeyInit[i]); + for (auto it = begin; it != end; it++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightValue.push_back(it->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}); - } - Y_UNIT_TEST(TestLeftSemiOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit({1}); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (rightSet.contains(keyInit[i])) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, - rightSetNode, leftType, leftList, {0}); - } + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightValue); - Y_UNIT_TEST(TestLeftOnlyOn1) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::fill(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - TVector<TString> valueInit; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyInit.push_back(i * 1001); - valueInit.push_back(threeLetterValues[i]); - } - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit({1}); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!rightSet.contains(keyInit[i])) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, - rightSetNode, leftType, leftList, {0}); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0}, + {}, {0} + ); } } // Y_UNIT_TEST_SUITE -Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinDropKeyColumns) { - +Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinTestOptional) { constexpr size_t testSize = 1 << 14; constexpr size_t valueSize = 3; static const TVector<TString> threeLetterValues = GenerateValues(valueSize); - static const TSet<ui64> fibonacci = GenerateFibonacci(21); + static const TSet<ui64> fibonacci = GenerateFibonacci(testSize); + + Y_UNIT_TEST(TestInnerJoin) { + TSetup<false> setup(GetNodeFactory()); - Y_UNIT_TEST(TestInnerOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), - [](const auto key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<ui64, TString> rightMap; + TVector<std::optional<ui64>> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return *key * 1001; }); + TVector<std::optional<TString>> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[*key]; }); + + // 2. Make input for the "right" stream. + TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<std::optional<TString>> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(*key); }); + + // 3. Add some NULLs + leftKeyInit[0] = leftKeyInit[2] = std::nullopt; + rightKeyInit[2] = rightKeyInit[3] = std::nullopt; + + leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt; + rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt; + + // 4. Make "expected" data. + TMap<ui64, std::optional<TString>> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; + if (rightKeyInit[i].has_value()) { + rightMap[*rightKeyInit[i]] = rightValueInit[i]; + } } - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMap.find(keyInit[i]); + TVector<std::optional<ui64>> expectedLeftKey; + TVector<ui64> expectedSubkey; + TVector<std::optional<TString>> expectedValue; + TVector<std::optional<ui64>> expectedRightKey; + TVector<std::optional<TString>> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + if (!leftKeyInit[i]) { + continue; + } + const auto& found = rightMap.find(*leftKeyInit[i]); if (found != rightMap.cend()) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(found->second); + expectedLeftKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey.push_back(found->first); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedLeftKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMapNode, leftType, leftList, {0}, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } - Y_UNIT_TEST(TestInnerMultiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftJoin) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), - [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), - [](const auto key) { return std::to_string(key * 1001); }); - // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; + TVector<std::optional<ui64>> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return *key * 1001; }); + TVector<std::optional<TString>> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[*key]; }); + + // 2. Make input for the "right" stream. + TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + TVector<std::optional<TString>> rightValueInit; + std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(*key); }); + + // 3. Add some NULLs + leftKeyInit[0] = leftKeyInit[2] = std::nullopt; + rightKeyInit[2] = rightKeyInit[3] = std::nullopt; + + leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt; + rightValueInit[2] = rightValueInit[12] = rightValueInit[42] = std::nullopt; + + // 4. Make "expected" data. + TMap<ui64, std::optional<TString>> rightMap; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } + if (rightKeyInit[i].has_value()) { + rightMap[*rightKeyInit[i]] = rightValueInit[i]; } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}, {0}); - } + TVector<std::optional<ui64>> expectedKey; + TVector<ui64> expectedSubkey; + TVector<std::optional<TString>> expectedValue; + TVector<std::optional<ui64>> expectedRightKey; + TVector<std::optional<TString>> expectedRightValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); - Y_UNIT_TEST(TestLeftOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayloadInit; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayloadInit), - [](const auto key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<ui64, TString> rightMap; - for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMap[rightKeyInit[i]] = rightPayloadInit[i]; - } - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - const auto& found = rightMap.find(keyInit[i]); + const auto& found = rightMap.find(*leftKeyInit[i]); if (found != rightMap.cend()) { - rightExpected.push_back(found->second); + expectedRightKey.push_back(found->first); + expectedRightValue.push_back(found->second); } else { - rightExpected.push_back(std::nullopt); + expectedRightKey.push_back(std::nullopt); + expectedRightValue.push_back(std::nullopt); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKeyInit, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue, expectedRightKey, expectedRightValue); + RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMapNode, leftType, leftList, {0}, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } - Y_UNIT_TEST(TestLeftMultiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestLeftSemiJoin) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - TVector<TString> rightPayload1Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload1Init), - [](const auto key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKeyInit.cbegin(), rightKeyInit.cend(), std::back_inserter(rightPayload2Init), - [](const auto key) { return std::to_string(key * 1001); }); - // 3. Make "expected" data. - TMap<ui64, TVector<TString>> rightMultiMap; + TVector<std::optional<ui64>> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return *key * 1001; }); + TVector<std::optional<TString>> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[*key]; }); + + // 2. Make input for the "right" stream. + TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); + + // 3. Add some NULLs + leftKeyInit[0] = leftKeyInit[2] = std::nullopt; + rightKeyInit[2] = rightKeyInit[3] = std::nullopt; + leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt; + + // 4. Make "expected" data. + TSet<ui64> rightSet; for (size_t i = 0; i < rightKeyInit.size(); i++) { - rightMultiMap[rightKeyInit[i]] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto& found = rightMultiMap.find(keyInit[i]); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } else { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(std::nullopt); + if (rightKeyInit[i].has_value()) { + rightSet.insert(*rightKeyInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0}, {0}); - } - - Y_UNIT_TEST(TestLeftSemiOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (rightSet.contains(keyInit[i])) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); + TVector<std::optional<ui64>> expectedKey; + TVector<ui64> expectedSubkey; + TVector<std::optional<TString>> expectedValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + if (!leftKeyInit[i]) { + continue; + } + if (rightSet.contains(*leftKeyInit[i])) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue); + RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, - rightSetNode, leftType, leftList, {0}, {0}); + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } - Y_UNIT_TEST(TestLeftOnlyOnUint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - const TVector<ui64> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - // 3. Make "expected" data. - TSet<ui64> rightSet(rightKeyInit.cbegin(), rightKeyInit.cend()); - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - if (!rightSet.contains(keyInit[i])) { - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKeyInit); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, - rightSetNode, leftType, leftList, {0}, {0}); - } + Y_UNIT_TEST(TestLeftOnlyJoin) { + TSetup<false> setup(GetNodeFactory()); -} // Y_UNIT_TEST_SUITE + // 1. Make input for the "left" stream. + TVector<std::optional<ui64>> leftKeyInit(testSize); + std::iota(leftKeyInit.begin(), leftKeyInit.end(), 1); + TVector<ui64> leftSubkeyInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftSubkeyInit), + [](const auto key) { return *key * 1001; }); + TVector<std::optional<TString>> leftValueInit; + std::transform(leftKeyInit.cbegin(), leftKeyInit.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[*key]; }); -Y_UNIT_TEST_SUITE(TMiniKQLBlockMapJoinMultiKeyBasicTest) { + // 2. Make input for the "right" stream. + TVector<std::optional<ui64>> rightKeyInit(fibonacci.cbegin(), fibonacci.cend()); - constexpr size_t testSize = 1 << 14; - constexpr size_t valueSize = 3; - static const TVector<TString> threeLetterValues = GenerateValues(valueSize); - static const TSet<ui64> fibonacci = GenerateFibonacci(21); + // 3. Add some NULLs + leftKeyInit[0] = leftKeyInit[2] = std::nullopt; + rightKeyInit[2] = rightKeyInit[3] = std::nullopt; + leftValueInit[1] = leftValueInit[11] = leftValueInit[41] = std::nullopt; - Y_UNIT_TEST(TestInnerOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - TVector<TString> rightPayloadInit; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit), - [](const auto& key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<std::tuple<ui64, ui64>, TString> rightMap; - for (size_t i = 0; i < rightKey1Init.size(); i++) { - const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); - rightMap[key] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - const auto found = rightMap.find(key); - if (found != rightMap.cend()) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(found->second); + // 4. Make "expected" data. + TSet<ui64> rightSet; + for (size_t i = 0; i < rightKeyInit.size(); i++) { + if (rightKeyInit[i].has_value()) { + rightSet.insert(*rightKeyInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMapNode, leftType, leftList, {0, 1}); - } - - Y_UNIT_TEST(TestInnerMultiOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - TVector<TString> rightPayload1Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init), - [](const auto& key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init), - [](const auto& key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap; - for (size_t i = 0; i < rightKey1Init.size(); i++) { - const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); - rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<TString> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - const auto found = rightMultiMap.find(key); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } + TVector<std::optional<ui64>> expectedKey; + TVector<ui64> expectedSubkey; + TVector<std::optional<TString>> expectedValue; + for (size_t i = 0; i < leftKeyInit.size(); i++) { + if (!leftKeyInit[i] || !rightSet.contains(*leftKeyInit[i])) { + expectedKey.push_back(leftKeyInit[i]); + expectedSubkey.push_back(leftSubkeyInit[i]); + expectedValue.push_back(leftValueInit[i]); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0, 1}); + + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKeyInit, leftSubkeyInit, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, rightKeyInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedKey, expectedSubkey, expectedValue); + + RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, + leftType, std::move(leftList), {0}, + rightType, std::move(rightList), {0} + ); } - Y_UNIT_TEST(TestLeftOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; + Y_UNIT_TEST(TestKeyTuple) { + TSetup<false> setup(GetNodeFactory()); + // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - TVector<TString> rightPayloadInit; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayloadInit), - [](const auto& key) { return std::to_string(key); }); - // 3. Make "expected" data. + TVector<std::optional<ui64>> leftKey1Init(testSize); + std::iota(leftKey1Init.begin(), leftKey1Init.end(), 1); + TVector<std::optional<ui64>> leftKey2Init(testSize); + std::iota(leftKey2Init.begin(), leftKey2Init.end(), 1); + TVector<TString> leftValueInit; + std::transform(leftKey1Init.cbegin(), leftKey1Init.cend(), std::back_inserter(leftValueInit), + [](const auto key) { return threeLetterValues[*key]; }); + + // 2. Make input for the "right" stream. + TVector<std::optional<ui64>> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<std::optional<ui64>> rightKey2Init(fibonacci.cbegin(), fibonacci.cend()); + TVector<TString> rightValueInit; + std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightValueInit), + [](const auto key) { return std::to_string(*key); }); + + // 3. Add some NULLs + leftKey1Init[0] = leftKey1Init[1] = std::nullopt; + leftKey2Init[1] = std::nullopt; + rightKey1Init[1] = rightKey1Init[2] = std::nullopt; + rightKey2Init[2] = std::nullopt; + + // 4. Make "expected" data. TMap<std::tuple<ui64, ui64>, TString> rightMap; for (size_t i = 0; i < rightKey1Init.size(); i++) { - const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); - rightMap[key] = rightPayloadInit[i]; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - const auto found = rightMap.find(key); + if (rightKey1Init[i].has_value() && rightKey2Init[i].has_value()) { + const auto key = std::make_tuple(*rightKey1Init[i], *rightKey2Init[i]); + rightMap[key] = rightValueInit[i]; + } + } + TVector<std::optional<ui64>> expectedLeftKey1; + TVector<std::optional<ui64>> expectedLeftKey2; + TVector<TString> expectedValue; + TVector<std::optional<ui64>> expectedRightKey1; + TVector<std::optional<ui64>> expectedRightKey2; + TVector<TString> expectedRightValue; + for (size_t i = 0; i < leftKey1Init.size(); i++) { + if (!leftKey1Init[i] || !leftKey2Init[i]) { + continue; + } + const auto key = std::make_tuple(*leftKey1Init[i], *leftKey2Init[i]); + const auto& found = rightMap.find(key); if (found != rightMap.cend()) { - rightExpected.push_back(found->second); - } else { - rightExpected.push_back(std::nullopt); + expectedLeftKey1.push_back(leftKey1Init[i]); + expectedLeftKey2.push_back(leftKey2Init[i]); + expectedValue.push_back(leftValueInit[i]); + expectedRightKey1.push_back(std::get<0>(found->first)); + expectedRightKey2.push_back(std::get<1>(found->first)); + expectedRightValue.push_back(found->second); } } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayloadInit); - const auto rightMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMapNode, leftType, leftList, {0, 1}); - } - Y_UNIT_TEST(TestLeftMultiOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - TVector<TString> rightPayload1Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightPayload1Init), - [](const auto& key) { return std::to_string(key); }); - TVector<TString> rightPayload2Init; - std::transform(rightKey2Init.cbegin(), rightKey2Init.cend(), std::back_inserter(rightPayload2Init), - [](const auto& key) { return std::to_string(key); }); - // 3. Make "expected" data. - TMap<std::tuple<ui64, ui64>, TVector<TString>> rightMultiMap; - for (size_t i = 0; i < rightKey1Init.size(); i++) { - const auto key = std::make_tuple(rightKey1Init[i], rightKey2Init[i]); - rightMultiMap[key] = {rightPayload1Init[i], rightPayload2Init[i]}; - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - TVector<std::optional<TString>> rightExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - const auto found = rightMultiMap.find(key); - if (found != rightMultiMap.cend()) { - for (const auto& right : found->second) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(right); - } - } else { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - rightExpected.push_back(std::nullopt); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected, rightExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightPayloads = BuildListNodes(pgmBuilder, rightPayload1Init, rightPayload2Init); - const auto rightMultiMapNode = MakeDict(pgmBuilder, rightKeys, rightPayloads); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::Left, expectedType, expected, - rightMultiMapNode, leftType, leftList, {0, 1}); - } - - Y_UNIT_TEST(TestLeftSemiOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - // 3. Make "expected" data. - TSet<std::tuple<ui64, ui64>> rightSet; - for (size_t i = 0; i < rightKey1Init.size(); i++) { - rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i])); - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - if (rightSet.contains(key)) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftSemi, expectedType, expected, - rightSetNode, leftType, leftList, {0, 1}); - } + auto [leftType, leftList] = ConvertVectorsToTuples(setup, + leftKey1Init, leftKey2Init, leftValueInit); + auto [rightType, rightList] = ConvertVectorsToTuples(setup, + rightKey1Init, rightKey2Init, rightValueInit); + auto [expectedType, expected] = ConvertVectorsToTuples(setup, + expectedLeftKey1, expectedLeftKey2, expectedValue, expectedRightKey1, expectedRightKey2, expectedRightValue); - Y_UNIT_TEST(TestLeftOnlyOnUint64Uint64) { - TSetup<false> setup; - TProgramBuilder& pgmBuilder = *setup.PgmBuilder; - // 1. Make input for the "left" stream. - TVector<ui64> keyInit(testSize); - std::iota(keyInit.begin(), keyInit.end(), 1); - TVector<ui64> subkeyInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(subkeyInit), - [](const auto key) { return key * 1001; }); - TVector<TString> valueInit; - std::transform(keyInit.cbegin(), keyInit.cend(), std::back_inserter(valueInit), - [](const auto key) { return threeLetterValues[key]; }); - // 2. Make input for the "right" dict. - TVector<ui64> rightKey1Init(fibonacci.cbegin(), fibonacci.cend()); - TVector<ui64> rightKey2Init; - std::transform(rightKey1Init.cbegin(), rightKey1Init.cend(), std::back_inserter(rightKey2Init), - [](const auto& key) { return key * 1001; }); - // 3. Make "expected" data. - TSet<std::tuple<ui64, ui64>> rightSet; - for (size_t i = 0; i < rightKey1Init.size(); i++) { - rightSet.emplace(std::make_tuple(rightKey1Init[i], rightKey2Init[i])); - } - TVector<ui64> keyExpected; - TVector<ui64> subkeyExpected; - TVector<TString> valueExpected; - for (size_t i = 0; i < keyInit.size(); i++) { - const auto key = std::make_tuple(keyInit[i], subkeyInit[i]); - if (!rightSet.contains(key)) { - keyExpected.push_back(keyInit[i]); - subkeyExpected.push_back(subkeyInit[i]); - valueExpected.push_back(valueInit[i]); - } - } - // 4. Convert input and expected TVectors to List<UV>. - const auto [leftType, leftList] = ConvertVectorsToTuples(setup, - keyInit, subkeyInit, valueInit); - const auto [expectedType, expected] = ConvertVectorsToTuples(setup, - keyExpected, subkeyExpected, valueExpected); - // 5. Build "right" computation node. - const auto rightKeys = BuildListNodes(pgmBuilder, rightKey1Init, rightKey2Init); - const auto rightSetNode = MakeSet(pgmBuilder, rightKeys); - // 6. Run tests. - RunTestBlockJoin(setup, EJoinKind::LeftOnly, expectedType, expected, - rightSetNode, leftType, leftList, {0, 1}); + RunTestBlockJoin(setup, EJoinKind::Inner, expectedType, expected, + leftType, std::move(leftList), {0, 1}, + rightType, std::move(rightList), {0, 1} + ); } } // Y_UNIT_TEST_SUITE diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp new file mode 100644 index 0000000000..2a65da92ed --- /dev/null +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.cpp @@ -0,0 +1,331 @@ +#include "mkql_block_map_join_ut_utils.h" + +#include <yql/essentials/minikql/computation/mkql_block_impl.h> +#include <yql/essentials/minikql/computation/mkql_block_reader.h> +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> +#include <yql/essentials/minikql/mkql_node_cast.h> +#include <yql/essentials/public/udf/arrow/args_dechunker.h> +#include <yql/essentials/public/udf/arrow/block_builder.h> + +namespace NKikimr { +namespace NMiniKQL { + +namespace { + +class TWideStreamThrottlerWrapper: public TMutableComputationNode<TWideStreamThrottlerWrapper> { + typedef TMutableComputationNode<TWideStreamThrottlerWrapper> TBaseComputation; +public: + class TStreamValue : public TComputationValue<TStreamValue> { + public: + using TBase = TComputationValue<TStreamValue>; + + TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream) + : TBase(memInfo) + , OrigStream_(std::move(origStream)) + { + } + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + if (Counter_++ % 3) { + return NUdf::EFetchStatus::Yield; + } + + TUnboxedValueVector items(width); + switch (OrigStream_.WideFetch(items.data(), width)) { + case NUdf::EFetchStatus::Yield: + return NUdf::EFetchStatus::Yield; + case NUdf::EFetchStatus::Ok: + for (size_t i = 0; i < width; i++) { + output[i] = std::move(items[i]); + } + return NUdf::EFetchStatus::Ok; + case NUdf::EFetchStatus::Finish: + return NUdf::EFetchStatus::Finish; + } + } + + private: + NUdf::TUnboxedValue OrigStream_; + size_t Counter_ = 0; + }; + + TWideStreamThrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream) + : TBaseComputation(mutables) + , OrigStream_(origStream) + { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx)); + } + +private: + void RegisterDependencies() const final { + DependsOn(OrigStream_); + } + +private: + IComputationNode* OrigStream_; +}; + +class TWideStreamDethrottlerWrapper: public TMutableComputationNode<TWideStreamDethrottlerWrapper> { + typedef TMutableComputationNode<TWideStreamDethrottlerWrapper> TBaseComputation; +public: + class TStreamValue : public TComputationValue<TStreamValue> { + public: + using TBase = TComputationValue<TStreamValue>; + + TStreamValue(TMemoryUsageInfo* memInfo, NUdf::TUnboxedValue origStream) + : TBase(memInfo) + , OrigStream_(std::move(origStream)) + { + } + + private: + NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { + TUnboxedValueVector items(width); + for (;;) { + switch (OrigStream_.WideFetch(items.data(), width)) { + case NUdf::EFetchStatus::Yield: + continue; + case NUdf::EFetchStatus::Ok: + for (size_t i = 0; i < width; i++) { + output[i] = std::move(items[i]); + } + return NUdf::EFetchStatus::Ok; + case NUdf::EFetchStatus::Finish: + return NUdf::EFetchStatus::Finish; + } + } + } + + private: + NUdf::TUnboxedValue OrigStream_; + }; + + TWideStreamDethrottlerWrapper(TComputationMutables& mutables, IComputationNode* origStream) + : TBaseComputation(mutables) + , OrigStream_(origStream) + { + } + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TStreamValue>(OrigStream_->GetValue(ctx)); + } + +private: + void RegisterDependencies() const final { + DependsOn(OrigStream_); + } + +private: + IComputationNode* OrigStream_; +}; + +IComputationNode* WrapWideStreamThrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg"); + const auto origStream = LocateNode(ctx.NodeLocator, callable, 0); + return new TWideStreamThrottlerWrapper(ctx.Mutables, origStream); +} + +IComputationNode* WrapWideStreamDethrottler(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg"); + const auto origStream = LocateNode(ctx.NodeLocator, callable, 0); + return new TWideStreamDethrottlerWrapper(ctx.Mutables, origStream); +} + +} + +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType) { + const auto itemTypes = AS_TYPE(TTupleType, tupleType)->GetElements(); + const auto ui64Type = pgmBuilder.NewDataType(NUdf::TDataType<ui64>::Id); + const auto blockLenType = pgmBuilder.NewBlockType(ui64Type, TBlockType::EShape::Scalar); + + TVector<TType*> blockItemTypes; + std::transform(itemTypes.cbegin(), itemTypes.cend(), std::back_inserter(blockItemTypes), + [&](const auto& itemType) { + return pgmBuilder.NewBlockType(itemType, TBlockType::EShape::Many); + }); + // XXX: Mind the last block length column. + blockItemTypes.push_back(blockLenType); + + return pgmBuilder.NewTupleType(blockItemTypes); +} + +NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values +) { + const auto maxLength = CalcBlockLen(std::accumulate(types.cbegin(), types.cend(), 0ULL, + [](size_t max, const TType* type) { + return std::max(max, CalcMaxBlockItemSize(type)); + })); + TVector<std::unique_ptr<NUdf::IArrayBuilder>> builders; + std::transform(types.cbegin(), types.cend(), std::back_inserter(builders), + [&](const auto& type) { + return MakeArrayBuilder(TTypeInfoHelper(), type, ctx.ArrowMemoryPool, + maxLength, &ctx.Builder->GetPgBuilder()); + }); + + const auto& holderFactory = ctx.HolderFactory; + const size_t width = types.size(); + const size_t total = values.GetListLength(); + NUdf::TUnboxedValue iterator = values.GetListIterator(); + NUdf::TUnboxedValue current; + size_t converted = 0; + TDefaultListRepresentation listValues; + while (converted < total) { + for (size_t i = 0; i < blockSize && iterator.Next(current); i++, converted++) { + for (size_t j = 0; j < builders.size(); j++) { + const NUdf::TUnboxedValuePod& item = current.GetElement(j); + builders[j]->Add(item); + } + } + std::vector<arrow::Datum> batch; + batch.reserve(width); + for (size_t i = 0; i < width; i++) { + batch.emplace_back(builders[i]->Build(converted >= total)); + } + + NUdf::TArgsDechunker dechunker(std::move(batch)); + std::vector<arrow::Datum> chunk; + ui64 chunkLen = 0; + while (dechunker.Next(chunk, chunkLen)) { + NUdf::TUnboxedValue* items = nullptr; + const auto tuple = holderFactory.CreateDirectArrayHolder(width + 1, items); + for (size_t i = 0; i < width; i++) { + items[i] = holderFactory.CreateArrowBlock(std::move(chunk[i])); + } + items[width] = MakeBlockCount(holderFactory, chunkLen); + + listValues = listValues.Append(std::move(tuple)); + } + } + return holderFactory.CreateDirectListHolder(std::move(listValues)); +} + +NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values +) { + TVector<std::unique_ptr<IBlockReader>> readers; + TVector<std::unique_ptr<IBlockItemConverter>> converters; + for (const auto& type : types) { + const auto blockItemType = AS_TYPE(TBlockType, type)->GetItemType(); + readers.push_back(MakeBlockReader(TTypeInfoHelper(), blockItemType)); + converters.push_back(MakeBlockItemConverter(TTypeInfoHelper(), blockItemType, + ctx.Builder->GetPgBuilder())); + } + + const auto& holderFactory = ctx.HolderFactory; + const size_t width = types.size() - 1; + TDefaultListRepresentation listValues; + NUdf::TUnboxedValue iterator = values.GetListIterator(); + NUdf::TUnboxedValue current; + while (iterator.Next(current)) { + const auto blockLengthValue = current.GetElement(width); + const auto blockLengthDatum = TArrowBlock::From(blockLengthValue).GetDatum(); + Y_ENSURE(blockLengthDatum.is_scalar()); + const auto blockLength = blockLengthDatum.scalar_as<arrow::UInt64Scalar>().value; + for (size_t i = 0; i < blockLength; i++) { + NUdf::TUnboxedValue* items = nullptr; + const auto tuple = holderFactory.CreateDirectArrayHolder(width, items); + for (size_t j = 0; j < width; j++) { + const auto arrayValue = current.GetElement(j); + const auto arrayDatum = TArrowBlock::From(arrayValue).GetDatum(); + UNIT_ASSERT(arrayDatum.is_array()); + const auto blockItem = readers[j]->GetItem(*arrayDatum.array(), i); + items[j] = converters[j]->MakeValue(blockItem, holderFactory); + } + listValues = listValues.Append(std::move(tuple)); + } + } + return holderFactory.CreateDirectListHolder(std::move(listValues)); +} + +TComputationNodeFactory GetNodeFactory() { + return [](TCallable& callable, const TComputationNodeFactoryContext& ctx) -> IComputationNode* { + if (callable.GetType()->GetName() == "WideStreamThrottler") { + return WrapWideStreamThrottler(callable, ctx); + } else if (callable.GetType()->GetName() == "WideStreamDethrottler") { + return WrapWideStreamDethrottler(callable, ctx); + } + return GetBuiltinFactory()(callable, ctx); + }; +} + +TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) { + TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamThrottler", stream.GetStaticType()); + callableBuilder.Add(stream); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream) { + TCallableBuilder callableBuilder(pgmBuilder.GetTypeEnvironment(), "WideStreamDethrottler", stream.GetStaticType()); + callableBuilder.Add(stream); + return TRuntimeNode(callableBuilder.Build(), false); +} + +TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list) { + NUdf::TUnboxedValue current; + NUdf::TUnboxedValue iterator = list.GetListIterator(); + TVector<NUdf::TUnboxedValue> items; + while (iterator.Next(current)) { + items.push_back(current); + } + return items; +} + +void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected, + const NUdf::TUnboxedValue& got +) { + const auto itemType = AS_TYPE(TListType, type)->GetItemType(); + const NUdf::ICompare::TPtr compare = MakeCompareImpl(itemType); + const NUdf::IEquate::TPtr equate = MakeEquateImpl(itemType); + // XXX: Stub both keyTypes and isTuple arguments, since + // ICompare/IEquate are used. + TKeyTypes keyTypesStub; + bool isTupleStub = false; + const TValueLess valueLess(keyTypesStub, isTupleStub, compare.Get()); + const TValueEqual valueEqual(keyTypesStub, isTupleStub, equate.Get()); + + auto expectedItems = ConvertListToVector(expected); + auto gotItems = ConvertListToVector(got); + UNIT_ASSERT_VALUES_EQUAL(expectedItems.size(), gotItems.size()); + Sort(expectedItems, valueLess); + Sort(gotItems, valueLess); + for (size_t i = 0; i < expectedItems.size(); i++) { + UNIT_ASSERT(valueEqual(gotItems[i], expectedItems[i])); + } +} + +TVector<TString> GenerateValues(size_t level) { + constexpr size_t alphaSize = 'Z' - 'A' + 1; + if (level == 1) { + TVector<TString> alphabet(alphaSize); + std::iota(alphabet.begin(), alphabet.end(), 'A'); + return alphabet; + } + const auto subValues = GenerateValues(level - 1); + TVector<TString> values; + values.reserve(alphaSize * subValues.size()); + for (char ch = 'A'; ch <= 'Z'; ch++) { + for (const auto& tail : subValues) { + values.emplace_back(ch + tail); + } + } + return values; +} + +TSet<ui64> GenerateFibonacci(size_t count) { + TSet<ui64> fibSet; + ui64 a = 0, b = 1; + fibSet.insert(a); + while (count--) { + a = std::exchange(b, a + b); + fibSet.insert(b); + } + return fibSet; +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h new file mode 100644 index 0000000000..2c2f32b312 --- /dev/null +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_block_map_join_ut_utils.h @@ -0,0 +1,113 @@ +#include "mkql_computation_node_ut.h" + +#include <yql/essentials/minikql/computation/mkql_computation_node_holders.h> + +namespace NKikimr { +namespace NMiniKQL { + +inline bool IsOptionalOrNull(const TType* type) { + return type->IsOptional() || type->IsNull() || type->IsPg(); +} + +TType* MakeBlockTupleType(TProgramBuilder& pgmBuilder, TType* tupleType); + +NUdf::TUnboxedValuePod ToBlocks(TComputationContext& ctx, size_t blockSize, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); +NUdf::TUnboxedValuePod FromBlocks(TComputationContext& ctx, + const TArrayRef<TType* const> types, const NUdf::TUnboxedValuePod& values); + +TComputationNodeFactory GetNodeFactory(); +TRuntimeNode ThrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream); +TRuntimeNode DethrottleStream(TProgramBuilder& pgmBuilder, TRuntimeNode stream); + +TVector<NUdf::TUnboxedValue> ConvertListToVector(const NUdf::TUnboxedValue& list); +void CompareResults(const TType* type, const NUdf::TUnboxedValue& expected, const NUdf::TUnboxedValue& got); + +TVector<TString> GenerateValues(size_t level); +TSet<ui64> GenerateFibonacci(size_t count); + +// +// Auxiliary routines to build list nodes from the given vectors. +// + +struct TTypeMapperBase { + TProgramBuilder& Pb; + TType* ItemType; + auto GetType() { return ItemType; } +}; + +template <typename Type> +struct TTypeMapper: TTypeMapperBase { + TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::TDataType<Type>::Id) } {} + auto GetValue(const Type& value) { + return Pb.NewDataLiteral<Type>(value); + } +}; + +template <> +struct TTypeMapper<TString>: TTypeMapperBase { + TTypeMapper(TProgramBuilder& pb): TTypeMapperBase {pb, pb.NewDataType(NUdf::EDataSlot::String)} {} + auto GetValue(const TString& value) { + return Pb.NewDataLiteral<NUdf::EDataSlot::String>(value); + } +}; + +template <typename TNested> +class TTypeMapper<std::optional<TNested>>: TTypeMapper<TNested> { + using TBase = TTypeMapper<TNested>; +public: + TTypeMapper(TProgramBuilder& pb): TBase(pb) {} + auto GetType() { return TBase::Pb.NewOptionalType(TBase::GetType()); } + auto GetValue(const std::optional<TNested>& value) { + if (value == std::nullopt) { + return TBase::Pb.NewEmptyOptional(GetType()); + } else { + return TBase::Pb.NewOptional(TBase::GetValue(*value)); + } + } +}; + +template<typename Type> +const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb, + const TVector<Type>& vector +) { + TTypeMapper<Type> mapper(pb); + + TRuntimeNode::TList listItems; + std::transform(vector.cbegin(), vector.cend(), std::back_inserter(listItems), + [&](const auto value) { + return mapper.GetValue(value); + }); + + return {pb.NewList(mapper.GetType(), listItems)}; +} + +template<typename Type, typename... Tail> +const TVector<const TRuntimeNode> BuildListNodes(TProgramBuilder& pb, + const TVector<Type>& vector, Tail... vectors +) { + const auto frontList = BuildListNodes(pb, vector); + const auto tailLists = BuildListNodes(pb, std::forward<Tail>(vectors)...); + TVector<const TRuntimeNode> lists; + lists.reserve(tailLists.size() + 1); + lists.push_back(frontList.front());; + for (const auto& list : tailLists) { + lists.push_back(list); + } + return lists; +} + +template<typename... TVectors> +const std::pair<TType*, NUdf::TUnboxedValue> ConvertVectorsToTuples( + TSetup<false>& setup, TVectors... vectors +) { + TProgramBuilder& pb = *setup.PgmBuilder; + const auto lists = BuildListNodes(pb, std::forward<TVectors>(vectors)...); + const auto tuplesNode = pb.Zip(lists); + const auto tuplesNodeType = tuplesNode.GetStaticType(); + const auto tuples = setup.BuildGraph(tuplesNode)->GetValue(); + return std::make_pair(tuplesNodeType, tuples); +} + +} // namespace NMiniKQL +} // namespace NKikimr diff --git a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp index c5f3c53ed4..bf68e92ea4 100644 --- a/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp +++ b/yql/essentials/minikql/comp_nodes/ut/mkql_rh_hash_ut.cpp @@ -375,6 +375,90 @@ Y_UNIT_TEST_SUITE(TMiniKQLRobinHoodHashTest) { Cerr << "maxDistance2: " << maxDistance2 << "\n"; UNIT_ASSERT(maxDistance2 < 20); } + + Y_UNIT_TEST(Lookup) { + TRobinHoodHashMap<i32> rh(sizeof(i64)); + std::unordered_map<i32, i64> h; + for (ui64 i = 0; i < 10000; ++i) { + auto k = i % 1000; + auto [it, inserted] = h.emplace(k, 0); + bool isNew; + auto iter = rh.Insert(k, isNew); + UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k); + UNIT_ASSERT_VALUES_EQUAL(isNew, inserted); + it->second += i; + if (isNew) { + *(i64*)rh.GetMutablePayload(iter) = i; + rh.CheckGrow(); + } else { + *(i64*)rh.GetMutablePayload(iter) += i; + } + + UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize()); + } + + for (ui64 i = 0; i < 1000; ++i) { + auto iter = rh.Lookup(i); + auto hit = h.find(i); + UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second); + h.erase(hit); + } + + UNIT_ASSERT(h.empty()); + } + + Y_UNIT_TEST(BatchLookup) { + using THashTable = TRobinHoodHashMap<i32>; + THashTable rh(sizeof(i64)); + std::unordered_map<i32, i64> h; + for (ui64 i = 0; i < 10000; ++i) { + auto k = i % 1000; + auto [it, inserted] = h.emplace(k, 0); + bool isNew; + auto iter = rh.Insert(k, isNew); + UNIT_ASSERT_VALUES_EQUAL(rh.GetKey(iter), k); + UNIT_ASSERT_VALUES_EQUAL(isNew, inserted); + it->second += i; + if (isNew) { + *(i64*)rh.GetMutablePayload(iter) = i; + rh.CheckGrow(); + } else { + *(i64*)rh.GetMutablePayload(iter) += i; + } + + UNIT_ASSERT_VALUES_EQUAL(h.size(), rh.GetSize()); + } + + std::array<TRobinHoodBatchRequestItem<i32>, PrefetchBatchSize> batch; + std::array<ui64, PrefetchBatchSize> batchI; + ui32 batchLen = 0; + + auto processBatch = [&]() { + rh.BatchLookup({batch.data(), batchLen}, [&](size_t i, THashTable::iterator iter) { + auto key = batchI[i]; + auto hit = h.find(key); + UNIT_ASSERT_VALUES_EQUAL(*(i64*)rh.GetPayload(iter), hit->second); + h.erase(hit); + }); + }; + + for (ui64 i = 0; i < 1000; ++i) { + if (batchLen == batch.size()) { + processBatch(); + batchLen = 0; + } + + batch[batchLen].ConstructKey(i); + batchI[batchLen] = i; + ++batchLen; + } + + if (batchLen > 0) { + processBatch(); + } + + UNIT_ASSERT(h.empty()); + } } } diff --git a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc index fc600ad3c4..46a1ad7f11 100644 --- a/yql/essentials/minikql/comp_nodes/ut/ya.make.inc +++ b/yql/essentials/minikql/comp_nodes/ut/ya.make.inc @@ -24,6 +24,7 @@ SET(ORIG_SOURCES mkql_block_compress_ut.cpp mkql_block_exists_ut.cpp mkql_block_skiptake_ut.cpp + mkql_block_map_join_ut_utils.cpp mkql_block_map_join_ut.cpp mkql_block_top_sort_ut.cpp mkql_blocks_ut.cpp |