aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorFilitov Mikhail <filitovme@gmail.com>2024-05-23 10:56:15 +0200
committerGitHub <noreply@github.com>2024-05-23 10:56:15 +0200
commitdaae4610f74efb5573b4c4704766637b4318d910 (patch)
treedb2bec227359e6e467eb6ee6fd9aa07fc8d3b60f
parentf702ceca69169c3999b3d2f71aee3b99541d9e80 (diff)
downloadydb-daae4610f74efb5573b4c4704766637b4318d910.tar.gz
Yql 18391 gracejoin spilling support (#4700)
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp398
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp114
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h152
3 files changed, 582 insertions, 82 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
index 35a6ece194..12c1b2080e 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp
@@ -20,7 +20,6 @@
#include <ydb/library/yql/parser/pg_catalog/catalog.h>
#include <chrono>
-#include <format>
#include <limits>
namespace NKikimr {
@@ -556,6 +555,384 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con
}
+class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpillingSupportState> {
+ using TBase = TComputationValue<TGraceJoinSpillingSupportState>;
+ enum class EOperatingMode {
+ InMemory,
+ Spilling,
+ ProcessSpilled
+ };
+public:
+
+ TGraceJoinSpillingSupportState(TMemoryUsageInfo* memInfo,
+ IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight,
+ EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns,
+ const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames,
+ const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory,
+ const bool isSelfJoin)
+ : TBase(memInfo)
+ , FlowLeft(flowLeft)
+ , FlowRight(flowRight)
+ , JoinKind(joinKind)
+ , LeftKeyColumns(leftKeyColumns)
+ , RightKeyColumns(rightKeyColumns)
+ , LeftRenames(leftRenames)
+ , RightRenames(rightRenames)
+ , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both)))
+ , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both)))
+ , JoinedTablePtr(std::make_unique<GraceJoin::TTable>())
+ , JoinCompleted(std::make_unique<bool>(false))
+ , PartialJoinCompleted(std::make_unique<bool>(false))
+ , HaveMoreLeftRows(std::make_unique<bool>(true))
+ , HaveMoreRightRows(std::make_unique<bool>(true))
+ , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() )
+ , IsSelfJoin_(isSelfJoin)
+ , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns))
+ {
+ if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) {
+ LeftPacker->BatchSize = std::numeric_limits<ui64>::max();
+ RightPacker->BatchSize = std::numeric_limits<ui64>::max();
+ }
+ }
+
+ EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
+ while (true) {
+ switch(GetMode()) {
+ case EOperatingMode::InMemory: {
+ auto r = DoCalculateInMemory(ctx, output);
+ if (GetMode() == EOperatingMode::InMemory) {
+ return r;
+ }
+ break;
+ }
+ case EOperatingMode::Spilling: {
+ DoCalculateWithSpilling(ctx);
+ if (GetMode() == EOperatingMode::Spilling) {
+ return EFetchResult::Yield;
+ }
+ break;
+ }
+ case EOperatingMode::ProcessSpilled: {
+ return ProcessSpilledData(ctx, output);
+ }
+
+ }
+ }
+ Y_UNREACHABLE();
+ }
+
+private:
+ EOperatingMode GetMode() const {
+ return Mode;
+ }
+
+ bool HasMemoryForProcessing() const {
+ return !TlsAllocState->IsMemoryYellowZoneEnabled();
+ }
+
+ bool IsSwitchToSpillingModeCondition() const {
+ return false;
+ // TODO: YQL-18033
+ // return !HasMemoryForProcessing();
+ }
+
+
+ void SwitchMode(EOperatingMode mode, TComputationContext& ctx) {
+ switch(mode) {
+ case EOperatingMode::InMemory: {
+ MKQL_ENSURE(false, "Internal logic error");
+ break;
+ }
+ case EOperatingMode::Spilling: {
+ MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error");
+ auto spiller = ctx.SpillerFactory->CreateSpiller();
+ RightPacker->TablePtr->InitializeBucketSpillers(spiller);
+ LeftPacker->TablePtr->InitializeBucketSpillers(spiller);
+ break;
+ }
+ case EOperatingMode::ProcessSpilled: {
+ MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
+ break;
+ }
+
+ }
+ Mode = mode;
+ }
+
+ bool FetchAndPackData(TComputationContext& ctx) {
+ const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data());
+ NKikimr::NMiniKQL::EFetchResult resultRight;
+
+ if (IsSelfJoin_) {
+ resultRight = resultLeft;
+ if (!SelfJoinSameKeys_) {
+ std::copy_n(LeftPacker->TupleHolder.begin(), LeftPacker->TotalColumnsNum, RightPacker->TupleHolder.begin());
+ }
+ } else {
+ resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data());
+ }
+
+ if (resultLeft == EFetchResult::One) {
+ if (LeftPacker->TuplesPacked == 0) {
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ }
+ LeftPacker->Pack();
+ LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data());
+ }
+
+ if (resultRight == EFetchResult::One) {
+ if (RightPacker->TuplesPacked == 0) {
+ RightPacker->StartTime = std::chrono::system_clock::now();
+ }
+
+ if ( !SelfJoinSameKeys_ ) {
+ RightPacker->Pack();
+ RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data());
+ }
+ }
+
+ if (resultLeft == EFetchResult::Finish ) {
+ *HaveMoreLeftRows = false;
+ }
+
+
+ if (resultRight == EFetchResult::Finish ) {
+ *HaveMoreRightRows = false;
+ }
+
+ if ((resultLeft == EFetchResult::Yield && (!*HaveMoreRightRows || resultRight == EFetchResult::Yield)) ||
+ (resultRight == EFetchResult::Yield && !*HaveMoreLeftRows))
+ {
+ return true;
+ }
+ return false;
+ }
+
+ void UnpackJoinedData(NUdf::TUnboxedValue*const* output) {
+ LeftPacker->UnPack();
+ RightPacker->UnPack();
+
+ auto &valsLeft = LeftPacker->TupleHolder;
+ auto &valsRight = RightPacker->TupleHolder;
+
+ for (size_t i = 0; i < LeftRenames.size() / 2; i++) {
+ auto & valPtr = output[LeftRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsLeft[LeftRenames[2 * i]];
+ }
+ }
+
+ for (size_t i = 0; i < RightRenames.size() / 2; i++) {
+ auto & valPtr = output[RightRenames[2 * i + 1]];
+ if ( valPtr ) {
+ *valPtr = valsRight[RightRenames[2 * i]];
+ }
+ }
+ }
+
+ EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) {
+ // Collecting data for join and perform join (batch or full)
+ while (!*JoinCompleted ) {
+
+ if ( *PartialJoinCompleted) {
+ // Returns join results (batch or full)
+ while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
+ UnpackJoinedData(output);
+ return EFetchResult::One;
+ }
+
+ // Resets batch state for batch join
+ if (!*HaveMoreRightRows) {
+ *PartialJoinCompleted = false;
+ LeftPacker->TuplesBatchPacked = 0;
+ LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+ JoinedTablePtr->Clear();
+ JoinedTablePtr->ResetIterator();
+ }
+
+
+ if (!*HaveMoreLeftRows ) {
+ *PartialJoinCompleted = false;
+ RightPacker->TuplesBatchPacked = 0;
+ RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+ JoinedTablePtr->Clear();
+ JoinedTablePtr->ResetIterator();
+ }
+ }
+
+ if (!*HaveMoreRightRows && !*HaveMoreLeftRows) {
+ *JoinCompleted = true;
+ break;
+ }
+
+ bool isYield = FetchAndPackData(ctx);
+ if (ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) {
+ SwitchMode(EOperatingMode::Spilling, ctx);
+ return EFetchResult::Yield;
+ }
+ if (isYield) return EFetchResult::Yield;
+
+ if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) {
+ *PartialJoinCompleted = true;
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
+ JoinedTablePtr->ResetIterator();
+ }
+
+ if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) {
+ *PartialJoinCompleted = true;
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
+ JoinedTablePtr->ResetIterator();
+
+ }
+
+ if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) {
+ *PartialJoinCompleted = true;
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ RightPacker->StartTime = std::chrono::system_clock::now();
+ if ( SelfJoinSameKeys_ ) {
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
+ } else {
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows);
+ }
+ JoinedTablePtr->ResetIterator();
+ LeftPacker->EndTime = std::chrono::system_clock::now();
+ RightPacker->EndTime = std::chrono::system_clock::now();
+ }
+
+ }
+
+ return EFetchResult::Finish;
+ }
+
+ bool TryToReduceMemoryAndWait() {
+ bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait();
+ bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait();
+
+ return isWaitingLeftForReduce || isWaitingRightForReduce;
+ }
+
+ void UpdateSpilling() {
+ LeftPacker->TablePtr->UpdateSpilling();
+ RightPacker->TablePtr->UpdateSpilling();
+ }
+
+ bool HasRunningAsyncOperation() const {
+ return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation();
+ }
+
+void DoCalculateWithSpilling(TComputationContext& ctx) {
+ UpdateSpilling();
+
+ if (!HasMemoryForProcessing()) {
+ bool isWaitingForReduce = TryToReduceMemoryAndWait();
+ if (isWaitingForReduce) return;
+ }
+
+ while (*HaveMoreLeftRows || *HaveMoreRightRows) {
+ bool isYield = FetchAndPackData(ctx);
+ if (isYield) return;
+ }
+
+ if (!*HaveMoreLeftRows && !*HaveMoreRightRows) {
+ UpdateSpilling();
+ if (HasRunningAsyncOperation()) return;
+ if (!IsSpillingFinalized) {
+ LeftPacker->TablePtr->FinalizeSpilling();
+ RightPacker->TablePtr->FinalizeSpilling();
+ IsSpillingFinalized = true;
+
+ if (HasRunningAsyncOperation()) return;
+ }
+ SwitchMode(EOperatingMode::ProcessSpilled, ctx);
+ return;
+ }
+}
+
+EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) {
+ while (NextBucketToJoin != GraceJoin::NumberOfBuckets) {
+ UpdateSpilling();
+
+ if (HasRunningAsyncOperation()) return EFetchResult::Yield;
+
+ if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
+ LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
+ }
+
+ if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
+ RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin);
+ }
+
+ if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) {
+ if (*PartialJoinCompleted) {
+ while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) {
+ UnpackJoinedData(output);
+
+ return EFetchResult::One;
+
+ }
+
+ LeftPacker->TuplesBatchPacked = 0;
+ LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+
+ RightPacker->TuplesBatchPacked = 0;
+ RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch
+
+ JoinedTablePtr->Clear();
+ JoinedTablePtr->ResetIterator();
+ *PartialJoinCompleted = false;
+
+ NextBucketToJoin++;
+ } else {
+ LeftPacker->TablePtr->ExtractBucket(NextBucketToJoin);
+ RightPacker->TablePtr->ExtractBucket(NextBucketToJoin);
+ *PartialJoinCompleted = true;
+ LeftPacker->StartTime = std::chrono::system_clock::now();
+ RightPacker->StartTime = std::chrono::system_clock::now();
+ if ( SelfJoinSameKeys_ ) {
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
+ } else {
+ JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1);
+ }
+
+ JoinedTablePtr->ResetIterator();
+ LeftPacker->EndTime = std::chrono::system_clock::now();
+ RightPacker->EndTime = std::chrono::system_clock::now();
+ }
+
+ }
+ }
+ return EFetchResult::Finish;
+}
+
+private:
+ EOperatingMode Mode = EOperatingMode::InMemory;
+
+ IComputationWideFlowNode* const FlowLeft;
+ IComputationWideFlowNode* const FlowRight;
+
+ const EJoinKind JoinKind;
+ const std::vector<ui32> LeftKeyColumns;
+ const std::vector<ui32> RightKeyColumns;
+ const std::vector<ui32> LeftRenames;
+ const std::vector<ui32> RightRenames;
+ const std::vector<TType *> LeftColumnsTypes;
+ const std::vector<TType *> RightColumnsTypes;
+ const std::unique_ptr<TGraceJoinPacker> LeftPacker;
+ const std::unique_ptr<TGraceJoinPacker> RightPacker;
+ const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr;
+ const std::unique_ptr<bool> JoinCompleted;
+ const std::unique_ptr<bool> PartialJoinCompleted;
+ const std::unique_ptr<bool> HaveMoreLeftRows;
+ const std::unique_ptr<bool> HaveMoreRightRows;
+ const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple;
+ const bool IsSelfJoin_;
+ const bool SelfJoinSameKeys_;
+
+ bool IsSpillingFinalized = false;
+
+ ui32 NextBucketToJoin = 0;
+};
+
class TGraceJoinState : public TComputationValue<TGraceJoinState> {
using TBase = TComputationValue<TGraceJoinState>;
public:
@@ -639,10 +1016,18 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const {
if (!state.HasValue()) {
- MakeState(ctx, state);
+ if (!ctx.ExecuteLLVM) {
+ MakeSpillingSupportState(ctx, state);
+ } else {
+ MakeState(ctx, state);
+ }
+ }
+
+ if (ctx.ExecuteLLVM) {
+ return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output);
}
- return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output);
+ return static_cast<TGraceJoinSpillingSupportState*>(state.AsBoxed().Get())->FetchValues(ctx, output);
}
#ifndef MKQL_DISABLE_CODEGEN
ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const {
@@ -735,6 +1120,13 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr
ctx.HolderFactory, IsSelfJoin_);
}
+ void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const {
+ state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>(
+ FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns,
+ LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes,
+ ctx.HolderFactory, IsSelfJoin_);
+ }
+
IComputationWideFlowNode *const FlowLeft;
IComputationWideFlowNode *const FlowRight;
const EJoinKind JoinKind;
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
index 46a902dc8f..0f1b2ed44a 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp
@@ -262,10 +262,7 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){
// Joins two tables and returns join result in joined table. Tuples of joined table could be received by
// joined table iterator
-void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) {
-
-
-
+void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) {
if ( hasMoreLeftTuples )
LeftTableBatch_ = true;
@@ -298,7 +295,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef
std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds, EMemorySubPool::Temporary>> joinResults;
- for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
+ for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) {
joinResults.clear();
TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket];
@@ -1111,24 +1108,97 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) {
void TTable::Clear() {
- for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
- TTableBucket & tb = TableBuckets[bucket];
- tb.KeyIntVals.clear();
- tb.DataIntVals.clear();
- tb.StringsOffsets.clear();
- tb.StringsValues.clear();
- tb.InterfaceValues.clear();
- tb.InterfaceOffsets.clear();
- tb.JoinIds.clear();
- tb.RightIds.clear();
-
- TTableBucketStats & tbs = TableBucketsStats[bucket];
- tbs.TuplesNum = 0;
- tbs.KeyIntValsTotalSize = 0;
- tbs.StringValuesTotalSize = 0;
+ for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) {
+ TTableBucket & tb = TableBuckets[bucket];
+ tb.KeyIntVals.clear();
+ tb.DataIntVals.clear();
+ tb.StringsOffsets.clear();
+ tb.StringsValues.clear();
+ tb.InterfaceValues.clear();
+ tb.InterfaceOffsets.clear();
+ tb.JoinIds.clear();
+ tb.RightIds.clear();
+
+ TTableBucketStats & tbs = TableBucketsStats[bucket];
+ tbs.TuplesNum = 0;
+ tbs.KeyIntValsTotalSize = 0;
+ tbs.StringValuesTotalSize = 0;
+ }
+}
+
+void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) {
+ for (size_t i = 0; i < NumberOfBuckets; ++i) {
+ TableBucketsSpillers.emplace_back(spiller, 5_MB);
+ }
+}
+
+ui64 TTable::GetSizeOfBucket(ui64 bucket) const {
+ return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64)
+ + TableBuckets[bucket].DataIntVals.size() * sizeof(ui64)
+ + TableBuckets[bucket].StringsValues.size()
+ + TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32)
+ + TableBuckets[bucket].InterfaceValues.size()
+ + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32);
+}
+
+bool TTable::TryToReduceMemoryAndWait() {
+ i32 largestBucketIndex = 0;
+ ui64 largestBucketSize = 0;
+ for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
+ if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true;
+
+ ui64 bucketSize = GetSizeOfBucket(bucket);
+ if (bucketSize > largestBucketSize) {
+ largestBucketSize = bucketSize;
+ largestBucketIndex = bucket;
}
+ }
+ if (largestBucketSize) return false;
+ TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex]));
+ TableBuckets[largestBucketIndex] = TTableBucket{};
+
+ return TableBucketsSpillers[largestBucketIndex].HasRunningAsyncIoOperation();
+}
+
+void TTable::UpdateSpilling() {
+ for (ui64 i = 0; i < NumberOfBuckets; ++i) {
+ TableBucketsSpillers[i].Update();
+ }
+}
+
+void TTable::FinalizeSpilling() {
+ MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error");
+
+ for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
+ if (!TableBucketsSpillers[bucket].IsInMemory()) {
+ TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket]));
+ TableBuckets[bucket] = TTableBucket{};
+ }
+ TableBucketsSpillers[bucket].Finalize();
+ }
+}
+
+bool TTable::HasRunningAsyncIoOperation() const {
+ for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) {
+ if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true;
+ }
+ return false;
+}
+
+bool TTable::IsBucketInMemory(ui32 bucket) const {
+ return TableBucketsSpillers[bucket].IsInMemory();
+}
+
+void TTable::StartLoadingBucket(ui32 bucket) {
+ MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error");
+
+ TableBucketsSpillers[bucket].StartBucketRestoration();
+}
+
+void TTable::ExtractBucket(ui64 bucket) {
+ TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket());
}
// Creates new table with key columns and data columns
@@ -1231,6 +1301,10 @@ bool TTableBucketSpiller::HasRunningAsyncIoOperation() const {
|| StateCharAdapter.HasRunningAsyncIoOperation();
}
+bool TTableBucketSpiller::IsInMemory() const {
+ return State == EState::InMemory;
+}
+
void TTableBucketSpiller::StartBucketRestoration() {
MKQL_ENSURE(State == EState::Restoring, "Internal logic error");
MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error");
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
index 87d84b3a5d..23d0d616a6 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h
@@ -9,6 +9,8 @@
namespace NKikimr {
namespace NMiniKQL {
namespace GraceJoin {
+
+class TTableBucketSpiller;
const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32
const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1;
@@ -93,6 +95,67 @@ struct TColTypeInterface {
const THolderFactory& HolderFactory; // To use during unpacking
};
+// Class that spills bucket data.
+// If, after saving, data has accumulated in the bucket again, you can spill it again.
+// After restoring the entire bucket, it will contain all the data saved over different iterations.
+class TTableBucketSpiller {
+public:
+ TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit);
+
+ // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs.
+ void SpillBucket(TTableBucket&& bucket);
+ // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation.
+ void StartBucketRestoration();
+ // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling.
+ TTableBucket&& ExtractBucket();
+
+ // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration.
+ void Update();
+ // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
+ void Finalize();
+ // Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
+ bool HasRunningAsyncIoOperation() const;
+
+ bool IsInMemory() const;
+
+private:
+ void ProcessBucketSpilling();
+ template <class T>
+ void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
+ void ProcessBucketRestoration();
+
+private:
+ enum class EState {
+ Spilling,
+ Restoring,
+ InMemory
+ };
+
+ enum class ENextVectorToProcess {
+ KeyAndVals,
+ DataIntVals,
+ StringsValues,
+ StringsOffsets,
+ InterfaceValues,
+ InterfaceOffsets,
+
+ None
+ };
+
+ TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter;
+ TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter;
+ TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter;
+
+ EState State = EState::InMemory;
+ ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None;
+
+ ui64 SpilledBucketsCount = 0;
+
+ bool IsFinalizing = false;
+
+ TTableBucket CurrentBucket;
+};
+
// Class which represents single table data stored in buckets
class TTable {
@@ -126,6 +189,8 @@ class TTable {
// Statistics for buckets. Total number of tuples inside a single bucket and offsets.
std::vector<TTableBucketStats> TableBucketsStats;
+ std::vector<TTableBucketSpiller> TableBucketsSpillers;
+
// Temporary vector for tuples manipulation;
std::vector<ui64> TempTuple;
@@ -194,83 +259,52 @@ public:
// Joins two tables and stores join result in table data. Tuples of joined table could be received by
// joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table
// hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false
- void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false );
-
+ void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets);
// Returns next jointed tuple data. Returs true if there are more tuples
bool NextJoinedData(TupleData& td1, TupleData& td2);
- // Clears table content
- void Clear();
+ // Creates buckets that support spilling.
+ void InitializeBucketSpillers(ISpiller::TPtr spiller);
- // Creates new table with key columns and data columns
- TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
- ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
- ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
- ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
-
- ~TTable();
+ // Calculates approximate size of a bucket. Used for spilling to determine the largest bucket.
+ ui64 GetSizeOfBucket(ui64 bucket) const;
-};
+ // This functions wind the largest bucket and spills it to the disk.
+ bool TryToReduceMemoryAndWait();
-// Class that spills bucket data.
-// If, after saving, data has accumulated in the bucket again, you can spill it again.
-// After restoring the entire bucket, it will contain all the data saved over different iterations.
-class TTableBucketSpiller {
-public:
- TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit);
+ // Update state of spilling. Must be called during each DoCalculate.
+ void UpdateSpilling();
- // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs.
- void SpillBucket(TTableBucket&& bucket);
- // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation.
- void StartBucketRestoration();
- // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling.
- TTableBucket&& ExtractBucket();
+ // Flushes all the spillers.
+ void FinalizeSpilling();
- // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration.
- void Update();
- // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling.
- void Finalize();
- // Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true.
+ // Checks if there any async operation running. If return value is true it's safe to return Yield.
bool HasRunningAsyncIoOperation() const;
-private:
- void ProcessBucketSpilling();
- template <class T>
- void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const;
- void ProcessBucketRestoration();
+ // Checks if bucket fully loaded to memory and may be joined.
+ bool IsBucketInMemory(ui32 bucket) const;
-private:
- enum class EState {
- Spilling,
- Restoring,
- InMemory
- };
-
- enum class ENextVectorToProcess {
- KeyAndVals,
- DataIntVals,
- StringsValues,
- StringsOffsets,
- InterfaceValues,
- InterfaceOffsets,
+ // Starts loading spilled bucket to memory.
+ void StartLoadingBucket(ui32 bucket);
- None
- };
+ // Extracts loaded bucket from spilling.
+ void ExtractBucket(ui64 bucket);
- TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter;
- TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter;
- TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter;
+ // Clears table content
+ void Clear();
- EState State = EState::InMemory;
- ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None;
+ // Creates new table with key columns and data columns
+ TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0,
+ ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0,
+ ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0,
+ ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false);
+
+ ~TTable();
- ui64 SpilledBucketsCount = 0;
+};
- bool IsFinalizing = false;
- TTableBucket CurrentBucket;
-};