aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgalaxycrab <UgnineSirdis@ydb.tech>2023-06-19 14:54:14 +0300
committergalaxycrab <UgnineSirdis@ydb.tech>2023-06-19 14:54:14 +0300
commitaa2aea0e4c3a16ff23b55f07cefae544948fb144 (patch)
tree69fbf804e16fc9a0afcd55d8228813cfe5391b87
parentc165426e54ead1cafbeddc8b2e0eb56755bd402a (diff)
downloadydb-aa2aea0e4c3a16ff23b55f07cefae544948fb144.tar.gz
Don't show run script actor id in operation id for script operations
-rw-r--r--ydb/core/grpc_services/query/rpc_fetch_script_results.cpp79
-rw-r--r--ydb/core/kqp/common/events/events.h25
-rw-r--r--ydb/core/kqp/common/kqp.cpp4
-rw-r--r--ydb/core/kqp/common/kqp.h4
-rw-r--r--ydb/core/kqp/common/kqp_script_executions.cpp14
-rw-r--r--ydb/core/kqp/common/kqp_script_executions.h4
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp88
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h1
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_impl.h7
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp2
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp12
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.h2
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp19
-rw-r--r--ydb/public/api/protos/draft/ydb_query.proto5
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp21
-rw-r--r--ydb/public/sdk/cpp/client/draft/ydb_query/client.h3
18 files changed, 228 insertions, 69 deletions
diff --git a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
index 3563c371f4..e67901e92a 100644
--- a/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
+++ b/ydb/core/grpc_services/query/rpc_fetch_script_results.cpp
@@ -3,7 +3,10 @@
#include <ydb/core/base/appdata.h>
#include <ydb/core/base/kikimr_issue.h>
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/grpc_services/rpc_request_base.h>
#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/common/kqp_script_executions.h>
+#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/public/api/protos/draft/ydb_query.pb.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
@@ -22,30 +25,25 @@ using TEvFetchScriptResultsRequest = TGrpcRequestNoOperationCall<Ydb::Query::Fet
constexpr i64 MAX_ROWS_LIMIT = 1000;
-class TFetchScriptResultsRPC : public TActorBootstrapped<TFetchScriptResultsRPC> {
+class TFetchScriptResultsRPC : public TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false> {
public:
+ using TRpcRequestActorBase = TRpcRequestActor<TFetchScriptResultsRPC, TEvFetchScriptResultsRequest, false>;
+
static constexpr NKikimrServices::TActivity::EType ActorActivityType() {
return NKikimrServices::TActivity::GRPC_REQ;
}
TFetchScriptResultsRPC(TEvFetchScriptResultsRequest* request)
- : Request_(request)
+ : TRpcRequestActorBase(request)
{}
void Bootstrap() {
- const auto* req = Request_->GetProtoRequest();
+ const auto* req = GetProtoRequest();
if (!req) {
Reply(Ydb::StatusIds::INTERNAL_ERROR, "Internal error");
return;
}
- const TString& executionId = req->execution_id();
- NActors::TActorId runScriptActor;
- if (!NKqp::ScriptExecutionIdToActorId(executionId, runScriptActor)) {
- Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect execution id");
- return;
- }
-
if (req->rows_limit() <= 0) {
Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid rows limit");
return;
@@ -61,27 +59,44 @@ public:
return;
}
- auto ev = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>();
- ev->Record.SetRowsOffset(req->rows_offset());
- ev->Record.SetRowsLimit(req->rows_limit());
-
- ui64 flags = IEventHandle::FlagTrackDelivery;
- if (runScriptActor.NodeId() != SelfId().NodeId()) {
- flags |= IEventHandle::FlagSubscribeOnSession;
- SubscribedOnSession = runScriptActor.NodeId();
+ if (!GetExecutionIdFromRequest()) {
+ return;
}
- Send(runScriptActor, std::move(ev), flags);
+
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvKqp::TEvGetRunScriptActorRequest(DatabaseName, ExecutionId));
Become(&TFetchScriptResultsRPC::StateFunc);
}
private:
STRICT_STFUNC(StateFunc,
+ hFunc(NKqp::TEvKqp::TEvGetRunScriptActorResponse, Handle);
hFunc(NKqp::TEvKqp::TEvFetchScriptResultsResponse, Handle);
hFunc(NActors::TEvents::TEvUndelivered, Handle);
hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
)
+ void Handle(NKqp::TEvKqp::TEvGetRunScriptActorResponse::TPtr& ev) {
+ if (ev->Get()->Status != Ydb::StatusIds::SUCCESS) {
+ Reply(ev->Get()->Status, ev->Get()->Issues);
+ return;
+ }
+
+ const auto* userReq = GetProtoRequest();
+
+ auto req = MakeHolder<NKqp::TEvKqp::TEvFetchScriptResultsRequest>();
+ req->Record.SetRowsOffset(userReq->rows_offset());
+ req->Record.SetRowsLimit(userReq->rows_limit());
+
+ const NActors::TActorId runScriptActor = ev->Get()->RunScriptActorId;
+ ui64 flags = IEventHandle::FlagTrackDelivery;
+ if (runScriptActor.NodeId() != SelfId().NodeId()) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ SubscribedOnSession = runScriptActor.NodeId();
+ }
+ Send(runScriptActor, std::move(req), flags);
+ }
+
void Handle(NKqp::TEvKqp::TEvFetchScriptResultsResponse::TPtr& ev) {
Ydb::Query::FetchScriptResultsResponse resp;
resp.set_status(ev->Get()->Record.GetStatus());
@@ -126,7 +141,7 @@ private:
TString serializedResult;
Y_PROTOBUF_SUPPRESS_NODISCARD result.SerializeToString(&serializedResult);
- Request_->SendSerializedResult(std::move(serializedResult), status);
+ Request->SendSerializedResult(std::move(serializedResult), status);
PassAway();
}
@@ -142,9 +157,31 @@ private:
Reply(status, issues);
}
+ bool GetExecutionIdFromRequest() {
+ switch (GetProtoRequest()->execution_case()) {
+ case Ydb::Query::FetchScriptResultsRequest::kExecutionId:
+ ExecutionId = GetProtoRequest()->execution_id();
+ break;
+ case Ydb::Query::FetchScriptResultsRequest::kOperationId:
+ {
+ TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(GetProtoRequest()->operation_id());
+ if (!executionId) {
+ Reply(Ydb::StatusIds::BAD_REQUEST, "Invalid operation id");
+ return false;
+ }
+ ExecutionId = *executionId;
+ break;
+ }
+ case Ydb::Query::FetchScriptResultsRequest::EXECUTION_NOT_SET:
+ Reply(Ydb::StatusIds::BAD_REQUEST, "No execution id");
+ return false;
+ }
+ return true;
+ }
+
private:
- std::unique_ptr<TEvFetchScriptResultsRequest> Request_;
TMaybe<ui32> SubscribedOnSession;
+ TString ExecutionId;
};
} // namespace
diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h
index 6a98198cb6..5bc4d06158 100644
--- a/ydb/core/kqp/common/events/events.h
+++ b/ydb/core/kqp/common/events/events.h
@@ -36,6 +36,31 @@ struct TEvKqp {
struct TEvCancelQueryRequest : public TEventPB<TEvCancelQueryRequest,
NKikimrKqp::TEvCancelQueryRequest, TKqpEvents::EvCancelQueryRequest> {};
+ struct TEvGetRunScriptActorRequest : public TEventLocal<TEvGetRunScriptActorRequest, TKqpEvents::EvGetRunScriptActorRequest> {
+ TEvGetRunScriptActorRequest(const TString& database, const TString& executionId)
+ : Database(database)
+ , ExecutionId(executionId)
+ {}
+
+ const TString Database;
+ const TString ExecutionId;
+ };
+
+ struct TEvGetRunScriptActorResponse : public TEventLocal<TEvGetRunScriptActorResponse, TKqpEvents::EvGetRunScriptActorResponse> {
+ TEvGetRunScriptActorResponse(const NActors::TActorId& actorId)
+ : RunScriptActorId(actorId)
+ {}
+
+ TEvGetRunScriptActorResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ {}
+
+ const Ydb::StatusIds::StatusCode Status = Ydb::StatusIds::SUCCESS;
+ const NYql::TIssues Issues;
+ const NActors::TActorId RunScriptActorId;
+ };
+
using TEvCompileRequest = NPrivateEvents::TEvCompileRequest;
using TEvRecompileRequest = NPrivateEvents::TEvRecompileRequest;
diff --git a/ydb/core/kqp/common/kqp.cpp b/ydb/core/kqp/common/kqp.cpp
index 298a2a7551..026fa6efb7 100644
--- a/ydb/core/kqp/common/kqp.cpp
+++ b/ydb/core/kqp/common/kqp.cpp
@@ -6,11 +6,11 @@
namespace NKikimr::NKqp {
-TString ActorIdToScriptExecutionId(const NActors::TActorId& actorId) {
+TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId) {
return TStringBuilder() << "[" << actorId.NodeId() << ":" << actorId.LocalId() << ":" << actorId.Hint() << ":" << actorId.PoolID() << "]";
}
-bool ScriptExecutionIdToActorId(const TString& executionId, TActorId& actorId) {
+bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId) {
if (executionId.Size() < 5 || executionId[0] != '[' || executionId[executionId.Size() - 1] != ']')
return false;
diff --git a/ydb/core/kqp/common/kqp.h b/ydb/core/kqp/common/kqp.h
index bcff94c9ee..173d527953 100644
--- a/ydb/core/kqp/common/kqp.h
+++ b/ydb/core/kqp/common/kqp.h
@@ -35,8 +35,8 @@ namespace NKikimr::NKqp {
void ConvertKqpQueryResultToDbResult(const NKikimrMiniKQL::TResult& from, Ydb::ResultSet* to);
-TString ActorIdToScriptExecutionId(const NActors::TActorId& actorId);
-bool ScriptExecutionIdToActorId(const TString& executionId, TActorId& actorId);
+TString ScriptExecutionRunnerActorIdString(const NActors::TActorId& actorId);
+bool ScriptExecutionRunnerActorIdFromString(const TString& executionId, TActorId& actorId);
template<typename TFrom, typename TTo>
inline void ConvertKqpQueryResultsToDbResult(const TFrom& from, TTo* to) {
diff --git a/ydb/core/kqp/common/kqp_script_executions.cpp b/ydb/core/kqp/common/kqp_script_executions.cpp
index f231dae077..97ab891956 100644
--- a/ydb/core/kqp/common/kqp_script_executions.cpp
+++ b/ydb/core/kqp/common/kqp_script_executions.cpp
@@ -7,17 +7,21 @@ namespace NKikimr::NKqp {
TString ScriptExecutionOperationFromExecutionId(const TString& executionId) {
Ydb::TOperationId operationId;
operationId.SetKind(Ydb::TOperationId::SCRIPT_EXECUTION);
- NOperationId::AddOptionalValue(operationId, "actor_id", executionId);
+ NOperationId::AddOptionalValue(operationId, "id", executionId);
return NOperationId::ProtoToString(operationId);
}
-TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId) {
+TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId) {
NOperationId::TOperationId operation(operationId);
- return ScriptExecutionFromOperation(operation);
+ return ScriptExecutionIdFromOperation(operation);
}
-TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId) {
- const auto& values = operationId.GetValue("actor_id");
+TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId) {
+ if (operationId.GetKind() != Ydb::TOperationId::SCRIPT_EXECUTION) {
+ return Nothing();
+ }
+
+ const auto& values = operationId.GetValue("id");
if (values.empty() || !values[0]) {
return Nothing();
}
diff --git a/ydb/core/kqp/common/kqp_script_executions.h b/ydb/core/kqp/common/kqp_script_executions.h
index d920d28b59..3f8491a6e5 100644
--- a/ydb/core/kqp/common/kqp_script_executions.h
+++ b/ydb/core/kqp/common/kqp_script_executions.h
@@ -8,7 +8,7 @@
namespace NKikimr::NKqp {
TString ScriptExecutionOperationFromExecutionId(const TString& executionId);
-TMaybe<TString> ScriptExecutionFromOperation(const TString& operationId);
-TMaybe<TString> ScriptExecutionFromOperation(const NOperationId::TOperationId& operationId);
+TMaybe<TString> ScriptExecutionIdFromOperation(const TString& operationId);
+TMaybe<TString> ScriptExecutionIdFromOperation(const NOperationId::TOperationId& operationId);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index a8faace1ea..0720e21c03 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -39,6 +39,8 @@ struct TKqpEvents {
EvCancelScriptExecutionResponse,
EvCancelQueryRequest,
EvCancelQueryResponse,
+ EvGetRunScriptActorRequest, // TODO: remove when there will be fetch script result through database
+ EvGetRunScriptActorResponse, // TODO: remove when there will be fetch script result through database
};
static_assert (EvCompileInvalidateRequest + 1 == EvAbortExecution);
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 7c81c9d5f9..07b7585750 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -1165,6 +1165,7 @@ public:
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle);
+ hFunc(TEvKqp::TEvGetRunScriptActorRequest, Handle);
default:
Y_FAIL("TKqpProxyService: unexpected event type: %" PRIx32 " event: %s",
ev->GetTypeRewrite(), ev->ToString().data());
@@ -1374,6 +1375,10 @@ private:
Register(CreateCancelScriptExecutionOperationActor(std::move(ev)));
}
+ void Handle(TEvKqp::TEvGetRunScriptActorRequest::TPtr& ev) {
+ Register(CreateGetRunScriptActorActor(std::move(ev)));
+ }
+
private:
NYql::NLog::YqlLoggerScope YqlLoggerScope;
NKikimrConfig::TLogConfig LogConfig;
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 1dd7a19db9..879f25b0d5 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -26,6 +26,7 @@
#include <library/cpp/protobuf/json/json2proto.h>
#include <library/cpp/protobuf/json/proto2json.h>
+#include <util/generic/guid.h>
#include <util/generic/utility.h>
#include <util/random/random.h>
@@ -335,6 +336,7 @@ private:
{
Col("database", NScheme::NTypeIds::Text),
Col("execution_id", NScheme::NTypeIds::Text),
+ Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
@@ -388,8 +390,9 @@ private:
class TCreateScriptOperationQuery : public TQueryBase {
public:
- TCreateScriptOperationQuery(const TString& executionId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero())
+ TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero())
: ExecutionId(executionId)
+ , RunScriptActorId(runScriptActorId)
, Request(req)
, LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION)
{
@@ -424,6 +427,7 @@ public:
TString sql = R"(
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
+ DECLARE $run_script_actor_id AS Text;
DECLARE $execution_status AS Int32;
DECLARE $execution_mode AS Int32;
DECLARE $query_text AS Text;
@@ -431,8 +435,8 @@ public:
DECLARE $lease_duration AS Interval;
UPSERT INTO `.metadata/script_executions`
- (database, execution_id, execution_status, execution_mode, start_ts, query_text, syntax)
- VALUES ($database, $execution_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax);
+ (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax)
+ VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax);
UPSERT INTO `.metadata/script_execution_leases`
(database, execution_id, lease_deadline, lease_generation)
@@ -447,6 +451,9 @@ public:
.AddParam("$execution_id")
.Utf8(ExecutionId)
.Build()
+ .AddParam("$run_script_actor_id")
+ .Utf8(ScriptExecutionRunnerActorIdString(RunScriptActorId))
+ .Build()
.AddParam("$execution_status")
.Int32(Ydb::Query::EXEC_STATUS_STARTING)
.Build()
@@ -481,6 +488,7 @@ public:
private:
const TString ExecutionId;
+ const NActors::TActorId RunScriptActorId;
NKikimrKqp::TEvQueryRequest Request;
TDuration LeaseDuration;
};
@@ -494,10 +502,11 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
void Bootstrap() {
Become(&TCreateScriptExecutionActor::StateFunc);
+ ExecutionId = CreateGuidAsString();
+
// Start request
- RunScriptActorId = Register(CreateRunScriptActor(Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1));
- TString executionId = ActorIdToScriptExecutionId(RunScriptActorId);
- Register(new TCreateScriptOperationQuery(executionId, Event->Get()->Record));
+ RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1));
+ Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record));
}
void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) {
@@ -516,6 +525,7 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
private:
TEvKqp::TEvScriptRequest::TPtr Event;
+ TString ExecutionId;
NActors::TActorId RunScriptActorId;
};
@@ -680,7 +690,7 @@ public:
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
- SELECT operation_status, execution_status, issues FROM `.metadata/script_executions`
+ SELECT operation_status, execution_status, issues, run_script_actor_id FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
SELECT lease_deadline FROM `.metadata/script_execution_leases`
@@ -713,6 +723,16 @@ public:
result.TryNextRow();
+ const TMaybe<TString> runScriptActorId = result.ColumnParser("run_script_actor_id").GetOptionalUtf8();
+ if (!runScriptActorId) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+ if (!NKqp::ScriptExecutionRunnerActorIdFromString(*runScriptActorId, RunScriptActorId)) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+
TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
TMaybe<TInstant> leaseDeadline;
@@ -759,7 +779,7 @@ public:
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
if (status == Ydb::StatusIds::SUCCESS) {
- Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues)), 0, Cookie);
+ Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId), 0, Cookie);
} else {
Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie);
}
@@ -774,6 +794,7 @@ private:
TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
TMaybe<NYql::TIssues> OperationIssues;
+ NActors::TActorId RunScriptActorId;
};
class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase {
@@ -804,7 +825,7 @@ public:
WHERE database = $database AND execution_id = $execution_id;
)";
- TMaybe<TString> maybeExecutionId = ScriptExecutionFromOperation(Request->Get()->OperationId);
+ TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId);
Y_ENSURE(maybeExecutionId, "No execution id specified");
ExecutionId = *maybeExecutionId;
@@ -841,7 +862,7 @@ public:
Ydb::Query::ExecuteScriptMetadata metadata;
- metadata.set_execution_id(*ScriptExecutionFromOperation(Request->Get()->OperationId));
+ metadata.set_execution_id(*ScriptExecutionIdFromOperation(Request->Get()->OperationId));
const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32();
if (executionStatus) {
@@ -1175,16 +1196,12 @@ public:
{}
void Bootstrap() {
- const TMaybe<TString> executionId = NKqp::ScriptExecutionFromOperation(Request->Get()->OperationId);
+ const TMaybe<TString> executionId = NKqp::ScriptExecutionIdFromOperation(Request->Get()->OperationId);
if (!executionId) {
return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
}
ExecutionId = *executionId;
- if (!NKqp::ScriptExecutionIdToActorId(ExecutionId, RunScriptActor)) {
- return Reply(Ydb::StatusIds::BAD_REQUEST, "Incorrect operation id");
- }
-
Become(&TCancelScriptExecutionOperationActor::StateFunc);
Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId));
}
@@ -1198,6 +1215,7 @@ public:
void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
+ RunScriptActor = ev->Get()->RunScriptActorId;
if (ev->Get()->OperationStatus) {
Reply(Ydb::StatusIds::PRECONDITION_FAILED); // Already finished.
} else {
@@ -1266,6 +1284,36 @@ private:
bool CancelSent = false;
};
+// TODO: remove when there will be fetch script result through database
+class TGetRunScriptActorActor : public NActors::TActorBootstrapped<TGetRunScriptActorActor> {
+public:
+ TGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev)
+ : Request(std::move(ev))
+ {}
+
+ void Bootstrap() {
+ Register(new TCheckLeaseStatusActor(Request->Get()->Database, Request->Get()->ExecutionId));
+ Become(&TGetRunScriptActorActor::StateFunc);
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvLeaseCheckResult, Handle);
+ )
+
+ void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
+ if (ev->Get()->Status == Ydb::StatusIds::SUCCESS) {
+ Send(Request->Sender, new TEvKqp::TEvGetRunScriptActorResponse(ev->Get()->RunScriptActorId));
+ } else {
+ Send(Request->Sender, new TEvKqp::TEvGetRunScriptActorResponse(ev->Get()->Status, std::move(ev->Get()->Issues)));
+ }
+ PassAway();
+ }
+
+private:
+ TEvKqp::TEvGetRunScriptActorRequest::TPtr Request;
+};
+
} // anonymous namespace
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev) {
@@ -1299,10 +1347,15 @@ NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecut
return new TCancelScriptExecutionOperationActor(std::move(ev));
}
+NActors::IActor* CreateGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev) {
+ return new TGetRunScriptActorActor(std::move(ev));
+}
+
+
namespace NPrivate {
-NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) {
- return new TCreateScriptOperationQuery(executionId, record, leaseDuration);
+NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) {
+ return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, leaseDuration);
}
NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease, ui64 cookie) {
@@ -1310,5 +1363,4 @@ NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TStr
}
} // namespace NPrivate
-
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index 11867d5781..94cf8bd93a 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -23,6 +23,7 @@ NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPt
NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev);
NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev);
NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev);
+NActors::IActor* CreateGetRunScriptActorActor(TEvKqp::TEvGetRunScriptActorRequest::TPtr ev);
// Updates status in database.
NActors::IActor* CreateScriptExecutionFinisher(
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
index eb16c31876..1846a5454a 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
@@ -50,11 +50,13 @@ struct TEvPrivate {
TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus,
TMaybe<Ydb::Query::ExecStatus> executionStatus,
- TMaybe<NYql::TIssues> operationIssues)
+ TMaybe<NYql::TIssues> operationIssues,
+ const NActors::TActorId& runScriptActorId)
: Status(Ydb::StatusIds::SUCCESS)
, OperationStatus(operationStatus)
, ExecutionStatus(executionStatus)
, OperationIssues(operationIssues)
+ , RunScriptActorId(runScriptActorId)
{}
const Ydb::StatusIds::StatusCode Status;
@@ -62,12 +64,13 @@ struct TEvPrivate {
const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
const TMaybe<NYql::TIssues> OperationIssues;
+ const NActors::TActorId RunScriptActorId;
};
};
// Writes new script into db.
// If lease duration is zero, default one will be taken.
-NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero());
+NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero());
// Checks lease of execution, finishes execution if its lease is off, returns current status
NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0);
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
index d599af9310..9cf871e303 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
@@ -90,7 +90,7 @@ struct TScriptExecutionsYdbSetup {
req.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
const ui32 node = 0;
TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
- GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor);
+ GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor);
auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvCreateScriptOperationResponse>(edgeActor);
UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS);
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 0b2f808a22..bf3a9148cf 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -40,8 +40,9 @@ class TRunScriptActor : public NActors::TActorBootstrapped<TRunScriptActor> {
Finished,
};
public:
- TRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration)
- : Request(request)
+ TRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration)
+ : ExecutionId(executionId)
+ , Request(request)
, Database(database)
, LeaseGeneration(leaseGeneration)
{}
@@ -300,7 +301,7 @@ private:
void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) {
RunState = runState;
Status = status;
- Register(CreateScriptExecutionFinisher(ActorIdToScriptExecutionId(SelfId()), Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues));
+ Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, status, GetExecStatusFromStatusCode(status), Issues));
if (RunState == ERunState::Cancelling) {
Issues.AddIssue("Script execution is cancelled");
ResultSets.clear();
@@ -325,6 +326,7 @@ private:
}
private:
+ const TString ExecutionId;
const NKikimrKqp::TEvQueryRequest Request;
const TString Database;
const ui64 LeaseGeneration;
@@ -340,8 +342,8 @@ private:
} // namespace
-NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) {
- return new TRunScriptActor(request, database, leaseGeneration);
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration) {
+ return new TRunScriptActor(executionId, request, database, leaseGeneration);
}
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
index 438795902d..96b24f4aeb 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.h
@@ -9,6 +9,6 @@ namespace NKikimr::NKqp {
struct TEvKqpRunScriptActor {
};
-NActors::IActor* CreateRunScriptActor(const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration);
+NActors::IActor* CreateRunScriptActor(const TString& executionId, const NKikimrKqp::TEvQueryRequest& request, const TString& database, ui64 leaseGeneration);
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index b66916f3db..dc91345f06 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -277,13 +277,18 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
UNIT_ASSERT_STRING_CONTAINS(readyOp.Metadata().ScriptContent.Text, "SELECT 42");
- TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId).ExtractValueSync();
- UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
- TResultSetParser resultSet(results.ExtractResultSet());
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
- UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
- UNIT_ASSERT(resultSet.TryNextRow());
- UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
+ auto checkFetch = [&](const auto& executionOrOperation) {
+ TFetchScriptResultsResult results = db.FetchScriptResults(executionOrOperation).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnsCount(), 1);
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.RowsCount(), 1);
+ UNIT_ASSERT(resultSet.TryNextRow());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
+ };
+
+ checkFetch(scriptExecutionOperation.Metadata().ExecutionId);
+ checkFetch(scriptExecutionOperation);
}
Y_UNIT_TEST(ListScriptExecutions) {
diff --git a/ydb/public/api/protos/draft/ydb_query.proto b/ydb/public/api/protos/draft/ydb_query.proto
index ad2eae0386..ad69e1500a 100644
--- a/ydb/public/api/protos/draft/ydb_query.proto
+++ b/ydb/public/api/protos/draft/ydb_query.proto
@@ -259,7 +259,10 @@ message ExecuteScriptMetadata {
}
message FetchScriptResultsRequest {
- string execution_id = 1 [(Ydb.length).le = 1024];
+ oneof execution {
+ string execution_id = 1 [(Ydb.length).le = 1024];
+ string operation_id = 5 [(Ydb.length).le = 1024];
+ }
oneof fetch {
string fetch_token = 2 [(Ydb.length).le = 1024];
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
index 9158e3d41f..462756dc6b 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.cpp
@@ -4,6 +4,7 @@
#include <ydb/public/sdk/cpp/client/impl/ydb_internal/make_request/make.h>
#undef INCLUDE_YDB_INTERNAL_H
+#include <ydb/public/lib/operation_id/operation_id.h>
#include <ydb/public/sdk/cpp/client/ydb_common_client/impl/client.h>
#include <ydb/public/sdk/cpp/client/draft/ydb_query/impl/exec_query.h>
@@ -70,9 +71,19 @@ public:
}
TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId, const TFetchScriptResultsSettings& settings) {
- using namespace Ydb::Query;
- auto request = MakeRequest<FetchScriptResultsRequest>();
+ auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>();
request.set_execution_id(executionId);
+ return FetchScriptResultsImpl(std::move(request), settings);
+ }
+
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation, const TFetchScriptResultsSettings& settings) {
+ auto request = MakeRequest<Ydb::Query::FetchScriptResultsRequest>();
+ request.set_operation_id(NKikimr::NOperationId::ProtoToString(scriptExecutionOperation.Id()));
+ return FetchScriptResultsImpl(std::move(request), settings);
+ }
+
+ TAsyncFetchScriptResultsResult FetchScriptResultsImpl(Ydb::Query::FetchScriptResultsRequest&& request, const TFetchScriptResultsSettings& settings) {
+ using namespace Ydb::Query;
if (settings.FetchToken_) {
request.set_fetch_token(settings.FetchToken_);
}
@@ -166,4 +177,10 @@ TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TString& e
return Impl_->FetchScriptResults(executionId, settings);
}
+TAsyncFetchScriptResultsResult TQueryClient::FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation,
+ const TFetchScriptResultsSettings& settings)
+{
+ return Impl_->FetchScriptResults(scriptExecutionOperation, settings);
+}
+
} // namespace NYdb::NQuery
diff --git a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
index 955a484183..ae27967ba4 100644
--- a/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
+++ b/ydb/public/sdk/cpp/client/draft/ydb_query/client.h
@@ -41,6 +41,9 @@ public:
TAsyncFetchScriptResultsResult FetchScriptResults(const TString& executionId,
const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
+ TAsyncFetchScriptResultsResult FetchScriptResults(const TScriptExecutionOperation& scriptExecutionOperation,
+ const TFetchScriptResultsSettings& settings = TFetchScriptResultsSettings());
+
private:
class TImpl;
std::shared_ptr<TImpl> Impl_;