diff options
author | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-01 21:18:18 +0300 |
---|---|---|
committer | Vasily Gerasimov <UgnineSirdis@gmail.com> | 2022-07-01 21:18:18 +0300 |
commit | 606f19201f1b5371e5066492e179af66c002befc (patch) | |
tree | 5109424783a2934c99c20595d798413e541ec74c | |
parent | 1ec57b72c9b73737b49ededffee0f25ad5ebe395 (diff) | |
download | ydb-606f19201f1b5371e5066492e179af66c002befc.tar.gz |
YQ-1206 Fix race sending output channels data when we have disconnected a moment ago
Fix race sending output channels data
ref:38f4aff5156ab3fd815e87b29c96762c5fa658b1
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp | 85 | ||||
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 10 |
2 files changed, 70 insertions, 25 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp index b1d3ee82230..958d5457250 100644 --- a/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp +++ b/ydb/library/yql/dq/actors/compute/dq_async_compute_actor.cpp @@ -277,6 +277,8 @@ private: } void DoExecuteImpl() override { + TrySendAsyncChannelsData(); + PollAsyncInput(); if (ProcessSourcesState.Inflight == 0) { auto req = GetCheckpointRequest(); @@ -345,6 +347,8 @@ private: } void OnRunFinished(NTaskRunnerActor::TEvTaskRunFinished::TPtr& ev, const NActors::TActorContext& ) { + TrySendAsyncChannelsData(); // send from previous cycle + MkqlMemoryLimit = ev->Get()->MkqlMemoryLimit; ProfileStats = std::move(ev->Get()->ProfileStats); auto sourcesState = GetSourcesState(); @@ -407,59 +411,92 @@ private: } void OnPopFinished(NTaskRunnerActor::TEvChannelPopFinished::TPtr& ev, const NActors::TActorContext&) { - ProcessOutputsState.LastPopReturnedNoData = (ev->Get()->Data.size() == 0); if (ev->Get()->Stats) { TaskRunnerStats = std::move(ev->Get()->Stats); } CA_LOG_D("OnPopFinished, stats: " << *TaskRunnerStats.Get()); - auto channelId = ev->Get()->ChannelId; - auto finished = ev->Get()->Finished; - auto dataWasSent = ev->Get()->Changed; - auto it = OutputChannelsMap.find(channelId); + auto it = OutputChannelsMap.find(ev->Get()->ChannelId); Y_VERIFY(it != OutputChannelsMap.end()); - TOutputChannelInfo& outputChannel = it->second; - outputChannel.Finished = finished; - if (finished) { - FinishedOutputChannels.insert(channelId); + Y_VERIFY(!outputChannel.AsyncData); // have finished previous cycle + outputChannel.AsyncData.ConstructInPlace(); + outputChannel.AsyncData->Data = std::move(ev->Get()->Data); + outputChannel.AsyncData->Checkpoint = std::move(ev->Get()->Checkpoint); + outputChannel.AsyncData->Finished = ev->Get()->Finished; + outputChannel.AsyncData->Changed = ev->Get()->Changed; + + if (TrySendAsyncChannelData(outputChannel)) { + CheckRunStatus(); + } + } + + bool TrySendAsyncChannelData(TOutputChannelInfo& outputChannel) { + if (!outputChannel.AsyncData) { + return false; + } + + if (!Channels->CanSendChannelData(outputChannel.ChannelId)) { // When channel will be connected, they will call resume execution. + return false; + } + + auto& asyncData = *outputChannel.AsyncData; + + outputChannel.Finished = asyncData.Finished; + if (asyncData.Finished) { + FinishedOutputChannels.insert(outputChannel.ChannelId); } outputChannel.PopStarted = false; - ProcessOutputsState.Inflight --; + ProcessOutputsState.Inflight--; ProcessOutputsState.HasDataToSend |= !outputChannel.Finished; + ProcessOutputsState.LastPopReturnedNoData = asyncData.Data.empty(); - if (ev->Get()->Checkpoint.Defined()) { + if (asyncData.Checkpoint.Defined()) { ResumeInputs(); } - for (ui32 i = 0; i < ev->Get()->Data.size(); i++) { - auto& chunk = ev->Get()->Data[i]; + for (ui32 i = 0; i < asyncData.Data.size(); i++) { + auto& chunk = asyncData.Data[i]; NDqProto::TChannelData channelData; - channelData.SetChannelId(channelId); + channelData.SetChannelId(outputChannel.ChannelId); // set finished only for last chunk - channelData.SetFinished(finished && i==ev->Get()->Data.size()-1); - if (i==ev->Get()->Data.size()-1 && ev->Get()->Checkpoint.Defined()) { - channelData.MutableCheckpoint()->Swap(&*ev->Get()->Checkpoint); + const bool lastChunk = i == asyncData.Data.size() - 1; + channelData.SetFinished(asyncData.Finished && lastChunk); + if (lastChunk && asyncData.Checkpoint.Defined()) { + channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } channelData.MutableData()->Swap(&chunk); Channels->SendChannelData(std::move(channelData)); } - if (ev->Get()->Data.empty() && dataWasSent) { + if (asyncData.Data.empty() && asyncData.Changed) { NDqProto::TChannelData channelData; - channelData.SetChannelId(channelId); - channelData.SetFinished(finished); - if (ev->Get()->Checkpoint.Defined()) { - channelData.MutableCheckpoint()->Swap(&*ev->Get()->Checkpoint); + channelData.SetChannelId(outputChannel.ChannelId); + channelData.SetFinished(asyncData.Finished); + if (asyncData.Checkpoint.Defined()) { + channelData.MutableCheckpoint()->Swap(&*asyncData.Checkpoint); } Channels->SendChannelData(std::move(channelData)); } - ProcessOutputsState.DataWasSent |= dataWasSent; + ProcessOutputsState.DataWasSent |= asyncData.Changed; ProcessOutputsState.AllOutputsFinished = FinishedOutputChannels.size() == OutputChannelsMap.size() && FinishedSinks.size() == SinksMap.size(); - CheckRunStatus(); + outputChannel.AsyncData = Nothing(); + + return true; + } + + bool TrySendAsyncChannelsData() { + bool result = false; + for (auto& [channelId, outputChannel] : OutputChannelsMap) { + result |= TrySendAsyncChannelData(outputChannel); + } + if (result) { + CheckRunStatus(); + } + return result; } void OnContinueRun(NTaskRunnerActor::TEvContinueRun::TPtr& ev, const NActors::TActorContext& ) { diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index 690ae2f6c95..66e9458c336 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -382,7 +382,7 @@ protected: // c) last run returned NO data (=> guaranteed, that peer's free space is not less than before this run) // // n.b. if c) is not satisfied we will also call ContinueExecute on branch - // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent" + // "status != ERunStatus::Finished -> !pollSent -> ProcessOutputsState.DataWasSent" // but idk what is the logic behind this ContinueExecute(); return; @@ -839,6 +839,14 @@ protected: ui64 NoDstActorId = 0; }; THolder<TStats> Stats; + + struct TAsyncData { // Is used in case of async compute actor + TVector<NDqProto::TData> Data; + TMaybe<NDqProto::TCheckpoint> Checkpoint; + bool Finished = false; + bool Changed = false; + }; + TMaybe<TAsyncData> AsyncData; }; struct TAsyncOutputInfoBase { |