diff options
| author | Filitov Mikhail <[email protected]> | 2024-07-17 16:37:15 +0200 |
|---|---|---|
| committer | GitHub <[email protected]> | 2024-07-17 16:37:15 +0200 |
| commit | fda767cc834942f37334c5bfe5c7d92e48e4c80a (patch) | |
| tree | c8680a7a02a79633ee6845bbe5f0cbb2db636feb | |
| parent | 02f915054b369d554aea8df6d318184a7834f40e (diff) | |
get rid of wide fields in wide combiner. Not used in llvm (#6536)
| -rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp | 95 |
1 files changed, 55 insertions, 40 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp index c28e8dee937..ff20307d634 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_wide_combine.cpp @@ -132,6 +132,24 @@ struct TCombinerNodes { } } + void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue** from, NUdf::TUnboxedValue* to) const { + for (ui32 i = 0U; i < ItemNodes.size(); ++i) { + if (from[i]) { + to[i] = std::move(*(from[i])); + } + } + } + + void ExtractValues(TComputationContext& ctx, NUdf::TUnboxedValue* from, NUdf::TUnboxedValue** to) const { + for (size_t i = 0, j = 0; i != ItemNodes.size(); ++i) { + if (IsInputItemNodeUsed(i)) { + *to[i] = std::move(from[j++]); + } else { + to[i] = nullptr; + } + } + } + void ProcessItem(TComputationContext& ctx, NUdf::TUnboxedValue* keys, NUdf::TUnboxedValue* state) const { if (keys) { std::fill_n(keys, KeyResultNodes.size(), NUdf::TUnboxedValuePod()); @@ -346,16 +364,16 @@ public: enum class ETasteResult: i8 { Init = -1, Update, - Skip + ConsumeRawData, + ExtractRawData }; TSpillingSupportState( - TMemoryUsageInfo* memInfo, size_t wideFieldsIndex, + TMemoryUsageInfo* memInfo, const TMultiType* usedInputItemType, const TMultiType* keyAndStateType, ui32 keyWidth, size_t itemNodesSize, const THashFunc& hash, const TEqualsFunc& equal, bool allowSpilling, TComputationContext& ctx ) : TBase(memInfo) , InMemoryProcessingState(memInfo, keyWidth, keyAndStateType->GetElementsCount() - keyWidth, hash, equal) - , WideFieldsIndex(wideFieldsIndex) , UsedInputItemType(usedInputItemType) , KeyAndStateType(keyAndStateType) , KeyWidth(keyWidth) @@ -380,7 +398,7 @@ public: bool IsProcessingRequired() const { if (InputStatus != EFetchResult::Finish) return true; - return HasDataForProcessing; + return HasRawDataToExtract || HasDataForProcessing; } bool UpdateAndWait() { @@ -424,10 +442,19 @@ public: return isNew ? ETasteResult::Init : ETasteResult::Update; } if (GetMode() == EOperatingMode::ProcessSpilled) { + if (HasRawDataToExtract) { + // Tongue not used here. + Throat = BufferForUsedInputItems.data(); + HasRawDataToExtract = false; + HasDataForProcessing = true; + return ETasteResult::ExtractRawData; + } + HasDataForProcessing = false; // while restoration we process buckets one by one starting from the first in a queue bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt(); Throat = SpilledBuckets.front().InMemoryProcessingState->Throat; Tongue = SpilledBuckets.front().InMemoryProcessingState->Tongue; + BufferForUsedInputItems.resize(0); return isNew ? ETasteResult::Init : ETasteResult::Update; } @@ -445,9 +472,13 @@ public: // Corresponding bucket is spilled, we don't need a key anymore, full input will be spilled BufferForKeyAndState.resize(0); - TryToSpillRawData(bucket, bucketId); + // Prepare space for raw data + MKQL_ENSURE(BufferForUsedInputItems.size() == 0, "Internal logic error"); + BufferForUsedInputItems.resize(ItemNodesSize); + BufferForUsedInputItemsBucketId = bucketId; + Throat = BufferForUsedInputItems.data(); - return ETasteResult::Skip; + return ETasteResult::ConsumeRawData; } NUdf::TUnboxedValuePod* Extract() { @@ -472,25 +503,6 @@ private: BufferForKeyAndState.resize(0); } - // Copies data from WideFields to local and tries to spill it using suitable bucket. - // if the bucket is already busy, then the buffer will wait for the next iteration. - void TryToSpillRawData(TSpilledBucket& bucket, size_t bucketId) { - auto **fields = Ctx.WideFields.data() + WideFieldsIndex; - MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error"); - - for (size_t i = 0; i < ItemNodesSize; ++i) { - if (fields[i]) { - BufferForUsedInputItems.push_back(*fields[i]); - } - } - if (bucket.AsyncWriteOperation.has_value()) { - BufferForUsedInputItemsBucketId = bucketId; - return; - } - bucket.AsyncWriteOperation = bucket.SpilledData->WriteWideItem(BufferForUsedInputItems); - BufferForUsedInputItems.resize(0); - } - bool FlushSpillingBuffersAndWait() { UpdateSpillingBuckets(); @@ -620,8 +632,14 @@ private: } AsyncReadOperation = std::nullopt; } + auto& bucket = SpilledBuckets.front(); if (bucket.BucketState == TSpilledBucket::EBucketState::InMemory) return false; + if (HasDataForProcessing) { + Tongue = bucket.InMemoryProcessingState->Tongue; + Throat = bucket.InMemoryProcessingState->Throat; + return false; + } //recover spilled state while(!bucket.SpilledState->Empty()) { RecoverState = true; @@ -651,17 +669,11 @@ private: if (AsyncReadOperation) { return true; } - auto **fields = Ctx.WideFields.data() + WideFieldsIndex; - for (size_t i = 0, j = 0; i < ItemNodesSize; ++i) { - if (fields[i]) { - fields[i] = &(BufferForUsedInputItems[j++]); - } - } Tongue = bucket.InMemoryProcessingState->Tongue; Throat = bucket.InMemoryProcessingState->Throat; - HasDataForProcessing = true; + HasRawDataToExtract = true; return false; } bucket.BucketState = TSpilledBucket::EBucketState::InMemory; @@ -725,8 +737,9 @@ private: bool HasDataForProcessing = false; + bool HasRawDataToExtract = false; + TState InMemoryProcessingState; - const size_t WideFieldsIndex; const TMultiType* const UsedInputItemType; const TMultiType* const KeyAndStateType; const size_t KeyWidth; @@ -1237,6 +1250,7 @@ public: , AllowSpilling(allowSpilling) {} + // MARK: DoCAlculate EFetchResult DoCalculate(NUdf::TUnboxedValue& state, TComputationContext& ctx, NUdf::TUnboxedValue*const* output) const { if (!state.HasValue()) { MakeState(ctx, state); @@ -1246,14 +1260,12 @@ public: auto **fields = ctx.WideFields.data() + WideFieldsIndex; while (true) { - for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) - fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); - if (ptr->UpdateAndWait()) { return EFetchResult::Yield; } - if (ptr->InputStatus != EFetchResult::Finish) { + for (auto i = 0U; i < Nodes.ItemNodes.size(); ++i) + fields[i] = Nodes.GetUsedInputItemNodePtrOrNull(ctx, i); switch (ptr->InputStatus = Flow->FetchValues(ctx, fields)) { case EFetchResult::One: break; @@ -1274,7 +1286,11 @@ public: case TSpillingSupportState::ETasteResult::Update: Nodes.ProcessItem(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Tongue), static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); break; - case TSpillingSupportState::ETasteResult::Skip: + case TSpillingSupportState::ETasteResult::ConsumeRawData: + Nodes.ExtractValues(ctx, fields, static_cast<NUdf::TUnboxedValue*>(ptr->Throat)); + break; + case TSpillingSupportState::ETasteResult::ExtractRawData: + Nodes.ExtractValues(ctx, static_cast<NUdf::TUnboxedValue*>(ptr->Throat), fields); break; } continue; @@ -1553,8 +1569,7 @@ public: #endif private: void MakeState(TComputationContext& ctx, NUdf::TUnboxedValue& state) const { - state = ctx.HolderFactory.Create<TSpillingSupportState>(WideFieldsIndex, - UsedInputItemType, KeyAndStateType, + state = ctx.HolderFactory.Create<TSpillingSupportState>(UsedInputItemType, KeyAndStateType, Nodes.KeyNodes.size(), Nodes.ItemNodes.size(), #ifdef MKQL_DISABLE_CODEGEN |
