aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhor911 <hor911@ydb.tech>2022-09-14 14:21:03 +0300
committerhor911 <hor911@ydb.tech>2022-09-14 14:21:03 +0300
commit211d3af96ecbc839df06d53a1868a66bd6741323 (patch)
tree063e34df1f343d07a343a91d7cd99b82b552b03f
parentd9f598eabeb666bcb34ea08f36d39503dc39a6de (diff)
downloadydb-211d3af96ecbc839df06d53a1868a66bd6741323.tar.gz
Support parallel execution of several precomputes
-rw-r--r--ydb/core/yq/libs/actors/run_actor.cpp72
1 files changed, 46 insertions, 26 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp
index d5b43a8cd7..0546662c69 100644
--- a/ydb/core/yq/libs/actors/run_actor.cpp
+++ b/ydb/core/yq/libs/actors/run_actor.cpp
@@ -268,6 +268,13 @@ private:
bool Compiled = false;
};
+struct TEvaluationGraphInfo {
+ NActors::TActorId ExecuterId;
+ NActors::TActorId ControlId;
+ NActors::TActorId ResultId;
+ NThreading::TPromise<NYql::IDqGateway::TResult> Result;
+};
+
class TRunActor : public NActors::TActorBootstrapped<TRunActor> {
public:
explicit TRunActor(
@@ -410,6 +417,13 @@ private:
Send(RateLimiterResourceCreatorId, new NActors::TEvents::TEvPoison());
}
+ if (!EvalInfos.empty()) {
+ for (auto& pair : EvalInfos) {
+ auto& info = pair.second;
+ Send(info.ControlId, new NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, YandexQuery::QueryMeta::ComputeStatus_Name(FinalQueryStatus)));
+ }
+ }
+
if (ControlId) {
LOG_D("Cancel running query");
Send(ControlId, new NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, YandexQuery::QueryMeta::ComputeStatus_Name(FinalQueryStatus)));
@@ -709,10 +723,9 @@ private:
LOG_D("Graph (" << (ev->Get()->IsEvaluation ? "evaluation" : "execution") << ") with tasks: " << ev->Get()->GraphParams.TasksSize());
if (ev->Get()->IsEvaluation) {
- Y_ASSERT(!EvaluationInProgress);
- EvaluationInProgress = true;
- EvaluationResult = ev->Get()->Result;
- RunEvalDqGraph(ev->Get()->GraphParams);
+ auto info = RunEvalDqGraph(ev->Get()->GraphParams);
+ info.Result = ev->Get()->Result;
+ EvalInfos.emplace(info.ExecuterId, info);
} else {
DqGraphParams.push_back(ev->Get()->GraphParams);
@@ -975,8 +988,8 @@ private:
}
void Handle(NYql::NDqs::TEvQueryResponse::TPtr& ev) {
-
- if (EvaluationInProgress) {
+ auto it = EvalInfos.find(ev->Sender);
+ if (it != EvalInfos.end()) {
IDqGateway::TResult QueryResult;
@@ -1003,9 +1016,9 @@ private:
QueryResult.AddIssues(issues);
QueryResult.Truncated = result.GetTruncated();
QueryResult.RowsCount = result.GetRowsCount();
- EvaluationResult.SetValue(QueryResult);
+ it->second.Result.SetValue(QueryResult);
+ EvalInfos.erase(it);
- EvaluationInProgress = false;
return;
}
@@ -1184,7 +1197,7 @@ private:
RunNextDqGraph();
}
- void RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) {
+ TEvaluationGraphInfo RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) {
LOG_D("RunEvalDqGraph");
@@ -1193,9 +1206,10 @@ private:
dqConfiguration->FreezeDefaults();
dqConfiguration->FallbackPolicy = "never";
- ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
+ TEvaluationGraphInfo info;
+
+ info.ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false));
- NActors::TActorId resultId;
if (dqGraphParams.GetResultType()) {
TVector<TString> columns;
for (const auto& column : dqGraphParams.GetColumns()) {
@@ -1204,17 +1218,17 @@ private:
NActors::TActorId empty = {};
THashMap<TString, TString> emptySecureParams; // NOT USED in RR
- resultId = NActors::TActivationContext::Register(
+ info.ResultId = NActors::TActivationContext::Register(
MakeResultReceiver(
- columns, ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams,
+ columns, info.ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams,
dqGraphParams.GetResultType(), empty, false).Release());
} else {
LOG_D("ResultReceiver was NOT CREATED since ResultType is empty");
- resultId = ExecuterId;
+ info.ResultId = info.ExecuterId;
}
- ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
+ info.ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release());
Yql::DqsProto::ExecuteGraphRequest request;
request.SetSourceId(dqGraphParams.GetSourceId());
@@ -1224,8 +1238,9 @@ private:
*request.MutableSecureParams() = dqGraphParams.GetSecureParams();
*request.MutableColumns() = dqGraphParams.GetColumns();
NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram());
- NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId)));
- LOG_D("Evaluation Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId);
+ NActors::TActivationContext::Send(new IEventHandle(info.ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId, TActorId{})));
+ LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId);
+ return info;
}
void RunNextDqGraph() {
@@ -1752,13 +1767,19 @@ private:
html << "<th>Checkpoint Coord</th>";
html << "</tr></thead><tbody>";
html << "<tr>";
- html << "<td>";
- if (EvaluationInProgress) html << "EVAL";
- else html << DqGraphIndex << " of " << DqGraphParams.size();
- html << "</td>";
- html << "<td>" << ExecuterId << "</td>";
- html << "<td>" << ControlId << "</td>";
- html << "<td>" << CheckpointCoordinatorId << "</td>";
+ for (auto& pair : EvalInfos) {
+ html << "<td>" << "Evaluation" << "</td>";
+ auto& info = pair.second;
+ html << "<td> Executer" << info.ExecuterId << "</td>";
+ html << "<td> Control" << info.ControlId << "</td>";
+ html << "<td> Result" << info.ResultId << "</td>";
+ }
+ if (!DqGraphParams.empty()) {
+ html << "<td>" << DqGraphIndex << " of " << DqGraphParams.size() << "</td>";
+ html << "<td> Executer" << ExecuterId << "</td>";
+ html << "<td> Control" << ControlId << "</td>";
+ html << "<td> Coordinator" << CheckpointCoordinatorId << "</td>";
+ }
html << "</tr>";
html << "</tbody></table>";
}
@@ -1919,8 +1940,7 @@ private:
TActorId Pinger;
TInstant CreatedAt;
YandexQuery::QueryAction Action = YandexQuery::QueryAction::QUERY_ACTION_UNSPECIFIED;
- bool EvaluationInProgress = false;
- NThreading::TPromise<NYql::IDqGateway::TResult> EvaluationResult;
+ TMap<NActors::TActorId, TEvaluationGraphInfo> EvalInfos;
std::vector<NYq::NProto::TGraphParams> DqGraphParams;
std::vector<i32> DqGrapResultIndices;
i32 DqGraphIndex = 0;