diff options
author | mfilitov <mfilitov@yandex-team.com> | 2025-02-21 18:53:51 +0300 |
---|---|---|
committer | mfilitov <mfilitov@yandex-team.com> | 2025-02-21 19:32:10 +0300 |
commit | 55a54de1a92c64f8fdddfdcf8ac9a85d4177a301 (patch) | |
tree | 03497ba477b3d77fd062bf45235fa118ba365ac9 | |
parent | c1c05c837b20f16588de56e1ee15e862d37a23b3 (diff) | |
download | ydb-55a54de1a92c64f8fdddfdcf8ac9a85d4177a301.tar.gz |
Fix early object deletion in wide combiner
Description:
<https://nda.ya.ru/t/_8nms0JC7C4StC>
commit_hash:6eb7f4b01d1203ca1f66bf7c86cd78f3cd5a2691
-rw-r--r-- | yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp | 6 |
1 files changed, 4 insertions, 2 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp index e14f3c5082..4e5f467696 100644 --- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp +++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp @@ -504,7 +504,6 @@ public: // while restoration we process buckets one by one starting from the first in a queue bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt(); Throat = SpilledBuckets.front().InMemoryProcessingState->Throat; - BufferForUsedInputItems.resize(0); return isNew ? ETasteResult::Init : ETasteResult::Update; } @@ -837,7 +836,7 @@ private: //process spilled data if (!bucket.SpilledData->Empty()) { RecoverState = false; - BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); + std::fill(BufferForUsedInputItems.begin(), BufferForUsedInputItems.end(), NUdf::TUnboxedValuePod()); AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems); if (AsyncReadOperation) { return EUpdateResult::Yield; @@ -886,6 +885,9 @@ private: YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled"; MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error"); MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error"); + MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error"); + + BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount()); std::sort(SpilledBuckets.begin(), SpilledBuckets.end(), [](const TSpilledBucket& lhs, const TSpilledBucket& rhs) { bool lhs_in_memory = lhs.BucketState == TSpilledBucket::EBucketState::InMemory; |