diff options
author | aakulaga <aakulaga@ydb.tech> | 2023-03-09 10:54:49 +0300 |
---|---|---|
committer | aakulaga <aakulaga@ydb.tech> | 2023-03-09 10:54:49 +0300 |
commit | 984d827f9e03fdc9c473e5f26b8397155ae4fc87 (patch) | |
tree | 4ab7c52790eb36ae36b028a1d8a7d5f8442657c6 | |
parent | f5abdffddc6f9145b06713a85f105139314cda91 (diff) | |
download | ydb-984d827f9e03fdc9c473e5f26b8397155ae4fc87.tar.gz |
Stream mode for GraceJoin
Stream mode for GraceJoin
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp | 127 | ||||
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp | 5 |
2 files changed, 99 insertions, 33 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index fa4520730db..4388da96073 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -25,6 +25,7 @@ namespace NMiniKQL { namespace { +const ui32 PartialJoinBatchSize = 100000; // Number of tuples for one join batch struct TColumnDataPackInfo { ui32 ColumnIdx = 0; // Column index in tuple @@ -44,6 +45,7 @@ struct TColumnDataPackInfo { struct TGraceJoinPacker { ui64 NullsBitmapSize = 0; // Number of ui64 values for nulls bitmap ui64 TuplesPacked = 0; // Total number of packed tuples + ui64 TuplesBatchPacked = 0; // Number of tuples packed during current join batch ui64 TuplesUnpacked = 0; // Total number of unpacked tuples std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution @@ -181,6 +183,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) { void TGraceJoinPacker::Pack() { TuplesPacked++; + TuplesBatchPacked++; for (ui64 i = 0; i < NullsBitmapSize; ++i) { TupleIntVals[i] = 0; // Clearing nulls bit array. Bit 1 means particular column contains null value } @@ -543,6 +546,9 @@ public: , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory)) , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) , JoinCompleted(std::make_unique<bool>(false)) + , PartialJoinCompleted(std::make_unique<bool>(false)) + , HaveMoreLeftRows(std::make_unique<bool>(true)) + , HaveMoreRightRows(std::make_unique<bool>(true)) , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() ) , FlowLeft(flowLeft) , FlowRight(flowRight) @@ -566,6 +572,9 @@ private: const std::unique_ptr<TGraceJoinPacker> RightPacker; const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr; const std::unique_ptr<bool> JoinCompleted; + const std::unique_ptr<bool> PartialJoinCompleted; + const std::unique_ptr<bool> HaveMoreLeftRows; + const std::unique_ptr<bool> HaveMoreRightRows; const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple; }; @@ -708,8 +717,67 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { + // Collecting data for join and perform join (batch or full) + while (!*JoinCompleted ) { + + if ( *PartialJoinCompleted) { + + // Returns join results (batch or full) + JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2); + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { + + LeftPacker->UnPack(); + RightPacker->UnPack(); + + auto &valsLeft = LeftPacker->TupleHolder; + auto &valsRight = RightPacker->TupleHolder; + + + for (ui32 i = 0; i < LeftRenames.size() / 2; i++) + { + auto & valPtr = output[LeftRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsLeft[LeftRenames[2 * i]]; + } + } + + for (ui32 i = 0; i < RightRenames.size() / 2; i++) + { + auto & valPtr = output[RightRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsRight[RightRenames[2 * i]]; + } + } + + return EFetchResult::One; + + } + + // Resets batch state for batch join + if (!*HaveMoreRightRows) { + *PartialJoinCompleted = false; + LeftPacker->TuplesBatchPacked = 0; + LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + JoinedTablePtr->Clear(); + JoinedTablePtr->ResetIterator(); + } + + + if (!*HaveMoreLeftRows ) { + *PartialJoinCompleted = false; + RightPacker->TuplesBatchPacked = 0; + RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + JoinedTablePtr->Clear(); + JoinedTablePtr->ResetIterator(); + } + + } + + if (!*HaveMoreRightRows && !*HaveMoreLeftRows) { + *JoinCompleted = true; + break; + } - while (!*JoinCompleted) { const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data()); const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); @@ -735,48 +803,43 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox return EFetchResult::Yield; } - if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) { - *JoinCompleted = true; - LeftPacker->StartTime = std::chrono::system_clock::now(); - RightPacker->StartTime = std::chrono::system_clock::now(); - JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); - JoinedTablePtr->ResetIterator(); - LeftPacker->EndTime = std::chrono::system_clock::now(); - RightPacker->EndTime = std::chrono::system_clock::now(); - - } - } + if (resultLeft == EFetchResult::Finish ) { + *HaveMoreLeftRows = false; + } - JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2); - while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { - LeftPacker->UnPack(); - RightPacker->UnPack(); + if (resultRight == EFetchResult::Finish ) { + *HaveMoreRightRows = false; + } - auto &valsLeft = LeftPacker->TupleHolder; - auto &valsRight = RightPacker->TupleHolder; + if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= PartialJoinBatchSize ) { + *PartialJoinCompleted = true; + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->ResetIterator(); + + } - for (ui32 i = 0; i < LeftRenames.size() / 2; i++) - { - auto & valPtr = output[LeftRenames[2 * i + 1]]; - if ( valPtr ) { - *valPtr = valsLeft[LeftRenames[2 * i]]; - } + if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= PartialJoinBatchSize ) { + *PartialJoinCompleted = true; + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->ResetIterator(); + } - for (ui32 i = 0; i < RightRenames.size() / 2; i++) - { - auto & valPtr = output[RightRenames[2 * i + 1]]; - if ( valPtr ) { - *valPtr = valsRight[RightRenames[2 * i]]; - } + if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) { + *PartialJoinCompleted = true; + LeftPacker->StartTime = std::chrono::system_clock::now(); + RightPacker->StartTime = std::chrono::system_clock::now(); + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind); + JoinedTablePtr->ResetIterator(); + LeftPacker->EndTime = std::chrono::system_clock::now(); + RightPacker->EndTime = std::chrono::system_clock::now(); + } - return EFetchResult::One; } -// Cout << "Tuples unpacked: " << LeftPacker->TuplesUnpacked << Endl; return EFetchResult::Finish; } diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 7daf3877936..086c2655962 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -853,9 +853,12 @@ void TTable::Clear() { tb.DataIntVals.clear(); tb.StringsOffsets.clear(); tb.StringsValues.clear(); + tb.InterfaceValues.clear(); + tb.InterfaceOffsets.clear(); tb.JoinIds.clear(); + tb.RightIds.clear(); } - + } |