aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraozeritsky <aozeritsky@yandex-team.ru>2022-02-08 12:08:38 +0300
committeraozeritsky <aozeritsky@yandex-team.ru>2022-02-08 12:08:38 +0300
commit2475432dcbb88486205a48c162a576361196f8be (patch)
treead4f250a9294bc61e8b73ec544a58be150d9cf14
parentd52cc2d3fa647ac37833ec9952b58acd9ac04ab6 (diff)
downloadydb-2475432dcbb88486205a48c162a576361196f8be.tar.gz
DQ-21: The final preparation for the inheritance
ref:6412de306c2d922a2021060c41c8166281a6e3cf
-rw-r--r--ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h20
1 files changed, 10 insertions, 10 deletions
diff --git a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
index ccf94e6996e..deaadefb42c 100644
--- a/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
+++ b/ydb/library/yql/dq/actors/compute/dq_compute_actor_impl.h
@@ -254,7 +254,7 @@ protected:
ReportStats(now);
}
- void DoExecuteImpl() {
+ virtual void DoExecuteImpl() {
auto sourcesState = GetSourcesState();
PollSourceActors();
@@ -331,7 +331,6 @@ protected:
void CheckRunStatus() {
if (ProcessOutputsState.Inflight != 0) {
- Y_VERIFY(false);
return;
}
@@ -350,7 +349,7 @@ protected:
// and sends us a new batch of data.
bool pollSent = false;
for (auto& [channelId, inputChannel] : InputChannelsMap) {
- pollSent |= Channels->PollChannel(channelId, inputChannel.Channel->GetFreeSpace());
+ pollSent |= Channels->PollChannel(channelId, GetInputChannelFreeSpace(channelId));
}
if (!pollSent) {
if (ProcessOutputsState.DataWasSent) {
@@ -572,7 +571,7 @@ public:
Checkpoints->OnSinkStateSaved(std::move(state), outputIndex, checkpoint);
}
-private:
+protected:
bool ReadyToCheckpoint() const override {
for (auto& [id, channelInfo] : InputChannelsMap) {
if (channelInfo.CheckpointingMode == NDqProto::CHECKPOINTING_MODE_DISABLED) {
@@ -700,6 +699,7 @@ protected:
};
struct TSourceInfo {
+ ui64 Index;
IDqSource::TPtr Source;
IDqSourceActor* SourceActor = nullptr;
NActors::IActor* Actor = nullptr;
@@ -708,7 +708,7 @@ protected:
i64 FreeSpace = 1;
bool PushStarted = false;
- TSourceInfo() : IssuesBuffer(IssuesBufferSize) {}
+ TSourceInfo(ui64 index) : Index(index), IssuesBuffer(IssuesBufferSize) {}
};
struct TOutputChannelInfo {
@@ -936,13 +936,14 @@ protected:
}
}
-private:
ui32 AllowedChannelsOvercommit() const {
const auto& fc = GetDqExecutionSettings().FlowControl;
const ui32 allowedOvercommit = (fc.InFlightBytesOvercommit - 1.f) * MemoryLimits.ChannelBufferSize;
return allowedOvercommit;
}
+private:
+
virtual void DrainOutputChannel(TOutputChannelInfo& outputChannel, const TDqComputeActorChannels::TPeerState& peerState) {
YQL_ENSURE(!outputChannel.Finished || Checkpoints);
@@ -1164,8 +1165,7 @@ protected:
}
}
for (auto& [inputIndex, source] : SourcesMap) {
- if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); }
- Y_VERIFY(source.Source);
+ if (TaskRunner) { source.Source = TaskRunner->GetSource(inputIndex); Y_VERIFY(source.Source);}
Y_VERIFY(SourceActorFactory);
const auto& inputDesc = Task.GetInputs(inputIndex);
const ui64 i = inputIndex; // Crutch for clang
@@ -1215,7 +1215,7 @@ protected:
}
for (auto& [inputIndex, source] : SourcesMap) {
- Y_VERIFY(source.Source);
+ Y_VERIFY(!TaskRunner || source.Source);
if (source.Finished) {
const ui64 indexForLogging = inputIndex; // Crutch for clang
CA_LOG_D("Skip polling source[" << indexForLogging << "]: finished");
@@ -1277,7 +1277,7 @@ private:
const auto& inputDesc = Task.GetInputs(i);
Y_VERIFY(!inputDesc.HasSource() || inputDesc.ChannelsSize() == 0); // HasSource => no channels
if (inputDesc.HasSource()) {
- auto result = SourcesMap.emplace(i, TSourceInfo());
+ auto result = SourcesMap.emplace(i, TSourceInfo(i));
YQL_ENSURE(result.second);
} else {
for (auto& channel : inputDesc.GetChannels()) {