diff options
author | andrewproni <andrewproni@yandex-team.com> | 2023-07-11 19:58:05 +0300 |
---|---|---|
committer | andrewproni <andrewproni@yandex-team.com> | 2023-07-11 19:58:05 +0300 |
commit | 4ea04c90b14d468bcdae5f98f2e1692f65285129 (patch) | |
tree | 3a1f03103e4ca67a4e07ee39a31fe6d701606c47 | |
parent | d37dee05b1b5e7f062a73c35e04503aa62d372d5 (diff) | |
download | ydb-4ea04c90b14d468bcdae5f98f2e1692f65285129.tar.gz |
script/execution/operation/results TTL
-rw-r--r-- | ydb/core/grpc_services/query/rpc_execute_script.cpp | 15 | ||||
-rw-r--r-- | ydb/core/kqp/common/events/events.h | 2 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions.cpp | 223 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions_impl.h | 3 | ||||
-rw-r--r-- | ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp | 6 | ||||
-rw-r--r-- | ydb/core/kqp/ut/service/kqp_query_service_ut.cpp | 136 | ||||
-rw-r--r-- | ydb/core/protos/kqp.proto | 7 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/client.cpp | 2 | ||||
-rw-r--r-- | ydb/public/sdk/cpp/client/ydb_query/query.h | 1 |
9 files changed, 347 insertions, 48 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp index 4f5c899395..091d70b9e9 100644 --- a/ydb/core/grpc_services/query/rpc_execute_script.cpp +++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp @@ -75,6 +75,9 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest( kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write(); kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true); + kqpRequest.MutableRequest()->SetCancelAfterMs(GetDuration(req.operation_params().cancel_after()).MilliSeconds()); + kqpRequest.MutableRequest()->SetTimeoutMs(GetDuration(req.operation_params().operation_timeout()).MilliSeconds()); + NYql::TIssues issues; if (!FillQueryContent(req, kqpRequest, issues)) { return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)}; @@ -96,10 +99,6 @@ public: void Bootstrap() { NYql::TIssues issues; const auto& request = *Request_->GetProtoRequest(); - if (request.operation_params().has_forget_after() && request.operation_params().operation_mode() != Ydb::Operations::OperationParams::SYNC) { - issues.AddIssue("forget_after is not supported for this type of operation"); - return Reply(Ydb::StatusIds::UNSUPPORTED, issues); - } if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) { issues.AddIssue("ExecuteScript must be asyncronous operation"); @@ -149,6 +148,14 @@ private: ev->Record.SetTraceId(traceId.GetRef()); } + if (req->operation_params().has_forget_after()) { + ev->ForgetAfter = GetDuration(req->operation_params().forget_after()); + } + + if (req->has_results_ttl()) { + ev->ResultsTtl = GetDuration(req->results_ttl()); + } + std::tie(status, issues) = FillKqpRequest(*req, ev->Record); if (status != Ydb::StatusIds::SUCCESS) { return nullptr; diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h index a605372921..26cf8f50b3 100644 --- a/ydb/core/kqp/common/events/events.h +++ b/ydb/core/kqp/common/events/events.h @@ -93,6 +93,8 @@ struct TEvKqp { TEvScriptRequest() = default; mutable NKikimrKqp::TEvQueryRequest Record; + TDuration ForgetAfter; + TDuration ResultsTtl; }; struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index e27b4bbe77..e33bb540b0 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -47,6 +47,10 @@ namespace { #define KQP_PROXY_LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream) constexpr TDuration LEASE_DURATION = TDuration::Seconds(30); +constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20); +constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60); +constexpr TDuration DEFAULT_OPERATION_TTL = TDuration::Days(1); +constexpr TDuration DEFAULT_RESULTS_TTL = TDuration::Days(1); TString SerializeIssues(const NYql::TIssues& issues) { NYql::TIssue root; @@ -82,10 +86,12 @@ public: class TTableCreator : public NActors::TActorBootstrapped<TTableCreator> { public: - TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns) + TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns, + TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing()) : PathComponents(std::move(pathComponents)) , Columns(std::move(columns)) , KeyColumns(std::move(keyColumns)) + , TtlSettings(std::move(ttlSettings)) { Y_VERIFY(!PathComponents.empty()); Y_VERIFY(!Columns.empty()); @@ -147,6 +153,9 @@ public: for (const NKikimrSchemeOp::TColumnDescription& col : Columns) { *tableDesc.AddColumns() = col; } + if (TtlSettings) { + tableDesc.MutableTTLSettings()->CopyFrom(*TtlSettings); + } Send(MakeTxProxyID(), std::move(request)); } @@ -299,6 +308,7 @@ private: const TVector<TString> PathComponents; const TVector<NKikimrSchemeOp::TColumnDescription> Columns; const TVector<TString> KeyColumns; + const TMaybe<NKikimrSchemeOp::TTTLSettings> TtlSettings; NActors::TActorId Owner; NActors::TActorId SchemePipeActorId; }; @@ -334,6 +344,14 @@ private: return Col(columnName, NScheme::TypeName(columnType)); } + static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) { + NKikimrSchemeOp::TTTLSettings settings; + settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds()); + settings.MutableEnabled()->SetColumnName(columnName); + settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds()); + return settings; + } + void RunCreateScriptExecutions() { TablesCreating++; Register( @@ -353,13 +371,14 @@ private: Col("ast", NScheme::NTypeIds::Text), Col("issues", NScheme::NTypeIds::JsonDocument), Col("plan", NScheme::NTypeIds::JsonDocument), - Col("store_deadline", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. Col("meta", NScheme::NTypeIds::JsonDocument), Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage. Col("result_set_metas", NScheme::NTypeIds::JsonDocument), Col("stats", NScheme::NTypeIds::JsonDocument), + Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. }, - { "database", "execution_id" } + { "database", "execution_id" }, + TtlCol("expire_at") ) ); } @@ -393,7 +412,8 @@ private: Col("expire_at", NScheme::NTypeIds::Timestamp), Col("result_set", NScheme::NTypeIds::String), }, - { "database", "execution_id", "result_set_id", "row_id" } + { "database", "execution_id", "result_set_id", "row_id" }, + TtlCol("expire_at") ) ); } @@ -443,10 +463,12 @@ Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) { class TCreateScriptOperationQuery : public TQueryBase { public: - TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero()) + TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero()) : ExecutionId(executionId) , RunScriptActorId(runScriptActorId) , Request(req) + , OperationTtl(operationTtl) + , ResultsTtl(resultsTtl) , LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION) { } @@ -461,17 +483,22 @@ public: DECLARE $execution_mode AS Int32; DECLARE $query_text AS Text; DECLARE $syntax AS Int32; + DECLARE $meta AS JsonDocument; DECLARE $lease_duration AS Interval; UPSERT INTO `.metadata/script_executions` - (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax) - VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax); + (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta) + VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta); UPSERT INTO `.metadata/script_execution_leases` (database, execution_id, lease_deadline, lease_generation) VALUES ($database, $execution_id, CurrentUtcTimestamp() + $lease_duration, 1); )"; + NKikimrKqp::TScriptExecutionOperationMeta meta; + SetDuration(OperationTtl, *meta.MutableOperationTtl()); + SetDuration(ResultsTtl, *meta.MutableResultsTtl()); + NYdb::TParamsBuilder params; params .AddParam("$database") @@ -495,6 +522,9 @@ public: .AddParam("$syntax") .Int32(Request.GetRequest().GetSyntax()) .Build() + .AddParam("$meta") + .JsonDocument(NProtobufJson::Proto2Json(meta, NProtobufJson::TProto2JsonConfig())) + .Build() .AddParam("$lease_duration") .Interval(static_cast<i64>(LeaseDuration.MicroSeconds())) .Build(); @@ -519,7 +549,9 @@ private: const TString ExecutionId; const NActors::TActorId RunScriptActorId; NKikimrKqp::TEvQueryRequest Request; - TDuration LeaseDuration; + const TDuration OperationTtl; + const TDuration ResultsTtl; + const TDuration LeaseDuration; }; struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> { @@ -533,10 +565,12 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec Become(&TCreateScriptExecutionActor::StateFunc); ExecutionId = CreateGuidAsString(); - + auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : DEFAULT_OPERATION_TTL; + auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : DEFAULT_RESULTS_TTL; + resultsTtl = Min(operationTtl, resultsTtl); // Start request RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration)); - Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, LeaseDuration)); + Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration)); } void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) { @@ -739,7 +773,7 @@ public: using TQueryBase::TQueryBase; void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, - const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(), + TDuration operationTtl, TDuration resultsTtl, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(), TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) { TString sql = R"( -- TScriptExecutionFinisherBase::FinishScriptExecution @@ -751,6 +785,8 @@ public: DECLARE $plan AS JsonDocument; DECLARE $stats AS JsonDocument; DECLARE $ast AS Text; + DECLARE $operation_ttl AS Interval; + DECLARE $results_ttl AS Interval; UPDATE `.metadata/script_executions` SET @@ -760,11 +796,17 @@ public: plan = $plan, end_ts = CurrentUtcTimestamp(), stats = $stats, - ast = $ast + ast = $ast, + expire_at = CurrentUtcTimestamp() + $operation_ttl WHERE database = $database AND execution_id = $execution_id; DELETE FROM `.metadata/script_execution_leases` WHERE database = $database AND execution_id = $execution_id; + + UPDATE `.metadata/result_sets` + SET + expire_at = CurrentUtcTimestamp() + $results_ttl + where database = $database AND execution_id = $execution_id; )"; TString serializedStats = "{}"; @@ -801,13 +843,21 @@ public: .Build() .AddParam("$ast") .Utf8(queryAst.GetOrElse("")) + .Build() + .AddParam("$operation_ttl") + .Interval(static_cast<i64>(operationTtl.MicroSeconds())) + .Build() + .AddParam("$results_ttl") + .Interval(static_cast<i64>(resultsTtl.MicroSeconds())) .Build(); RunDataQuery(sql, ¶ms, txControl); } - void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) { - FinishScriptExecution(database, executionId, operationStatus, execStatus, IssuesFromMessage(message), txControl); + void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, + TDuration operationTtl, TDuration resultsTtl, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx(), + TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) { + FinishScriptExecution(database, executionId, operationStatus, execStatus, operationTtl, resultsTtl, IssuesFromMessage(message), txControl, kqpStats, queryPlan, queryAst); } static NYql::TIssues IssuesFromMessage(const TString& message) { @@ -821,6 +871,16 @@ public: } }; +TMaybe<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) { + NKikimrKqp::TScriptExecutionOperationMeta meta; + try { + NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig()); + return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl())); + } catch (NJson::TJsonException &e) { + return Nothing(); + } +} + class TScriptExecutionFinisher : public TScriptExecutionFinisherBase { public: TScriptExecutionFinisher( @@ -854,6 +914,9 @@ public: SELECT lease_generation FROM `.metadata/script_execution_leases` WHERE database = $database AND execution_id = $execution_id; + + SELECT meta FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; )"; NYdb::TParamsBuilder params; @@ -870,7 +933,7 @@ public: void OnQueryResult() override { if (!FinishWasRun) { - if (ResultSets.size() != 1) { + if (ResultSets.size() != 2) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); return; } @@ -893,7 +956,24 @@ public: return; } - FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst)); + NYdb::TResultSetParser metaResult(ResultSets[1]); + metaResult.TryNextRow(); + + const auto serializedMeta = metaResult.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation"); + return; + } + + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + + const auto [operationTtl, resultsTtl] = *ttl; + FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, operationTtl, resultsTtl, + Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst)); FinishWasRun = true; } else { Finish(); @@ -934,8 +1014,9 @@ public: DECLARE $database AS Text; DECLARE $execution_id AS Text; - SELECT operation_status, execution_status, issues, run_script_actor_id FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; + SELECT operation_status, execution_status, issues, run_script_actor_id, meta FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id AND + (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); SELECT lease_deadline FROM `.metadata/script_execution_leases` WHERE database = $database AND execution_id = $execution_id; @@ -961,7 +1042,7 @@ public: } NYdb::TResultSetParser result(ResultSets[0]); if (result.RowsCount() == 0) { - Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); + Finish(Ydb::StatusIds::NOT_FOUND, "No such execution"); return; } @@ -992,7 +1073,18 @@ public: if (operationStatus) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); } else if (*leaseDeadline < RunStartTime) { - FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED); + auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation"); + return; + } + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + const auto [operationTtl, resultsTtl] = *ttl; + FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl); SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution); } else { // OperationStatus is Nothing(): currently running @@ -1058,7 +1150,8 @@ public: operation_status, execution_status FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; + WHERE database = $database AND execution_id = $execution_id AND + (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); SELECT lease_deadline @@ -1106,6 +1199,11 @@ public: DELETE FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; + + DELETE + FROM `.metadata/result_sets` + WHERE database = $database AND execution_id = $execution_id; + )"; NYdb::TResultSetParser deadlineResult(ResultSets[1]); @@ -1167,7 +1265,7 @@ public: , StartActorTime(TInstant::Now()) {} - bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult) { + bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult, TDuration operationTtl, TDuration resultsTtl) { deadlineResult.TryNextRow(); TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); @@ -1180,7 +1278,7 @@ public: LeaseExpired = true; Ready = true; - FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, Issues); + FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); SetQueryResultHandler(&TGetScriptExecutionOperationBase::OnFinishOperation); return false; } @@ -1226,7 +1324,7 @@ public: DECLARE $database AS Text; DECLARE $execution_id AS Text; - SELECT execution_status + SELECT execution_status, meta FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id; @@ -1266,10 +1364,23 @@ public: Metadata.set_exec_status(static_cast<Ydb::Query::ExecStatus>(*executionStatus)); } + auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); + return; + } + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + + const auto [operationTtl, resultsTtl] = *ttl; + NYdb::TResultSetParser deadlineResult(ResultSets[1]); if (deadlineResult.RowsCount() == 0) { Ready = true; - } else if (!CheakLeaseDeadline(deadlineResult)) { + } else if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) { return; } @@ -1308,9 +1419,11 @@ public: plan, issues, stats, - ast + ast, + meta FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; + WHERE database = $database AND execution_id = $execution_id AND + (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); SELECT lease_deadline @@ -1420,9 +1533,23 @@ public: return; } - if (!CheakLeaseDeadline(deadlineResult)) { + auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); + return; + } + + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); return; } + + const auto [operationTtl, resultsTtl] = *ttl; + if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) { + return; + } + } CommitTransaction(); @@ -1484,7 +1611,7 @@ public: execution_mode, issues FROM `.metadata/script_executions` - WHERE database = $database + WHERE database = $database AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL) )"; if (PageToken) { sql << R"( @@ -1994,10 +2121,11 @@ public: DECLARE $offset AS Int64; DECLARE $limit AS Uint64; - SELECT result_set_metas, operation_status, issues + SELECT result_set_metas, operation_status, issues, end_ts, meta FROM `.metadata/script_executions` - WHERE database = $database - AND execution_id = $execution_id; + WHERE database = $database + AND execution_id = $execution_id + AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); SELECT row_id, result_set FROM `.metadata/result_sets` @@ -2040,7 +2168,7 @@ public: NYdb::TResultSetParser result(ResultSets[0]); if (!result.TryNextRow()) { - Finish(Ydb::StatusIds::BAD_REQUEST, "Script execution not found"); + Finish(Ydb::StatusIds::NOT_FOUND, "Script execution not found"); return; } @@ -2050,6 +2178,29 @@ public: return; } + const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); + return; + } + + const auto endTs = result.ColumnParser("end_ts").GetOptionalTimestamp(); + if (!endTs) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation end timestamp"); + return; + } + + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + const auto [_, resultsTtl] = *ttl; + if ((*endTs + resultsTtl) < TInstant::Now()){ + Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired"); + return; + } + Ydb::StatusIds::StatusCode operationStatusCode = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); if (operationStatusCode != Ydb::StatusIds::SUCCESS) { const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument(); @@ -2078,7 +2229,7 @@ public: Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid"); return; } - + Ydb::Query::ResultSetMeta meta; NProtobufJson::Json2Proto(*metaValue, meta); @@ -2217,8 +2368,8 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re namespace NPrivate { -NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) { - return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, leaseDuration); +NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) { + return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, operationTtl, resultsTtl, leaseDuration); } NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease, ui64 cookie) { diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h index 1846a5454a..d5efc733a2 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h @@ -70,7 +70,8 @@ struct TEvPrivate { // Writes new script into db. // If lease duration is zero, default one will be taken. -NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero()); +NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, + TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero()); // Checks lease of execution, finishes execution if its lease is off, returns current status NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0); diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp index cb997c85fc..1adc650798 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp @@ -17,6 +17,8 @@ using namespace NSchemeShard; namespace { constexpr TDuration TestLeaseDuration = TDuration::Seconds(1); +constexpr TDuration TestOperationTtl = TDuration::Minutes(1); +constexpr TDuration TestResultsTtl = TDuration::Minutes(1); const TString TestDatabase = "test_db"; struct TScriptExecutionsYdbSetup { @@ -82,7 +84,7 @@ struct TScriptExecutionsYdbSetup { } // Creates query in db. Returns execution id - TString CreateQueryInDb(const TString& query = "SELECT 42", TDuration leaseDuration = TestLeaseDuration) { + TString CreateQueryInDb(const TString& query = "SELECT 42", TDuration leaseDuration = TestLeaseDuration, TDuration operationTtl = TestOperationTtl, TDuration resultsTtl = TestResultsTtl) { TString executionId = CreateGuidAsString(); NKikimrKqp::TEvQueryRequest req; req.MutableRequest()->SetDatabase(TestDatabase); @@ -90,7 +92,7 @@ struct TScriptExecutionsYdbSetup { req.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE); const ui32 node = 0; TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node); - GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor); + GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, operationTtl, resultsTtl, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor); auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvCreateScriptOperationResponse>(edgeActor); UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp index 550ca5c8c8..ca7714f20f 100644 --- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp +++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp @@ -779,7 +779,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString()); UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString()); - UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled); + UNIT_ASSERT_C(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled, op.Status().GetIssues().ToString()); UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId); UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED); @@ -826,14 +826,142 @@ Y_UNIT_TEST_SUITE(KqpQueryService) { } } - Y_UNIT_TEST(ExecuteScriptFailsWithForgetAfter) { + NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionFail(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) { + NYdb::NOperation::TOperationClient client(ydbDriver); + NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op; + do { + if (!op.Initialized()) { + Sleep(TDuration::MilliSeconds(10)); + } + op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId); + } while (op.GetValueSync().Status().IsSuccess()); + return op.GetValueSync(); + } + + void ExpectExecStatus(EExecStatus status, const TScriptExecutionOperation op, const NYdb::TDriver& ydbDriver) { + auto readyOp = WaitScriptExecutionOperation(op.Id(), ydbDriver); + UNIT_ASSERT_C(readyOp.Status().IsSuccess(), readyOp.Status().GetIssues().ToString()); + UNIT_ASSERT_C(readyOp.Ready(), readyOp.Status().GetIssues().ToString()); + UNIT_ASSERT(readyOp.Metadata().ExecStatus == status); + UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, op.Metadata().ExecutionId); + } + + void ExecuteScriptWithSettings(const TExecuteScriptSettings& settings, EExecStatus status, TString query = "SELECT 1;") { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto op = db.ExecuteScript(query, settings).ExtractValueSync(); + ExpectExecStatus(status, op, kikimr.GetDriver()); + } + + Y_UNIT_TEST(ExecuteScriptWithCancelAfter) { + TString query = R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )"; + auto settings = TExecuteScriptSettings().CancelAfter(TDuration::MilliSeconds(1)); + ExecuteScriptWithSettings(settings, EExecStatus::Canceled, query); + + settings = TExecuteScriptSettings().CancelAfter(TDuration::Seconds(10)); + ExecuteScriptWithSettings(settings, EExecStatus::Completed); + } + + Y_UNIT_TEST(ExecuteScriptWithTimeout) { + TString query = R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )"; + auto settings = TExecuteScriptSettings().OperationTimeout(TDuration::MilliSeconds(1)); + ExecuteScriptWithSettings(settings, EExecStatus::Failed, query); + + settings = TExecuteScriptSettings().OperationTimeout(TDuration::Seconds(100)); + ExecuteScriptWithSettings(settings, EExecStatus::Completed); + } + + Y_UNIT_TEST(ExecuteScriptWithCancelAfterAndTimeout) { + TString query = R"( + SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key; + SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key; + )"; + auto settings = TExecuteScriptSettings().CancelAfterWithTimeout(TDuration::MilliSeconds(1), TDuration::Seconds(100)); + ExecuteScriptWithSettings(settings, EExecStatus::Canceled, query); + + settings = TExecuteScriptSettings().CancelAfterWithTimeout(TDuration::Seconds(100), TDuration::MilliSeconds(1)); + ExecuteScriptWithSettings(settings, EExecStatus::Failed, query); + } + + void CheckScriptOperationExpires(const TExecuteScriptSettings &settings) { auto kikimr = DefaultKikimrRunner(); auto db = kikimr.GetQueryClient(); auto scriptExecutionOperation = db.ExecuteScript(R"( SELECT 42 - )", NYdb::NQuery::TExecuteScriptSettings().ForgetAfter(TDuration::Days(1))).ExtractValueSync(); - UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::UNSUPPORTED, scriptExecutionOperation.Status().GetIssues().ToString()); + )", settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + + auto readyOp = WaitScriptExecutionFail(scriptExecutionOperation.Id(), kikimr.GetDriver()); + UNIT_ASSERT_C(readyOp.Status().GetStatus() == EStatus::NOT_FOUND, readyOp.Status().GetStatus() << ":" << readyOp.Status().GetIssues().ToString()); + + NOperation::TOperationClient client(kikimr.GetDriver()); + + auto list = client.List<NYdb::NQuery::TScriptExecutionOperation>(100).ExtractValueSync(); + UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString()); + UNIT_ASSERT(list.GetList().size() == 0); + + auto status = client.Forget(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::NOT_FOUND, status.GetIssues().ToString()); + + status = client.Cancel(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::NOT_FOUND, status.GetIssues().ToString()); + + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync(); + UNIT_ASSERT_C(results.GetStatus() == NYdb::EStatus::NOT_FOUND, results.GetIssues().ToString()); + } + + Y_UNIT_TEST(ExecuteScriptWithForgetAfter) { + CheckScriptOperationExpires(TExecuteScriptSettings().ForgetAfter(TDuration::MilliSeconds(50))); + } + + void CheckScriptResultsExpire(const TExecuteScriptSettings& settings) { + auto kikimr = DefaultKikimrRunner(); + auto db = kikimr.GetQueryClient(); + + auto scriptExecutionOperation = db.ExecuteScript(R"( + SELECT 42 + )", settings).ExtractValueSync(); + UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString()); + + WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver()); + TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync(); + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + TResultSetParser resultSet(results.ExtractResultSet()); + UNIT_ASSERT_C(resultSet.TryNextRow(), results.GetIssues().ToString()); + UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42); + + while (results.GetStatus() != NYdb::EStatus::NOT_FOUND) { + UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString()); + results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync(); + Sleep(TDuration::MilliSeconds(10)); + } + + UNIT_ASSERT_C(results.GetStatus() == NYdb::EStatus::NOT_FOUND, results.GetIssues().ToString()); + NOperation::TOperationClient client(kikimr.GetDriver()); + auto op = client.Get<TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync(); + UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString()); + } + + Y_UNIT_TEST(ExecuteScriptWithResultsTtl) { + CheckScriptResultsExpire(TExecuteScriptSettings().ResultsTtl(TDuration::MilliSeconds(2000))); + } + + Y_UNIT_TEST(ExecuteScriptWithResultsTtlAndForgetAfter) { + + auto settings = TExecuteScriptSettings().ResultsTtl(TDuration::MilliSeconds(2000)).ForgetAfter(TDuration::Seconds(30)); + CheckScriptResultsExpire(settings); + + settings = TExecuteScriptSettings().ResultsTtl(TDuration::Minutes(1)).ForgetAfter(TDuration::MilliSeconds(500)); + CheckScriptOperationExpires(settings); } TScriptExecutionOperation CreateScriptExecutionOperation(size_t numberOfRows, NYdb::NQuery::TQueryClient& db, const NYdb::TDriver& ydbDriver) { diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto index dc2d84b1dc..19b6cbe08a 100644 --- a/ydb/core/protos/kqp.proto +++ b/ydb/core/protos/kqp.proto @@ -2,6 +2,7 @@ package NKikimrKqp; option java_package = "ru.yandex.kikimr.proto"; option cc_enable_arenas = true; +import "google/protobuf/duration.proto"; import "library/cpp/actors/protos/actors.proto"; import "ydb/core/protos/type_info.proto"; import "ydb/core/protos/kqp_physical.proto"; @@ -665,3 +666,9 @@ message TEvCancelScriptExecutionResponse { optional Ydb.StatusIds.StatusCode Status = 1; repeated Ydb.Issue.IssueMessage Issues = 2; } + +// stored in column "meta" of .metadata/script_executions table +message TScriptExecutionOperationMeta { + optional google.protobuf.Duration OperationTtl = 1; + optional google.protobuf.Duration ResultsTtl = 2; +}
\ No newline at end of file diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp index e502b69fe1..40f2d211b1 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp +++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp @@ -41,7 +41,7 @@ public: request.set_stats_mode(settings.StatsMode_); request.mutable_script_content()->set_syntax(settings.Syntax_); request.mutable_script_content()->set_text(script); - + SetDuration(settings.ResultsTtl_, *request.mutable_results_ttl()); auto promise = NThreading::NewPromise<TScriptExecutionOperation>(); auto responseCb = [promise] diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.h b/ydb/public/sdk/cpp/client/ydb_query/query.h index 23a5d37504..f58459afdc 100644 --- a/ydb/public/sdk/cpp/client/ydb_query/query.h +++ b/ydb/public/sdk/cpp/client/ydb_query/query.h @@ -130,6 +130,7 @@ struct TExecuteScriptSettings : public TOperationRequestSettings<TExecuteScriptS FLUENT_SETTING_DEFAULT(Ydb::Query::Syntax, Syntax, Ydb::Query::SYNTAX_YQL_V1); FLUENT_SETTING_DEFAULT(Ydb::Query::ExecMode, ExecMode, Ydb::Query::EXEC_MODE_EXECUTE); FLUENT_SETTING_DEFAULT(Ydb::Query::StatsMode, StatsMode, Ydb::Query::STATS_MODE_NONE); + FLUENT_SETTING(TDuration, ResultsTtl); }; class TVersionedScriptId { |