diff options
author | aozeritsky <aozeritsky@yandex-team.ru> | 2022-02-08 12:08:38 +0300 |
---|---|---|
committer | aozeritsky <aozeritsky@yandex-team.ru> | 2022-02-08 12:08:38 +0300 |
commit | 2475432dcbb88486205a48c162a576361196f8be (patch) | |
tree | ad4f250a9294bc61e8b73ec544a58be150d9cf14 | |
parent | d52cc2d3fa647ac37833ec9952b58acd9ac04ab6 (diff) | |
download | ydb-2475432dcbb88486205a48c162a576361196f8be.tar.gz |
DQ-21: The final preparation for the inheritance
ref:6412de306c2d922a2021060c41c8166281a6e3cf
-rw-r--r-- | ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h | 20 |
1 files changed, 10 insertions, 10 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h index ccf94e6996e..deaadefb42c 100644 --- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h +++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h @@ -254,7 +254,7 @@ protected: ReportStats(now); } - void DoExecuteImpl() { + virtual void DoExecuteImpl() { auto sourcesState = GetSourcesState(); PollSourceActors(); @@ -331,7 +331,6 @@ protected: void CheckRunStatus() { if (ProcessOutputsState.Inflight != 0) { - Y_VERIFY(false); return; } @@ -350,7 +349,7 @@ protected: // and sends us a new batch of data. bool pollSent = false; for (auto& [channelId, inputChannel] : InputChannelsMap) { - pollSent |= Channels->PollChannel(channelId, inputChannel.Channel->GetFreeSpace()); + pollSent |= Channels->PollChannel(channelId, GetInputChannelFreeSpace(channelId)); } if (!pollSent) { if (ProcessOutputsState.DataWasSent) { @@ -572,7 +571,7 @@ public: Checkpoints->OnSinkStateSaved(std::move(state), outputIndex, checkpoint); } -private: +protected: bool ReadyToCheckpoint() const override { for (auto& [id, channelInfo] : InputChannelsMap) { if (channelInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) { @@ -700,6 +699,7 @@ protected: }; struct TSourceInfo { + ui64 Index; IDqSource::TPtr Source; IDqSourceActor* SourceActor = nullptr; NActors::IActor* Actor = nullptr; @@ -708,7 +708,7 @@ protected: i64 FreeSpace = 1; bool PushStarted = false; - TSourceInfo() : IssuesBuffer(IssuesBufferSize) {} + TSourceInfo(ui64 index) : Index(index), IssuesBuffer(IssuesBufferSize) {} }; struct TOutputChannelInfo { @@ -936,13 +936,14 @@ protected: } } -private: ui32 AllowedChannelsOvercommit() const { const auto& fc = GetDqExecutionSettings().FlowControl; const ui32 allowedOvercommit = (fc.InFlightBytesOvercommit - 1.f) * MemoryLimits.ChannelBufferSize; return allowedOvercommit; } +private: + virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) { YQL_ENSURE(!outputChannel.Finished || Checkpoints); @@ -1164,8 +1165,7 @@ protected: } } for (auto& [inputIndex, source] : SourcesMap) { - if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); } - Y_VERIFY(source.Source); + if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Source);} Y_VERIFY(SourceActorFactory); const auto& inputDesc = Task.GetInputs(inputIndex); const ui64 i = inputIndex; // Crutch for clang @@ -1215,7 +1215,7 @@ protected: } for (auto& [inputIndex, source] : SourcesMap) { - Y_VERIFY(source.Source); + Y_VERIFY(!TaskRunner || source.Source); if (source.Finished) { const ui64 indexForLogging = inputIndex; // Crutch for clang CA_LOG_D("Skip polling source[" << indexForLogging << "]: finished"); @@ -1277,7 +1277,7 @@ private: const auto& inputDesc = Task.GetInputs(i); Y_VERIFY(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels if (inputDesc.HasSource()) { - auto result = SourcesMap.emplace(i, TSourceInfo()); + auto result = SourcesMap.emplace(i, TSourceInfo(i)); YQL_ENSURE(result.second); } else { for (auto& channel : inputDesc.GetChannels()) { |