diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-06-23 17:46:07 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-06-23 17:46:07 +0300 |
commit | af207c571a5855b41675286f62e08a465e379041 (patch) | |
tree | 14115bf0b12d0719c6add7b324b9ef4f8edf9799 | |
parent | a3fb7cdc57dc655107ebfb8e4b5bcce882b60951 (diff) | |
download | ydb-af207c571a5855b41675286f62e08a465e379041.tar.gz |
ForgetOperation для операций script execution
-rw-r--r-- | ydb/core/grpc_services/rpc_forget_operation.cpp | 33 | ||||
-rw-r--r-- | ydb/core/kqp/common/events/script_executions.h | 22 | ||||
-rw-r--r-- | ydb/core/kqp/common/simple/kqp_event_ids.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_proxy_service.cpp | 5 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 119 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.h | 1 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 114 |
7 files changed, 294 insertions, 2 deletions
diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp index 7cdd87c10c..c41c0f56a9 100644 --- a/ydb/core/grpc_services/rpc_forget_operation.cpp +++ b/ydb/core/grpc_services/rpc_forget_operation.cpp @@ -3,6 +3,9 @@ #include "rpc_operation_request_base.h" #include <ydb/core/grpc_services/base/base.h> +#include <ydb/core/kqp/common/events/script_executions.h> +#include <ydb/core/kqp/common/kqp.h> +#include <ydb/core/kqp/common/simple/services.h> #include <ydb/core/tx/schemeshard/schemeshard_build_index.h> #include <ydb/core/tx/schemeshard/schemeshard_export.h> #include <ydb/core/tx/schemeshard/schemeshard_import.h> @@ -31,6 +34,8 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, return "[ForgetImport]"; case TOperationId::BUILD_INDEX: return "[ForgetIndexBuild]"; + case TOperationId::SCRIPT_EXECUTION: + return "[ForgetScriptExecution]"; default: return "[Untagged]"; } @@ -49,6 +54,13 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, } } + bool NeedAllocateTxId() const { + const Ydb::TOperationId::EKind kind = OperationId.GetKind(); + return kind == TOperationId::EXPORT + || kind == TOperationId::IMPORT + || kind == TOperationId::BUILD_INDEX; + } + void Handle(TEvExport::TEvForgetExportResponse::TPtr& ev) { const auto& record = ev->Get()->Record.GetResponse(); @@ -76,6 +88,18 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC, Reply(record.GetStatus(), record.GetIssues()); } + void Handle(NKqp::TEvForgetScriptExecutionOperationResponce::TPtr& ev) { + google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto; + NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto); + LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponce responce" + << ": status# " << ev->Get()->Status); + Reply(ev->Get()->Status, issuesProto); + } + + void SendForgetScriptExecutionOperation() { + Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId)); + } + public: using TRpcOperationRequestActor::TRpcOperationRequestActor; @@ -94,11 +118,15 @@ public: } break; + case TOperationId::SCRIPT_EXECUTION: + SendForgetScriptExecutionOperation(); + break; default: return Reply(StatusIds::UNSUPPORTED, TIssuesIds::DEFAULT_ERROR, "Unknown operation kind"); } - - AllocateTxId(); + if (NeedAllocateTxId()) { + AllocateTxId(); + } } catch (const yexception&) { return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Invalid operation id"); } @@ -111,6 +139,7 @@ public: hFunc(TEvExport::TEvForgetExportResponse, Handle); hFunc(TEvImport::TEvForgetImportResponse, Handle); hFunc(TEvIndexBuilder::TEvForgetResponse, Handle); + hFunc(NKqp::TEvForgetScriptExecutionOperationResponce, Handle); default: return StateBase(ev); } diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index f2bcba5ee6..6b3903ab3b 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -13,6 +13,28 @@ namespace NKikimr::NKqp { +struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> { + explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) + : Database(database) + , OperationId(id) + { + } + + TString Database; + NOperationId::TOperationId OperationId; +}; + +struct TEvForgetScriptExecutionOperationResponce : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponce, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponce> { + TEvForgetScriptExecutionOperationResponce(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(issues) + { + } + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> { explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) : Database(database) diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 2f800dcb00..21d543fdd8 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -138,6 +138,8 @@ struct TKqpScriptExecutionEvents { EvCancelScriptExecutionOperation, EvCancelScriptExecutionOperationResponse, EvScriptExecutionFinished, + EvForgetScriptExecutionOperation, + EvForgetScriptExecutionOperationResponce }; }; diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp index 07b7585750..43325c40bb 100644 --- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp +++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp @@ -1162,6 +1162,7 @@ public: hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent); hFunc(TEvPrivate::TEvCloseIdleSessions, Handle); hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle); + hFunc(NKqp::TEvForgetScriptExecutionOperation, Handle); hFunc(NKqp::TEvGetScriptExecutionOperation, Handle); hFunc(NKqp::TEvListScriptExecutionOperations, Handle); hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle); @@ -1363,6 +1364,10 @@ private: ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished; } + void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) { + Register(CreateForgetScriptExecutionOperationActor(std::move(ev))); + } + void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) { Register(CreateGetScriptExecutionOperationActor(std::move(ev))); } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index a2405dff56..b05e3fb812 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -897,6 +897,121 @@ private: NActors::TActorId RunScriptActorId; }; +class TForgetScriptExecutionOperationActor : public TQueryBase { +public: + explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) + : Request(std::move(ev)) + { + } + + void OnRunQuery() override { + TString sql = R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + SELECT + operation_status, + execution_status + FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + + SELECT + lease_deadline + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId); + Y_ENSURE(maybeExecutionId, "No execution id specified"); + ExecutionId = *maybeExecutionId; + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Request->Get()->Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + SetQueryResultHandler(&TForgetScriptExecutionOperationActor::OnGetInfo); + } + + void OnGetInfo() { + if (ResultSets.size() != 2) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::NOT_FOUND, "No such execution"); + return; + } + + result.TryNextRow(); + + const TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); + + TStringBuilder sql; + sql << R"( + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + DELETE + FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TResultSetParser deadlineResult(ResultSets[1]); + if (deadlineResult.RowsCount() != 0) { + deadlineResult.TryNextRow(); + TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); + if (!leaseDeadline) { + // existing row with empty lease??? + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); + return; + } + if (*leaseDeadline >= TInstant::Now()) { + if (!operationStatus) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Operation is still running"); + return; + } + } + sql << R"( + DELETE + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + + )"; + } + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Request->Get()->Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); + SetQueryResultHandler(&TForgetScriptExecutionOperationActor::OnForgetOperation); + } + + void OnForgetOperation() { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + Send(Request->Sender, new TEvForgetScriptExecutionOperationResponce(status, std::move(issues))); + } + +private: + TEvForgetScriptExecutionOperation::TPtr Request; + TString ExecutionId; + NYql::TIssues Issues; +}; + class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase { public: explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) @@ -1435,6 +1550,10 @@ NActors::IActor* CreateScriptExecutionFinisher( return new TScriptExecutionFinisher(executionId, database, leaseGeneration, operationStatus, execStatus, std::move(issues)); } +NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) { + return new TForgetScriptExecutionOperationActor(std::move(ev)); +} + NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) { return new TGetScriptExecutionOperationActor(std::move(ev)); } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index ed42dc2390..89dcc41743 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -20,6 +20,7 @@ NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev); // Operation API impl. +NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev); NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev); NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev); NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index dc91345f06..ee2d91884b 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -332,6 +332,120 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { UNIT_ASSERT_EQUAL(ops, listedOps); } + Y_UNIT_TEST(ForgetScriptExecution) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + NYdb::NOperation::TOperationClient client(kikimr.GetDriver()); + auto list = client.List<NYdb::NQuery::TScriptExecutionOperation>(42).ExtractValueSync(); + UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString()); + UNIT_ASSERT(list.GetList().empty()); + + constexpr ui64 ScriptExecutionsCount = 100; + std::set<TString> ops; + for (ui64 i = 0; i < ScriptExecutionsCount; ++i) { + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT 42 + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + ops.emplace(scriptExecutionOperation.Metadata().ExecutionId); + } + UNIT_ASSERT_VALUES_EQUAL(ops.size(), ScriptExecutionsCount); + + std::set<TString> listedOps; + ui64 listed = 0; + std::set<TString> rememberedOps = ops; + bool forgetNextOperation = true; + list = client.List<NYdb::NQuery::TScriptExecutionOperation>(ScriptExecutionsCount).ExtractValueSync(); + UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString()); + UNIT_ASSERT(list.GetList().size() == ScriptExecutionsCount); + for (const auto& op : list.GetList()) { + ++listed; + UNIT_ASSERT_C(listedOps.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId); + if (forgetNextOperation) { + auto status = client.Forget(op.Id()).ExtractValueSync(); + UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED || + status.GetStatus() == NYdb::EStatus::ABORTED, status.GetIssues().ToString()); + if (status.GetStatus() == NYdb::EStatus::SUCCESS) { + rememberedOps.erase(op.Metadata().ExecutionId); + } + } + forgetNextOperation = !forgetNextOperation; + } + UNIT_ASSERT_VALUES_EQUAL(listed, ScriptExecutionsCount); + UNIT_ASSERT_EQUAL(ops, listedOps); + + std::set<TString> listedOpsAfterForget; + list = client.List<NYdb::NQuery::TScriptExecutionOperation>(ScriptExecutionsCount, {}).ExtractValueSync(); + UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString()); + for (const auto& op : list.GetList()) { + UNIT_ASSERT_C(listedOpsAfterForget.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId); + } + + UNIT_ASSERT_EQUAL(rememberedOps, listedOpsAfterForget); + } + + Y_UNIT_TEST(ForgetScriptExecutionOnLongQuery) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + // TODO: really long query + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver()); + TStatus forgetStatus = {EStatus::STATUS_UNDEFINED, NYql::TIssues()}; + while (forgetStatus.GetStatus() != NYdb::EStatus::SUCCESS) { + forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED || + forgetStatus.GetStatus() == NYdb::EStatus::ABORTED, forgetStatus.GetIssues().ToString()); + } + UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS, forgetStatus.GetIssues().ToString()); + forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString()); + + } + + Y_UNIT_TEST(ForgetScriptExecutionRace) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT 42 + )").ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId); + + NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver()); + WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + + std::vector<NYdb::TAsyncStatus> forgetFutures(3); + for (auto& f : forgetFutures) { + f = opClient.Forget(scriptExecutionOperation.Id()); + } + + i32 successCount = 0; + for (auto& f : forgetFutures) { + auto forgetStatus = f.ExtractValueSync(); + UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND || + forgetStatus.GetStatus() == NYdb::EStatus::ABORTED, forgetStatus.GetIssues().ToString()); + if (forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS) { + ++successCount; + } + } + + UNIT_ASSERT(successCount == 1); + + auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); + auto forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(op.Status().GetStatus() == NYdb::EStatus::NOT_FOUND, op.Status().GetStatus()); + UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString()); + } + Y_UNIT_TEST(CancelScriptExecution) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); |