diff options
| author | aidarsamer <[email protected]> | 2023-12-15 11:59:35 +0300 |
|---|---|---|
| committer | aidarsamer <[email protected]> | 2023-12-15 12:56:35 +0300 |
| commit | 98f85629761be979eacf43e22bbed905e3627219 (patch) | |
| tree | 358eece77f87df8fb96a90b9cb426657670f6fef | |
| parent | d1374eadc86bb66d36b0182b13a3b080e2ee4d58 (diff) | |
Refactor OutputChannel reads in task_runner_actor
| -rw-r--r-- | ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp | 103 |
1 files changed, 70 insertions, 33 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp index a8285d95582..afb384be683 100644 --- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp +++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp @@ -33,6 +33,68 @@ TTaskRunnerActorSensors GetSensors(const T& t) { return result; } +struct TOutputChannelReadResult { + bool IsChanged = false; + bool IsFinished = false; + bool HasData = false; + std::list<::NYql::NDqProto::TMetric> Metrics; + TVector<TDqSerializedBatch> DataChunks; +}; + +class TOutputChannelReader { +public: + TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize, bool wasFinished) + : Channel(channel) + , ToPopSize(toPopSize) + , WasFinished(wasFinished) + {} + + TOutputChannelReadResult Read() { + int maxChunks = std::numeric_limits<int>::max(); + bool changed = false; + bool isFinished = false; + i64 remain = ToPopSize; + ui32 dataSize = 0; + bool hasData = true; + TOutputChannelReadResult result; + + if (remain == 0) { + // special case to WorkerActor + remain = 5<<20; + maxChunks = 1; + } + + TVector<TDqSerializedBatch> chunks; + for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { + TDqSerializedBatch data; + const auto lastPop = std::move(Channel->Pop(data)); + + for (auto& metric : lastPop.GetMetric()) { + // *response.AddMetric() = metric; + result.Metrics.push_back(metric); + } + + hasData = lastPop.GetResult(); + dataSize = data.Size(); + isFinished = !hasData && Channel->IsFinished(); + changed = changed || hasData || (isFinished != WasFinished); + + if (hasData) { + result.DataChunks.emplace_back(std::move(data)); + } + } + result.IsFinished = isFinished; + result.IsChanged = changed; + result.HasData = hasData; + return result; + } + +private: + NTaskRunnerProxy::IOutputChannel::TPtr Channel; + i64 ToPopSize; + bool WasFinished; +}; + } // namespace class TTaskRunnerActor @@ -302,39 +364,14 @@ private: try { // auto guard = taskRunner->BindAllocator(); // only for local mode auto channel = taskRunner->GetOutputChannel(channelId); - int maxChunks = std::numeric_limits<int>::max(); - bool changed = false; - bool isFinished = false; - i64 remain = toPop; - ui32 dataSize = 0; - bool hasData = true; - - if (remain == 0) { - // special case to WorkerActor - remain = 5<<20; - maxChunks = 1; - } + TOutputChannelReader reader(channel, toPop, wasFinished); + TOutputChannelReadResult result = reader.Read(); - TVector<TDqSerializedBatch> chunks; NDqProto::TPopResponse response; - for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) { - TDqSerializedBatch data; - const auto lastPop = std::move(channel->Pop(data)); - - for (auto& metric : lastPop.GetMetric()) { - *response.AddMetric() = metric; - } - - hasData = lastPop.GetResult(); - dataSize = data.Size(); - isFinished = !hasData && channel->IsFinished(); - response.SetResult(response.GetResult() || hasData); - changed = changed || hasData || (isFinished != wasFinished); - - if (hasData) { - chunks.emplace_back(std::move(data)); - } + for (auto& metric : result.Metrics) { + *response.AddMetric() = metric; } + response.SetResult(response.GetResult() || result.HasData); actorSystem->Send( new IEventHandle( @@ -342,11 +379,11 @@ private: selfId, new TEvChannelPopFinished( channelId, - std::move(chunks), + std::move(result.DataChunks), Nothing(), Nothing(), - isFinished, - changed, + result.IsFinished, + result.IsChanged, GetSensors(response)), /*flags=*/0, cookie)); |
