diff options
author | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
---|---|---|
committer | Alexander Smirnov <alex@ydb.tech> | 2025-02-18 13:35:23 +0000 |
commit | d1f5b91da822b27faad83d50ecfdd2830a1be93e (patch) | |
tree | 78df3bf535cb8a5451afa402c51cb3f8d11b4d06 /yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp | |
parent | 22bc9b81495143d67a93bf58c936c5d5a65c8e8e (diff) | |
parent | a2f16dc9eb108ecf11938c7c4275d701a3635bb7 (diff) | |
download | ydb-d1f5b91da822b27faad83d50ecfdd2830a1be93e.tar.gz |
Merge pull request #14716 from ydb-platform/merge-libs-250218-0050
Diffstat (limited to 'yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp')
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp | 515 |
1 files changed, 327 insertions, 188 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp index 37217aec85..17c549202c 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_block_map_join.cpp @@ -239,10 +239,8 @@ private: TVector<NYql::NUdf::IBlockItemHasher::TPtr> Hashers_; }; -template <typename TDerived> -class TBlockStorageBase : public TComputationValue<TDerived> { - using TSelf = TBlockStorageBase<TDerived>; - using TBase = TComputationValue<TDerived>; +class TBlockStorage : public TComputationValue<TBlockStorage> { + using TBase = TComputationValue<TBlockStorage>; public: struct TBlock { @@ -268,7 +266,7 @@ public: }; class TRowIterator { - friend class TBlockStorageBase; + friend class TBlockStorage; public: TRowIterator() = default; @@ -304,7 +302,7 @@ public: } private: - TRowIterator(const TSelf* blockStorage) + TRowIterator(const TBlockStorage* blockStorage) : BlockStorage_(blockStorage) {} @@ -312,19 +310,21 @@ public: size_t CurrentBlockOffset_ = 0; size_t CurrentItemOffset_ = 0; - const TSelf* BlockStorage_ = nullptr; + const TBlockStorage* BlockStorage_ = nullptr; }; - TBlockStorageBase( + TBlockStorage( TMemoryUsageInfo* memInfo, const TVector<TType*>& itemTypes, NUdf::TUnboxedValue stream, + TStringBuf resourceTag, arrow::MemoryPool* pool ) : TBase(memInfo) , InputsDescr_(ToValueDescr(itemTypes)) - , Stream_(stream) + , Stream_(std::move(stream)) , Inputs_(itemTypes.size()) + , ResourceTag_(std::move(resourceTag)) { TBlockTypeHelper helper; for (size_t i = 0; i < itemTypes.size(); i++) { @@ -341,11 +341,14 @@ public: case NUdf::EFetchStatus::Yield: return NUdf::EFetchStatus::Yield; case NUdf::EFetchStatus::Finish: + IsFinished_ = true; return NUdf::EFetchStatus::Finish; case NUdf::EFetchStatus::Ok: break; } + Y_ENSURE(!IsFinished_, "Got data on finished stream"); + std::vector<arrow::Datum> blockColumns; for (size_t i = 0; i < Inputs_.size() - 1; i++) { auto& datum = TArrowBlock::From(Inputs_[i]).GetDatum(); @@ -384,19 +387,15 @@ public: return TRowIterator(this); } + size_t GetRowCount() const { + return RowCount_; + } + TBlockItem GetItem(TRowEntry entry, ui32 columnIdx) const { Y_ENSURE(columnIdx < Inputs_.size() - 1); return GetItemFromBlock(GetBlock(entry.BlockOffset), columnIdx, entry.ItemOffset); } - void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { - Y_ENSURE(row.size() == ioMap.size()); - for (size_t i = 0; i < row.size(); i++) { - row[i] = GetItem(entry, ioMap[i]); - } - } - -protected: TBlockItem GetItemFromBlock(const TBlock& block, ui32 columnIdx, size_t offset) const { Y_ENSURE(offset < block.Size); const auto& datum = block.Columns[columnIdx]; @@ -408,6 +407,34 @@ protected: } } + void GetRow(TRowEntry entry, const TVector<ui32>& ioMap, std::vector<NYql::NUdf::TBlockItem>& row) const { + Y_ENSURE(row.size() == ioMap.size()); + for (size_t i = 0; i < row.size(); i++) { + row[i] = GetItem(entry, ioMap[i]); + } + } + + const TVector<NUdf::IBlockItemComparator::TPtr>& GetItemComparators() const { + return Comparators_; + } + + const TVector<NUdf::IBlockItemHasher::TPtr>& GetItemHashers() const { + return Hashers_; + } + + bool IsFinished() const { + return IsFinished_; + } + +private: + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + protected: const std::vector<arrow::ValueDescr> InputsDescr_; @@ -418,27 +445,60 @@ protected: std::vector<TBlock> Data_; size_t RowCount_ = 0; + bool IsFinished_ = false; NUdf::TUnboxedValue Stream_; TUnboxedValueVector Inputs_; + + const TStringBuf ResourceTag_; }; -class TBlockStorage: public TBlockStorageBase<TBlockStorage> { -private: - using TBase = TBlockStorageBase<TBlockStorage>; +class TBlockStorageWrapper : public TMutableComputationNode<TBlockStorageWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockStorageWrapper>; + public: - using TBase::TBase; + TBlockStorageWrapper( + TComputationMutables& mutables, + TVector<TType*>&& itemTypes, + IComputationNode* stream, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , ItemTypes_(std::move(itemTypes)) + , Stream_(stream) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockStorage>( + ItemTypes_, + std::move(Stream_->GetValue(ctx)), + ResourceTag_, + &ctx.ArrowMemoryPool + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(Stream_); + } + +private: + const TVector<TType*> ItemTypes_; + IComputationNode* const Stream_; + + const TString ResourceTag_; }; -class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { - using TBase = TBlockStorageBase<TIndexedBlockStorage>; +class TBlockIndex : public TComputationValue<TBlockIndex> { + using TBase = TComputationValue<TBlockIndex>; struct TIndexNode { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; TIndexNode* Next; TIndexNode() = delete; - TIndexNode(TRowEntry entry, TIndexNode* next = nullptr) + TIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* next = nullptr) : Entry(entry) , Next(next) {} @@ -450,7 +510,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { : Raw(0) {} - TIndexMapValue(TRowEntry entry) { + TIndexMapValue(TBlockStorage::TRowEntry entry) { TIndexEntryUnion un; un.Entry = entry; @@ -471,7 +531,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { return EntryList; } - TRowEntry GetEntry() const { + TBlockStorage::TRowEntry GetEntry() const { Y_ENSURE(IsInplace()); TIndexEntryUnion un; @@ -481,7 +541,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { private: union TIndexEntryUnion { - TRowEntry Entry; + TBlockStorage::TRowEntry Entry; ui64 Raw; }; @@ -504,7 +564,7 @@ class TIndexedBlockStorage : public TBlockStorageBase<TIndexedBlockStorage> { public: class TIterator { - friend class TIndexedBlockStorage; + friend class TBlockIndex; enum class EIteratorType { EMPTY, @@ -547,7 +607,7 @@ public: return *this; } - TMaybe<TRowEntry> Next() { + TMaybe<TBlockStorage::TRowEntry> Next() { Y_ENSURE(IsValid()); switch (Type_) { @@ -560,7 +620,7 @@ public: } EntryConsumed_ = true; - return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TRowEntry>(Entry_) : Nothing(); + return BlockIndex_->IsKeyEquals(Entry_, ItemsToLookup_) ? TMaybe<TBlockStorage::TRowEntry>(Entry_) : Nothing(); case EIteratorType::LIST: for (; Node_ != nullptr; Node_ = Node_->Next) { @@ -597,12 +657,12 @@ public: } private: - TIterator(const TIndexedBlockStorage* blockIndex) + TIterator(const TBlockIndex* blockIndex) : Type_(EIteratorType::EMPTY) , BlockIndex_(blockIndex) {} - TIterator(const TIndexedBlockStorage* blockIndex, TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TBlockStorage::TRowEntry entry, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::INPLACE) , BlockIndex_(blockIndex) , Entry_(entry) @@ -610,7 +670,7 @@ public: , ItemsToLookup_(std::move(itemsToLookup)) {} - TIterator(const TIndexedBlockStorage* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) + TIterator(const TBlockIndex* blockIndex, TIndexNode* node, std::vector<NYql::NUdf::TBlockItem> itemsToLookup) : Type_(EIteratorType::LIST) , BlockIndex_(blockIndex) , Node_(node) @@ -619,12 +679,12 @@ public: private: EIteratorType Type_; - const TIndexedBlockStorage* BlockIndex_ = nullptr; + const TBlockIndex* BlockIndex_ = nullptr; union { TIndexNode* Node_; struct { - TRowEntry Entry_; + TBlockStorage::TRowEntry Entry_; bool EntryConsumed_; }; }; @@ -633,32 +693,35 @@ public: }; public: - TIndexedBlockStorage( + TBlockIndex( TMemoryUsageInfo* memInfo, - const TVector<TType*>& itemTypes, const TVector<ui32>& keyColumns, - NUdf::TUnboxedValue stream, + NUdf::TUnboxedValue blockStorage, bool any, - arrow::MemoryPool* pool + TStringBuf resourceTag ) - : TBase(memInfo, itemTypes, stream, pool) + : TBase(memInfo) , KeyColumns_(keyColumns) + , BlockStorage_(std::move(blockStorage)) , Any_(any) + , ResourceTag_(std::move(resourceTag)) {} - NUdf::EFetchStatus FetchStream() { - Y_ENSURE(!Index_, "Data fetch shouldn't be done after the index has been built"); - return TBase::FetchStream(); - } - void BuildIndex() { - Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(RowCount_)); - for (size_t blockOffset = 0; blockOffset < Data_.size(); blockOffset++) { - const auto& block = GetBlock(blockOffset); + if (Index_) { + return; + } + + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(blockStorage.IsFinished(), "Index build should be done after all data has been read"); + + Index_ = std::make_unique<TIndexMap>(CalculateRHHashTableCapacity(blockStorage.GetRowCount())); + for (size_t blockOffset = 0; blockOffset < blockStorage.GetBlockCount(); blockOffset++) { + const auto& block = blockStorage.GetBlock(blockOffset); auto blockSize = block.Size; std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> insertBatch; - std::array<TRowEntry, PrefetchBatchSize> insertBatchEntries; + std::array<TBlockStorage::TRowEntry, PrefetchBatchSize> insertBatchEntries; std::array<std::vector<NYql::NUdf::TBlockItem>, PrefetchBatchSize> insertBatchKeys; ui32 insertBatchLen = 0; @@ -692,7 +755,7 @@ public: continue; } - insertBatchEntries[insertBatchLen] = TRowEntry(blockOffset, itemOffset); + insertBatchEntries[insertBatchLen] = TBlockStorage::TRowEntry(blockOffset, itemOffset); insertBatch[insertBatchLen].ConstructKey(keyHash); insertBatchLen++; @@ -709,7 +772,7 @@ public: } template<typename TGetKey> - void BatchLookup(size_t batchSize, std::array<TIndexedBlockStorage::TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { + void BatchLookup(size_t batchSize, std::array<TIterator, PrefetchBatchSize>& iterators, TGetKey&& getKey) { Y_ENSURE(batchSize <= PrefetchBatchSize); std::array<TRobinHoodBatchRequestItem<ui64>, PrefetchBatchSize> lookupBatch; @@ -737,11 +800,13 @@ public: }); } - bool IsKeyEquals(TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + bool IsKeyEquals(TBlockStorage::TRowEntry entry, const std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + Y_ENSURE(keyItems.size() == KeyColumns_.size()); for (size_t i = 0; i < KeyColumns_.size(); i++) { - auto indexItem = GetItem(entry, KeyColumns_[i]); - if (Comparators_[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { + auto indexItem = blockStorage.GetItem(entry, KeyColumns_[i]); + if (blockStorage.GetItemComparators()[KeyColumns_[i]]->Equals(indexItem, keyItems[i])) { return true; } } @@ -749,25 +814,31 @@ public: return false; } + const NUdf::TUnboxedValue& GetBlockStorage() const { + return BlockStorage_; + } + private: - ui64 GetKey(const TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + ui64 GetKey(const TBlockStorage::TBlock& block, size_t offset, std::vector<NYql::NUdf::TBlockItem>& keyItems) const { + auto& blockStorage = *static_cast<TBlockStorage*>(BlockStorage_.GetResource()); + ui64 keyHash = 0; keyItems.clear(); for (ui32 keyColumn : KeyColumns_) { - auto item = GetItemFromBlock(block, keyColumn, offset); + auto item = blockStorage.GetItemFromBlock(block, keyColumn, offset); if (!item) { keyItems.clear(); return 0; } - keyHash = CombineHashes(keyHash, Hashers_[keyColumn]->Hash(item)); + keyHash = CombineHashes(keyHash, blockStorage.GetItemHashers()[keyColumn]->Hash(item)); keyItems.push_back(std::move(item)); } return keyHash; } - TIndexNode* InsertIndexNode(TRowEntry entry, TIndexNode* currentHead = nullptr) { + TIndexNode* InsertIndexNode(TBlockStorage::TRowEntry entry, TIndexNode* currentHead = nullptr) { return &IndexNodes_.emplace_back(entry, currentHead); } @@ -787,21 +858,72 @@ private: } } + NUdf::TStringRef GetResourceTag() const override { + return NUdf::TStringRef(ResourceTag_); + } + + void* GetResource() override { + return this; + } + private: const TVector<ui32>& KeyColumns_; + NUdf::TUnboxedValue BlockStorage_; std::unique_ptr<TIndexMap> Index_; std::deque<TIndexNode> IndexNodes_; const bool Any_; + const TStringBuf ResourceTag_; +}; + +class TBlockMapJoinIndexWrapper : public TMutableComputationNode<TBlockMapJoinIndexWrapper> { + using TBaseComputation = TMutableComputationNode<TBlockMapJoinIndexWrapper>; + +public: + TBlockMapJoinIndexWrapper( + TComputationMutables& mutables, + TVector<ui32>&& keyColumns, + IComputationNode* blockStorage, + bool any, + const TStringBuf& resourceTag + ) + : TBaseComputation(mutables, EValueRepresentation::Boxed) + , KeyColumns_(std::move(keyColumns)) + , BlockStorage_(blockStorage) + , Any_(any) + , ResourceTag_(resourceTag) + {} + + NUdf::TUnboxedValuePod DoCalculate(TComputationContext& ctx) const { + return ctx.HolderFactory.Create<TBlockIndex>( + KeyColumns_, + std::move(BlockStorage_->GetValue(ctx)), + Any_, + ResourceTag_ + ); + } + +private: + void RegisterDependencies() const final { + DependsOn(BlockStorage_); + } + +private: + const TVector<ui32> KeyColumns_; + IComputationNode* const BlockStorage_; + const bool Any_; + const TString ResourceTag_; }; -template <bool WithoutRight, bool RightRequired, bool RightAny> -class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>> +template <bool WithoutRight, bool RightRequired> +class TBlockMapJoinCoreWraper : public TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>> { -using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired, RightAny>>; -using TJoinState = TBlockJoinState<RightRequired>; -using TIndexState = TIndexedBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockMapJoinCoreWraper<WithoutRight, RightRequired>>; + using TJoinState = TBlockJoinState<RightRequired>; + using TStorageState = TBlockStorage; + using TIndexState = TBlockIndex; + public: TBlockMapJoinCoreWraper( TComputationMutables& mutables, @@ -809,22 +931,18 @@ public: const TVector<TType*>&& leftItemTypes, const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockIndex ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) - , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , KeyTupleCache_(mutables) {} @@ -835,60 +953,49 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TIndexState>( - RightItemTypes_, - RightKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightAny, - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), - std::move(LeftStream_->GetValue(ctx)), LeftKeyColumns_, - std::move(RightStream_->GetValue(ctx)), - RightKeyColumns_, - RightIOMap_ + RightIOMap_, + std::move(LeftStream_->GetValue(ctx)), + std::move(RightBlockIndex_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& indexState, - NUdf::TUnboxedValue&& leftStream, const TVector<ui32>& leftKeyColumns, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightKeyColumns, - const TVector<ui32>& rightIOMap + const TVector<ui32>& rightIOMap, + NUdf::TUnboxedValue&& leftStream, + NUdf::TUnboxedValue&& rightBlockIndex ) : TBase(memInfo) , JoinState_(joinState) - , IndexState_(indexState) - , LeftStream_(leftStream) , LeftKeyColumns_(leftKeyColumns) - , RightStream_(rightStream) - , RightKeyColumns_(rightKeyColumns) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockIndex_(rightBlockIndex) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& indexState = *static_cast<TIndexState*>(IndexState_.AsBoxed().Get()); + auto& indexState = *static_cast<TIndexState*>(RightBlockIndex_.GetResource()); + auto& storageState = *static_cast<TStorageState*>(indexState.GetBlockStorage().GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; while (fetchStatus != NUdf::EFetchStatus::Finish) { - fetchStatus = indexState.FetchStream(); + fetchStatus = storageState.FetchStream(); if (fetchStatus == NUdf::EFetchStatus::Yield) { return NUdf::EFetchStatus::Yield; } @@ -931,7 +1038,7 @@ private: while (joinState.IsNotFull() && !iter.IsEmpty()) { auto key = iter.Next(); - indexState.GetRow(*key, RightIOMap_, rightRow); + storageState.GetRow(*key, RightIOMap_, rightRow); joinState.MakeRow(rightRow); } @@ -993,13 +1100,9 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue IndexState_; - NUdf::TUnboxedValue LeftStream_; const TVector<ui32>& LeftKeyColumns_; - NUdf::TUnboxedValue RightStream_; - const TVector<ui32>& RightKeyColumns_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; @@ -1007,12 +1110,15 @@ private: ui32 LookupBatchCurrent_ = 0; ui32 LookupBatchSize_ = 0; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockIndex_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockIndex_); } private: @@ -1022,44 +1128,37 @@ private: const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockIndex_; const TContainerCacheOnContext KeyTupleCache_; }; class TBlockCrossJoinCoreWraper : public TMutableComputationNode<TBlockCrossJoinCoreWraper> { -using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; -using TJoinState = TBlockJoinState<true>; -using TStorageState = TBlockStorage; + using TBaseComputation = TMutableComputationNode<TBlockCrossJoinCoreWraper>; + using TJoinState = TBlockJoinState<true>; + using TStorageState = TBlockStorage; + public: TBlockCrossJoinCoreWraper( TComputationMutables& mutables, const TVector<TType*>&& resultItemTypes, const TVector<TType*>&& leftItemTypes, - const TVector<ui32>&& leftKeyColumns, const TVector<ui32>&& leftIOMap, - const TVector<TType*>&& rightItemTypes, - const TVector<ui32>&& rightKeyColumns, const TVector<ui32>&& rightIOMap, IComputationNode* leftStream, - IComputationNode* rightStream + IComputationNode* rightBlockStorage ) : TBaseComputation(mutables, EValueRepresentation::Boxed) , ResultItemTypes_(std::move(resultItemTypes)) , LeftItemTypes_(std::move(leftItemTypes)) - , LeftKeyColumns_(std::move(leftKeyColumns)) , LeftIOMap_(std::move(leftIOMap)) - , RightItemTypes_(std::move(rightItemTypes)) - , RightKeyColumns_(std::move(rightKeyColumns)) , RightIOMap_(std::move(rightIOMap)) , LeftStream_(std::move(leftStream)) - , RightStream_(std::move(rightStream)) + , RightBlockStorage_(std::move(rightBlockStorage)) , KeyTupleCache_(mutables) {} @@ -1070,47 +1169,40 @@ public: LeftIOMap_, ResultItemTypes_ ); - const auto indexState = ctx.HolderFactory.Create<TStorageState>( - RightItemTypes_, - std::move(RightStream_->GetValue(ctx)), - &ctx.ArrowMemoryPool - ); return ctx.HolderFactory.Create<TStreamValue>(ctx.HolderFactory, std::move(joinState), - std::move(indexState), + RightIOMap_, std::move(LeftStream_->GetValue(ctx)), - std::move(RightStream_->GetValue(ctx)), - RightIOMap_ + std::move(RightBlockStorage_->GetValue(ctx)) ); } private: class TStreamValue : public TComputationValue<TStreamValue> { - using TBase = TComputationValue<TStreamValue>; + using TBase = TComputationValue<TStreamValue>; + public: TStreamValue( TMemoryUsageInfo* memInfo, const THolderFactory& holderFactory, NUdf::TUnboxedValue&& joinState, - NUdf::TUnboxedValue&& storageState, + const TVector<ui32>& rightIOMap, NUdf::TUnboxedValue&& leftStream, - NUdf::TUnboxedValue&& rightStream, - const TVector<ui32>& rightIOMap + NUdf::TUnboxedValue&& rightBlockStorage ) : TBase(memInfo) , JoinState_(joinState) - , StorageState_(storageState) - , LeftStream_(leftStream) - , RightStream_(rightStream) , RightIOMap_(rightIOMap) + , LeftStream_(leftStream) + , RightBlockStorage_(rightBlockStorage) , HolderFactory_(holderFactory) {} private: NUdf::EFetchStatus WideFetch(NUdf::TUnboxedValue* output, ui32 width) { auto& joinState = *static_cast<TJoinState*>(JoinState_.AsBoxed().Get()); - auto& storageState = *static_cast<TStorageState*>(StorageState_.AsBoxed().Get()); + auto& storageState = *static_cast<TStorageState*>(RightBlockStorage_.GetResource()); if (!RightStreamConsumed_) { auto fetchStatus = NUdf::EFetchStatus::Ok; @@ -1176,43 +1268,109 @@ private: } NUdf::TUnboxedValue JoinState_; - NUdf::TUnboxedValue StorageState_; - - NUdf::TUnboxedValue LeftStream_; - NUdf::TUnboxedValue RightStream_; const TVector<ui32>& RightIOMap_; bool RightStreamConsumed_ = false; TStorageState::TRowIterator RightRowIterator_; + NUdf::TUnboxedValue LeftStream_; + NUdf::TUnboxedValue RightBlockStorage_; + const THolderFactory& HolderFactory_; }; void RegisterDependencies() const final { this->DependsOn(LeftStream_); - this->DependsOn(RightStream_); + this->DependsOn(RightBlockStorage_); } private: const TVector<TType*> ResultItemTypes_; const TVector<TType*> LeftItemTypes_; - const TVector<ui32> LeftKeyColumns_; const TVector<ui32> LeftIOMap_; - const TVector<TType*> RightItemTypes_; - const TVector<ui32> RightKeyColumns_; const TVector<ui32> RightIOMap_; IComputationNode* const LeftStream_; - IComputationNode* const RightStream_; + IComputationNode* const RightBlockStorage_; const TContainerCacheOnContext KeyTupleCache_; }; } // namespace +IComputationNode* WrapBlockStorage(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 1, "Expected 1 arg"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsStream(), "Expected WideStream as an input stream"); + const auto inputStreamType = AS_TYPE(TStreamType, inputType); + MKQL_ENSURE(inputStreamType->GetItemType()->IsMulti(), + "Expected Multi as a left stream item type"); + const auto inputStreamComponents = GetWideComponents(inputStreamType); + MKQL_ENSURE(inputStreamComponents.size() > 0, "Expected at least one column"); + TVector<TType*> inputStreamItems(inputStreamComponents.cbegin(), inputStreamComponents.cend()); + + const auto inputStream = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockStorageWrapper( + ctx.Mutables, + std::move(inputStreamItems), + inputStream, + resultResourceType->GetTag() + ); +} + +IComputationNode* WrapBlockMapJoinIndex(TCallable& callable, const TComputationNodeFactoryContext& ctx) { + MKQL_ENSURE(callable.GetInputsCount() == 4, "Expected 4 args"); + + const auto resultType = callable.GetType()->GetReturnType(); + MKQL_ENSURE(resultType->IsResource(), "Expected Resource as a result type"); + auto resultResourceType = AS_TYPE(TResourceType, resultType); + MKQL_ENSURE(resultResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + + const auto inputType = callable.GetInput(0).GetStaticType(); + MKQL_ENSURE(inputType->IsResource(), "Expected Resource as an input type"); + auto inputResourceType = AS_TYPE(TResourceType, inputType); + MKQL_ENSURE(inputResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + + auto origInputItemType = AS_VALUE(TTypeType, callable.GetInput(1)); + MKQL_ENSURE(origInputItemType->IsMulti(), "Expected Multi as an input item type"); + const auto streamComponents = AS_TYPE(TMultiType, origInputItemType)->GetElements(); + MKQL_ENSURE(streamComponents.size() > 0, "Expected at least one column"); + + const auto keyColumnsLiteral = callable.GetInput(2); + const auto keyColumnsTuple = AS_VALUE(TTupleLiteral, keyColumnsLiteral); + TVector<ui32> keyColumns; + keyColumns.reserve(keyColumnsTuple->GetValuesCount()); + for (ui32 i = 0; i < keyColumnsTuple->GetValuesCount(); i++) { + const auto item = AS_VALUE(TDataLiteral, keyColumnsTuple->GetValue(i)); + keyColumns.emplace_back(item->AsValue().Get<ui32>()); + } + + for (ui32 keyColumn : keyColumns) { + MKQL_ENSURE(keyColumn < streamComponents.size() - 1, "Key column out of range"); + } + + const auto anyNode = callable.GetInput(3); + const auto any = AS_VALUE(TDataLiteral, anyNode)->AsValue().Get<bool>(); + + const auto blockStorage = LocateNode(ctx.NodeLocator, callable, 0); + return new TBlockMapJoinIndexWrapper( + ctx.Mutables, + std::move(keyColumns), + blockStorage, + any, + resultResourceType->GetTag() + ); +} + IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNodeFactoryContext& ctx) { MKQL_ENSURE(callable.GetInputsCount() == 8, "Expected 8 args"); @@ -1234,22 +1392,28 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo MKQL_ENSURE(leftStreamComponents.size() > 0, "Expected at least one column"); const TVector<TType*> leftStreamItems(leftStreamComponents.cbegin(), leftStreamComponents.cend()); - const auto rightType = callable.GetInput(1).GetStaticType(); - MKQL_ENSURE(rightType->IsStream(), "Expected WideStream as a right stream"); - const auto rightStreamType = AS_TYPE(TStreamType, rightType); - MKQL_ENSURE(rightStreamType->GetItemType()->IsMulti(), - "Expected Multi as a right stream item type"); - const auto rightStreamComponents = GetWideComponents(rightStreamType); - MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); - const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); - - const auto joinKindNode = callable.GetInput(2); + const auto joinKindNode = callable.GetInput(3); const auto rawKind = AS_VALUE(TDataLiteral, joinKindNode)->AsValue().Get<ui32>(); const auto joinKind = GetJoinKind(rawKind); Y_ENSURE(joinKind == EJoinKind::Inner || joinKind == EJoinKind::Left || joinKind == EJoinKind::LeftSemi || joinKind == EJoinKind::LeftOnly || joinKind == EJoinKind::Cross); - const auto leftKeyColumnsLiteral = callable.GetInput(3); + const auto rightBlockStorageType = callable.GetInput(1).GetStaticType(); + MKQL_ENSURE(rightBlockStorageType->IsResource(), "Expected Resource as a right type"); + auto rightBlockStorageResourceType = AS_TYPE(TResourceType, rightBlockStorageType); + if (joinKind != EJoinKind::Cross) { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockMapJoinIndexResourcePrefix), "Expected block map join index resource"); + } else { + MKQL_ENSURE(rightBlockStorageResourceType->GetTag().StartsWith(BlockStorageResourcePrefix), "Expected block storage resource"); + } + + auto origRightItemType = AS_VALUE(TTypeType, callable.GetInput(2)); + MKQL_ENSURE(origRightItemType->IsMulti(), "Expected Multi as a right stream item type"); + const auto rightStreamComponents = AS_TYPE(TMultiType, origRightItemType)->GetElements(); + MKQL_ENSURE(rightStreamComponents.size() > 0, "Expected at least one column"); + const TVector<TType*> rightStreamItems(rightStreamComponents.cbegin(), rightStreamComponents.cend()); + + const auto leftKeyColumnsLiteral = callable.GetInput(4); const auto leftKeyColumnsTuple = AS_VALUE(TTupleLiteral, leftKeyColumnsLiteral); TVector<ui32> leftKeyColumns; leftKeyColumns.reserve(leftKeyColumnsTuple->GetValuesCount()); @@ -1259,7 +1423,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> leftKeySet(leftKeyColumns.cbegin(), leftKeyColumns.cend()); - const auto leftKeyDropsLiteral = callable.GetInput(4); + const auto leftKeyDropsLiteral = callable.GetInput(5); const auto leftKeyDropsTuple = AS_VALUE(TTupleLiteral, leftKeyDropsLiteral); THashSet<ui32> leftKeyDrops; leftKeyDrops.reserve(leftKeyDropsTuple->GetValuesCount()); @@ -1273,7 +1437,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo "Only key columns has to be specified in drop column set"); } - const auto rightKeyColumnsLiteral = callable.GetInput(5); + const auto rightKeyColumnsLiteral = callable.GetInput(6); const auto rightKeyColumnsTuple = AS_VALUE(TTupleLiteral, rightKeyColumnsLiteral); TVector<ui32> rightKeyColumns; rightKeyColumns.reserve(rightKeyColumnsTuple->GetValuesCount()); @@ -1283,7 +1447,7 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const THashSet<ui32> rightKeySet(rightKeyColumns.cbegin(), rightKeyColumns.cend()); - const auto rightKeyDropsLiteral = callable.GetInput(6); + const auto rightKeyDropsLiteral = callable.GetInput(7); const auto rightKeyDropsTuple = AS_VALUE(TTupleLiteral, rightKeyDropsLiteral); THashSet<ui32> rightKeyDrops; rightKeyDrops.reserve(rightKeyDropsTuple->GetValuesCount()); @@ -1303,9 +1467,6 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } MKQL_ENSURE(leftKeyColumns.size() == rightKeyColumns.size(), "Key columns mismatch"); - const auto rightAnyNode = callable.GetInput(7); - const auto rightAny = AS_VALUE(TDataLiteral, rightAnyNode)->AsValue().Get<bool>(); - // XXX: Mind the last wide item, containing block length. TVector<ui32> leftIOMap; for (size_t i = 0; i < leftStreamItems.size() - 1; i++) { @@ -1329,62 +1490,40 @@ IComputationNode* WrapBlockMapJoinCore(TCallable& callable, const TComputationNo } const auto leftStream = LocateNode(ctx.NodeLocator, callable, 0); - const auto rightStream = LocateNode(ctx.NodeLocator, callable, 1); + const auto rightBlockStorage = LocateNode(ctx.NodeLocator, callable, 1); -#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY) \ - return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED, RIGHT_ANY>( \ +#define JOIN_WRAPPER(WITHOUT_RIGHT, RIGHT_REQUIRED) \ + return new TBlockMapJoinCoreWraper<WITHOUT_RIGHT, RIGHT_REQUIRED>( \ ctx.Mutables, \ std::move(joinItems), \ std::move(leftStreamItems), \ std::move(leftKeyColumns), \ std::move(leftIOMap), \ - std::move(rightStreamItems), \ - std::move(rightKeyColumns), \ std::move(rightIOMap), \ leftStream, \ - rightStream \ + rightBlockStorage \ ) switch (joinKind) { case EJoinKind::Inner: - if (rightAny) { - JOIN_WRAPPER(false, true, true); - } else { - JOIN_WRAPPER(false, true, false); - } + JOIN_WRAPPER(false, true); case EJoinKind::Left: - if (rightAny) { - JOIN_WRAPPER(false, false, true); - } else { - JOIN_WRAPPER(false, false, false); - } + JOIN_WRAPPER(false, false); case EJoinKind::LeftSemi: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left semi join"); - if (rightAny) { - JOIN_WRAPPER(true, true, true); - } else { - JOIN_WRAPPER(true, true, false); - } + JOIN_WRAPPER(true, true); case EJoinKind::LeftOnly: MKQL_ENSURE(rightIOMap.empty(), "Can't access right table on left only join"); - if (rightAny) { - JOIN_WRAPPER(true, false, true); - } else { - JOIN_WRAPPER(true, false, false); - } + JOIN_WRAPPER(true, false); case EJoinKind::Cross: - MKQL_ENSURE(!rightAny, "rightAny can't be used with cross join"); return new TBlockCrossJoinCoreWraper( ctx.Mutables, std::move(joinItems), std::move(leftStreamItems), - std::move(leftKeyColumns), std::move(leftIOMap), - std::move(rightStreamItems), - std::move(rightKeyColumns), std::move(rightIOMap), leftStream, - rightStream + rightBlockStorage ); default: /* TODO: Display the human-readable join kind name. */ |