summaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authoraidarsamer <[email protected]>2023-12-15 11:59:35 +0300
committeraidarsamer <[email protected]>2023-12-15 12:56:35 +0300
commit98f85629761be979eacf43e22bbed905e3627219 (patch)
tree358eece77f87df8fb96a90b9cb426657670f6fef
parentd1374eadc86bb66d36b0182b13a3b080e2ee4d58 (diff)
Refactor OutputChannel reads in task_runner_actor
-rw-r--r--ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp103
1 files changed, 70 insertions, 33 deletions
diff --git a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
index a8285d95582..afb384be683 100644
--- a/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
+++ b/ydb/library/yql/providers/dq/task_runner_actor/task_runner_actor.cpp
@@ -33,6 +33,68 @@ TTaskRunnerActorSensors GetSensors(const T& t) {
return result;
}
+struct TOutputChannelReadResult {
+ bool IsChanged = false;
+ bool IsFinished = false;
+ bool HasData = false;
+ std::list<::NYql::NDqProto::TMetric> Metrics;
+ TVector<TDqSerializedBatch> DataChunks;
+};
+
+class TOutputChannelReader {
+public:
+ TOutputChannelReader(NTaskRunnerProxy::IOutputChannel::TPtr channel, i64 toPopSize, bool wasFinished)
+ : Channel(channel)
+ , ToPopSize(toPopSize)
+ , WasFinished(wasFinished)
+ {}
+
+ TOutputChannelReadResult Read() {
+ int maxChunks = std::numeric_limits<int>::max();
+ bool changed = false;
+ bool isFinished = false;
+ i64 remain = ToPopSize;
+ ui32 dataSize = 0;
+ bool hasData = true;
+ TOutputChannelReadResult result;
+
+ if (remain == 0) {
+ // special case to WorkerActor
+ remain = 5<<20;
+ maxChunks = 1;
+ }
+
+ TVector<TDqSerializedBatch> chunks;
+ for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
+ TDqSerializedBatch data;
+ const auto lastPop = std::move(Channel->Pop(data));
+
+ for (auto& metric : lastPop.GetMetric()) {
+ // *response.AddMetric() = metric;
+ result.Metrics.push_back(metric);
+ }
+
+ hasData = lastPop.GetResult();
+ dataSize = data.Size();
+ isFinished = !hasData && Channel->IsFinished();
+ changed = changed || hasData || (isFinished != WasFinished);
+
+ if (hasData) {
+ result.DataChunks.emplace_back(std::move(data));
+ }
+ }
+ result.IsFinished = isFinished;
+ result.IsChanged = changed;
+ result.HasData = hasData;
+ return result;
+ }
+
+private:
+ NTaskRunnerProxy::IOutputChannel::TPtr Channel;
+ i64 ToPopSize;
+ bool WasFinished;
+};
+
} // namespace
class TTaskRunnerActor
@@ -302,39 +364,14 @@ private:
try {
// auto guard = taskRunner->BindAllocator(); // only for local mode
auto channel = taskRunner->GetOutputChannel(channelId);
- int maxChunks = std::numeric_limits<int>::max();
- bool changed = false;
- bool isFinished = false;
- i64 remain = toPop;
- ui32 dataSize = 0;
- bool hasData = true;
-
- if (remain == 0) {
- // special case to WorkerActor
- remain = 5<<20;
- maxChunks = 1;
- }
+ TOutputChannelReader reader(channel, toPop, wasFinished);
+ TOutputChannelReadResult result = reader.Read();
- TVector<TDqSerializedBatch> chunks;
NDqProto::TPopResponse response;
- for (;maxChunks && remain > 0 && !isFinished && hasData; maxChunks--, remain -= dataSize) {
- TDqSerializedBatch data;
- const auto lastPop = std::move(channel->Pop(data));
-
- for (auto& metric : lastPop.GetMetric()) {
- *response.AddMetric() = metric;
- }
-
- hasData = lastPop.GetResult();
- dataSize = data.Size();
- isFinished = !hasData && channel->IsFinished();
- response.SetResult(response.GetResult() || hasData);
- changed = changed || hasData || (isFinished != wasFinished);
-
- if (hasData) {
- chunks.emplace_back(std::move(data));
- }
+ for (auto& metric : result.Metrics) {
+ *response.AddMetric() = metric;
}
+ response.SetResult(response.GetResult() || result.HasData);
actorSystem->Send(
new IEventHandle(
@@ -342,11 +379,11 @@ private:
selfId,
new TEvChannelPopFinished(
channelId,
- std::move(chunks),
+ std::move(result.DataChunks),
Nothing(),
Nothing(),
- isFinished,
- changed,
+ result.IsFinished,
+ result.IsChanged,
GetSensors(response)),
/*flags=*/0,
cookie));