aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorwhcrc <whcrc@yandex-team.ru>2022-02-25 14:02:55 +0300
committerwhcrc <whcrc@yandex-team.ru>2022-02-25 14:02:55 +0300
commitd620560edb76b61bf20a577af99d2780608aa28d (patch)
treede8d277934905d3155b2a3c1b65e6ac576662a2a
parentecbf4c9a9a4d3a5ca9edef27a7f5b9f7b3a659ef (diff)
downloadydb-d620560edb76b61bf20a577af99d2780608aa28d.tar.gz
YQL-14355: fix async CA bug with premature finish on full channel inflight
ref:8e7aac2b14eee6c11beaa0c70bcd5b28f881d719
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp5
-rw-r--r--ydb/library/yql/dq/actors/task_runner/events.h3
2 files changed, 7 insertions, 1 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index 6f5f46abe66..72346aaff11 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -113,7 +113,10 @@ private:
if (Y_UNLIKELY(outputChannel.Stats)) {
outputChannel.Stats->BlockedByCapacity++;
}
- this->Send(this->SelfId(), new NTaskRunnerActor::TEvChannelPopFinished(channelId));
+ CA_LOG_D("Cannot drain channel cause it blocked by capacity, channelId: " << channelId);
+ auto ev = MakeHolder<NTaskRunnerActor::TEvChannelPopFinished>(channelId);
+ Y_VERIFY(!ev->Finished);
+ this->Send(this->SelfId(), std::move(ev)); // try again, ev.Finished == false
return;
}
diff --git a/ydb/library/yql/dq/actors/task_runner/events.h b/ydb/library/yql/dq/actors/task_runner/events.h
index ae5894cddd5..2acd46de5d2 100644
--- a/ydb/library/yql/dq/actors/task_runner/events.h
+++ b/ydb/library/yql/dq/actors/task_runner/events.h
@@ -210,6 +210,9 @@ struct TEvChannelPopFinished
TEvChannelPopFinished() = default;
TEvChannelPopFinished(ui32 channelId)
: ChannelId(channelId)
+ , Data()
+ , Finished(false)
+ , Changed(false)
{ }
TEvChannelPopFinished(ui32 channelId, TVector<NDqProto::TData>&& data, bool finished, bool changed, const TTaskRunnerActorSensors& sensors = {})
: Sensors(sensors)