diff options
author | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-19 14:54:14 +0300 |
---|---|---|
committer | galaxycrab <UgnineSirdis@ydb.tech> | 2023-06-19 14:54:14 +0300 |
commit | aa2aea0e4c3a16ff23b55f07cefae544948fb144 (patch) | |
tree | 69fbf804e16fc9a0afcd55d8228813cfe5391b87 | |
parent | c165426e54ead1cafbeddc8b2e0eb56755bd402a (diff) | |
download | ydb-aa2aea0e4c3a16ff23b55f07cefae544948fb144.tar.gz |
Don't show run script actor id in operation id for script operations
18 files changed, 228 insertions, 69 deletions
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp index 3563c371f4..e67901e92a 100644 --- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp +++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp @@ -3,7 +3,10 @@ #include <ydb/core/base/appdata.h> #include <ydb/core/base/kikimr_issue.h> #include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/grpc_services/rpc_request_base.h> #include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/common/kqp_script_executions.h> +#include <ydb/core/kqp/common/simple/services.h> #include <ydb/public/api/protos/draft/ydb_query.pb.h> #include <library/cpp/actors/core/actor_bootstrapped.h> @@ -22,30 +25,25 @@ using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::Fet constexpr i64 MAX_ROWS_LIMIT = 1000; -class TFetchScriptResultsRPC : public TActorBootstrapped<TFetchScriptResultsRPC> { +class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false> { public: + using TRpcRequestActorBase = TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false>; + static constexpr NKikimrServices::TActivity::EType ActorActivityType() { return NKikimrServices::TActivity::GRPC_REQ; } TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request) - : Request_(request) + : TRpcRequestActorBase(request) {} void Bootstrap() { - const auto* req = Request_->GetProtoRequest(); + const auto* req = GetProtoRequest(); if (!req) { Reply(Ydb::StatusIds::INTERNAL_ERROR, "Internal error"); return; } - const TString& executionId = req->execution_id(); - NActors::TActorId runScriptActor; - if (!NKqp::ScriptExecutionIdToActorId(executionId, runScriptActor)) { - Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect execution id"); - return; - } - if (req->rows_limit() <= 0) { Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit"); return; @@ -61,27 +59,44 @@ public: return; } - auto ev = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>(); - ev->Record.SetRowsOffset(req->rows_offset()); - ev->Record.SetRowsLimit(req->rows_limit()); - - ui64 flags = IEventHandle::FlagTrackDelivery; - if (runScriptActor.NodeId() != SelfId().NodeId()) { - flags |= IEventHandle::FlagSubscribeOnSession; - SubscribedOnSession = runScriptActor.NodeId(); + if (!GetExecutionIdFromRequest()) { + return; } - Send(runScriptActor, std::move(ev), flags); + + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvKqp::TEvGetRunScriptActorRequest(DatabaseName, ExecutionId)); Become(&TFetchScriptResultsRPC::StateFunc); } private: STRICT_STFUNC(StateFunc, + hFunc(NKqp::TEvKqp::TEvGetRunScriptActorResponse, Handle); hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle); hFunc(NActors::TEvents::TEvUndelivered, Handle); hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); ) + void Handle(NKqp::TEvKqp::TEvGetRunScriptActorResponse::TPtr& ev) { + if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) { + Reply(ev->Get()->Status, ev->Get()->Issues); + return; + } + + const auto* userReq = GetProtoRequest(); + + auto req = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>(); + req->Record.SetRowsOffset(userReq->rows_offset()); + req->Record.SetRowsLimit(userReq->rows_limit()); + + const NActors::TActorId runScriptActor = ev->Get()->RunScriptActorId; + ui64 flags = IEventHandle::FlagTrackDelivery; + if (runScriptActor.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = runScriptActor.NodeId(); + } + Send(runScriptActor, std::move(req), flags); + } + void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) { Ydb::Query::FetchScriptResultsResponse resp; resp.set_status(ev->Get()->Record.GetStatus()); @@ -126,7 +141,7 @@ private: TString serializedResult; Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult); - Request_->SendSerializedResult(std::move(serializedResult), status); + Request->SendSerializedResult(std::move(serializedResult), status); PassAway(); } @@ -142,9 +157,31 @@ private: Reply(status, issues); } + bool GetExecutionIdFromRequest() { + switch (GetProtoRequest()->execution_case()) { + case Ydb::Query::FetchScriptResultsRequest::kExecutionId: + ExecutionId = GetProtoRequest()->execution_id(); + break; + case Ydb::Query::FetchScriptResultsRequest::kOperationId: + { + TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id()); + if (!executionId) { + Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid operation id"); + return false; + } + ExecutionId = *executionId; + break; + } + case Ydb::Query::FetchScriptResultsRequest::EXECUTION_NOT_SET: + Reply(Ydb::StatusIds::BAD_REQUEST, "No execution id"); + return false; + } + return true; + } + private: - std::unique_ptr<TEvFetchScriptResultsRequest> Request_; TMaybe<ui32> SubscribedOnSession; + TString ExecutionId; }; } // namespace diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index 6a98198cb6..5bc4d06158 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -36,6 +36,31 @@ struct TEvKqp { struct TEvCancelQueryRequest : public TEventPB<TEvCancelQueryRequest, NKikimrKqp::TEvCancelQueryRequest, TKqpEvents::EvCancelQueryRequest> {}; + struct TEvGetRunScriptActorRequest : public TEventLocal<TEvGetRunScriptActorRequest, TKqpEvents::EvGetRunScriptActorRequest> { + TEvGetRunScriptActorRequest(const TString& database, const TString& executionId) + : Database(database) + , ExecutionId(executionId) + {} + + const TString Database; + const TString ExecutionId; + }; + + struct TEvGetRunScriptActorResponse : public TEventLocal<TEvGetRunScriptActorResponse, TKqpEvents::EvGetRunScriptActorResponse> { + TEvGetRunScriptActorResponse(const NActors::TActorId& actorId) + : RunScriptActorId(actorId) + {} + + TEvGetRunScriptActorResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + {} + + const Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS; + const NYql::TIssues Issues; + const NActors::TActorId RunScriptActorId; + }; + using TEvCompileRequest = NPrivateEvents::TEvCompileRequest; using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest; diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp index 298a2a7551..026fa6efb7 100644 --- a/ydb/core/kqp/common/kqp.cpp +++ b/ydb/core/kqp/common/kqp.cpp @@ -6,11 +6,11 @@ namespace NKikimr::NKqp { -TString ActorIdToScriptExecutionId(const NActors::TActorId& actorId) { +TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId) { return TStringBuilder() << "[" << actorId.NodeId() << ":" << actorId.LocalId() << ":" << actorId.Hint() << ":" << actorId.PoolID() << "]"; } -bool ScriptExecutionIdToActorId(const TString& executionId, TActorId& actorId) { +bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId) { if (executionId.Size() < 5 || executionId[0] != '[' || executionId[executionId.Size() - 1] != ']') return false; diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h index bcff94c9ee..173d527953 100644 --- a/ydb/core/kqp/common/kqp.h +++ b/ydb/core/kqp/common/kqp.h @@ -35,8 +35,8 @@ namespace NKikimr::NKqp { void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to); -TString ActorIdToScriptExecutionId(const NActors::TActorId& actorId); -bool ScriptExecutionIdToActorId(const TString& executionId, TActorId& actorId); +TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId); +bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId); template<typename TFrom, typename TTo> inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) { diff --git a/ydb/core/kqp/common/kqp_script_executions.cpp b/ydb/core/kqp/common/kqp_script_executions.cpp index f231dae077..97ab891956 100644 --- a/ydb/core/kqp/common/kqp_script_executions.cpp +++ b/ydb/core/kqp/common/kqp_script_executions.cpp @@ -7,17 +7,21 @@ namespace NKikimr::NKqp { TString ScriptExecutionOperationFromExecutionId(const TString& executionId) { Ydb::TOperationId operationId; operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION); - NOperationId::AddOptionalValue(operationId, "actor_id", executionId); + NOperationId::AddOptionalValue(operationId, "id", executionId); return NOperationId::ProtoToString(operationId); } -TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) { +TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId) { NOperationId::TOperationId operation(operationId); - return ScriptExecutionFromOperation(operation); + return ScriptExecutionIdFromOperation(operation); } -TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) { - const auto& values = operationId.GetValue("actor_id"); +TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId) { + if (operationId.GetKind() != Ydb::TOperationId::SCRIPT_EXECUTION) { + return Nothing(); + } + + const auto& values = operationId.GetValue("id"); if (values.empty() || !values[0]) { return Nothing(); } diff --git a/ydb/core/kqp/common/kqp_script_executions.h b/ydb/core/kqp/common/kqp_script_executions.h index d920d28b59..3f8491a6e5 100644 --- a/ydb/core/kqp/common/kqp_script_executions.h +++ b/ydb/core/kqp/common/kqp_script_executions.h @@ -8,7 +8,7 @@ namespace NKikimr::NKqp { TString ScriptExecutionOperationFromExecutionId(const TString& executionId); -TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId); -TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId); +TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId); +TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index a8faace1ea..0720e21c03 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -39,6 +39,8 @@ struct TKqpEvents { EvCancelScriptExecutionResponse, EvCancelQueryRequest, EvCancelQueryResponse, + EvGetRunScriptActorRequest, // TODO: remove when there will be fetch script result through database + EvGetRunScriptActorResponse, // TODO: remove when there will be fetch script result through database }; static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution); diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 7c81c9d5f9..07b7585750 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1165,6 +1165,7 @@ public: hFunc(NKqp::TEvGetScriptExecutionOperation, Handle); hFunc(NKqp::TEvListScriptExecutionOperations, Handle); hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle); + hFunc(TEvKqp::TEvGetRunScriptActorRequest, Handle); default: Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); @@ -1374,6 +1375,10 @@ private: Register(CreateCancelScriptExecutionOperationActor(std::move(ev))); } + void Handle(TEvKqp::TEvGetRunScriptActorRequest::TPtr& ev) { + Register(CreateGetRunScriptActorActor(std::move(ev))); + } + private: NYql::NLog::YqlLoggerScope YqlLoggerScope; NKikimrConfig::TLogConfig LogConfig; diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 1dd7a19db9..879f25b0d5 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -26,6 +26,7 @@ #include <library/cpp/protobuf/json/json2proto.h> #include <library/cpp/protobuf/json/proto2json.h> +#include <util/generic/guid.h> #include <util/generic/utility.h> #include <util/random/random.h> @@ -335,6 +336,7 @@ private: { Col("database", NScheme::NTypeIds::Text), Col("execution_id", NScheme::NTypeIds::Text), + Col("run_script_actor_id", NScheme::NTypeIds::Text), Col("operation_status", NScheme::NTypeIds::Int32), Col("execution_status", NScheme::NTypeIds::Int32), Col("execution_mode", NScheme::NTypeIds::Int32), @@ -388,8 +390,9 @@ private: class TCreateScriptOperationQuery : public TQueryBase { public: - TCreateScriptOperationQuery(const TString& executionId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero()) + TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero()) : ExecutionId(executionId) + , RunScriptActorId(runScriptActorId) , Request(req) , LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION) { @@ -424,6 +427,7 @@ public: TString sql = R"( DECLARE $database AS Text; DECLARE $execution_id AS Text; + DECLARE $run_script_actor_id AS Text; DECLARE $execution_status AS Int32; DECLARE $execution_mode AS Int32; DECLARE $query_text AS Text; @@ -431,8 +435,8 @@ public: DECLARE $lease_duration AS Interval; UPSERT INTO `.metadata/script_executions` - (database, execution_id, execution_status, execution_mode, start_ts, query_text, syntax) - VALUES ($database, $execution_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax); + (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax) + VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax); UPSERT INTO `.metadata/script_execution_leases` (database, execution_id, lease_deadline, lease_generation) @@ -447,6 +451,9 @@ public: .AddParam("$execution_id") .Utf8(ExecutionId) .Build() + .AddParam("$run_script_actor_id") + .Utf8(ScriptExecutionRunnerActorIdString(RunScriptActorId)) + .Build() .AddParam("$execution_status") .Int32(Ydb::Query::EXEC_STATUS_STARTING) .Build() @@ -481,6 +488,7 @@ public: private: const TString ExecutionId; + const NActors::TActorId RunScriptActorId; NKikimrKqp::TEvQueryRequest Request; TDuration LeaseDuration; }; @@ -494,10 +502,11 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec void Bootstrap() { Become(&TCreateScriptExecutionActor::StateFunc); + ExecutionId = CreateGuidAsString(); + // Start request - RunScriptActorId = Register(CreateRunScriptActor(Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1)); - TString executionId = ActorIdToScriptExecutionId(RunScriptActorId); - Register(new TCreateScriptOperationQuery(executionId, Event->Get()->Record)); + RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1)); + Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record)); } void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) { @@ -516,6 +525,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec private: TEvKqp::TEvScriptRequest::TPtr Event; + TString ExecutionId; NActors::TActorId RunScriptActorId; }; @@ -680,7 +690,7 @@ public: DECLARE $database AS Text; DECLARE $execution_id AS Text; - SELECT operation_status, execution_status, issues FROM `.metadata/script_executions` + SELECT operation_status, execution_status, issues, run_script_actor_id FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; SELECT lease_deadline FROM `.metadata/script_execution_leases` @@ -713,6 +723,16 @@ public: result.TryNextRow(); + const TMaybe<TString> runScriptActorId = result.ColumnParser("run_script_actor_id").GetOptionalUtf8(); + if (!runScriptActorId) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + if (!NKqp::ScriptExecutionRunnerActorIdFromString(*runScriptActorId, RunScriptActorId)) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); TMaybe<TInstant> leaseDeadline; @@ -759,7 +779,7 @@ public: void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { if (status == Ydb::StatusIds::SUCCESS) { - Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues)), 0, Cookie); + Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId), 0, Cookie); } else { Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie); } @@ -774,6 +794,7 @@ private: TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; TMaybe<NYql::TIssues> OperationIssues; + NActors::TActorId RunScriptActorId; }; class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase { @@ -804,7 +825,7 @@ public: WHERE database = $database AND execution_id = $execution_id; )"; - TMaybe<TString> maybeExecutionId = ScriptExecutionFromOperation(Request->Get()->OperationId); + TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId); Y_ENSURE(maybeExecutionId, "No execution id specified"); ExecutionId = *maybeExecutionId; @@ -841,7 +862,7 @@ public: Ydb::Query::ExecuteScriptMetadata metadata; - metadata.set_execution_id(*ScriptExecutionFromOperation(Request->Get()->OperationId)); + metadata.set_execution_id(*ScriptExecutionIdFromOperation(Request->Get()->OperationId)); const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32(); if (executionStatus) { @@ -1175,16 +1196,12 @@ public: {} void Bootstrap() { - const TMaybe<TString> executionId = NKqp::ScriptExecutionFromOperation(Request->Get()->OperationId); + const TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId); if (!executionId) { return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id"); } ExecutionId = *executionId; - if (!NKqp::ScriptExecutionIdToActorId(ExecutionId, RunScriptActor)) { - return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id"); - } - Become(&TCancelScriptExecutionOperationActor::StateFunc); Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); } @@ -1198,6 +1215,7 @@ public: void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + RunScriptActor = ev->Get()->RunScriptActorId; if (ev->Get()->OperationStatus) { Reply(Ydb::StatusIds::PRECONDITION_FAILED); // Already finished. } else { @@ -1266,6 +1284,36 @@ private: bool CancelSent = false; }; +// TODO: remove when there will be fetch script result through database +class TGetRunScriptActorActor : public NActors::TActorBootstrapped<TGetRunScriptActorActor> { +public: + TGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev) + : Request(std::move(ev)) + {} + + void Bootstrap() { + Register(new TCheckLeaseStatusActor(Request->Get()->Database, Request->Get()->ExecutionId)); + Become(&TGetRunScriptActorActor::StateFunc); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvLeaseCheckResult, Handle); + ) + + void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { + if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) { + Send(Request->Sender, new TEvKqp::TEvGetRunScriptActorResponse(ev->Get()->RunScriptActorId)); + } else { + Send(Request->Sender, new TEvKqp::TEvGetRunScriptActorResponse(ev->Get()->Status, std::move(ev->Get()->Issues))); + } + PassAway(); + } + +private: + TEvKqp::TEvGetRunScriptActorRequest::TPtr Request; +}; + } // anonymous namespace NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) { @@ -1299,10 +1347,15 @@ NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecut return new TCancelScriptExecutionOperationActor(std::move(ev)); } +NActors::IActor* CreateGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev) { + return new TGetRunScriptActorActor(std::move(ev)); +} + + namespace NPrivate { -NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) { - return new TCreateScriptOperationQuery(executionId, record, leaseDuration); +NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) { + return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, leaseDuration); } NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease, ui64 cookie) { @@ -1310,5 +1363,4 @@ NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TStr } } // namespace NPrivate - } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index 11867d5781..94cf8bd93a 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -23,6 +23,7 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev); NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev); NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev); +NActors::IActor* CreateGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev); // Updates status in database. NActors::IActor* CreateScriptExecutionFinisher( diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h index eb16c31876..1846a5454a 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h @@ -50,11 +50,13 @@ struct TEvPrivate { TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus, TMaybe<Ydb::Query::ExecStatus> executionStatus, - TMaybe<NYql::TIssues> operationIssues) + TMaybe<NYql::TIssues> operationIssues, + const NActors::TActorId& runScriptActorId) : Status(Ydb::StatusIds::SUCCESS) , OperationStatus(operationStatus) , ExecutionStatus(executionStatus) , OperationIssues(operationIssues) + , RunScriptActorId(runScriptActorId) {} const Ydb::StatusIds::StatusCode Status; @@ -62,12 +64,13 @@ struct TEvPrivate { const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; const TMaybe<NYql::TIssues> OperationIssues; + const NActors::TActorId RunScriptActorId; }; }; // Writes new script into db. // If lease duration is zero, default one will be taken. -NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero()); +NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero()); // Checks lease of execution, finishes execution if its lease is off, returns current status NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp index d599af9310..9cf871e303 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp @@ -90,7 +90,7 @@ struct TScriptExecutionsYdbSetup { req.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); const ui32 node = 0; TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node); - GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor); + GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor); auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvCreateScriptOperationResponse>(edgeActor); UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 0b2f808a22..bf3a9148cf 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -40,8 +40,9 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> { Finished, }; public: - TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) - : Request(request) + TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) + : ExecutionId(executionId) + , Request(request) , Database(database) , LeaseGeneration(leaseGeneration) {} @@ -300,7 +301,7 @@ private: void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) { RunState = runState; Status = status; - Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues)); + Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues)); if (RunState == ERunState::Cancelling) { Issues.AddIssue("Script execution is cancelled"); ResultSets.clear(); @@ -325,6 +326,7 @@ private: } private: + const TString ExecutionId; const NKikimrKqp::TEvQueryRequest Request; const TString Database; const ui64 LeaseGeneration; @@ -340,8 +342,8 @@ private: } // namespace -NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) { - return new TRunScriptActor(request, database, leaseGeneration); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) { + return new TRunScriptActor(executionId, request, database, leaseGeneration); } } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h index 438795902d..96b24f4aeb 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h @@ -9,6 +9,6 @@ namespace NKikimr::NKqp { struct TEvKqpRunScriptActor { }; -NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration); +NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration); } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index b66916f3db..dc91345f06 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -277,13 +277,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42"); - TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync(); - UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); - TResultSetParser resultSet(results.ExtractResultSet()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); - UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); - UNIT_ASSERT(resultSet.TryNextRow()); - UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); + auto checkFetch = [&](const auto& executionOrOperation) { + TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1); + UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1); + UNIT_ASSERT(resultSet.TryNextRow()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); + }; + + checkFetch(scriptExecutionOperation.Metadata().ExecutionId); + checkFetch(scriptExecutionOperation); } Y_UNIT_TEST(ListScriptExecutions) { diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto index ad2eae0386..ad69e1500a 100644 --- a/ydb/public/api/protos/draft/ydb_query.proto +++ b/ydb/public/api/protos/draft/ydb_query.proto @@ -259,7 +259,10 @@ message ExecuteScriptMetadata { } message FetchScriptResultsRequest { - string execution_id = 1 [(Ydb.length).le = 1024]; + oneof execution { + string execution_id = 1 [(Ydb.length).le = 1024]; + string operation_id = 5 [(Ydb.length).le = 1024]; + } oneof fetch { string fetch_token = 2 [(Ydb.length).le = 1024]; diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp index 9158e3d41f..462756dc6b 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp @@ -4,6 +4,7 @@ #include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h> #undef INCLUDE_YDB_INTERNAL_H +#include <ydb/public/lib/operation_id/operation_id.h> #include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h> #include <ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h> @@ -70,9 +71,19 @@ public: } TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) { - using namespace Ydb::Query; - auto request = MakeRequest<FetchScriptResultsRequest>(); + auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>(); request.set_execution_id(executionId); + return FetchScriptResultsImpl(std::move(request), settings); + } + + TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, const TFetchScriptResultsSettings& settings) { + auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>(); + request.set_operation_id(NKikimr::NOperationId::ProtoToString(scriptExecutionOperation.Id())); + return FetchScriptResultsImpl(std::move(request), settings); + } + + TAsyncFetchScriptResultsResult FetchScriptResultsImpl(Ydb::Query::FetchScriptResultsRequest&& request, const TFetchScriptResultsSettings& settings) { + using namespace Ydb::Query; if (settings.FetchToken_) { request.set_fetch_token(settings.FetchToken_); } @@ -166,4 +177,10 @@ TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& e return Impl_->FetchScriptResults(executionId, settings); } +TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, + const TFetchScriptResultsSettings& settings) +{ + return Impl_->FetchScriptResults(scriptExecutionOperation, settings); +} + } // namespace NYdb::NQuery diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h index 955a484183..ae27967ba4 100644 --- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h +++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h @@ -41,6 +41,9 @@ public: TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); + TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, + const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings()); + private: class TImpl; std::shared_ptr<TImpl> Impl_; |