diff options
author | whcrc <whcrc@yandex-team.ru> | 2022-06-30 20:06:27 +0300 |
---|---|---|
committer | whcrc <whcrc@yandex-team.ru> | 2022-06-30 20:06:27 +0300 |
commit | 53d4c76b348320ead25bfe9aa4efdc92d6fbb762 (patch) | |
tree | e2c1ead71c956186027cc658196226fe6dd4bf30 | |
parent | 88efbd698f3668cacafb6b9aec4a6eaa66345a8d (diff) | |
download | ydb-53d4c76b348320ead25bfe9aa4efdc92d6fbb762.tar.gz |
YQL-15035: async ca, continue on yield without data
ref:f8aa7f95d89b63a27b05ff1ec0eb336277620278
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp | 1 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 14 |
2 files changed, 15 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index 6ba070e1cf..b1d3ee8223 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -407,6 +407,7 @@ private: } void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) { + ProcessOutputsState.LastPopReturnedNoData = (ev->Get()->Data.size() == 0); if (ev->Get()->Stats) { TaskRunnerStats = std::move(ev->Get()->Stats); } diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 565641b2f5..690ae2f6c9 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -375,6 +375,19 @@ protected: status = ERunStatus::Finished; } + if (InputChannelsMap.empty() && SourcesMap.empty() && status == ERunStatus::PendingInput && ProcessOutputsState.LastPopReturnedNoData) { + // fix for situation when: + // a) stage receives data by itself (e.g. it has YtRead inside) + // b) last run finished with YIELD status + // c) last run returned NO data (=> guaranteed, that peer's free space is not less than before this run) + // + // n.b. if c) is not satisfied we will also call ContinueExecute on branch + // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent" + // but idk what is the logic behind this + ContinueExecute(); + return; + } + if (status != ERunStatus::Finished) { // If the incoming channel's buffer was full at the moment when last ChannelDataAck event had been sent, // there will be no attempts to send a new piece of data from the other side of this channel. @@ -1697,6 +1710,7 @@ protected: bool DataWasSent = false; bool AllOutputsFinished = true; ERunStatus LastRunStatus = ERunStatus::PendingInput; + bool LastPopReturnedNoData = false; }; TProcessOutputsState ProcessOutputsState; |