aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-06-23 17:46:07 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-06-23 17:46:07 +0300
commitaf207c571a5855b41675286f62e08a465e379041 (patch)
tree14115bf0b12d0719c6add7b324b9ef4f8edf9799
parenta3fb7cdc57dc655107ebfb8e4b5bcce882b60951 (diff)
downloadydb-af207c571a5855b41675286f62e08a465e379041.tar.gz
ForgetOperation для операций script execution
-rw-r--r--ydb/core/grpc_services/rpc_forget_operation.cpp33
-rw-r--r--ydb/core/kqp/common/events/script_executions.h22
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_proxy_service.cpp5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp119
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h1
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp114
7 files changed, 294 insertions, 2 deletions
diff --git a/ydb/core/grpc_services/rpc_forget_operation.cpp b/ydb/core/grpc_services/rpc_forget_operation.cpp
index 7cdd87c10c..c41c0f56a9 100644
--- a/ydb/core/grpc_services/rpc_forget_operation.cpp
+++ b/ydb/core/grpc_services/rpc_forget_operation.cpp
@@ -3,6 +3,9 @@
#include "rpc_operation_request_base.h"
#include <ydb/core/grpc_services/base/base.h>
+#include <ydb/core/kqp/common/events/script_executions.h>
+#include <ydb/core/kqp/common/kqp.h>
+#include <ydb/core/kqp/common/simple/services.h>
#include <ydb/core/tx/schemeshard/schemeshard_build_index.h>
#include <ydb/core/tx/schemeshard/schemeshard_export.h>
#include <ydb/core/tx/schemeshard/schemeshard_import.h>
@@ -31,6 +34,8 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
return "[ForgetImport]";
case TOperationId::BUILD_INDEX:
return "[ForgetIndexBuild]";
+ case TOperationId::SCRIPT_EXECUTION:
+ return "[ForgetScriptExecution]";
default:
return "[Untagged]";
}
@@ -49,6 +54,13 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
}
}
+ bool NeedAllocateTxId() const {
+ const Ydb::TOperationId::EKind kind = OperationId.GetKind();
+ return kind == TOperationId::EXPORT
+ || kind == TOperationId::IMPORT
+ || kind == TOperationId::BUILD_INDEX;
+ }
+
void Handle(TEvExport::TEvForgetExportResponse::TPtr& ev) {
const auto& record = ev->Get()->Record.GetResponse();
@@ -76,6 +88,18 @@ class TForgetOperationRPC: public TRpcOperationRequestActor<TForgetOperationRPC,
Reply(record.GetStatus(), record.GetIssues());
}
+ void Handle(NKqp::TEvForgetScriptExecutionOperationResponce::TPtr& ev) {
+ google::protobuf::RepeatedPtrField<Ydb::Issue::IssueMessage> issuesProto;
+ NYql::IssuesToMessage(ev->Get()->Issues, &issuesProto);
+ LOG_D("Handle NKqp::TEvForgetScriptExecutionOperationResponce responce"
+ << ": status# " << ev->Get()->Status);
+ Reply(ev->Get()->Status, issuesProto);
+ }
+
+ void SendForgetScriptExecutionOperation() {
+ Send(NKqp::MakeKqpProxyID(SelfId().NodeId()), new NKqp::TEvForgetScriptExecutionOperation(DatabaseName, OperationId));
+ }
+
public:
using TRpcOperationRequestActor::TRpcOperationRequestActor;
@@ -94,11 +118,15 @@ public:
}
break;
+ case TOperationId::SCRIPT_EXECUTION:
+ SendForgetScriptExecutionOperation();
+ break;
default:
return Reply(StatusIds::UNSUPPORTED, TIssuesIds::DEFAULT_ERROR, "Unknown operation kind");
}
-
- AllocateTxId();
+ if (NeedAllocateTxId()) {
+ AllocateTxId();
+ }
} catch (const yexception&) {
return Reply(StatusIds::BAD_REQUEST, TIssuesIds::DEFAULT_ERROR, "Invalid operation id");
}
@@ -111,6 +139,7 @@ public:
hFunc(TEvExport::TEvForgetExportResponse, Handle);
hFunc(TEvImport::TEvForgetImportResponse, Handle);
hFunc(TEvIndexBuilder::TEvForgetResponse, Handle);
+ hFunc(NKqp::TEvForgetScriptExecutionOperationResponce, Handle);
default:
return StateBase(ev);
}
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index f2bcba5ee6..6b3903ab3b 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -13,6 +13,28 @@
namespace NKikimr::NKqp {
+struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
+ explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
+ : Database(database)
+ , OperationId(id)
+ {
+ }
+
+ TString Database;
+ NOperationId::TOperationId OperationId;
+};
+
+struct TEvForgetScriptExecutionOperationResponce : public NActors::TEventLocal<TEvForgetScriptExecutionOperationResponce, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperationResponce> {
+ TEvForgetScriptExecutionOperationResponce(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(issues)
+ {
+ }
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScriptExecutionOperation, TKqpScriptExecutionEvents::EvGetScriptExecutionOperation> {
explicit TEvGetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 2f800dcb00..21d543fdd8 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -138,6 +138,8 @@ struct TKqpScriptExecutionEvents {
EvCancelScriptExecutionOperation,
EvCancelScriptExecutionOperationResponse,
EvScriptExecutionFinished,
+ EvForgetScriptExecutionOperation,
+ EvForgetScriptExecutionOperationResponce
};
};
diff --git a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
index 07b7585750..43325c40bb 100644
--- a/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_proxy_service.cpp
@@ -1162,6 +1162,7 @@ public:
hFunc(TEvKqp::TEvCreateSessionResponse, ForwardEvent);
hFunc(TEvPrivate::TEvCloseIdleSessions, Handle);
hFunc(TEvPrivate::TEvScriptExecutionsTablesCreationFinished, Handle);
+ hFunc(NKqp::TEvForgetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvGetScriptExecutionOperation, Handle);
hFunc(NKqp::TEvListScriptExecutionOperations, Handle);
hFunc(NKqp::TEvCancelScriptExecutionOperation, Handle);
@@ -1363,6 +1364,10 @@ private:
ScriptExecutionsCreationStatus = EScriptExecutionsCreationStatus::Finished;
}
+ void Handle(NKqp::TEvForgetScriptExecutionOperation::TPtr& ev) {
+ Register(CreateForgetScriptExecutionOperationActor(std::move(ev)));
+ }
+
void Handle(NKqp::TEvGetScriptExecutionOperation::TPtr& ev) {
Register(CreateGetScriptExecutionOperationActor(std::move(ev)));
}
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index a2405dff56..b05e3fb812 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -897,6 +897,121 @@ private:
NActors::TActorId RunScriptActorId;
};
+class TForgetScriptExecutionOperationActor : public TQueryBase {
+public:
+ explicit TForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev)
+ : Request(std::move(ev))
+ {
+ }
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ SELECT
+ operation_status,
+ execution_status
+ FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT
+ lease_deadline
+ FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ TMaybe<TString> maybeExecutionId = ScriptExecutionIdFromOperation(Request->Get()->OperationId);
+ Y_ENSURE(maybeExecutionId, "No execution id specified");
+ ExecutionId = *maybeExecutionId;
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Request->Get()->Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ SetQueryResultHandler(&TForgetScriptExecutionOperationActor::OnGetInfo);
+ }
+
+ void OnGetInfo() {
+ if (ResultSets.size() != 2) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::NOT_FOUND, "No such execution");
+ return;
+ }
+
+ result.TryNextRow();
+
+ const TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
+
+ TStringBuilder sql;
+ sql << R"(
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ DELETE
+ FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TResultSetParser deadlineResult(ResultSets[1]);
+ if (deadlineResult.RowsCount() != 0) {
+ deadlineResult.TryNextRow();
+ TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
+ if (!leaseDeadline) {
+ // existing row with empty lease???
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
+ return;
+ }
+ if (*leaseDeadline >= TInstant::Now()) {
+ if (!operationStatus) {
+ Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Operation is still running");
+ return;
+ }
+ }
+ sql << R"(
+ DELETE
+ FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ )";
+ }
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Request->Get()->Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
+ SetQueryResultHandler(&TForgetScriptExecutionOperationActor::OnForgetOperation);
+ }
+
+ void OnForgetOperation() {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ Send(Request->Sender, new TEvForgetScriptExecutionOperationResponce(status, std::move(issues)));
+ }
+
+private:
+ TEvForgetScriptExecutionOperation::TPtr Request;
+ TString ExecutionId;
+ NYql::TIssues Issues;
+};
+
class TGetScriptExecutionOperationActor : public TScriptExecutionFinisherBase {
public:
explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
@@ -1435,6 +1550,10 @@ NActors::IActor* CreateScriptExecutionFinisher(
return new TScriptExecutionFinisher(executionId, database, leaseGeneration, operationStatus, execStatus, std::move(issues));
}
+NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) {
+ return new TForgetScriptExecutionOperationActor(std::move(ev));
+}
+
NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) {
return new TGetScriptExecutionOperationActor(std::move(ev));
}
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index ed42dc2390..89dcc41743 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -20,6 +20,7 @@ NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev);
// Operation API impl.
+NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev);
NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev);
NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev);
NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev);
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index dc91345f06..ee2d91884b 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -332,6 +332,120 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
UNIT_ASSERT_EQUAL(ops, listedOps);
}
+ Y_UNIT_TEST(ForgetScriptExecution) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ NYdb::NOperation::TOperationClient client(kikimr.GetDriver());
+ auto list = client.List<NYdb::NQuery::TScriptExecutionOperation>(42).ExtractValueSync();
+ UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString());
+ UNIT_ASSERT(list.GetList().empty());
+
+ constexpr ui64 ScriptExecutionsCount = 100;
+ std::set<TString> ops;
+ for (ui64 i = 0; i < ScriptExecutionsCount; ++i) {
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT 42
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+ ops.emplace(scriptExecutionOperation.Metadata().ExecutionId);
+ }
+ UNIT_ASSERT_VALUES_EQUAL(ops.size(), ScriptExecutionsCount);
+
+ std::set<TString> listedOps;
+ ui64 listed = 0;
+ std::set<TString> rememberedOps = ops;
+ bool forgetNextOperation = true;
+ list = client.List<NYdb::NQuery::TScriptExecutionOperation>(ScriptExecutionsCount).ExtractValueSync();
+ UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString());
+ UNIT_ASSERT(list.GetList().size() == ScriptExecutionsCount);
+ for (const auto& op : list.GetList()) {
+ ++listed;
+ UNIT_ASSERT_C(listedOps.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId);
+ if (forgetNextOperation) {
+ auto status = client.Forget(op.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::SUCCESS || status.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED ||
+ status.GetStatus() == NYdb::EStatus::ABORTED, status.GetIssues().ToString());
+ if (status.GetStatus() == NYdb::EStatus::SUCCESS) {
+ rememberedOps.erase(op.Metadata().ExecutionId);
+ }
+ }
+ forgetNextOperation = !forgetNextOperation;
+ }
+ UNIT_ASSERT_VALUES_EQUAL(listed, ScriptExecutionsCount);
+ UNIT_ASSERT_EQUAL(ops, listedOps);
+
+ std::set<TString> listedOpsAfterForget;
+ list = client.List<NYdb::NQuery::TScriptExecutionOperation>(ScriptExecutionsCount, {}).ExtractValueSync();
+ UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString());
+ for (const auto& op : list.GetList()) {
+ UNIT_ASSERT_C(listedOpsAfterForget.emplace(op.Metadata().ExecutionId).second, op.Metadata().ExecutionId);
+ }
+
+ UNIT_ASSERT_EQUAL(rememberedOps, listedOpsAfterForget);
+ }
+
+ Y_UNIT_TEST(ForgetScriptExecutionOnLongQuery) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+ // TODO: really long query
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver());
+ TStatus forgetStatus = {EStatus::STATUS_UNDEFINED, NYql::TIssues()};
+ while (forgetStatus.GetStatus() != NYdb::EStatus::SUCCESS) {
+ forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::PRECONDITION_FAILED ||
+ forgetStatus.GetStatus() == NYdb::EStatus::ABORTED, forgetStatus.GetIssues().ToString());
+ }
+ UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS, forgetStatus.GetIssues().ToString());
+ forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString());
+
+ }
+
+ Y_UNIT_TEST(ForgetScriptExecutionRace) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT 42
+ )").ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+ UNIT_ASSERT(scriptExecutionOperation.Metadata().ExecutionId);
+
+ NYdb::NOperation::TOperationClient opClient(kikimr.GetDriver());
+ WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+
+ std::vector<NYdb::TAsyncStatus> forgetFutures(3);
+ for (auto& f : forgetFutures) {
+ f = opClient.Forget(scriptExecutionOperation.Id());
+ }
+
+ i32 successCount = 0;
+ for (auto& f : forgetFutures) {
+ auto forgetStatus = f.ExtractValueSync();
+ UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS || forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND ||
+ forgetStatus.GetStatus() == NYdb::EStatus::ABORTED, forgetStatus.GetIssues().ToString());
+ if (forgetStatus.GetStatus() == NYdb::EStatus::SUCCESS) {
+ ++successCount;
+ }
+ }
+
+ UNIT_ASSERT(successCount == 1);
+
+ auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
+ auto forgetStatus = opClient.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(op.Status().GetStatus() == NYdb::EStatus::NOT_FOUND, op.Status().GetStatus());
+ UNIT_ASSERT_C(forgetStatus.GetStatus() == NYdb::EStatus::NOT_FOUND, forgetStatus.GetIssues().ToString());
+ }
+
Y_UNIT_TEST(CancelScriptExecution) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();