aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authormfilitov <mfilitov@yandex-team.com>2025-02-21 18:53:51 +0300
committermfilitov <mfilitov@yandex-team.com>2025-02-21 19:32:10 +0300
commit55a54de1a92c64f8fdddfdcf8ac9a85d4177a301 (patch)
tree03497ba477b3d77fd062bf45235fa118ba365ac9
parentc1c05c837b20f16588de56e1ee15e862d37a23b3 (diff)
downloadydb-55a54de1a92c64f8fdddfdcf8ac9a85d4177a301.tar.gz
Fix early object deletion in wide combiner
Description: <https://nda.ya.ru/t/_8nms0JC7C4StC> commit_hash:6eb7f4b01d1203ca1f66bf7c86cd78f3cd5a2691
-rw-r--r--yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp6
1 files changed, 4 insertions, 2 deletions
diff --git a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
index e14f3c5082..4e5f467696 100644
--- a/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
+++ b/yql/essentials/minikql/comp_nodes/mkql_wide_combine.cpp
@@ -504,7 +504,6 @@ public:
// while restoration we process buckets one by one starting from the first in a queue
bool isNew = SpilledBuckets.front().InMemoryProcessingState->TasteIt();
Throat = SpilledBuckets.front().InMemoryProcessingState->Throat;
- BufferForUsedInputItems.resize(0);
return isNew ? ETasteResult::Init : ETasteResult::Update;
}
@@ -837,7 +836,7 @@ private:
//process spilled data
if (!bucket.SpilledData->Empty()) {
RecoverState = false;
- BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
+ std::fill(BufferForUsedInputItems.begin(), BufferForUsedInputItems.end(), NUdf::TUnboxedValuePod());
AsyncReadOperation = bucket.SpilledData->ExtractWideItem(BufferForUsedInputItems);
if (AsyncReadOperation) {
return EUpdateResult::Yield;
@@ -886,6 +885,9 @@ private:
YQL_LOG(INFO) << "switching Memory mode to ProcessSpilled";
MKQL_ENSURE(EOperatingMode::Spilling == Mode, "Internal logic error");
MKQL_ENSURE(SpilledBuckets.size() == SpilledBucketCount, "Internal logic error");
+ MKQL_ENSURE(BufferForUsedInputItems.empty(), "Internal logic error");
+
+ BufferForUsedInputItems.resize(UsedInputItemType->GetElementsCount());
std::sort(SpilledBuckets.begin(), SpilledBuckets.end(), [](const TSpilledBucket& lhs, const TSpilledBucket& rhs) {
bool lhs_in_memory = lhs.BucketState == TSpilledBucket::EBucketState::InMemory;