diff options
author | Filitov Mikhail <filitovme@gmail.com> | 2024-05-23 10:56:15 +0200 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-05-23 10:56:15 +0200 |
commit | daae4610f74efb5573b4c4704766637b4318d910 (patch) | |
tree | db2bec227359e6e467eb6ee6fd9aa07fc8d3b60f | |
parent | f702ceca69169c3999b3d2f71aee3b99541d9e80 (diff) | |
download | ydb-daae4610f74efb5573b4c4704766637b4318d910.tar.gz |
Yql 18391 gracejoin spilling support (#4700)
3 files changed, 582 insertions, 82 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp index 35a6ece194..12c1b2080e 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join.cpp @@ -20,7 +20,6 @@ #include <ydb/library/yql/parser/pg_catalog/catalog.h> #include <chrono> -#include <format> #include <limits> namespace NKikimr { @@ -556,6 +555,384 @@ TGraceJoinPacker::TGraceJoinPacker(const std::vector<TType *> & columnTypes, con } +class TGraceJoinSpillingSupportState : public TComputationValue<TGraceJoinSpillingSupportState> { + using TBase = TComputationValue<TGraceJoinSpillingSupportState>; + enum class EOperatingMode { + InMemory, + Spilling, + ProcessSpilled + }; +public: + + TGraceJoinSpillingSupportState(TMemoryUsageInfo* memInfo, + IComputationWideFlowNode* flowLeft, IComputationWideFlowNode* flowRight, + EJoinKind joinKind, EAnyJoinSettings anyJoinSettings, const std::vector<ui32>& leftKeyColumns, const std::vector<ui32>& rightKeyColumns, + const std::vector<ui32>& leftRenames, const std::vector<ui32>& rightRenames, + const std::vector<TType*>& leftColumnsTypes, const std::vector<TType*>& rightColumnsTypes, const THolderFactory & holderFactory, + const bool isSelfJoin) + : TBase(memInfo) + , FlowLeft(flowLeft) + , FlowRight(flowRight) + , JoinKind(joinKind) + , LeftKeyColumns(leftKeyColumns) + , RightKeyColumns(rightKeyColumns) + , LeftRenames(leftRenames) + , RightRenames(rightRenames) + , LeftPacker(std::make_unique<TGraceJoinPacker>(leftColumnsTypes, leftKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Left || anyJoinSettings == EAnyJoinSettings::Both))) + , RightPacker(std::make_unique<TGraceJoinPacker>(rightColumnsTypes, rightKeyColumns, holderFactory, (anyJoinSettings == EAnyJoinSettings::Right || anyJoinSettings == EAnyJoinSettings::Both))) + , JoinedTablePtr(std::make_unique<GraceJoin::TTable>()) + , JoinCompleted(std::make_unique<bool>(false)) + , PartialJoinCompleted(std::make_unique<bool>(false)) + , HaveMoreLeftRows(std::make_unique<bool>(true)) + , HaveMoreRightRows(std::make_unique<bool>(true)) + , JoinedTuple(std::make_unique<std::vector<NUdf::TUnboxedValue*>>() ) + , IsSelfJoin_(isSelfJoin) + , SelfJoinSameKeys_(isSelfJoin && (leftKeyColumns == rightKeyColumns)) + { + if (JoinKind == EJoinKind::Full || JoinKind == EJoinKind::Exclusion || IsSelfJoin_) { + LeftPacker->BatchSize = std::numeric_limits<ui64>::max(); + RightPacker->BatchSize = std::numeric_limits<ui64>::max(); + } + } + + EFetchResult FetchValues(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + while (true) { + switch(GetMode()) { + case EOperatingMode::InMemory: { + auto r = DoCalculateInMemory(ctx, output); + if (GetMode() == EOperatingMode::InMemory) { + return r; + } + break; + } + case EOperatingMode::Spilling: { + DoCalculateWithSpilling(ctx); + if (GetMode() == EOperatingMode::Spilling) { + return EFetchResult::Yield; + } + break; + } + case EOperatingMode::ProcessSpilled: { + return ProcessSpilledData(ctx, output); + } + + } + } + Y_UNREACHABLE(); + } + +private: + EOperatingMode GetMode() const { + return Mode; + } + + bool HasMemoryForProcessing() const { + return !TlsAllocState->IsMemoryYellowZoneEnabled(); + } + + bool IsSwitchToSpillingModeCondition() const { + return false; + // TODO: YQL-18033 + // return !HasMemoryForProcessing(); + } + + + void SwitchMode(EOperatingMode mode, TComputationContext& ctx) { + switch(mode) { + case EOperatingMode::InMemory: { + MKQL_ENSURE(false, "Internal logic error"); + break; + } + case EOperatingMode::Spilling: { + MKQL_ENSURE(EOperatingMode::InMemory == Mode, "Internal logic error"); + auto spiller = ctx.SpillerFactory->CreateSpiller(); + RightPacker->TablePtr->InitializeBucketSpillers(spiller); + LeftPacker->TablePtr->InitializeBucketSpillers(spiller); + break; + } + case EOperatingMode::ProcessSpilled: { + MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); + break; + } + + } + Mode = mode; + } + + bool FetchAndPackData(TComputationContext& ctx) { + const NKikimr::NMiniKQL::EFetchResult resultLeft = FlowLeft->FetchValues(ctx, LeftPacker->TuplePtrs.data()); + NKikimr::NMiniKQL::EFetchResult resultRight; + + if (IsSelfJoin_) { + resultRight = resultLeft; + if (!SelfJoinSameKeys_) { + std::copy_n(LeftPacker->TupleHolder.begin(), LeftPacker->TotalColumnsNum, RightPacker->TupleHolder.begin()); + } + } else { + resultRight = FlowRight->FetchValues(ctx, RightPacker->TuplePtrs.data()); + } + + if (resultLeft == EFetchResult::One) { + if (LeftPacker->TuplesPacked == 0) { + LeftPacker->StartTime = std::chrono::system_clock::now(); + } + LeftPacker->Pack(); + LeftPacker->TablePtr->AddTuple(LeftPacker->TupleIntVals.data(), LeftPacker->TupleStrings.data(), LeftPacker->TupleStrSizes.data(), LeftPacker->IColumnsHolder.data()); + } + + if (resultRight == EFetchResult::One) { + if (RightPacker->TuplesPacked == 0) { + RightPacker->StartTime = std::chrono::system_clock::now(); + } + + if ( !SelfJoinSameKeys_ ) { + RightPacker->Pack(); + RightPacker->TablePtr->AddTuple(RightPacker->TupleIntVals.data(), RightPacker->TupleStrings.data(), RightPacker->TupleStrSizes.data(), RightPacker->IColumnsHolder.data()); + } + } + + if (resultLeft == EFetchResult::Finish ) { + *HaveMoreLeftRows = false; + } + + + if (resultRight == EFetchResult::Finish ) { + *HaveMoreRightRows = false; + } + + if ((resultLeft == EFetchResult::Yield && (!*HaveMoreRightRows || resultRight == EFetchResult::Yield)) || + (resultRight == EFetchResult::Yield && !*HaveMoreLeftRows)) + { + return true; + } + return false; + } + + void UnpackJoinedData(NUdf::TUnboxedValue*const* output) { + LeftPacker->UnPack(); + RightPacker->UnPack(); + + auto &valsLeft = LeftPacker->TupleHolder; + auto &valsRight = RightPacker->TupleHolder; + + for (size_t i = 0; i < LeftRenames.size() / 2; i++) { + auto & valPtr = output[LeftRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsLeft[LeftRenames[2 * i]]; + } + } + + for (size_t i = 0; i < RightRenames.size() / 2; i++) { + auto & valPtr = output[RightRenames[2 * i + 1]]; + if ( valPtr ) { + *valPtr = valsRight[RightRenames[2 * i]]; + } + } + } + + EFetchResult DoCalculateInMemory(TComputationContext& ctx, NUdf::TUnboxedValue*const* output) { + // Collecting data for join and perform join (batch or full) + while (!*JoinCompleted ) { + + if ( *PartialJoinCompleted) { + // Returns join results (batch or full) + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { + UnpackJoinedData(output); + return EFetchResult::One; + } + + // Resets batch state for batch join + if (!*HaveMoreRightRows) { + *PartialJoinCompleted = false; + LeftPacker->TuplesBatchPacked = 0; + LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + JoinedTablePtr->Clear(); + JoinedTablePtr->ResetIterator(); + } + + + if (!*HaveMoreLeftRows ) { + *PartialJoinCompleted = false; + RightPacker->TuplesBatchPacked = 0; + RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + JoinedTablePtr->Clear(); + JoinedTablePtr->ResetIterator(); + } + } + + if (!*HaveMoreRightRows && !*HaveMoreLeftRows) { + *JoinCompleted = true; + break; + } + + bool isYield = FetchAndPackData(ctx); + if (ctx.SpillerFactory && IsSwitchToSpillingModeCondition()) { + SwitchMode(EOperatingMode::Spilling, ctx); + return EFetchResult::Yield; + } + if (isYield) return EFetchResult::Yield; + + if (!*HaveMoreRightRows && !*PartialJoinCompleted && LeftPacker->TuplesBatchPacked >= LeftPacker->BatchSize ) { + *PartialJoinCompleted = true; + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + JoinedTablePtr->ResetIterator(); + } + + if (!*HaveMoreLeftRows && !*PartialJoinCompleted && RightPacker->TuplesBatchPacked >= RightPacker->BatchSize ) { + *PartialJoinCompleted = true; + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + JoinedTablePtr->ResetIterator(); + + } + + if (!*HaveMoreRightRows && !*HaveMoreLeftRows && !*PartialJoinCompleted) { + *PartialJoinCompleted = true; + LeftPacker->StartTime = std::chrono::system_clock::now(); + RightPacker->StartTime = std::chrono::system_clock::now(); + if ( SelfJoinSameKeys_ ) { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + } else { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows); + } + JoinedTablePtr->ResetIterator(); + LeftPacker->EndTime = std::chrono::system_clock::now(); + RightPacker->EndTime = std::chrono::system_clock::now(); + } + + } + + return EFetchResult::Finish; + } + + bool TryToReduceMemoryAndWait() { + bool isWaitingLeftForReduce = LeftPacker->TablePtr->TryToReduceMemoryAndWait(); + bool isWaitingRightForReduce = RightPacker->TablePtr->TryToReduceMemoryAndWait(); + + return isWaitingLeftForReduce || isWaitingRightForReduce; + } + + void UpdateSpilling() { + LeftPacker->TablePtr->UpdateSpilling(); + RightPacker->TablePtr->UpdateSpilling(); + } + + bool HasRunningAsyncOperation() const { + return LeftPacker->TablePtr->HasRunningAsyncIoOperation() || RightPacker->TablePtr->HasRunningAsyncIoOperation(); + } + +void DoCalculateWithSpilling(TComputationContext& ctx) { + UpdateSpilling(); + + if (!HasMemoryForProcessing()) { + bool isWaitingForReduce = TryToReduceMemoryAndWait(); + if (isWaitingForReduce) return; + } + + while (*HaveMoreLeftRows || *HaveMoreRightRows) { + bool isYield = FetchAndPackData(ctx); + if (isYield) return; + } + + if (!*HaveMoreLeftRows && !*HaveMoreRightRows) { + UpdateSpilling(); + if (HasRunningAsyncOperation()) return; + if (!IsSpillingFinalized) { + LeftPacker->TablePtr->FinalizeSpilling(); + RightPacker->TablePtr->FinalizeSpilling(); + IsSpillingFinalized = true; + + if (HasRunningAsyncOperation()) return; + } + SwitchMode(EOperatingMode::ProcessSpilled, ctx); + return; + } +} + +EFetchResult ProcessSpilledData(TComputationContext&, NUdf::TUnboxedValue*const* output) { + while (NextBucketToJoin != GraceJoin::NumberOfBuckets) { + UpdateSpilling(); + + if (HasRunningAsyncOperation()) return EFetchResult::Yield; + + if (!LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + LeftPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); + } + + if (!RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + RightPacker->TablePtr->StartLoadingBucket(NextBucketToJoin); + } + + if (LeftPacker->TablePtr->IsBucketInMemory(NextBucketToJoin) && RightPacker->TablePtr->IsBucketInMemory(NextBucketToJoin)) { + if (*PartialJoinCompleted) { + while (JoinedTablePtr->NextJoinedData(LeftPacker->JoinTupleData, RightPacker->JoinTupleData)) { + UnpackJoinedData(output); + + return EFetchResult::One; + + } + + LeftPacker->TuplesBatchPacked = 0; + LeftPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + + RightPacker->TuplesBatchPacked = 0; + RightPacker->TablePtr->Clear(); // Clear table content, ready to collect data for next batch + + JoinedTablePtr->Clear(); + JoinedTablePtr->ResetIterator(); + *PartialJoinCompleted = false; + + NextBucketToJoin++; + } else { + LeftPacker->TablePtr->ExtractBucket(NextBucketToJoin); + RightPacker->TablePtr->ExtractBucket(NextBucketToJoin); + *PartialJoinCompleted = true; + LeftPacker->StartTime = std::chrono::system_clock::now(); + RightPacker->StartTime = std::chrono::system_clock::now(); + if ( SelfJoinSameKeys_ ) { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *LeftPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1); + } else { + JoinedTablePtr->Join(*LeftPacker->TablePtr, *RightPacker->TablePtr, JoinKind, *HaveMoreLeftRows, *HaveMoreRightRows, NextBucketToJoin, NextBucketToJoin+1); + } + + JoinedTablePtr->ResetIterator(); + LeftPacker->EndTime = std::chrono::system_clock::now(); + RightPacker->EndTime = std::chrono::system_clock::now(); + } + + } + } + return EFetchResult::Finish; +} + +private: + EOperatingMode Mode = EOperatingMode::InMemory; + + IComputationWideFlowNode* const FlowLeft; + IComputationWideFlowNode* const FlowRight; + + const EJoinKind JoinKind; + const std::vector<ui32> LeftKeyColumns; + const std::vector<ui32> RightKeyColumns; + const std::vector<ui32> LeftRenames; + const std::vector<ui32> RightRenames; + const std::vector<TType *> LeftColumnsTypes; + const std::vector<TType *> RightColumnsTypes; + const std::unique_ptr<TGraceJoinPacker> LeftPacker; + const std::unique_ptr<TGraceJoinPacker> RightPacker; + const std::unique_ptr<GraceJoin::TTable> JoinedTablePtr; + const std::unique_ptr<bool> JoinCompleted; + const std::unique_ptr<bool> PartialJoinCompleted; + const std::unique_ptr<bool> HaveMoreLeftRows; + const std::unique_ptr<bool> HaveMoreRightRows; + const std::unique_ptr<std::vector<NUdf::TUnboxedValue*>> JoinedTuple; + const bool IsSelfJoin_; + const bool SelfJoinSameKeys_; + + bool IsSpillingFinalized = false; + + ui32 NextBucketToJoin = 0; +}; + class TGraceJoinState : public TComputationValue<TGraceJoinState> { using TBase = TComputationValue<TGraceJoinState>; public: @@ -639,10 +1016,18 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { if (!state.HasValue()) { - MakeState(ctx, state); + if (!ctx.ExecuteLLVM) { + MakeSpillingSupportState(ctx, state); + } else { + MakeState(ctx, state); + } + } + + if (ctx.ExecuteLLVM) { + return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output); } - return static_cast<TGraceJoinState*>(state.AsBoxed().Get())->FetchValues(ctx, output); + return static_cast<TGraceJoinSpillingSupportState*>(state.AsBoxed().Get())->FetchValues(ctx, output); } #ifndef MKQL_DISABLE_CODEGEN ICodegeneratorInlineWideNode::TGenerateResult DoGenGetValues(const TCodegenContext& ctx, Value* statePtr, BasicBlock*& block) const { @@ -735,6 +1120,13 @@ class TGraceJoinWrapper : public TStatefulWideFlowCodegeneratorNode<TGraceJoinWr ctx.HolderFactory, IsSelfJoin_); } + void MakeSpillingSupportState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { + state = ctx.HolderFactory.Create<TGraceJoinSpillingSupportState>( + FlowLeft, FlowRight, JoinKind, AnyJoinSettings_, LeftKeyColumns, RightKeyColumns, + LeftRenames, RightRenames, LeftColumnsTypes, RightColumnsTypes, + ctx.HolderFactory, IsSelfJoin_); + } + IComputationWideFlowNode *const FlowLeft; IComputationWideFlowNode *const FlowRight; const EJoinKind JoinKind; diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp index 46a902dc8f..0f1b2ed44a 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.cpp @@ -262,10 +262,7 @@ void ResizeHashTable(KeysHashTable &t, ui64 newSlots){ // Joins two tables and returns join result in joined table. Tuples of joined table could be received by // joined table iterator -void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples ) { - - - +void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLeftTuples, bool hasMoreRightTuples, ui32 fromBucket, ui32 toBucket ) { if ( hasMoreLeftTuples ) LeftTableBatch_ = true; @@ -298,7 +295,7 @@ void TTable::Join( TTable & t1, TTable & t2, EJoinKind joinKind, bool hasMoreLef std::vector<JoinTuplesIds, TMKQLAllocator<JoinTuplesIds, EMemorySubPool::Temporary>> joinResults; - for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) { + for (ui64 bucket = fromBucket; bucket < toBucket; bucket++) { joinResults.clear(); TTableBucket * bucket1 = &JoinTable1->TableBuckets[bucket]; @@ -1111,24 +1108,97 @@ bool TTable::NextJoinedData( TupleData & td1, TupleData & td2) { void TTable::Clear() { - for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) { - TTableBucket & tb = TableBuckets[bucket]; - tb.KeyIntVals.clear(); - tb.DataIntVals.clear(); - tb.StringsOffsets.clear(); - tb.StringsValues.clear(); - tb.InterfaceValues.clear(); - tb.InterfaceOffsets.clear(); - tb.JoinIds.clear(); - tb.RightIds.clear(); - - TTableBucketStats & tbs = TableBucketsStats[bucket]; - tbs.TuplesNum = 0; - tbs.KeyIntValsTotalSize = 0; - tbs.StringValuesTotalSize = 0; + for (ui64 bucket = 0; bucket < NumberOfBuckets; bucket++) { + TTableBucket & tb = TableBuckets[bucket]; + tb.KeyIntVals.clear(); + tb.DataIntVals.clear(); + tb.StringsOffsets.clear(); + tb.StringsValues.clear(); + tb.InterfaceValues.clear(); + tb.InterfaceOffsets.clear(); + tb.JoinIds.clear(); + tb.RightIds.clear(); + + TTableBucketStats & tbs = TableBucketsStats[bucket]; + tbs.TuplesNum = 0; + tbs.KeyIntValsTotalSize = 0; + tbs.StringValuesTotalSize = 0; + } +} + +void TTable::InitializeBucketSpillers(ISpiller::TPtr spiller) { + for (size_t i = 0; i < NumberOfBuckets; ++i) { + TableBucketsSpillers.emplace_back(spiller, 5_MB); + } +} + +ui64 TTable::GetSizeOfBucket(ui64 bucket) const { + return TableBuckets[bucket].KeyIntVals.size() * sizeof(ui64) + + TableBuckets[bucket].DataIntVals.size() * sizeof(ui64) + + TableBuckets[bucket].StringsValues.size() + + TableBuckets[bucket].StringsOffsets.size() * sizeof(ui32) + + TableBuckets[bucket].InterfaceValues.size() + + TableBuckets[bucket].InterfaceOffsets.size() * sizeof(ui32); +} + +bool TTable::TryToReduceMemoryAndWait() { + i32 largestBucketIndex = 0; + ui64 largestBucketSize = 0; + for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) { + if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true; + + ui64 bucketSize = GetSizeOfBucket(bucket); + if (bucketSize > largestBucketSize) { + largestBucketSize = bucketSize; + largestBucketIndex = bucket; } + } + if (largestBucketSize) return false; + TableBucketsSpillers[largestBucketIndex].SpillBucket(std::move(TableBuckets[largestBucketIndex])); + TableBuckets[largestBucketIndex] = TTableBucket{}; + + return TableBucketsSpillers[largestBucketIndex].HasRunningAsyncIoOperation(); +} + +void TTable::UpdateSpilling() { + for (ui64 i = 0; i < NumberOfBuckets; ++i) { + TableBucketsSpillers[i].Update(); + } +} + +void TTable::FinalizeSpilling() { + MKQL_ENSURE(!HasRunningAsyncIoOperation(), "Internal logic error"); + + for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) { + if (!TableBucketsSpillers[bucket].IsInMemory()) { + TableBucketsSpillers[bucket].SpillBucket(std::move(TableBuckets[bucket])); + TableBuckets[bucket] = TTableBucket{}; + } + TableBucketsSpillers[bucket].Finalize(); + } +} + +bool TTable::HasRunningAsyncIoOperation() const { + for (ui32 bucket = 0; bucket < NumberOfBuckets; ++bucket) { + if (TableBucketsSpillers[bucket].HasRunningAsyncIoOperation()) return true; + } + return false; +} + +bool TTable::IsBucketInMemory(ui32 bucket) const { + return TableBucketsSpillers[bucket].IsInMemory(); +} + +void TTable::StartLoadingBucket(ui32 bucket) { + MKQL_ENSURE(!TableBucketsSpillers[bucket].IsInMemory(), "Internal logic error"); + + TableBucketsSpillers[bucket].StartBucketRestoration(); +} + +void TTable::ExtractBucket(ui64 bucket) { + TableBuckets[bucket] = std::move(TableBucketsSpillers[bucket].ExtractBucket()); } // Creates new table with key columns and data columns @@ -1231,6 +1301,10 @@ bool TTableBucketSpiller::HasRunningAsyncIoOperation() const { || StateCharAdapter.HasRunningAsyncIoOperation(); } +bool TTableBucketSpiller::IsInMemory() const { + return State == EState::InMemory; +} + void TTableBucketSpiller::StartBucketRestoration() { MKQL_ENSURE(State == EState::Restoring, "Internal logic error"); MKQL_ENSURE(NextVectorToProcess == ENextVectorToProcess::None, "Internal logic error"); diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h index 87d84b3a5d..23d0d616a6 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h +++ b/ydb/library/yql/minikql/comp_nodes/mkql_grace_join_imp.h @@ -9,6 +9,8 @@ namespace NKikimr { namespace NMiniKQL { namespace GraceJoin { + +class TTableBucketSpiller; const ui64 BitsForNumberOfBuckets = 5; // 2^5 = 32 const ui64 BucketsMask = (0x00000001 << BitsForNumberOfBuckets) - 1; @@ -93,6 +95,67 @@ struct TColTypeInterface { const THolderFactory& HolderFactory; // To use during unpacking }; +// Class that spills bucket data. +// If, after saving, data has accumulated in the bucket again, you can spill it again. +// After restoring the entire bucket, it will contain all the data saved over different iterations. +class TTableBucketSpiller { +public: + TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit); + + // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs. + void SpillBucket(TTableBucket&& bucket); + // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation. + void StartBucketRestoration(); + // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling. + TTableBucket&& ExtractBucket(); + + // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration. + void Update(); + // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling. + void Finalize(); + // Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true. + bool HasRunningAsyncIoOperation() const; + + bool IsInMemory() const; + +private: + void ProcessBucketSpilling(); + template <class T> + void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const; + void ProcessBucketRestoration(); + +private: + enum class EState { + Spilling, + Restoring, + InMemory + }; + + enum class ENextVectorToProcess { + KeyAndVals, + DataIntVals, + StringsValues, + StringsOffsets, + InterfaceValues, + InterfaceOffsets, + + None + }; + + TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter; + TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter; + TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter; + + EState State = EState::InMemory; + ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None; + + ui64 SpilledBucketsCount = 0; + + bool IsFinalizing = false; + + TTableBucket CurrentBucket; +}; + // Class which represents single table data stored in buckets class TTable { @@ -126,6 +189,8 @@ class TTable { // Statistics for buckets. Total number of tuples inside a single bucket and offsets. std::vector<TTableBucketStats> TableBucketsStats; + std::vector<TTableBucketSpiller> TableBucketsSpillers; + // Temporary vector for tuples manipulation; std::vector<ui64> TempTuple; @@ -194,83 +259,52 @@ public: // Joins two tables and stores join result in table data. Tuples of joined table could be received by // joined table iterator. Life time of t1, t2 should be greater than lifetime of joined table // hasMoreLeftTuples, hasMoreRightTuples is true if join is partial and more rows are coming. For final batch hasMoreLeftTuples = false, hasMoreRightTuples = false - void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false ); - + void Join(TTable& t1, TTable& t2, EJoinKind joinKind = EJoinKind::Inner, bool hasMoreLeftTuples = false, bool hasMoreRightTuples = false, ui32 fromBucket = 0, ui32 toBucket = NumberOfBuckets); // Returns next jointed tuple data. Returs true if there are more tuples bool NextJoinedData(TupleData& td1, TupleData& td2); - // Clears table content - void Clear(); + // Creates buckets that support spilling. + void InitializeBucketSpillers(ISpiller::TPtr spiller); - // Creates new table with key columns and data columns - TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, - ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, - ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, - ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); - - ~TTable(); + // Calculates approximate size of a bucket. Used for spilling to determine the largest bucket. + ui64 GetSizeOfBucket(ui64 bucket) const; -}; + // This functions wind the largest bucket and spills it to the disk. + bool TryToReduceMemoryAndWait(); -// Class that spills bucket data. -// If, after saving, data has accumulated in the bucket again, you can spill it again. -// After restoring the entire bucket, it will contain all the data saved over different iterations. -class TTableBucketSpiller { -public: - TTableBucketSpiller(ISpiller::TPtr spiller, size_t sizeLimit); + // Update state of spilling. Must be called during each DoCalculate. + void UpdateSpilling(); - // Takes the bucket and immediately starts spilling. Spilling continues until an async operation occurs. - void SpillBucket(TTableBucket&& bucket); - // Starts bucket restoration after spilling. Restores and unites all the buckets from different iterations. Will pause in case of async operation. - void StartBucketRestoration(); - // Extracts bucket restored from spilling. This bucket will contain all the data from different iterations of spilling. - TTableBucket&& ExtractBucket(); + // Flushes all the spillers. + void FinalizeSpilling(); - // Updates the states of spillers. This update should be called after async operation completion to resume spilling/resoration. - void Update(); - // Flushes all the data from inner spillers. Should be called when no more data is expected for spilling. - void Finalize(); - // Checks if spillers are waiting for any running async operation. No calls other than update are allowed when the method returns true. + // Checks if there any async operation running. If return value is true it's safe to return Yield. bool HasRunningAsyncIoOperation() const; -private: - void ProcessBucketSpilling(); - template <class T> - void AppendVector(std::vector<T, TMKQLAllocator<T>>& first, std::vector<T, TMKQLAllocator<T>>&& second) const; - void ProcessBucketRestoration(); + // Checks if bucket fully loaded to memory and may be joined. + bool IsBucketInMemory(ui32 bucket) const; -private: - enum class EState { - Spilling, - Restoring, - InMemory - }; - - enum class ENextVectorToProcess { - KeyAndVals, - DataIntVals, - StringsValues, - StringsOffsets, - InterfaceValues, - InterfaceOffsets, + // Starts loading spilled bucket to memory. + void StartLoadingBucket(ui32 bucket); - None - }; + // Extracts loaded bucket from spilling. + void ExtractBucket(ui64 bucket); - TVectorSpillerAdapter<ui64, TMKQLAllocator<ui64>> StateUi64Adapter; - TVectorSpillerAdapter<ui32, TMKQLAllocator<ui32>> StateUi32Adapter; - TVectorSpillerAdapter<char, TMKQLAllocator<char>> StateCharAdapter; + // Clears table content + void Clear(); - EState State = EState::InMemory; - ENextVectorToProcess NextVectorToProcess = ENextVectorToProcess::None; + // Creates new table with key columns and data columns + TTable(ui64 numberOfKeyIntColumns = 0, ui64 numberOfKeyStringColumns = 0, + ui64 numberOfDataIntColumns = 0, ui64 numberOfDataStringColumns = 0, + ui64 numberOfKeyIColumns = 0, ui64 numberOfDataIColumns = 0, + ui64 nullsBitmapSize = 1, TColTypeInterface * colInterfaces = nullptr, bool isAny = false); + + ~TTable(); - ui64 SpilledBucketsCount = 0; +}; - bool IsFinalizing = false; - TTableBucket CurrentBucket; -}; |