aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-01 21:18:18 +0300
committerVasily Gerasimov <UgnineSirdis@gmail.com>2022-07-01 21:18:18 +0300
commit606f19201f1b5371e5066492e179af66c002befc (patch)
tree5109424783a2934c99c20595d798413e541ec74c
parent1ec57b72c9b73737b49ededffee0f25ad5ebe395 (diff)
downloadydb-606f19201f1b5371e5066492e179af66c002befc.tar.gz
YQ-1206 Fix race sending output channels data when we have disconnected a moment ago
Fix race sending output channels data ref:38f4aff5156ab3fd815e87b29c96762c5fa658b1
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp85
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h10
2 files changed, 70 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
index b1d3ee82230..958d5457250 100644
--- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
+++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp
@@ -277,6 +277,8 @@ private:
}
void DoExecuteImpl() override {
+ TrySendAsyncChannelsData();
+
PollAsyncInput();
if (ProcessSourcesState.Inflight == 0) {
auto req = GetCheckpointRequest();
@@ -345,6 +347,8 @@ private:
}
void OnRunFinished(NTaskRunnerActor::TEvTaskRunFinished::TPtr& ev, const NActors::TActorContext& ) {
+ TrySendAsyncChannelsData(); // send from previous cycle
+
MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit;
ProfileStats = std::move(ev->Get()->ProfileStats);
auto sourcesState = GetSourcesState();
@@ -407,59 +411,92 @@ private:
}
void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) {
- ProcessOutputsState.LastPopReturnedNoData = (ev->Get()->Data.size() == 0);
if (ev->Get()->Stats) {
TaskRunnerStats = std::move(ev->Get()->Stats);
}
CA_LOG_D("OnPopFinished, stats: " << *TaskRunnerStats.Get());
- auto channelId = ev->Get()->ChannelId;
- auto finished = ev->Get()->Finished;
- auto dataWasSent = ev->Get()->Changed;
- auto it = OutputChannelsMap.find(channelId);
+ auto it = OutputChannelsMap.find(ev->Get()->ChannelId);
Y_VERIFY(it != OutputChannelsMap.end());
-
TOutputChannelInfo& outputChannel = it->second;
- outputChannel.Finished = finished;
- if (finished) {
- FinishedOutputChannels.insert(channelId);
+ Y_VERIFY(!outputChannel.AsyncData); // have finished previous cycle
+ outputChannel.AsyncData.ConstructInPlace();
+ outputChannel.AsyncData->Data = std::move(ev->Get()->Data);
+ outputChannel.AsyncData->Checkpoint = std::move(ev->Get()->Checkpoint);
+ outputChannel.AsyncData->Finished = ev->Get()->Finished;
+ outputChannel.AsyncData->Changed = ev->Get()->Changed;
+
+ if (TrySendAsyncChannelData(outputChannel)) {
+ CheckRunStatus();
+ }
+ }
+
+ bool TrySendAsyncChannelData(TOutputChannelInfo& outputChannel) {
+ if (!outputChannel.AsyncData) {
+ return false;
+ }
+
+ if (!Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution.
+ return false;
+ }
+
+ auto& asyncData = *outputChannel.AsyncData;
+
+ outputChannel.Finished = asyncData.Finished;
+ if (asyncData.Finished) {
+ FinishedOutputChannels.insert(outputChannel.ChannelId);
}
outputChannel.PopStarted = false;
- ProcessOutputsState.Inflight --;
+ ProcessOutputsState.Inflight--;
ProcessOutputsState.HasDataToSend |= !outputChannel.Finished;
+ ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty();
- if (ev->Get()->Checkpoint.Defined()) {
+ if (asyncData.Checkpoint.Defined()) {
ResumeInputs();
}
- for (ui32 i = 0; i < ev->Get()->Data.size(); i++) {
- auto& chunk = ev->Get()->Data[i];
+ for (ui32 i = 0; i < asyncData.Data.size(); i++) {
+ auto& chunk = asyncData.Data[i];
NDqProto::TChannelData channelData;
- channelData.SetChannelId(channelId);
+ channelData.SetChannelId(outputChannel.ChannelId);
// set finished only for last chunk
- channelData.SetFinished(finished && i==ev->Get()->Data.size()-1);
- if (i==ev->Get()->Data.size()-1 && ev->Get()->Checkpoint.Defined()) {
- channelData.MutableCheckpoint()->Swap(&*ev->Get()->Checkpoint);
+ const bool lastChunk = i == asyncData.Data.size() - 1;
+ channelData.SetFinished(asyncData.Finished && lastChunk);
+ if (lastChunk && asyncData.Checkpoint.Defined()) {
+ channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
channelData.MutableData()->Swap(&chunk);
Channels->SendChannelData(std::move(channelData));
}
- if (ev->Get()->Data.empty() && dataWasSent) {
+ if (asyncData.Data.empty() && asyncData.Changed) {
NDqProto::TChannelData channelData;
- channelData.SetChannelId(channelId);
- channelData.SetFinished(finished);
- if (ev->Get()->Checkpoint.Defined()) {
- channelData.MutableCheckpoint()->Swap(&*ev->Get()->Checkpoint);
+ channelData.SetChannelId(outputChannel.ChannelId);
+ channelData.SetFinished(asyncData.Finished);
+ if (asyncData.Checkpoint.Defined()) {
+ channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint);
}
Channels->SendChannelData(std::move(channelData));
}
- ProcessOutputsState.DataWasSent |= dataWasSent;
+ ProcessOutputsState.DataWasSent |= asyncData.Changed;
ProcessOutputsState.AllOutputsFinished =
FinishedOutputChannels.size() == OutputChannelsMap.size() &&
FinishedSinks.size() == SinksMap.size();
- CheckRunStatus();
+ outputChannel.AsyncData = Nothing();
+
+ return true;
+ }
+
+ bool TrySendAsyncChannelsData() {
+ bool result = false;
+ for (auto& [channelId, outputChannel] : OutputChannelsMap) {
+ result |= TrySendAsyncChannelData(outputChannel);
+ }
+ if (result) {
+ CheckRunStatus();
+ }
+ return result;
}
void OnContinueRun(NTaskRunnerActor::TEvContinueRun::TPtr& ev, const NActors::TActorContext& ) {
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index 690ae2f6c95..66e9458c336 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -382,7 +382,7 @@ protected:
// c) last run returned NO data (=> guaranteed, that peer's free space is not less than before this run)
//
// n.b. if c) is not satisfied we will also call ContinueExecute on branch
- // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent"
+ // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent"
// but idk what is the logic behind this
ContinueExecute();
return;
@@ -839,6 +839,14 @@ protected:
ui64 NoDstActorId = 0;
};
THolder<TStats> Stats;
+
+ struct TAsyncData { // Is used in case of async compute actor
+ TVector<NDqProto::TData> Data;
+ TMaybe<NDqProto::TCheckpoint> Checkpoint;
+ bool Finished = false;
+ bool Changed = false;
+ };
+ TMaybe<TAsyncData> AsyncData;
};
struct TAsyncOutputInfoBase {