diff options
author | hor911 <hor911@ydb.tech> | 2022-09-14 14:21:03 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-09-14 14:21:03 +0300 |
commit | 211d3af96ecbc839df06d53a1868a66bd6741323 (patch) | |
tree | 063e34df1f343d07a343a91d7cd99b82b552b03f | |
parent | d9f598eabeb666bcb34ea08f36d39503dc39a6de (diff) | |
download | ydb-211d3af96ecbc839df06d53a1868a66bd6741323.tar.gz |
Support parallel execution of several precomputes
-rw-r--r-- | ydb/core/yq/libs/actors/run_actor.cpp | 72 |
1 files changed, 46 insertions, 26 deletions
diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index d5b43a8cd7..0546662c69 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -268,6 +268,13 @@ private: bool Compiled = false; }; +struct TEvaluationGraphInfo { + NActors::TActorId ExecuterId; + NActors::TActorId ControlId; + NActors::TActorId ResultId; + NThreading::TPromise<NYql::IDqGateway::TResult> Result; +}; + class TRunActor : public NActors::TActorBootstrapped<TRunActor> { public: explicit TRunActor( @@ -410,6 +417,13 @@ private: Send(RateLimiterResourceCreatorId, new NActors::TEvents::TEvPoison()); } + if (!EvalInfos.empty()) { + for (auto& pair : EvalInfos) { + auto& info = pair.second; + Send(info.ControlId, new NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, YandexQuery::QueryMeta::ComputeStatus_Name(FinalQueryStatus))); + } + } + if (ControlId) { LOG_D("Cancel running query"); Send(ControlId, new NDq::TEvDq::TEvAbortExecution(NYql::NDqProto::StatusIds::ABORTED, YandexQuery::QueryMeta::ComputeStatus_Name(FinalQueryStatus))); @@ -709,10 +723,9 @@ private: LOG_D("Graph (" << (ev->Get()->IsEvaluation ? "evaluation" : "execution") << ") with tasks: " << ev->Get()->GraphParams.TasksSize()); if (ev->Get()->IsEvaluation) { - Y_ASSERT(!EvaluationInProgress); - EvaluationInProgress = true; - EvaluationResult = ev->Get()->Result; - RunEvalDqGraph(ev->Get()->GraphParams); + auto info = RunEvalDqGraph(ev->Get()->GraphParams); + info.Result = ev->Get()->Result; + EvalInfos.emplace(info.ExecuterId, info); } else { DqGraphParams.push_back(ev->Get()->GraphParams); @@ -975,8 +988,8 @@ private: } void Handle(NYql::NDqs::TEvQueryResponse::TPtr& ev) { - - if (EvaluationInProgress) { + auto it = EvalInfos.find(ev->Sender); + if (it != EvalInfos.end()) { IDqGateway::TResult QueryResult; @@ -1003,9 +1016,9 @@ private: QueryResult.AddIssues(issues); QueryResult.Truncated = result.GetTruncated(); QueryResult.RowsCount = result.GetRowsCount(); - EvaluationResult.SetValue(QueryResult); + it->second.Result.SetValue(QueryResult); + EvalInfos.erase(it); - EvaluationInProgress = false; return; } @@ -1184,7 +1197,7 @@ private: RunNextDqGraph(); } - void RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) { + TEvaluationGraphInfo RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) { LOG_D("RunEvalDqGraph"); @@ -1193,9 +1206,10 @@ private: dqConfiguration->FreezeDefaults(); dqConfiguration->FallbackPolicy = "never"; - ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); + TEvaluationGraphInfo info; + + info.ExecuterId = NActors::TActivationContext::Register(NYql::NDq::MakeDqExecuter(MakeNodesManagerId(), SelfId(), Params.QueryId, "", dqConfiguration, QueryCounters.Counters, TInstant::Now(), false)); - NActors::TActorId resultId; if (dqGraphParams.GetResultType()) { TVector<TString> columns; for (const auto& column : dqGraphParams.GetColumns()) { @@ -1204,17 +1218,17 @@ private: NActors::TActorId empty = {}; THashMap<TString, TString> emptySecureParams; // NOT USED in RR - resultId = NActors::TActivationContext::Register( + info.ResultId = NActors::TActivationContext::Register( MakeResultReceiver( - columns, ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams, + columns, info.ExecuterId, dqGraphParams.GetSession(), dqConfiguration, emptySecureParams, dqGraphParams.GetResultType(), empty, false).Release()); } else { LOG_D("ResultReceiver was NOT CREATED since ResultType is empty"); - resultId = ExecuterId; + info.ResultId = info.ExecuterId; } - ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, ExecuterId, resultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); + info.ControlId = NActors::TActivationContext::Register(NYql::MakeTaskController(SessionId, info.ExecuterId, info.ResultId, dqConfiguration, QueryCounters, TDuration::Seconds(3)).Release()); Yql::DqsProto::ExecuteGraphRequest request; request.SetSourceId(dqGraphParams.GetSourceId()); @@ -1224,8 +1238,9 @@ private: *request.MutableSecureParams() = dqGraphParams.GetSecureParams(); *request.MutableColumns() = dqGraphParams.GetColumns(); NTasksPacker::UnPack(*request.MutableTask(), dqGraphParams.GetTasks(), dqGraphParams.GetStageProgram()); - NActors::TActivationContext::Send(new IEventHandle(ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, ControlId, resultId, CheckpointCoordinatorId))); - LOG_D("Evaluation Executer: " << ExecuterId << ", Controller: " << ControlId << ", ResultIdActor: " << resultId); + NActors::TActivationContext::Send(new IEventHandle(info.ExecuterId, SelfId(), new NYql::NDqs::TEvGraphRequest(request, info.ControlId, info.ResultId, TActorId{}))); + LOG_D("Evaluation Executer: " << info.ExecuterId << ", Controller: " << info.ControlId << ", ResultActor: " << info.ResultId); + return info; } void RunNextDqGraph() { @@ -1752,13 +1767,19 @@ private: html << "<th>Checkpoint Coord</th>"; html << "</tr></thead><tbody>"; html << "<tr>"; - html << "<td>"; - if (EvaluationInProgress) html << "EVAL"; - else html << DqGraphIndex << " of " << DqGraphParams.size(); - html << "</td>"; - html << "<td>" << ExecuterId << "</td>"; - html << "<td>" << ControlId << "</td>"; - html << "<td>" << CheckpointCoordinatorId << "</td>"; + for (auto& pair : EvalInfos) { + html << "<td>" << "Evaluation" << "</td>"; + auto& info = pair.second; + html << "<td> Executer" << info.ExecuterId << "</td>"; + html << "<td> Control" << info.ControlId << "</td>"; + html << "<td> Result" << info.ResultId << "</td>"; + } + if (!DqGraphParams.empty()) { + html << "<td>" << DqGraphIndex << " of " << DqGraphParams.size() << "</td>"; + html << "<td> Executer" << ExecuterId << "</td>"; + html << "<td> Control" << ControlId << "</td>"; + html << "<td> Coordinator" << CheckpointCoordinatorId << "</td>"; + } html << "</tr>"; html << "</tbody></table>"; } @@ -1919,8 +1940,7 @@ private: TActorId Pinger; TInstant CreatedAt; YandexQuery::QueryAction Action = YandexQuery::QueryAction::QUERY_ACTION_UNSPECIFIED; - bool EvaluationInProgress = false; - NThreading::TPromise<NYql::IDqGateway::TResult> EvaluationResult; + TMap<NActors::TActorId, TEvaluationGraphInfo> EvalInfos; std::vector<NYq::NProto::TGraphParams> DqGraphParams; std::vector<i32> DqGrapResultIndices; i32 DqGraphIndex = 0; |