diff options
author | a-romanov <a-romanov@yandex-team.ru> | 2022-04-07 00:14:02 +0300 |
---|---|---|
committer | a-romanov <a-romanov@yandex-team.ru> | 2022-04-07 00:14:02 +0300 |
commit | 8fab6620ff546037e93ecd3cc90b42f8869007d0 (patch) | |
tree | 82e5fad16cdcf1a5baf024e36005a6698cb422fe | |
parent | 2069aaad506bdf89c42201d590a1a8168093297c (diff) | |
download | ydb-8fab6620ff546037e93ecd3cc90b42f8869007d0.tar.gz |
Fix PreserveStream fetch after finish.
ref:0346676be9d27a839db73ed2a3860d147649bb2f
-rw-r--r-- | ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp | 16 |
1 files changed, 11 insertions, 5 deletions
diff --git a/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp b/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp index 49cc897f0e2..082dea0b660 100644 --- a/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp +++ b/ydb/library/yql/minikql/comp_nodes/mkql_queue.cpp @@ -319,12 +319,16 @@ public: private: NUdf::EFetchStatus Fetch(NUdf::TUnboxedValue& value) override { - NUdf::TUnboxedValue item; - if (State != EPreserveState::Feed) { - Buffer.PopFront(); - --Outpace; + switch (State) { + case EPreserveState::Done: + return NUdf::EFetchStatus::Finish; + case EPreserveState::Feed: + break; + default: + Buffer.PopFront(); + --Outpace; } - while (State != EPreserveState::Emit && Outpace <= OutpaceGoal) { + for (NUdf::TUnboxedValue item; State != EPreserveState::Emit && Outpace <= OutpaceGoal;) { switch (Stream.Fetch(item)) { case NUdf::EFetchStatus::Yield: return NUdf::EFetchStatus::Yield; @@ -344,6 +348,7 @@ private: } if (!Outpace) { Buffer.Clean(); + State = EPreserveState::Done; return NUdf::EFetchStatus::Finish; } value = Buffer.Get(FrontIndex); @@ -354,6 +359,7 @@ private: Feed, GoOn, Emit, + Done }; const NUdf::TUnboxedValue Stream; const NUdf::TUnboxedValue Queue; |