aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authora-romanov <a-romanov@yandex-team.ru>2022-04-07 00:14:02 +0300
committera-romanov <a-romanov@yandex-team.ru>2022-04-07 00:14:02 +0300
commit8fab6620ff546037e93ecd3cc90b42f8869007d0 (patch)
tree82e5fad16cdcf1a5baf024e36005a6698cb422fe
parent2069aaad506bdf89c42201d590a1a8168093297c (diff)
downloadydb-8fab6620ff546037e93ecd3cc90b42f8869007d0.tar.gz
Fix PreserveStream fetch after finish.
ref:0346676be9d27a839db73ed2a3860d147649bb2f
-rw-r--r--ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp16
1 files changed, 11 insertions, 5 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp
index 49cc897f0e2..082dea0b660 100644
--- a/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp
+++ b/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp
@@ -319,12 +319,16 @@ public:
private:
NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) override {
- NUdf::TUnboxedValue item;
- if (State != EPreserveState::Feed) {
- Buffer.PopFront();
- --Outpace;
+ switch (State) {
+ case EPreserveState::Done:
+ return NUdf::EFetchStatus::Finish;
+ case EPreserveState::Feed:
+ break;
+ default:
+ Buffer.PopFront();
+ --Outpace;
}
- while (State != EPreserveState::Emit && Outpace <= OutpaceGoal) {
+ for (NUdf::TUnboxedValue item; State != EPreserveState::Emit && Outpace <= OutpaceGoal;) {
switch (Stream.Fetch(item)) {
case NUdf::EFetchStatus::Yield:
return NUdf::EFetchStatus::Yield;
@@ -344,6 +348,7 @@ private:
}
if (!Outpace) {
Buffer.Clean();
+ State = EPreserveState::Done;
return NUdf::EFetchStatus::Finish;
}
value = Buffer.Get(FrontIndex);
@@ -354,6 +359,7 @@ private:
Feed,
GoOn,
Emit,
+ Done
};
const NUdf::TUnboxedValue Stream;
const NUdf::TUnboxedValue Queue;