aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraakulaga <aakulaga@ydb.tech>2023-03-09 10:54:49 +0300
committeraakulaga <aakulaga@ydb.tech>2023-03-09 10:54:49 +0300
commit984d827f9e03fdc9c473e5f26b8397155ae4fc87 (patch)
tree4ab7c52790eb36ae36b028a1d8a7d5f8442657c6
parentf5abdffddc6f9145b06713a85f105139314cda91 (diff)
downloadydb-984d827f9e03fdc9c473e5f26b8397155ae4fc87.tar.gz
Stream mode for GraceJoin
Stream mode for GraceJoin
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp127
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp5
2 files changed, 99 insertions, 33 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index fa4520730db..4388da96073 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -25,6 +25,7 @@ namespace NMiniKQL {
namespace {
+const ui32 PartialJoinBatchSize = 100000; // Number of tuples for one join batch
struct TColumnDataPackInfo {
ui32 ColumnIdx = 0; // Column index in tuple
@@ -44,6 +45,7 @@ struct TColumnDataPackInfo {
struct TGraceJoinPacker {
ui64 NullsBitmapSize = 0; // Number of ui64 values for nulls bitmap
ui64 TuplesPacked = 0; // Total number of packed tuples
+ ui64 TuplesBatchPacked = 0; // Number of tuples packed during current join batch
ui64 TuplesUnpacked = 0; // Total number of unpacked tuples
std::chrono::time_point<std::chrono::system_clock> StartTime; // Start time of execution
std::chrono::time_point<std::chrono::system_clock> EndTime; // End time of execution
@@ -181,6 +183,7 @@ TColumnDataPackInfo GetPackInfo(TType* type) {
void TGraceJoinPacker::Pack() {
TuplesPacked++;
+ TuplesBatchPacked++;
for (ui64 i = 0; i < NullsBitmapSize; ++i) {
TupleIntVals[i] = 0; // Clearing nulls bit array. Bit 1 means particular column contains null value
}
@@ -543,6 +546,9 @@ public:
, RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory))
, JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
, JoinCompleted(std::make_unique<bool>(false))
+ , PartialJoinCompleted(std::make_unique<bool>(false))
+ , HaveMoreLeftRows(std::make_unique<bool>(true))
+ , HaveMoreRightRows(std::make_unique<bool>(true))
, JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
, FlowLeft(flowLeft)
, FlowRight(flowRight)
@@ -566,6 +572,9 @@ private:
const std::unique_ptr<TGraceJoinPacker> RightPacker;
const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr;
const std::unique_ptr<bool> JoinCompleted;
+ const std::unique_ptr<bool> PartialJoinCompleted;
+ const std::unique_ptr<bool> HaveMoreLeftRows;
+ const std::unique_ptr<bool> HaveMoreRightRows;
const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
};
@@ -708,8 +717,67 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
+ // Collecting data for join and perform join (batch or full)
+ while (!*JoinCompleted ) {
+
+ if ( *PartialJoinCompleted) {
+
+ // Returns join results (batch or full)
+ JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2);
+ while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
+
+ LeftPacker->UnPack();
+ RightPacker->UnPack();
+
+ auto &valsLeft = LeftPacker->TupleHolder;
+ auto &valsRight = RightPacker->TupleHolder;
+
+
+ for (ui32 i = 0; i < LeftRenames.size() / 2; i++)
+ {
+ auto & valPtr = output[LeftRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsLeft[LeftRenames[2 * i]];
+ }
+ }
+
+ for (ui32 i = 0; i < RightRenames.size() / 2; i++)
+ {
+ auto & valPtr = output[RightRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsRight[RightRenames[2 * i]];
+ }
+ }
+
+ return EFetchResult::One;
+
+ }
+
+ // Resets batch state for batch join
+ if (!*HaveMoreRightRows) {
+ *PartialJoinCompleted = false;
+ LeftPacker->TuplesBatchPacked = 0;
+ LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+ JoinedTablePtr->Clear();
+ JoinedTablePtr->ResetIterator();
+ }
+
+
+ if (!*HaveMoreLeftRows ) {
+ *PartialJoinCompleted = false;
+ RightPacker->TuplesBatchPacked = 0;
+ RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+ JoinedTablePtr->Clear();
+ JoinedTablePtr->ResetIterator();
+ }
+
+ }
+
+ if (!*HaveMoreRightRows && !*HaveMoreLeftRows) {
+ *JoinCompleted = true;
+ break;
+ }
- while (!*JoinCompleted) {
const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
const NKikimr::NMiniKQL::EFetchResult resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
@@ -735,48 +803,43 @@ EFetchResult TGraceJoinState::FetchValues(TComputationContext& ctx, NUdf::TUnbox
return EFetchResult::Yield;
}
- if (resultRight == EFetchResult::Finish && resultLeft == EFetchResult::Finish && !*JoinCompleted) {
- *JoinCompleted = true;
- LeftPacker->StartTime = std::chrono::system_clock::now();
- RightPacker->StartTime = std::chrono::system_clock::now();
- JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
- JoinedTablePtr->ResetIterator();
- LeftPacker->EndTime = std::chrono::system_clock::now();
- RightPacker->EndTime = std::chrono::system_clock::now();
-
- }
- }
+ if (resultLeft == EFetchResult::Finish ) {
+ *HaveMoreLeftRows = false;
+ }
- JoinedTuple->resize((LeftRenames.size() + RightRenames.size()) / 2);
- while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
- LeftPacker->UnPack();
- RightPacker->UnPack();
+ if (resultRight == EFetchResult::Finish ) {
+ *HaveMoreRightRows = false;
+ }
- auto &valsLeft = LeftPacker->TupleHolder;
- auto &valsRight = RightPacker->TupleHolder;
+ if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= PartialJoinBatchSize ) {
+ *PartialJoinCompleted = true;
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->ResetIterator();
+
+ }
- for (ui32 i = 0; i < LeftRenames.size() / 2; i++)
- {
- auto & valPtr = output[LeftRenames[2 * i + 1]];
- if ( valPtr ) {
- *valPtr = valsLeft[LeftRenames[2 * i]];
- }
+ if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= PartialJoinBatchSize ) {
+ *PartialJoinCompleted = true;
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->ResetIterator();
+
}
- for (ui32 i = 0; i < RightRenames.size() / 2; i++)
- {
- auto & valPtr = output[RightRenames[2 * i + 1]];
- if ( valPtr ) {
- *valPtr = valsRight[RightRenames[2 * i]];
- }
+ if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) {
+ *PartialJoinCompleted = true;
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ RightPacker->StartTime = std::chrono::system_clock::now();
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind);
+ JoinedTablePtr->ResetIterator();
+ LeftPacker->EndTime = std::chrono::system_clock::now();
+ RightPacker->EndTime = std::chrono::system_clock::now();
+
}
- return EFetchResult::One;
}
-// Cout << "Tuples unpacked: " << LeftPacker->TuplesUnpacked << Endl;
return EFetchResult::Finish;
}
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 7daf3877936..086c2655962 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -853,9 +853,12 @@ void TTable::Clear() {
tb.DataIntVals.clear();
tb.StringsOffsets.clear();
tb.StringsValues.clear();
+ tb.InterfaceValues.clear();
+ tb.InterfaceOffsets.clear();
tb.JoinIds.clear();
+ tb.RightIds.clear();
}
-
+
}