aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorandrewproni <andrewproni@yandex-team.com>2023-07-11 19:58:05 +0300
committerandrewproni <andrewproni@yandex-team.com>2023-07-11 19:58:05 +0300
commit4ea04c90b14d468bcdae5f98f2e1692f65285129 (patch)
tree3a1f03103e4ca67a4e07ee39a31fe6d701606c47
parentd37dee05b1b5e7f062a73c35e04503aa62d372d5 (diff)
downloadydb-4ea04c90b14d468bcdae5f98f2e1692f65285129.tar.gz
script/execution/operation/results TTL
-rw-r--r--ydb/core/grpc_services/query/rpc_execute_script.cpp15
-rw-r--r--ydb/core/kqp/common/events/events.h2
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp223
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_impl.h3
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp6
-rw-r--r--ydb/core/kqp/ut/service/kqp_query_service_ut.cpp136
-rw-r--r--ydb/core/protos/kqp.proto7
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/client.cpp2
-rw-r--r--ydb/public/sdk/cpp/client/ydb_query/query.h1
9 files changed, 347 insertions, 48 deletions
diff --git a/ydb/core/grpc_services/query/rpc_execute_script.cpp b/ydb/core/grpc_services/query/rpc_execute_script.cpp
index 4f5c899395..091d70b9e9 100644
--- a/ydb/core/grpc_services/query/rpc_execute_script.cpp
+++ b/ydb/core/grpc_services/query/rpc_execute_script.cpp
@@ -75,6 +75,9 @@ std::tuple<Ydb::StatusIds::StatusCode, NYql::TIssues> FillKqpRequest(
kqpRequest.MutableRequest()->MutableTxControl()->mutable_begin_tx()->mutable_serializable_read_write();
kqpRequest.MutableRequest()->MutableTxControl()->set_commit_tx(true);
+ kqpRequest.MutableRequest()->SetCancelAfterMs(GetDuration(req.operation_params().cancel_after()).MilliSeconds());
+ kqpRequest.MutableRequest()->SetTimeoutMs(GetDuration(req.operation_params().operation_timeout()).MilliSeconds());
+
NYql::TIssues issues;
if (!FillQueryContent(req, kqpRequest, issues)) {
return {Ydb::StatusIds::BAD_REQUEST, std::move(issues)};
@@ -96,10 +99,6 @@ public:
void Bootstrap() {
NYql::TIssues issues;
const auto& request = *Request_->GetProtoRequest();
- if (request.operation_params().has_forget_after() && request.operation_params().operation_mode() != Ydb::Operations::OperationParams::SYNC) {
- issues.AddIssue("forget_after is not supported for this type of operation");
- return Reply(Ydb::StatusIds::UNSUPPORTED, issues);
- }
if (request.operation_params().operation_mode() == Ydb::Operations::OperationParams::SYNC) {
issues.AddIssue("ExecuteScript must be asyncronous operation");
@@ -149,6 +148,14 @@ private:
ev->Record.SetTraceId(traceId.GetRef());
}
+ if (req->operation_params().has_forget_after()) {
+ ev->ForgetAfter = GetDuration(req->operation_params().forget_after());
+ }
+
+ if (req->has_results_ttl()) {
+ ev->ResultsTtl = GetDuration(req->results_ttl());
+ }
+
std::tie(status, issues) = FillKqpRequest(*req, ev->Record);
if (status != Ydb::StatusIds::SUCCESS) {
return nullptr;
diff --git a/ydb/core/kqp/common/events/events.h b/ydb/core/kqp/common/events/events.h
index a605372921..26cf8f50b3 100644
--- a/ydb/core/kqp/common/events/events.h
+++ b/ydb/core/kqp/common/events/events.h
@@ -93,6 +93,8 @@ struct TEvKqp {
TEvScriptRequest() = default;
mutable NKikimrKqp::TEvQueryRequest Record;
+ TDuration ForgetAfter;
+ TDuration ResultsTtl;
};
struct TEvScriptResponse : public TEventLocal<TEvScriptResponse, TKqpEvents::EvScriptResponse> {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index e27b4bbe77..e33bb540b0 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -47,6 +47,10 @@ namespace {
#define KQP_PROXY_LOG_C(stream) LOG_CRIT_S(*TlsActivationContext, NKikimrServices::KQP_PROXY, stream)
constexpr TDuration LEASE_DURATION = TDuration::Seconds(30);
+constexpr TDuration DEADLINE_OFFSET = TDuration::Minutes(20);
+constexpr TDuration BRO_RUN_INTERVAL = TDuration::Minutes(60);
+constexpr TDuration DEFAULT_OPERATION_TTL = TDuration::Days(1);
+constexpr TDuration DEFAULT_RESULTS_TTL = TDuration::Days(1);
TString SerializeIssues(const NYql::TIssues& issues) {
NYql::TIssue root;
@@ -82,10 +86,12 @@ public:
class TTableCreator : public NActors::TActorBootstrapped<TTableCreator> {
public:
- TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns)
+ TTableCreator(TVector<TString> pathComponents, TVector<NKikimrSchemeOp::TColumnDescription> columns, TVector<TString> keyColumns,
+ TMaybe<NKikimrSchemeOp::TTTLSettings> ttlSettings = Nothing())
: PathComponents(std::move(pathComponents))
, Columns(std::move(columns))
, KeyColumns(std::move(keyColumns))
+ , TtlSettings(std::move(ttlSettings))
{
Y_VERIFY(!PathComponents.empty());
Y_VERIFY(!Columns.empty());
@@ -147,6 +153,9 @@ public:
for (const NKikimrSchemeOp::TColumnDescription& col : Columns) {
*tableDesc.AddColumns() = col;
}
+ if (TtlSettings) {
+ tableDesc.MutableTTLSettings()->CopyFrom(*TtlSettings);
+ }
Send(MakeTxProxyID(), std::move(request));
}
@@ -299,6 +308,7 @@ private:
const TVector<TString> PathComponents;
const TVector<NKikimrSchemeOp::TColumnDescription> Columns;
const TVector<TString> KeyColumns;
+ const TMaybe<NKikimrSchemeOp::TTTLSettings> TtlSettings;
NActors::TActorId Owner;
NActors::TActorId SchemePipeActorId;
};
@@ -334,6 +344,14 @@ private:
return Col(columnName, NScheme::TypeName(columnType));
}
+ static NKikimrSchemeOp::TTTLSettings TtlCol(const TString& columnName) {
+ NKikimrSchemeOp::TTTLSettings settings;
+ settings.MutableEnabled()->SetExpireAfterSeconds(DEADLINE_OFFSET.Seconds());
+ settings.MutableEnabled()->SetColumnName(columnName);
+ settings.MutableEnabled()->MutableSysSettings()->SetRunInterval(BRO_RUN_INTERVAL.MicroSeconds());
+ return settings;
+ }
+
void RunCreateScriptExecutions() {
TablesCreating++;
Register(
@@ -353,13 +371,14 @@ private:
Col("ast", NScheme::NTypeIds::Text),
Col("issues", NScheme::NTypeIds::JsonDocument),
Col("plan", NScheme::NTypeIds::JsonDocument),
- Col("store_deadline", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
Col("meta", NScheme::NTypeIds::JsonDocument),
Col("parameters", NScheme::NTypeIds::String), // TODO: store aparameters separately to support bigger storage.
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
+ Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
},
- { "database", "execution_id" }
+ { "database", "execution_id" },
+ TtlCol("expire_at")
)
);
}
@@ -393,7 +412,8 @@ private:
Col("expire_at", NScheme::NTypeIds::Timestamp),
Col("result_set", NScheme::NTypeIds::String),
},
- { "database", "execution_id", "result_set_id", "row_id" }
+ { "database", "execution_id", "result_set_id", "row_id" },
+ TtlCol("expire_at")
)
);
}
@@ -443,10 +463,12 @@ Ydb::Query::ExecMode GetExecModeFromAction(NKikimrKqp::EQueryAction action) {
class TCreateScriptOperationQuery : public TQueryBase {
public:
- TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration leaseDuration = TDuration::Zero())
+ TCreateScriptOperationQuery(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& req, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero())
: ExecutionId(executionId)
, RunScriptActorId(runScriptActorId)
, Request(req)
+ , OperationTtl(operationTtl)
+ , ResultsTtl(resultsTtl)
, LeaseDuration(leaseDuration ? leaseDuration : LEASE_DURATION)
{
}
@@ -461,17 +483,22 @@ public:
DECLARE $execution_mode AS Int32;
DECLARE $query_text AS Text;
DECLARE $syntax AS Int32;
+ DECLARE $meta AS JsonDocument;
DECLARE $lease_duration AS Interval;
UPSERT INTO `.metadata/script_executions`
- (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax)
- VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax);
+ (database, execution_id, run_script_actor_id, execution_status, execution_mode, start_ts, query_text, syntax, meta)
+ VALUES ($database, $execution_id, $run_script_actor_id, $execution_status, $execution_mode, CurrentUtcTimestamp(), $query_text, $syntax, $meta);
UPSERT INTO `.metadata/script_execution_leases`
(database, execution_id, lease_deadline, lease_generation)
VALUES ($database, $execution_id, CurrentUtcTimestamp() + $lease_duration, 1);
)";
+ NKikimrKqp::TScriptExecutionOperationMeta meta;
+ SetDuration(OperationTtl, *meta.MutableOperationTtl());
+ SetDuration(ResultsTtl, *meta.MutableResultsTtl());
+
NYdb::TParamsBuilder params;
params
.AddParam("$database")
@@ -495,6 +522,9 @@ public:
.AddParam("$syntax")
.Int32(Request.GetRequest().GetSyntax())
.Build()
+ .AddParam("$meta")
+ .JsonDocument(NProtobufJson::Proto2Json(meta, NProtobufJson::TProto2JsonConfig()))
+ .Build()
.AddParam("$lease_duration")
.Interval(static_cast<i64>(LeaseDuration.MicroSeconds()))
.Build();
@@ -519,7 +549,9 @@ private:
const TString ExecutionId;
const NActors::TActorId RunScriptActorId;
NKikimrKqp::TEvQueryRequest Request;
- TDuration LeaseDuration;
+ const TDuration OperationTtl;
+ const TDuration ResultsTtl;
+ const TDuration LeaseDuration;
};
struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExecutionActor> {
@@ -533,10 +565,12 @@ struct TCreateScriptExecutionActor : public TActorBootstrapped<TCreateScriptExec
Become(&TCreateScriptExecutionActor::StateFunc);
ExecutionId = CreateGuidAsString();
-
+ auto operationTtl = Event->Get()->ForgetAfter ? Event->Get()->ForgetAfter : DEFAULT_OPERATION_TTL;
+ auto resultsTtl = Event->Get()->ResultsTtl ? Event->Get()->ResultsTtl : DEFAULT_RESULTS_TTL;
+ resultsTtl = Min(operationTtl, resultsTtl);
// Start request
RunScriptActorId = Register(CreateRunScriptActor(ExecutionId, Event->Get()->Record, Event->Get()->Record.GetRequest().GetDatabase(), 1, LeaseDuration));
- Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, LeaseDuration));
+ Register(new TCreateScriptOperationQuery(ExecutionId, RunScriptActorId, Event->Get()->Record, operationTtl, resultsTtl, LeaseDuration));
}
void Handle(TEvPrivate::TEvCreateScriptOperationResponse::TPtr& ev) {
@@ -739,7 +773,7 @@ public:
using TQueryBase::TQueryBase;
void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus,
- const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(),
+ TDuration operationTtl, TDuration resultsTtl, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(),
TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) {
TString sql = R"(
-- TScriptExecutionFinisherBase::FinishScriptExecution
@@ -751,6 +785,8 @@ public:
DECLARE $plan AS JsonDocument;
DECLARE $stats AS JsonDocument;
DECLARE $ast AS Text;
+ DECLARE $operation_ttl AS Interval;
+ DECLARE $results_ttl AS Interval;
UPDATE `.metadata/script_executions`
SET
@@ -760,11 +796,17 @@ public:
plan = $plan,
end_ts = CurrentUtcTimestamp(),
stats = $stats,
- ast = $ast
+ ast = $ast,
+ expire_at = CurrentUtcTimestamp() + $operation_ttl
WHERE database = $database AND execution_id = $execution_id;
DELETE FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
+
+ UPDATE `.metadata/result_sets`
+ SET
+ expire_at = CurrentUtcTimestamp() + $results_ttl
+ where database = $database AND execution_id = $execution_id;
)";
TString serializedStats = "{}";
@@ -801,13 +843,21 @@ public:
.Build()
.AddParam("$ast")
.Utf8(queryAst.GetOrElse(""))
+ .Build()
+ .AddParam("$operation_ttl")
+ .Interval(static_cast<i64>(operationTtl.MicroSeconds()))
+ .Build()
+ .AddParam("$results_ttl")
+ .Interval(static_cast<i64>(resultsTtl.MicroSeconds()))
.Build();
RunDataQuery(sql, &params, txControl);
}
- void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx()) {
- FinishScriptExecution(database, executionId, operationStatus, execStatus, IssuesFromMessage(message), txControl);
+ void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus,
+ TDuration operationTtl, TDuration resultsTtl, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx(),
+ TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) {
+ FinishScriptExecution(database, executionId, operationStatus, execStatus, operationTtl, resultsTtl, IssuesFromMessage(message), txControl, kqpStats, queryPlan, queryAst);
}
static NYql::TIssues IssuesFromMessage(const TString& message) {
@@ -821,6 +871,16 @@ public:
}
};
+TMaybe<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) {
+ NKikimrKqp::TScriptExecutionOperationMeta meta;
+ try {
+ NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig());
+ return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl()));
+ } catch (NJson::TJsonException &e) {
+ return Nothing();
+ }
+}
+
class TScriptExecutionFinisher : public TScriptExecutionFinisherBase {
public:
TScriptExecutionFinisher(
@@ -854,6 +914,9 @@ public:
SELECT lease_generation FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT meta FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
)";
NYdb::TParamsBuilder params;
@@ -870,7 +933,7 @@ public:
void OnQueryResult() override {
if (!FinishWasRun) {
- if (ResultSets.size() != 1) {
+ if (ResultSets.size() != 2) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
}
@@ -893,7 +956,24 @@ public:
return;
}
- FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst));
+ NYdb::TResultSetParser metaResult(ResultSets[1]);
+ metaResult.TryNextRow();
+
+ const auto serializedMeta = metaResult.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation");
+ return;
+ }
+
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+
+ const auto [operationTtl, resultsTtl] = *ttl;
+ FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, operationTtl, resultsTtl,
+ Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst));
FinishWasRun = true;
} else {
Finish();
@@ -934,8 +1014,9 @@ public:
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
- SELECT operation_status, execution_status, issues, run_script_actor_id FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
+ SELECT operation_status, execution_status, issues, run_script_actor_id, meta FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id AND
+ (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
SELECT lease_deadline FROM `.metadata/script_execution_leases`
WHERE database = $database AND execution_id = $execution_id;
@@ -961,7 +1042,7 @@ public:
}
NYdb::TResultSetParser result(ResultSets[0]);
if (result.RowsCount() == 0) {
- Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
+ Finish(Ydb::StatusIds::NOT_FOUND, "No such execution");
return;
}
@@ -992,7 +1073,18 @@ public:
if (operationStatus) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
} else if (*leaseDeadline < RunStartTime) {
- FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED);
+ auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation");
+ return;
+ }
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+ const auto [operationTtl, resultsTtl] = *ttl;
+ FinishScriptExecution(Database, ExecutionId, StatusOnExpiredLease, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl);
SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution);
} else {
// OperationStatus is Nothing(): currently running
@@ -1058,7 +1150,8 @@ public:
operation_status,
execution_status
FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
+ WHERE database = $database AND execution_id = $execution_id AND
+ (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
SELECT
lease_deadline
@@ -1106,6 +1199,11 @@ public:
DELETE
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
+
+ DELETE
+ FROM `.metadata/result_sets`
+ WHERE database = $database AND execution_id = $execution_id;
+
)";
NYdb::TResultSetParser deadlineResult(ResultSets[1]);
@@ -1167,7 +1265,7 @@ public:
, StartActorTime(TInstant::Now())
{}
- bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult) {
+ bool CheakLeaseDeadline(NYdb::TResultSetParser& deadlineResult, TDuration operationTtl, TDuration resultsTtl) {
deadlineResult.TryNextRow();
TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
@@ -1180,7 +1278,7 @@ public:
LeaseExpired = true;
Ready = true;
- FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, Issues);
+ FinishScriptExecution(Request->Get()->Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
SetQueryResultHandler(&TGetScriptExecutionOperationBase::OnFinishOperation);
return false;
}
@@ -1226,7 +1324,7 @@ public:
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
- SELECT execution_status
+ SELECT execution_status, meta
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id;
@@ -1266,10 +1364,23 @@ public:
Metadata.set_exec_status(static_cast<Ydb::Query::ExecStatus>(*executionStatus));
}
+ auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation");
+ return;
+ }
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+
+ const auto [operationTtl, resultsTtl] = *ttl;
+
NYdb::TResultSetParser deadlineResult(ResultSets[1]);
if (deadlineResult.RowsCount() == 0) {
Ready = true;
- } else if (!CheakLeaseDeadline(deadlineResult)) {
+ } else if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) {
return;
}
@@ -1308,9 +1419,11 @@ public:
plan,
issues,
stats,
- ast
+ ast,
+ meta
FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
+ WHERE database = $database AND execution_id = $execution_id AND
+ (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
SELECT
lease_deadline
@@ -1420,9 +1533,23 @@ public:
return;
}
- if (!CheakLeaseDeadline(deadlineResult)) {
+ auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation");
+ return;
+ }
+
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
return;
}
+
+ const auto [operationTtl, resultsTtl] = *ttl;
+ if (!CheakLeaseDeadline(deadlineResult, operationTtl, resultsTtl)) {
+ return;
+ }
+
}
CommitTransaction();
@@ -1484,7 +1611,7 @@ public:
execution_mode,
issues
FROM `.metadata/script_executions`
- WHERE database = $database
+ WHERE database = $database AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL)
)";
if (PageToken) {
sql << R"(
@@ -1994,10 +2121,11 @@ public:
DECLARE $offset AS Int64;
DECLARE $limit AS Uint64;
- SELECT result_set_metas, operation_status, issues
+ SELECT result_set_metas, operation_status, issues, end_ts, meta
FROM `.metadata/script_executions`
- WHERE database = $database
- AND execution_id = $execution_id;
+ WHERE database = $database
+ AND execution_id = $execution_id
+ AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
SELECT row_id, result_set
FROM `.metadata/result_sets`
@@ -2040,7 +2168,7 @@ public:
NYdb::TResultSetParser result(ResultSets[0]);
if (!result.TryNextRow()) {
- Finish(Ydb::StatusIds::BAD_REQUEST, "Script execution not found");
+ Finish(Ydb::StatusIds::NOT_FOUND, "Script execution not found");
return;
}
@@ -2050,6 +2178,29 @@ public:
return;
}
+ const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation");
+ return;
+ }
+
+ const auto endTs = result.ColumnParser("end_ts").GetOptionalTimestamp();
+ if (!endTs) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation end timestamp");
+ return;
+ }
+
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+ const auto [_, resultsTtl] = *ttl;
+ if ((*endTs + resultsTtl) < TInstant::Now()){
+ Finish(Ydb::StatusIds::NOT_FOUND, "Results are expired");
+ return;
+ }
+
Ydb::StatusIds::StatusCode operationStatusCode = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
if (operationStatusCode != Ydb::StatusIds::SUCCESS) {
const TMaybe<TString> issuesSerialized = result.ColumnParser("issues").GetOptionalJsonDocument();
@@ -2078,7 +2229,7 @@ public:
Finish(Ydb::StatusIds::BAD_REQUEST, "Result set index is invalid");
return;
}
-
+
Ydb::Query::ResultSetMeta meta;
NProtobufJson::Json2Proto(*metaValue, meta);
@@ -2217,8 +2368,8 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re
namespace NPrivate {
-NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration) {
- return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, leaseDuration);
+NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) {
+ return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, operationTtl, resultsTtl, leaseDuration);
}
NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease, ui64 cookie) {
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
index 1846a5454a..d5efc733a2 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
@@ -70,7 +70,8 @@ struct TEvPrivate {
// Writes new script into db.
// If lease duration is zero, default one will be taken.
-NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration leaseDuration = TDuration::Zero());
+NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record,
+ TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero());
// Checks lease of execution, finishes execution if its lease is off, returns current status
NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode statusOnExpiredLease = Ydb::StatusIds::ABORTED, ui64 cookie = 0);
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
index cb997c85fc..1adc650798 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
@@ -17,6 +17,8 @@ using namespace NSchemeShard;
namespace {
constexpr TDuration TestLeaseDuration = TDuration::Seconds(1);
+constexpr TDuration TestOperationTtl = TDuration::Minutes(1);
+constexpr TDuration TestResultsTtl = TDuration::Minutes(1);
const TString TestDatabase = "test_db";
struct TScriptExecutionsYdbSetup {
@@ -82,7 +84,7 @@ struct TScriptExecutionsYdbSetup {
}
// Creates query in db. Returns execution id
- TString CreateQueryInDb(const TString& query = "SELECT 42", TDuration leaseDuration = TestLeaseDuration) {
+ TString CreateQueryInDb(const TString& query = "SELECT 42", TDuration leaseDuration = TestLeaseDuration, TDuration operationTtl = TestOperationTtl, TDuration resultsTtl = TestResultsTtl) {
TString executionId = CreateGuidAsString();
NKikimrKqp::TEvQueryRequest req;
req.MutableRequest()->SetDatabase(TestDatabase);
@@ -90,7 +92,7 @@ struct TScriptExecutionsYdbSetup {
req.MutableRequest()->SetAction(NKikimrKqp::QUERY_ACTION_EXECUTE);
const ui32 node = 0;
TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
- GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor);
+ GetRuntime()->Register(NPrivate::CreateCreateScriptOperationQueryActor(executionId, NActors::TActorId(), req, operationTtl, resultsTtl, leaseDuration), 0, 0, TMailboxType::Simple, 0, edgeActor);
auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvCreateScriptOperationResponse>(edgeActor);
UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS);
diff --git a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
index 550ca5c8c8..ca7714f20f 100644
--- a/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
+++ b/ydb/core/kqp/ut/service/kqp_query_service_ut.cpp
@@ -779,7 +779,7 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
auto op = opClient.Get<NYdb::NQuery::TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString());
UNIT_ASSERT_C(op.Ready(), op.Status().GetIssues().ToString());
- UNIT_ASSERT(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled);
+ UNIT_ASSERT_C(op.Metadata().ExecStatus == EExecStatus::Completed || op.Metadata().ExecStatus == EExecStatus::Canceled, op.Status().GetIssues().ToString());
UNIT_ASSERT_EQUAL(op.Metadata().ExecutionId, scriptExecutionOperation.Metadata().ExecutionId);
UNIT_ASSERT(op.Status().GetStatus() == NYdb::EStatus::SUCCESS || op.Status().GetStatus() == NYdb::EStatus::CANCELLED);
@@ -826,14 +826,142 @@ Y_UNIT_TEST_SUITE(KqpQueryService) {
}
}
- Y_UNIT_TEST(ExecuteScriptFailsWithForgetAfter) {
+ NYdb::NQuery::TScriptExecutionOperation WaitScriptExecutionFail(const NYdb::TOperation::TOperationId& operationId, const NYdb::TDriver& ydbDriver) {
+ NYdb::NOperation::TOperationClient client(ydbDriver);
+ NThreading::TFuture<NYdb::NQuery::TScriptExecutionOperation> op;
+ do {
+ if (!op.Initialized()) {
+ Sleep(TDuration::MilliSeconds(10));
+ }
+ op = client.Get<NYdb::NQuery::TScriptExecutionOperation>(operationId);
+ } while (op.GetValueSync().Status().IsSuccess());
+ return op.GetValueSync();
+ }
+
+ void ExpectExecStatus(EExecStatus status, const TScriptExecutionOperation op, const NYdb::TDriver& ydbDriver) {
+ auto readyOp = WaitScriptExecutionOperation(op.Id(), ydbDriver);
+ UNIT_ASSERT_C(readyOp.Status().IsSuccess(), readyOp.Status().GetIssues().ToString());
+ UNIT_ASSERT_C(readyOp.Ready(), readyOp.Status().GetIssues().ToString());
+ UNIT_ASSERT(readyOp.Metadata().ExecStatus == status);
+ UNIT_ASSERT_EQUAL(readyOp.Metadata().ExecutionId, op.Metadata().ExecutionId);
+ }
+
+ void ExecuteScriptWithSettings(const TExecuteScriptSettings& settings, EExecStatus status, TString query = "SELECT 1;") {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto op = db.ExecuteScript(query, settings).ExtractValueSync();
+ ExpectExecStatus(status, op, kikimr.GetDriver());
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithCancelAfter) {
+ TString query = R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )";
+ auto settings = TExecuteScriptSettings().CancelAfter(TDuration::MilliSeconds(1));
+ ExecuteScriptWithSettings(settings, EExecStatus::Canceled, query);
+
+ settings = TExecuteScriptSettings().CancelAfter(TDuration::Seconds(10));
+ ExecuteScriptWithSettings(settings, EExecStatus::Completed);
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithTimeout) {
+ TString query = R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )";
+ auto settings = TExecuteScriptSettings().OperationTimeout(TDuration::MilliSeconds(1));
+ ExecuteScriptWithSettings(settings, EExecStatus::Failed, query);
+
+ settings = TExecuteScriptSettings().OperationTimeout(TDuration::Seconds(100));
+ ExecuteScriptWithSettings(settings, EExecStatus::Completed);
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithCancelAfterAndTimeout) {
+ TString query = R"(
+ SELECT * FROM EightShard WHERE Text = "Value2" AND Data = 1 ORDER BY Key;
+ SELECT * FROM TwoShard WHERE Key < 10 ORDER BY Key;
+ )";
+ auto settings = TExecuteScriptSettings().CancelAfterWithTimeout(TDuration::MilliSeconds(1), TDuration::Seconds(100));
+ ExecuteScriptWithSettings(settings, EExecStatus::Canceled, query);
+
+ settings = TExecuteScriptSettings().CancelAfterWithTimeout(TDuration::Seconds(100), TDuration::MilliSeconds(1));
+ ExecuteScriptWithSettings(settings, EExecStatus::Failed, query);
+ }
+
+ void CheckScriptOperationExpires(const TExecuteScriptSettings &settings) {
auto kikimr = DefaultKikimrRunner();
auto db = kikimr.GetQueryClient();
auto scriptExecutionOperation = db.ExecuteScript(R"(
SELECT 42
- )", NYdb::NQuery::TExecuteScriptSettings().ForgetAfter(TDuration::Days(1))).ExtractValueSync();
- UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::UNSUPPORTED, scriptExecutionOperation.Status().GetIssues().ToString());
+ )", settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+
+
+ auto readyOp = WaitScriptExecutionFail(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ UNIT_ASSERT_C(readyOp.Status().GetStatus() == EStatus::NOT_FOUND, readyOp.Status().GetStatus() << ":" << readyOp.Status().GetIssues().ToString());
+
+ NOperation::TOperationClient client(kikimr.GetDriver());
+
+ auto list = client.List<NYdb::NQuery::TScriptExecutionOperation>(100).ExtractValueSync();
+ UNIT_ASSERT_C(list.IsSuccess(), list.GetIssues().ToString());
+ UNIT_ASSERT(list.GetList().size() == 0);
+
+ auto status = client.Forget(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::NOT_FOUND, status.GetIssues().ToString());
+
+ status = client.Cancel(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(status.GetStatus() == NYdb::EStatus::NOT_FOUND, status.GetIssues().ToString());
+
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.GetStatus() == NYdb::EStatus::NOT_FOUND, results.GetIssues().ToString());
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithForgetAfter) {
+ CheckScriptOperationExpires(TExecuteScriptSettings().ForgetAfter(TDuration::MilliSeconds(50)));
+ }
+
+ void CheckScriptResultsExpire(const TExecuteScriptSettings& settings) {
+ auto kikimr = DefaultKikimrRunner();
+ auto db = kikimr.GetQueryClient();
+
+ auto scriptExecutionOperation = db.ExecuteScript(R"(
+ SELECT 42
+ )", settings).ExtractValueSync();
+ UNIT_ASSERT_VALUES_EQUAL_C(scriptExecutionOperation.Status().GetStatus(), EStatus::SUCCESS, scriptExecutionOperation.Status().GetIssues().ToString());
+
+ WaitScriptExecutionOperation(scriptExecutionOperation.Id(), kikimr.GetDriver());
+ TFetchScriptResultsResult results = db.FetchScriptResults(scriptExecutionOperation, 0).ExtractValueSync();
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ TResultSetParser resultSet(results.ExtractResultSet());
+ UNIT_ASSERT_C(resultSet.TryNextRow(), results.GetIssues().ToString());
+ UNIT_ASSERT_VALUES_EQUAL(resultSet.ColumnParser(0).GetInt32(), 42);
+
+ while (results.GetStatus() != NYdb::EStatus::NOT_FOUND) {
+ UNIT_ASSERT_C(results.IsSuccess(), results.GetIssues().ToString());
+ results = db.FetchScriptResults(scriptExecutionOperation.Metadata().ExecutionId, 0).ExtractValueSync();
+ Sleep(TDuration::MilliSeconds(10));
+ }
+
+ UNIT_ASSERT_C(results.GetStatus() == NYdb::EStatus::NOT_FOUND, results.GetIssues().ToString());
+ NOperation::TOperationClient client(kikimr.GetDriver());
+ auto op = client.Get<TScriptExecutionOperation>(scriptExecutionOperation.Id()).ExtractValueSync();
+ UNIT_ASSERT_C(op.Status().IsSuccess(), op.Status().GetIssues().ToString());
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithResultsTtl) {
+ CheckScriptResultsExpire(TExecuteScriptSettings().ResultsTtl(TDuration::MilliSeconds(2000)));
+ }
+
+ Y_UNIT_TEST(ExecuteScriptWithResultsTtlAndForgetAfter) {
+
+ auto settings = TExecuteScriptSettings().ResultsTtl(TDuration::MilliSeconds(2000)).ForgetAfter(TDuration::Seconds(30));
+ CheckScriptResultsExpire(settings);
+
+ settings = TExecuteScriptSettings().ResultsTtl(TDuration::Minutes(1)).ForgetAfter(TDuration::MilliSeconds(500));
+ CheckScriptOperationExpires(settings);
}
TScriptExecutionOperation CreateScriptExecutionOperation(size_t numberOfRows, NYdb::NQuery::TQueryClient& db, const NYdb::TDriver& ydbDriver) {
diff --git a/ydb/core/protos/kqp.proto b/ydb/core/protos/kqp.proto
index dc2d84b1dc..19b6cbe08a 100644
--- a/ydb/core/protos/kqp.proto
+++ b/ydb/core/protos/kqp.proto
@@ -2,6 +2,7 @@ package NKikimrKqp;
option java_package = "ru.yandex.kikimr.proto";
option cc_enable_arenas = true;
+import "google/protobuf/duration.proto";
import "library/cpp/actors/protos/actors.proto";
import "ydb/core/protos/type_info.proto";
import "ydb/core/protos/kqp_physical.proto";
@@ -665,3 +666,9 @@ message TEvCancelScriptExecutionResponse {
optional Ydb.StatusIds.StatusCode Status = 1;
repeated Ydb.Issue.IssueMessage Issues = 2;
}
+
+// stored in column "meta" of .metadata/script_executions table
+message TScriptExecutionOperationMeta {
+ optional google.protobuf.Duration OperationTtl = 1;
+ optional google.protobuf.Duration ResultsTtl = 2;
+} \ No newline at end of file
diff --git a/ydb/public/sdk/cpp/client/ydb_query/client.cpp b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
index e502b69fe1..40f2d211b1 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/client.cpp
+++ b/ydb/public/sdk/cpp/client/ydb_query/client.cpp
@@ -41,7 +41,7 @@ public:
request.set_stats_mode(settings.StatsMode_);
request.mutable_script_content()->set_syntax(settings.Syntax_);
request.mutable_script_content()->set_text(script);
-
+ SetDuration(settings.ResultsTtl_, *request.mutable_results_ttl());
auto promise = NThreading::NewPromise<TScriptExecutionOperation>();
auto responseCb = [promise]
diff --git a/ydb/public/sdk/cpp/client/ydb_query/query.h b/ydb/public/sdk/cpp/client/ydb_query/query.h
index 23a5d37504..f58459afdc 100644
--- a/ydb/public/sdk/cpp/client/ydb_query/query.h
+++ b/ydb/public/sdk/cpp/client/ydb_query/query.h
@@ -130,6 +130,7 @@ struct TExecuteScriptSettings : public TOperationRequestSettings<TExecuteScriptS
FLUENT_SETTING_DEFAULT(Ydb::Query::Syntax, Syntax, Ydb::Query::SYNTAX_YQL_V1);
FLUENT_SETTING_DEFAULT(Ydb::Query::ExecMode, ExecMode, Ydb::Query::EXEC_MODE_EXECUTE);
FLUENT_SETTING_DEFAULT(Ydb::Query::StatsMode, StatsMode, Ydb::Query::STATS_MODE_NONE);
+ FLUENT_SETTING(TDuration, ResultsTtl);
};
class TVersionedScriptId {