diff options
author | hor911 <hor911@ydb.tech> | 2022-10-26 15:01:14 +0300 |
---|---|---|
committer | hor911 <hor911@ydb.tech> | 2022-10-26 15:01:14 +0300 |
commit | 4d1349733c09d2ca8746497197467e0ea7f3378c (patch) | |
tree | 0ba37092b599b5dbef37c2ee0a6884de0c59786a | |
parent | 71b018f828ad9f35f0b391747dd7eaa200793d1d (diff) | |
download | ydb-4d1349733c09d2ca8746497197467e0ea7f3378c.tar.gz |
Instant RUNNING Status
10 files changed, 125 insertions, 167 deletions
diff --git a/ydb/core/yq/libs/actors/pending_fetcher.cpp b/ydb/core/yq/libs/actors/pending_fetcher.cpp index 72c57df0d8..ce89ac9ea4 100644 --- a/ydb/core/yq/libs/actors/pending_fetcher.cpp +++ b/ydb/core/yq/libs/actors/pending_fetcher.cpp @@ -395,7 +395,8 @@ private: NProtoInterop::CastFromProto(task.execution_limit()), NProtoInterop::CastFromProto(task.request_started_at()), task.restart_count(), - task.job_id().value() + task.job_id().value(), + task.resources() ); auto runActorId = Register(CreateRunActor(SelfId(), queryCounters, std::move(params))); diff --git a/ydb/core/yq/libs/actors/run_actor.cpp b/ydb/core/yq/libs/actors/run_actor.cpp index cd501e3b18..67aadaf04d 100644 --- a/ydb/core/yq/libs/actors/run_actor.cpp +++ b/ydb/core/yq/libs/actors/run_actor.cpp @@ -284,7 +284,6 @@ public: , EnableCheckpointCoordinator(Params.QueryType == YandexQuery::QueryContent::STREAMING && Params.CheckpointCoordinatorConfig.GetEnabled()) , MaxTasksPerOperation(Params.CommonConfig.GetMaxTasksPerOperation() ? Params.CommonConfig.GetMaxTasksPerOperation() : 40) , Compressor(Params.CommonConfig.GetQueryArtifactsCompressionMethod(), Params.CommonConfig.GetQueryArtifactsCompressionMinSize()) - , RateLimiterResourceWasCreated(CalcRateLimiterResourceWasCreated()) { QueryCounters.SetUptimePublicAndServiceCounter(0); } @@ -321,7 +320,9 @@ public: Become(&TRunActor::StateFuncWrapper<&TRunActor::StateFunc>); try { - Run(); + if (!TimeLimitExceeded()) { + Run(); + } } catch (const std::exception&) { FailOnException(); } @@ -435,6 +436,10 @@ private: NActors::TActorBootstrapped<TRunActor>::PassAway(); } + void AbortByExecutionTimeout() { + Abort("Execution time limit exceeded", YandexQuery::QueryMeta::ABORTED_BY_SYSTEM); + } + bool TimeLimitExceeded() { if (Params.ExecutionTtl != TDuration::Zero()) { auto currentTime = TInstant::Now(); @@ -442,7 +447,7 @@ private: auto deadline = startedAt + Params.ExecutionTtl; if (currentTime >= deadline) { - Abort("Execution time limit exceeded", YandexQuery::QueryMeta::ABORTED_BY_SYSTEM); + AbortByExecutionTimeout(); return true; } else { Schedule(deadline, new NActors::TEvents::TEvWakeup(RunActorWakeupTag::ExecutionTimeout)); @@ -451,15 +456,70 @@ private: return false; } - void Run() { - if (!Params.DqGraphs.empty() && Params.Status != YandexQuery::QueryMeta::STARTING) { - FillDqGraphParams(); + void ProcessQuery() { + // should be called in case of some event, so sync state first + Send(Pinger, new TEvents::TEvForwardPingRequest(QueryStateUpdateRequest)); + + // must be in-sync, TODO: dedup + Params.Resources = QueryStateUpdateRequest.resources(); + + if (QueryStateUpdateRequest.resources().rate_limiter() == Fq::Private::TaskResources::PREPARE) { + if (!RateLimiterResourceCreatorId) { + LOG_D("Start rate limiter resource creator"); + RateLimiterResourceCreatorId = Register(CreateRateLimiterResourceCreator(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName)); + } + return; + } + + if (QueryStateUpdateRequest.resources().compilation() == Fq::Private::TaskResources::PREPARE) { + if (!ProgramRunnerId) { + RunProgram(); + } + return; + } + + if (QueryStateUpdateRequest.resources().read_rules() == Fq::Private::TaskResources::PREPARE) { + if (!ReadRulesCreatorId) { + ReadRulesCreatorId = Register( + ::NYq::MakeReadRuleCreatorActor( + SelfId(), + Params.QueryId, + Params.YqSharedResources->UserSpaceYdbDriver, + std::move(TopicsForConsumersCreation), + std::move(CredentialsForConsumersCreation) + ) + ); + } + return; } - if (TimeLimitExceeded()) { + if (DqGraphParams.empty()) { + QueryStateUpdateRequest.set_resign_query(false); + const bool isOk = Issues.Size() == 0; + Finish(GetFinishStatus(isOk)); return; } + if (AbortOnExceedingDqGraphsLimits()) { + return; + } + + for (const auto& m : Params.ResultSetMetas) { + *QueryStateUpdateRequest.add_result_set_meta() = m; + } + + DqGraphIndex = Params.DqGraphIndex; + UpdateResultIndices(); + RunNextDqGraph(); + } + + void Run() { + *QueryStateUpdateRequest.mutable_resources() = Params.Resources; + + if (!Params.DqGraphs.empty() && QueryStateUpdateRequest.resources().compilation() == Fq::Private::TaskResources::READY) { + FillDqGraphParams(); + } + switch (Params.Status) { case YandexQuery::QueryMeta::ABORTING_BY_USER: case YandexQuery::QueryMeta::ABORTING_BY_SYSTEM: @@ -470,16 +530,16 @@ private: break; case YandexQuery::QueryMeta::STARTING: HandleConnections(); - if (Params.RateLimiterConfig.GetEnabled()) { - if (StartRateLimiterResourceCreatorIfNeeded() || !RateLimiterResourceWasCreated) { - return; - } - } - RunProgram(); - break; + QueryStateUpdateRequest.mutable_resources()->set_rate_limiter( + Params.RateLimiterConfig.GetEnabled() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); + QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::PREPARE); + // know nothing about read rules yet + Params.Status = YandexQuery::QueryMeta::RUNNING; // ??? + QueryStateUpdateRequest.set_status(YandexQuery::QueryMeta::RUNNING); + // DO NOT break here case YandexQuery::QueryMeta::RESUMING: case YandexQuery::QueryMeta::RUNNING: - ReRunQuery(); + ProcessQuery(); break; default: Abort("Fail to start query from unexpected status " + YandexQuery::QueryMeta::ComputeStatus_Name(Params.Status), YandexQuery::QueryMeta::FAILED); @@ -647,7 +707,7 @@ private: auto tag = (RunActorWakeupTag) ev->Get()->Tag; switch (tag) { case RunActorWakeupTag::ExecutionTimeout: { - Abort("Execution timeout", YandexQuery::QueryMeta::ABORTED_BY_SYSTEM); + AbortByExecutionTimeout(); break; } default: { @@ -664,19 +724,10 @@ private: } if (ev->Cookie == SaveQueryInfoCookie) { - if (TopicsForConsumersCreation.size()) { - ReadRulesCreatorId = Register( - ::NYq::MakeReadRuleCreatorActor( - SelfId(), - Params.QueryId, - Params.YqSharedResources->UserSpaceYdbDriver, - std::move(TopicsForConsumersCreation), - std::move(CredentialsForConsumersCreation) - ) - ); - } else { - RunDqGraphs(); - } + QueryStateUpdateRequest.mutable_resources()->set_compilation(Fq::Private::TaskResources::READY); + QueryStateUpdateRequest.mutable_resources()->set_read_rules( + TopicsForConsumersCreation.size() ? Fq::Private::TaskResources::PREPARE : Fq::Private::TaskResources::NOT_NEEDED); + ProcessQuery(); } else if (ev->Cookie == SetLoadFromCheckpointModeCookie) { Send(CheckpointCoordinatorId, new TEvCheckpointCoordinator::TEvRunGraph()); } @@ -719,10 +770,10 @@ private: void Handle(TEvents::TEvGraphParams::TPtr& ev) { LOG_D("Graph (" << (ev->Get()->IsEvaluation ? "evaluation" : "execution") << ") with tasks: " << ev->Get()->GraphParams.TasksSize()); - if (RateLimiterPath) { + if (Params.Resources.rate_limiter_path()) { const TString rateLimiterResource = GetRateLimiterResourcePath(Params.CloudId, Params.Scope.ParseFolder(), Params.QueryId); for (auto& task : *ev->Get()->GraphParams.MutableTasks()) { - task.SetRateLimiter(RateLimiterPath); + task.SetRateLimiter(Params.Resources.rate_limiter_path()); task.SetRateLimiterResource(rateLimiterResource); } } @@ -776,6 +827,7 @@ private: i32 UpdateResultIndices() { i32 count = 0; + DqGrapResultIndices.clear(); for (const auto& graphParams : DqGraphParams) { DqGrapResultIndices.push_back(graphParams.GetResultType() ? count++ : -1); } @@ -1072,7 +1124,8 @@ private: LOG_D(Issues.ToOneLineString()); Finish(YandexQuery::QueryMeta::FAILED); } else { - RunDqGraphs(); + QueryStateUpdateRequest.mutable_resources()->set_read_rules(Fq::Private::TaskResources::READY); + ProcessQuery(); } } @@ -1138,9 +1191,10 @@ private: LOG_D(Issues.ToOneLineString()); Finish(YandexQuery::QueryMeta::FAILED); } else { - RateLimiterResourceWasCreated = true; - RateLimiterPath = ev->Get()->Result.rate_limiter(); - RunProgram(); + Params.Resources.set_rate_limiter_path(ev->Get()->Result.rate_limiter()); + QueryStateUpdateRequest.mutable_resources()->set_rate_limiter_path(ev->Get()->Result.rate_limiter()); + QueryStateUpdateRequest.mutable_resources()->set_rate_limiter(Fq::Private::TaskResources::READY); + ProcessQuery(); } } @@ -1159,15 +1213,6 @@ private: ContinueFinish(); } - bool StartRateLimiterResourceCreatorIfNeeded() { - if (!RateLimiterResourceWasCreated && !RateLimiterResourceCreatorId && Params.RateLimiterConfig.GetEnabled()) { - LOG_D("Start rate limiter resource creator"); - RateLimiterResourceCreatorId = Register(CreateRateLimiterResourceCreator(SelfId(), Params.Owner, Params.QueryId, Params.Scope, Params.TenantName)); - return true; - } - return false; - } - bool StartRateLimiterResourceDeleterIfCan() { if (!RateLimiterResourceDeleterId && !RateLimiterResourceCreatorId && FinalizingStatusIsWritten && QueryResponseArrived && Params.RateLimiterConfig.GetEnabled()) { LOG_D("Start rate limiter resource deleter"); @@ -1177,24 +1222,6 @@ private: return false; } - void RunDqGraphs() { - if (DqGraphParams.empty()) { - QueryStateUpdateRequest.set_resign_query(false); - const bool isOk = Issues.Size() == 0; - Finish(GetFinishStatus(isOk)); - return; - } - - { - Params.Status = YandexQuery::QueryMeta::RUNNING; - Fq::Private::PingTaskRequest request; - request.set_status(YandexQuery::QueryMeta::RUNNING); - Send(Pinger, new TEvents::TEvForwardPingRequest(request), 0, UpdateQueryInfoCookie); - } - - RunNextDqGraph(); - } - TEvaluationGraphInfo RunEvalDqGraph(NYq::NProto::TGraphParams& dqGraphParams) { LOG_D("RunEvalDqGraph"); @@ -1512,86 +1539,6 @@ private: } } - void ReRunQuery() { - if (AbortOnExceedingDqGraphsLimits()) { - return; - } - for (const auto& m : Params.ResultSetMetas) { - *QueryStateUpdateRequest.add_result_set_meta() = m; - } - DqGraphIndex = Params.DqGraphIndex; - UpdateResultIndices(); - RunNextDqGraph(); - } - - bool RunProgram( - const NKikimr::NMiniKQL::IFunctionRegistry* functionRegistry, - ui64 nextUniqueId, - TVector<TDataProviderInitializer> dataProvidersInit, - NYql::IModuleResolver::TPtr& moduleResolver, - NYql::TGatewaysConfig gatewaysConfig, - const TString& sql, - const TString& sessionId, - NSQLTranslation::TTranslationSettings sqlSettings, - YandexQuery::ExecuteMode executeMode - ) { - TProgramFactory progFactory(false, functionRegistry, nextUniqueId, dataProvidersInit, "yq"); - progFactory.SetModules(moduleResolver); - progFactory.SetUdfResolver(NYql::NCommon::CreateSimpleUdfResolver(functionRegistry, nullptr)); - progFactory.SetGatewaysConfig(&gatewaysConfig); - - Program = progFactory.Create("-stdin-", sql, sessionId); - Program->EnableResultPosition(); - - // parse phase - { - if (!Program->ParseSql(sqlSettings)) { - Issues.AddIssues(Program->Issues()); - return false; - - } - - if (executeMode == YandexQuery::ExecuteMode::PARSE) { - return true; - } - } - - // compile phase - { - if (!Program->Compile("")) { - Issues.AddIssues(Program->Issues()); - return false; - } - - if (executeMode == YandexQuery::ExecuteMode::COMPILE) { - return true; - } - } - - // next phases can be async: optimize, validate, run - TProgram::TFutureStatus futureStatus; - switch (executeMode) { - case YandexQuery::ExecuteMode::EXPLAIN: - futureStatus = Program->OptimizeAsync(""); - break; - case YandexQuery::ExecuteMode::VALIDATE: - futureStatus = Program->ValidateAsync(""); - break; - case YandexQuery::ExecuteMode::RUN: - futureStatus = Program->RunAsync(""); - break; - default: - Issues.AddIssue(TStringBuilder() << "Unexpected execute mode " << static_cast<int>(Params.ExecuteMode)); - return false; - } - - futureStatus.Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), selfId = SelfId()](const TProgram::TFutureStatus& f) { - actorSystem->Send(selfId, new TEvents::TEvAsyncContinue(f)); - }); - - return true; - } - void RunProgram() { LOG_D("Compiling query ..."); NYql::TGatewaysConfig gatewaysConfig; @@ -1676,19 +1623,6 @@ private: return; } -/* - return RunProgram( - Params.FunctionRegistry, - Params.NextUniqueId, - dataProvidersInit, - Params.ModuleResolver, - gatewaysConfig, - Params.Sql, - SessionId, - sqlSettings, - Params.ExecuteMode - ); -*/ ProgramRunnerId = Register(new TProgramRunnerActor( SelfId(), Params.FunctionRegistry, @@ -1917,13 +1851,6 @@ private: << " }"); } - bool CalcRateLimiterResourceWasCreated() const { - if (Params.Status == YandexQuery::QueryMeta::STARTING) { - return false; - } - return true; - } - private: TActorId FetcherId; TActorId ProgramRunnerId; @@ -1961,11 +1888,9 @@ private: NActors::TActorId ReadRulesCreatorId; // Rate limiter resource creation - bool RateLimiterResourceWasCreated = false; bool RateLimiterResourceWasDeleted = false; NActors::TActorId RateLimiterResourceCreatorId; NActors::TActorId RateLimiterResourceDeleterId; - TString RateLimiterPath; // Finish bool Finishing = false; diff --git a/ydb/core/yq/libs/actors/run_actor_params.cpp b/ydb/core/yq/libs/actors/run_actor_params.cpp index 154133637f..3239f8d5b4 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.cpp +++ b/ydb/core/yq/libs/actors/run_actor_params.cpp @@ -53,7 +53,8 @@ TRunActorParams::TRunActorParams( TDuration executionTtl, TInstant requestStartedAt, ui32 restartCount, - const TString& jobId + const TString& jobId, + const Fq::Private::TaskResources& resources ) : YqSharedResources(yqSharedResources) , CredentialsProviderFactory(credentialsProviderFactory) @@ -104,6 +105,7 @@ TRunActorParams::TRunActorParams( , RequestStartedAt(requestStartedAt) , RestartCount(restartCount) , JobId(jobId) + , Resources(resources) { } diff --git a/ydb/core/yq/libs/actors/run_actor_params.h b/ydb/core/yq/libs/actors/run_actor_params.h index a4da0189bb..716c131e98 100644 --- a/ydb/core/yq/libs/actors/run_actor_params.h +++ b/ydb/core/yq/libs/actors/run_actor_params.h @@ -68,7 +68,8 @@ struct TRunActorParams { // TODO2 : Change name TDuration executionTtl, TInstant requestStartedAt, ui32 restartCount, - const TString& jobId + const TString& jobId, + const Fq::Private::TaskResources& resources ); TRunActorParams(const TRunActorParams& params) = default; @@ -126,6 +127,7 @@ struct TRunActorParams { // TODO2 : Change name TInstant RequestStartedAt; ui32 RestartCount; const TString JobId; + Fq::Private::TaskResources Resources; }; } /* NYq */ diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp index bde301450a..e5860b8c72 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_get.cpp @@ -412,6 +412,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvGetTaskRequ *newTask->mutable_result_set_meta() = task.Query.result_set_meta(); newTask->set_scope(task.Scope); + *newTask->mutable_resources() = task.Internal.resources(); } return result; diff --git a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp index 6edd062a50..bc4544042e 100644 --- a/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp +++ b/ydb/core/yq/libs/control_plane_storage/internal/task_ping.cpp @@ -279,6 +279,10 @@ std::tuple<TString, TParams, const std::function<std::pair<TString, NYdb::TParam internal.set_dq_graph_index(request.dq_graph_index()); } + if (request.has_resources()) { + *internal.mutable_resources() = request.resources(); + } + if (job.ByteSizeLong() > maxRequestSize) { ythrow TControlPlaneStorageException(TIssuesIds::BAD_REQUEST) << "Job proto exceeded the size limit: " << job.ByteSizeLong() << " of " << maxRequestSize << " " << TSizeFormatPrinter(job).ToString(); } diff --git a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto index a451d761f9..1783710ad0 100644 --- a/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto +++ b/ydb/core/yq/libs/control_plane_storage/proto/yq_internal.proto @@ -37,6 +37,7 @@ message QueryInternal { Fq.Private.CompressedData ast_compressed = 18; Fq.Private.CompressedData plan_compressed = 19; repeated Fq.Private.CompressedData dq_graph_compressed = 20; + Fq.Private.TaskResources resources = 21; } message JobInternal { diff --git a/ydb/core/yq/libs/control_plane_storage/util.cpp b/ydb/core/yq/libs/control_plane_storage/util.cpp index f22e17b672..068625c490 100644 --- a/ydb/core/yq/libs/control_plane_storage/util.cpp +++ b/ydb/core/yq/libs/control_plane_storage/util.cpp @@ -163,6 +163,7 @@ bool DoesPingTaskUpdateQueriesTable(const Fq::Private::PingTaskRequest& request) || request.dq_graph_index() || request.state_load_mode() || request.has_disposition() + || request.has_resources() ; } diff --git a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp index 25c921e44d..8a175b4f55 100644 --- a/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp +++ b/ydb/core/yq/libs/control_plane_storage/ydb_control_plane_storage_queries.cpp @@ -892,6 +892,7 @@ void TYdbControlPlaneStorageActor::Handle(TEvControlPlaneStorage::TEvModifyQuery internal.clear_binding(); internal.clear_connection(); + internal.clear_resources(); // TODO: move to run actor priority selection TSet<TString> disabledConnections; diff --git a/ydb/core/yq/libs/protos/fq_private.proto b/ydb/core/yq/libs/protos/fq_private.proto index 0e4c075024..022c1fa619 100644 --- a/ydb/core/yq/libs/protos/fq_private.proto +++ b/ydb/core/yq/libs/protos/fq_private.proto @@ -22,6 +22,23 @@ message CompressedData { bytes data = 2; } +message TaskResources { + + enum ResourceState { + UNSPECIFIED = 0; // on start + NOT_NEEDED = 1; // is not configured to use - skip it + PREPARE = 2; // resource creating in progress (retriable) + READY = 3; // created and ready to go + CLEANUP = 4; // is being destroyin (cleanup) + } + + ResourceState rate_limiter = 1; + ResourceState compilation = 2; + ResourceState read_rules = 3; + + string rate_limiter_path = 10; +} + message GetTaskRequest { string tenant = 1; string owner_id = 2; // guid, should be refreshed on node restart @@ -89,6 +106,8 @@ message GetTaskResult { google.protobuf.Timestamp request_started_at = 30; repeated CompressedData dq_graph_compressed = 31; uint32 restart_count = 32; + + TaskResources resources = 33; } repeated Task tasks = 1; } @@ -127,6 +146,7 @@ message PingTaskRequest { google.protobuf.Timestamp finished_at = 102; google.protobuf.Timestamp deadline = 103; string tenant = 104; + TaskResources resources = 33; } message PingTaskResult { |