aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-06-30 20:06:27 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-06-30 20:06:27 +0300
commit53d4c76b348320ead25bfe9aa4efdc92d6fbb762 (patch)
treee2c1ead71c956186027cc658196226fe6dd4bf30
parent88efbd698f3668cacafb6b9aec4a6eaa66345a8d (diff)
downloadydb-53d4c76b348320ead25bfe9aa4efdc92d6fbb762.tar.gz
YQL-15035: async ca, continue on yield without data
ref:f8aa7f95d89b63a27b05ff1ec0eb336277620278
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h14
2 files changed, 15 insertions, 0 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 6ba070e1cf..b1d3ee8223 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -407,6 +407,7 @@ private:
}
void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) {
+ ProcessOutputsState.LastPopReturnedNoData = (ev->Get()->Data.size() == 0);
if (ev->Get()->Stats) {
TaskRunnerStats = std::move(ev->Get()->Stats);
}
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 565641b2f5..690ae2f6c9 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -375,6 +375,19 @@ protected:
status = ERunStatus::Finished;
}
+ if (InputChannelsMap.empty() && SourcesMap.empty() && status == ERunStatus::PendingInput && ProcessOutputsState.LastPopReturnedNoData) {
+ // fix for situation when:
+ // a) stage receives data by itself (e.g. it has YtRead inside)
+ // b) last run finished with YIELD status
+ // c) last run returned NO data (=> guaranteed, that peer's free space is not less than before this run)
+ //
+ // n.b. if c) is not satisfied we will also call ContinueExecute on branch
+ // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent"
+ // but idk what is the logic behind this
+ ContinueExecute();
+ return;
+ }
+
if (status != ERunStatus::Finished) {
// If the incoming channel's buffer was full at the moment when last ChannelDataAck event had been sent,
// there will be no attempts to send a new piece of data from the other side of this channel.
@@ -1697,6 +1710,7 @@ protected:
bool DataWasSent = false;
bool AllOutputsFinished = true;
ERunStatus LastRunStatus = ERunStatus::PendingInput;
+ bool LastPopReturnedNoData = false;
};
TProcessOutputsState ProcessOutputsState;