aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorgrigoriypisar <grigoriypisar@yandex-team.com>2023-11-13 10:42:35 +0300
committergrigoriypisar <grigoriypisar@yandex-team.com>2023-11-13 11:17:32 +0300
commit005735009aadf431092af0ecbb430dafee3f58ea (patch)
tree0544f6e0d758770b3ccbe01a24777a36c4047c7e
parent977f49e54c144e59ce1299caf8e8249d09909e5b (diff)
downloadydb-005735009aadf431092af0ecbb430dafee3f58ea.tar.gz
-commit-and-rollback-for-pending-loads
Created first commit version
-rw-r--r--.mapping.json5
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/driver_lib/run/kikimr_services_initializers.cpp11
-rw-r--r--ydb/core/driver_lib/run/ya.make1
-rw-r--r--ydb/core/kqp/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/common/events/query.h10
-rw-r--r--ydb/core/kqp/common/events/script_executions.h155
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.cpp4
-rw-r--r--ydb/core/kqp/common/kqp_user_request_context.h8
-rw-r--r--ydb/core/kqp/common/simple/kqp_event_ids.h7
-rw-r--r--ydb/core/kqp/common/simple/services.h5
-rw-r--r--ydb/core/kqp/executer_actor/kqp_data_executer.cpp81
-rw-r--r--ydb/core/kqp/executer_actor/kqp_executer_impl.h105
-rw-r--r--ydb/core/kqp/executer_actor/kqp_scan_executer.cpp6
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp103
-rw-r--r--ydb/core/kqp/federated_query/kqp_federated_query_actors.h13
-rw-r--r--ydb/core/kqp/federated_query/ya.make1
-rw-r--r--ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt22
-rw-r--r--ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt23
-rw-r--r--ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt23
-rw-r--r--ydb/core/kqp/finalize_script_service/CMakeLists.txt17
-rw-r--r--ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt22
-rw-r--r--ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp218
-rw-r--r--ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h14
-rw-r--r--ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp143
-rw-r--r--ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h12
-rw-r--r--ydb/core/kqp/finalize_script_service/ya.make14
-rw-r--r--ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h95
-rw-r--r--ydb/core/kqp/gateway/kqp_metadata_loader.cpp27
-rw-r--r--ydb/core/kqp/host/kqp_host.cpp5
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.cpp1181
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions.h18
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_impl.h17
-rw-r--r--ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp2
-rw-r--r--ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp28
-rw-r--r--ydb/core/kqp/session_actor/kqp_query_state.h6
-rw-r--r--ydb/core/kqp/ya.make1
-rw-r--r--ydb/core/protos/config.proto6
-rw-r--r--ydb/core/testlib/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/testlib/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/testlib/test_client.cpp5
-rw-r--r--ydb/core/testlib/ya.make1
-rw-r--r--ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp2
-rw-r--r--ydb/library/yql/providers/common/structured_token/yql_token_builder.h2
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp24
-rw-r--r--ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp2
-rw-r--r--ydb/library/yql/providers/s3/provider/yql_s3_settings.h1
-rw-r--r--ydb/tests/tools/kqprun/kqprun.cpp4
-rw-r--r--ydb/tests/tools/kqprun/src/kqp_runner.cpp8
-rw-r--r--ydb/tests/tools/kqprun/src/kqp_runner.h2
-rw-r--r--ydb/tests/tools/kqprun/src/ydb_setup.cpp13
-rw-r--r--ydb/tests/tools/kqprun/src/ydb_setup.h2
65 files changed, 1851 insertions, 642 deletions
diff --git a/.mapping.json b/.mapping.json
index b1d5ec9df7..2297dce1d8 100644
--- a/.mapping.json
+++ b/.mapping.json
@@ -4624,6 +4624,11 @@
"ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt":"",
"ydb/core/kqp/federated_query/CMakeLists.txt":"",
"ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt":"",
+ "ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt":"",
+ "ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt":"",
+ "ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt":"",
+ "ydb/core/kqp/finalize_script_service/CMakeLists.txt":"",
+ "ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt":"",
"ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt":"",
"ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt":"",
"ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt":"",
diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
index 105a543cf8..8439de3a0c 100644
--- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt
@@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC
ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
index 43ad518bdc..fb358d77d5 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt
@@ -70,6 +70,7 @@ target_link_libraries(run PUBLIC
ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
index 43ad518bdc..fb358d77d5 100644
--- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt
@@ -70,6 +70,7 @@ target_link_libraries(run PUBLIC
ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
index 105a543cf8..8439de3a0c 100644
--- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt
@@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC
ydb-core-kafka_proxy
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
core-kqp-rm_service
ydb-core-load_test
ydb-core-local_pgwire
diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
index bd8fe08b55..13d147c149 100644
--- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
+++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp
@@ -73,6 +73,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/proxy_service/kqp_proxy_service.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
+#include <ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h>
#include <ydb/core/load_test/service_actor.h>
@@ -2109,13 +2110,21 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu
GlobalObjects.AddGlobalObject(std::make_shared<NYql::NLog::YqlLoggerScope>(
new NYql::NLog::TTlsLogBackend(new TNullLogBackend())));
+ auto federatedQuerySetupFactory = NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config);
+
auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(),
Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources),
- NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config)
+ federatedQuerySetupFactory
);
setup->LocalServices.push_back(std::make_pair(
NKqp::MakeKqpProxyID(NodeId),
TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId)));
+
+ // Create finalize script service
+ auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory);
+ setup->LocalServices.push_back(std::make_pair(
+ NKqp::MakeKqpFinalizeScriptServiceId(NodeId),
+ TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId)));
}
}
diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make
index 9ae7e14eac..cf75b16e98 100644
--- a/ydb/core/driver_lib/run/ya.make
+++ b/ydb/core/driver_lib/run/ya.make
@@ -78,6 +78,7 @@ PEERDIR(
ydb/core/kafka_proxy
ydb/core/kqp
ydb/core/kqp/federated_query
+ ydb/core/kqp/finalize_script_service
ydb/core/kqp/rm_service
ydb/core/load_test
ydb/core/local_pgwire
diff --git a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
index 1bad2f5045..54cac3e15d 100644
--- a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
add_subdirectory(federated_query)
+add_subdirectory(finalize_script_service)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
index 5337a48cda..f064e17c70 100644
--- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
add_subdirectory(federated_query)
+add_subdirectory(finalize_script_service)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/CMakeLists.linux-x86_64.txt
index 5337a48cda..f064e17c70 100644
--- a/ydb/core/kqp/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.linux-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
add_subdirectory(federated_query)
+add_subdirectory(finalize_script_service)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/CMakeLists.windows-x86_64.txt
index 1bad2f5045..54cac3e15d 100644
--- a/ydb/core/kqp/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/CMakeLists.windows-x86_64.txt
@@ -13,6 +13,7 @@ add_subdirectory(counters)
add_subdirectory(executer_actor)
add_subdirectory(expr_nodes)
add_subdirectory(federated_query)
+add_subdirectory(finalize_script_service)
add_subdirectory(gateway)
add_subdirectory(host)
add_subdirectory(node_service)
diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h
index 59fbac0e00..d40c9ecea3 100644
--- a/ydb/core/kqp/common/events/query.h
+++ b/ydb/core/kqp/common/events/query.h
@@ -1,6 +1,7 @@
#pragma once
#include <ydb/core/protos/kqp.pb.h>
#include <ydb/core/kqp/common/simple/kqp_event_ids.h>
+#include <ydb/core/kqp/common/kqp_user_request_context.h>
#include <ydb/core/grpc_services/base/base.h>
#include <ydb/core/grpc_services/cancelation/cancelation_event.h>
#include <ydb/core/grpc_services/cancelation/cancelation.h>
@@ -265,6 +266,14 @@ public:
}
}
+ void SetUserRequestContext(TIntrusivePtr<TUserRequestContext> userRequestContext) {
+ UserRequestContext = userRequestContext;
+ }
+
+ TIntrusivePtr<TUserRequestContext> GetUserRequestContext() const {
+ return UserRequestContext;
+ }
+
mutable NKikimrKqp::TEvQueryRequest Record;
private:
@@ -291,6 +300,7 @@ private:
TDuration OperationTimeout;
TDuration CancelAfter;
const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED;
+ TIntrusivePtr<TUserRequestContext> UserRequestContext;
};
struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart,
diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h
index 94ff1de909..893fae46da 100644
--- a/ydb/core/kqp/common/events/script_executions.h
+++ b/ydb/core/kqp/common/events/script_executions.h
@@ -14,6 +14,11 @@
namespace NKikimr::NKqp {
+enum EFinalizationStatus : i32 {
+ FS_COMMIT,
+ FS_ROLLBACK,
+};
+
struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> {
explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id)
: Database(database)
@@ -47,28 +52,44 @@ struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScript
NOperationId::TOperationId OperationId;
};
-struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> {
- TEvGetScriptExecutionOperationResponse(bool ready, bool leaseExpired, TActorId runScriptActorId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata)
+struct TEvGetScriptExecutionOperationQueryResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationQueryResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationQueryResponse> {
+ TEvGetScriptExecutionOperationQueryResponse(bool ready, bool leaseExpired, std::optional<EFinalizationStatus> finalizationStatus, TActorId runScriptActorId,
+ const TString& executionId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, Ydb::Query::ExecuteScriptMetadata metadata)
: Ready(ready)
, LeaseExpired(leaseExpired)
+ , FinalizationStatus(finalizationStatus)
, RunScriptActorId(runScriptActorId)
+ , ExecutionId(executionId)
, Status(status)
, Issues(std::move(issues))
, Metadata(std::move(metadata))
- {
- }
+ {}
+
+ bool Ready;
+ bool LeaseExpired;
+ std::optional<EFinalizationStatus> FinalizationStatus;
+ TActorId RunScriptActorId;
+ TString ExecutionId;
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+ Ydb::Query::ExecuteScriptMetadata Metadata;
+};
+
+struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> {
+ TEvGetScriptExecutionOperationResponse(bool ready, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata)
+ : Ready(ready)
+ , Status(status)
+ , Issues(std::move(issues))
+ , Metadata(std::move(metadata))
+ {}
TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
: Ready(false)
- , LeaseExpired(false)
, Status(status)
, Issues(std::move(issues))
- {
- }
+ {}
bool Ready;
- bool LeaseExpired;
- TActorId RunScriptActorId;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
TMaybe<google::protobuf::Any> Metadata;
@@ -153,12 +174,19 @@ struct TEvCancelScriptExecutionOperationResponse : public NActors::TEventLocal<T
};
struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecutionFinished, TKqpScriptExecutionEvents::EvScriptExecutionFinished> {
- TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
- : Status(status)
+ explicit TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : OperationAlreadyFinalized(false)
+ , Status(status)
, Issues(std::move(issues))
- {
- }
+ {}
+
+ TEvScriptExecutionFinished(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {})
+ : OperationAlreadyFinalized(operationAlreadyFinalized)
+ , Status(status)
+ , Issues(std::move(issues))
+ {}
+ bool OperationAlreadyFinalized;
Ydb::StatusIds::StatusCode Status;
NYql::TIssues Issues;
};
@@ -196,4 +224,105 @@ struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetch
NKikimrKqp::TEvFetchScriptResultsResponse Results;
};
+struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> {
+ TEvSaveScriptExternalEffectRequest(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken)
+ : ExecutionId(executionId)
+ , Database(database)
+ , CustomerSuppliedId(customerSuppliedId)
+ , UserToken(userToken)
+ {}
+
+ TString ExecutionId;
+ TString Database;
+
+ TString CustomerSuppliedId;
+ TString UserToken;
+ std::vector<NKqpProto::TKqpExternalSink> Sinks;
+ std::vector<TString> SecretNames;
+};
+
+struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSaveScriptExternalEffectResponse, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectResponse> {
+ TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ {}
+
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+};
+
+struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeRequest, TKqpScriptExecutionEvents::EvScriptFinalizeRequest> {
+ TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database,
+ Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues = {}, std::optional<NKqpProto::TKqpStatsQuery> queryStats = std::nullopt,
+ std::optional<TString> queryPlan = std::nullopt, std::optional<TString> queryAst = std::nullopt, std::optional<ui64> leaseGeneration = std::nullopt)
+ : FinalizationStatus(finalizationStatus)
+ , ExecutionId(executionId)
+ , Database(database)
+ , OperationStatus(operationStatus)
+ , ExecStatus(execStatus)
+ , Issues(std::move(issues))
+ , QueryStats(std::move(queryStats))
+ , QueryPlan(std::move(queryPlan))
+ , QueryAst(std::move(queryAst))
+ , LeaseGeneration(leaseGeneration)
+ {}
+
+ EFinalizationStatus FinalizationStatus;
+ TString ExecutionId;
+ TString Database;
+ Ydb::StatusIds::StatusCode OperationStatus;
+ Ydb::Query::ExecStatus ExecStatus;
+ NYql::TIssues Issues;
+ std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
+ std::optional<TString> QueryPlan;
+ std::optional<TString> QueryAst;
+ std::optional<ui64> LeaseGeneration;
+ TDuration OperationTtl;
+ TDuration ResultsTtl;
+};
+
+struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalizeResponse, TKqpScriptExecutionEvents::EvScriptFinalizeResponse> {
+ explicit TEvScriptFinalizeResponse(const TString& executionId)
+ : ExecutionId(executionId)
+ {}
+
+ TString ExecutionId;
+};
+
+struct TEvSaveScriptFinalStatusResponse : public NActors::TEventLocal<TEvSaveScriptFinalStatusResponse, TKqpScriptExecutionEvents::EvSaveScriptFinalStatusResponse> {
+ TEvSaveScriptFinalStatusResponse(const TString& customerSuppliedId, const TString& userToken)
+ : CustomerSuppliedId(customerSuppliedId)
+ , UserToken(userToken)
+ {}
+
+ TString CustomerSuppliedId;
+ TString UserToken;
+ std::vector<NKqpProto::TKqpExternalSink> Sinks;
+ std::vector<TString> SecretNames;
+};
+
+struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecretsResponse, TKqpScriptExecutionEvents::EvDescribeSecretsResponse> {
+ struct TDescription {
+ TDescription(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
+ : Status(status)
+ , Issues(std::move(issues))
+ {}
+
+ TDescription(const std::vector<TString>& secretValues)
+ : SecretValues(secretValues)
+ , Status(Ydb::StatusIds::SUCCESS)
+ {}
+
+ std::vector<TString> SecretValues;
+ Ydb::StatusIds::StatusCode Status;
+ NYql::TIssues Issues;
+ };
+
+ TEvDescribeSecretsResponse(const TDescription& description)
+ : Description(description)
+ {}
+
+ TDescription Description;
+};
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/common/kqp_user_request_context.cpp b/ydb/core/kqp/common/kqp_user_request_context.cpp
index 48dcacde95..685dc0ffe1 100644
--- a/ydb/core/kqp/common/kqp_user_request_context.cpp
+++ b/ydb/core/kqp/common/kqp_user_request_context.cpp
@@ -3,12 +3,14 @@
namespace NKikimr::NKqp {
void TUserRequestContext::Out(IOutputStream& o) const {
- o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}";
+ o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << "}";
}
void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) {
resultMap["TraceId"] = ctx.TraceId;
resultMap["Database"] = ctx.Database;
resultMap["SessionId"] = ctx.SessionId;
+ resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId;
+ resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId;
}
}
diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h
index 62bf4402fa..d091f7aa19 100644
--- a/ydb/core/kqp/common/kqp_user_request_context.h
+++ b/ydb/core/kqp/common/kqp_user_request_context.h
@@ -10,6 +10,8 @@ namespace NKikimr::NKqp {
TString TraceId;
TString Database;
TString SessionId;
+ TString CurrentExecutionId;
+ TString CustomerSuppliedId;
TUserRequestContext() = default;
@@ -18,6 +20,12 @@ namespace NKikimr::NKqp {
, Database(database)
, SessionId(sessionId) {}
+ TUserRequestContext(const TString& traceId, const TString& database, const TString& sessionId, const TString& currentExecutionId, const TString& customerSuppliedId)
+ : TraceId(traceId)
+ , Database(database)
+ , SessionId(sessionId)
+ , CurrentExecutionId(currentExecutionId)
+ , CustomerSuppliedId(customerSuppliedId) {}
void Out(IOutputStream& o) const;
};
diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h
index 4aecb291a3..2bbbfe6ee1 100644
--- a/ydb/core/kqp/common/simple/kqp_event_ids.h
+++ b/ydb/core/kqp/common/simple/kqp_event_ids.h
@@ -142,6 +142,13 @@ struct TKqpScriptExecutionEvents {
EvCheckAliveRequest,
EvCheckAliveResponse,
EvFetchScriptResultsQueryResponse,
+ EvSaveScriptExternalEffectRequest,
+ EvSaveScriptExternalEffectResponse,
+ EvScriptFinalizeRequest,
+ EvScriptFinalizeResponse,
+ EvSaveScriptFinalStatusResponse,
+ EvGetScriptExecutionOperationQueryResponse,
+ EvDescribeSecretsResponse,
};
};
diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h
index ebee571eb0..21746985c9 100644
--- a/ydb/core/kqp/common/simple/services.h
+++ b/ydb/core/kqp/common/simple/services.h
@@ -36,4 +36,9 @@ inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId)
return NActors::TActorId(nodeId, TStringBuf(name, 12));
}
+inline NActors::TActorId MakeKqpFinalizeScriptServiceId(ui32 nodeId) {
+ const char name[12] = "kqp_sfinal";
+ return NActors::TActorId(nodeId, TStringBuf(name, 12));
+}
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
index e09aab4a57..ddc2b6e622 100644
--- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp
@@ -313,9 +313,9 @@ public:
hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve);
hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve);
hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve);
+ hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve);
+ hFunc(TEvDescribeSecretsResponse, HandleResolve);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData);
- hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout);
default:
UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite());
}
@@ -363,7 +363,6 @@ private:
hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected);
hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
- IgnoreFunc(NActors::TEvents::TEvWakeup);
default: {
CancelProposal(0);
UnexpectedEvent("PrepareState", ev->GetTypeRewrite());
@@ -953,7 +952,6 @@ private:
hFunc(TEvDqCompute::TEvChannelData, HandleExecute);
hFunc(TEvKqp::TEvAbortExecution, HandleExecute);
IgnoreFunc(TEvInterconnect::TEvNodeConnected);
- IgnoreFunc(NActors::TEvents::TEvWakeup);
default:
UnexpectedEvent("ExecuteState", ev->GetTypeRewrite());
}
@@ -1360,7 +1358,7 @@ private:
return true;
}
- void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
+ void BuildDatashardTasks(TStageInfo& stageInfo) {
THashMap<ui64, ui64> shardTasks; // shardId -> taskId
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -1375,7 +1373,7 @@ private:
task.Meta.ShardId = shardId;
shardTasks.emplace(shardId, task.Id);
- BuildSinks(stage, task, secureParams);
+ BuildSinks(stage, task);
return task;
};
@@ -1588,47 +1586,82 @@ private:
YQL_ENSURE(result.second);
}
+ bool WaitRequired() const {
+ return SecretSnapshotRequired || ResourceSnapshotRequired || SaveScriptExternalEffectRequired;
+ }
+
+ void HandleResolve(TEvDescribeSecretsResponse::TPtr& ev) {
+ YQL_ENSURE(ev->Get()->Description.Status == Ydb::StatusIds::SUCCESS, "failed to get secrets snapshot with issues: " << ev->Get()->Description.Issues.ToOneLineString());
+
+ for (size_t i = 0; i < SecretNames.size(); ++i) {
+ SecureParams.emplace(SecretNames[i], ev->Get()->Description.SecretValues[i]);
+ }
+
+ SecretSnapshotRequired = false;
+ if (!WaitRequired()) {
+ Execute();
+ }
+ }
+
void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) {
if (ev->Get()->Snapshot.empty()) {
LOG_E("Can not find default state storage group for database " << Database);
}
ResourceSnapshot = std::move(ev->Get()->Snapshot);
ResourceSnapshotRequired = false;
- if (!SecretSnapshotRequired) {
+ if (!WaitRequired()) {
+ Execute();
+ }
+ }
+
+ void HandleResolve(TEvSaveScriptExternalEffectResponse::TPtr& ev) {
+ YQL_ENSURE(ev->Get()->Status == Ydb::StatusIds::SUCCESS, "failed to save script external effect with issues: " << ev->Get()->Issues.ToOneLineString());
+
+ SaveScriptExternalEffectRequired = false;
+ if (!WaitRequired()) {
Execute();
}
}
void DoExecute() {
- TVector<TString> secretNames;
+ const auto& requestContext = GetUserRequestContext();
+ auto scriptExternalEffect = std::make_unique<TEvSaveScriptExternalEffectRequest>(
+ requestContext->CurrentExecutionId, requestContext->Database,
+ requestContext->CustomerSuppliedId, UserToken ? UserToken->GetUserSID() : ""
+ );
for (const auto& transaction : Request.Transactions) {
for (const auto& secretName : transaction.Body->GetSecretNames()) {
SecretSnapshotRequired = true;
- secretNames.push_back(secretName);
+ SecretNames.push_back(secretName);
}
for (const auto& stage : transaction.Body->GetStages()) {
if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) {
ResourceSnapshotRequired = true;
HasExternalSources = true;
}
+ if (requestContext->CurrentExecutionId) {
+ for (const auto& sink : stage.GetSinks()) {
+ if (sink.GetTypeCase() == NKqpProto::TKqpSink::kExternalSink) {
+ SaveScriptExternalEffectRequired = true;
+ scriptExternalEffect->Sinks.push_back(sink.GetExternalSink());
+ }
+ }
+ }
}
}
+ scriptExternalEffect->SecretNames = SecretNames;
- if (!SecretSnapshotRequired && !ResourceSnapshotRequired) {
+ if (!WaitRequired()) {
return Execute();
}
if (SecretSnapshotRequired) {
- FetchSecrets(std::move(secretNames));
+ GetSecretsSnapshot();
}
if (ResourceSnapshotRequired) {
GetResourcesSnapshot();
}
- }
-
- void OnSecretsFetched() override {
- SecretSnapshotRequired = false;
- if (!ResourceSnapshotRequired) {
- Execute();
+ if (SaveScriptExternalEffectRequired) {
+ SaveScriptExternalEffect(std::move(scriptExternalEffect));
}
}
@@ -1651,8 +1684,6 @@ private:
size_t readActors = 0;
for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) {
auto& tx = Request.Transactions[txIdx];
- TMap<TString, TString> secureParams = ResolveSecretNames(tx.Body->GetSecretNames());
-
for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) {
auto& stage = tx.Body->GetStages(stageIdx);
auto& stageInfo = TasksGraph.GetStageInfo(TStageId(txIdx, stageIdx));
@@ -1689,14 +1720,14 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- if (auto actors = BuildScanTasksFromSource(stageInfo, secureParams)) {
+ if (auto actors = BuildScanTasksFromSource(stageInfo)) {
readActors += *actors;
} else {
UnknownAffectedShardCount = true;
}
break;
case NKqpProto::TKqpSource::kExternalSource:
- BuildReadTasksFromSource(stageInfo, secureParams, ResourceSnapshot);
+ BuildReadTasksFromSource(stageInfo, ResourceSnapshot);
break;
default:
YQL_ENSURE(false, "unknown source type");
@@ -1704,11 +1735,11 @@ private:
} else if (StreamResult && stageInfo.Meta.IsOlap()) {
BuildScanTasksFromShards(stageInfo);
} else if (stageInfo.Meta.ShardOperations.empty()) {
- BuildComputeTasks(stageInfo, secureParams);
+ BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
- BuildSysViewScanTasks(stageInfo, secureParams);
+ BuildSysViewScanTasks(stageInfo);
} else {
- BuildDatashardTasks(stageInfo, secureParams);
+ BuildDatashardTasks(stageInfo);
}
if (stage.GetIsSinglePartition()) {
@@ -1910,7 +1941,6 @@ private:
switch (ev->GetTypeRewrite()) {
hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle);
hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution);
- IgnoreFunc(NActors::TEvents::TEvWakeup);
default:
UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite());
}
@@ -2368,6 +2398,7 @@ private:
bool HasExternalSources = false;
bool SecretSnapshotRequired = false;
bool ResourceSnapshotRequired = false;
+ bool SaveScriptExternalEffectRequired = false;
TVector<NKikimrKqp::TKqpNodeResources> ResourceSnapshot;
ui64 TxCoordinator = 0;
diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
index b613d0b796..7654c0aa98 100644
--- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h
+++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h
@@ -21,6 +21,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/common/kqp_yql.h>
#include <ydb/core/kqp/common/kqp_user_request_context.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/grpc_services/local_rate_limiter.h>
#include <ydb/services/metadata/secret/fetcher.h>
@@ -386,71 +387,6 @@ protected:
return res;
}
- NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() {
- return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
- }
-
- void FetchSecrets(TVector<TString>&& secretNames) {
- YQL_ENSURE(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service is not active");
- SecretNames = std::move(secretNames);
-
- SubscribedOnSecrets = true;
- this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
- this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
- }
-
- void HandleRefreshSubscriberData(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- if (!SubscribedOnSecrets) {
- return;
- }
-
- Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>();
-
- TString secretValue;
- for (const TString& secretName : SecretNames) {
- auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName);
- if (!Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue)) {
- return;
- }
- }
-
- UnsubscribeFromSecrets();
- OnSecretsFetched();
- }
-
- void HandleSecretsWaitingTimeout(NActors::TEvents::TEvWakeup::TPtr&) {
- if (!SubscribedOnSecrets) {
- return;
- }
-
- YQL_ENSURE(Secrets != nullptr, "secrets snapshot fetching timeout");
- UnsubscribeFromSecrets();
- OnSecretsFetched();
- }
-
- void UnsubscribeFromSecrets() {
- if (SubscribedOnSecrets) {
- SubscribedOnSecrets = false;
- this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
- }
- }
-
- virtual void OnSecretsFetched() {}
-
- TMap<TString, TString> ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames) {
- TMap<TString, TString> secureParams;
- for (const auto& secretName : secretNames) {
- auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName);
-
- TString secretValue;
- YQL_ENSURE(Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue), "secret with name '" << secretName << "' not found");
-
- secureParams[secretName] = secretValue;
- }
-
- return secureParams;
- }
-
protected:
bool CheckExecutionComplete() {
if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) {
@@ -735,7 +671,7 @@ protected:
protected:
- void BuildSysViewScanTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
+ void BuildSysViewScanTasks(TStageInfo& stageInfo) {
Y_DEBUG_ABORT_UNLESS(stageInfo.Meta.IsSysView());
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
@@ -778,13 +714,13 @@ protected:
task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse();
task.Meta.Type = TTaskMeta::TTaskType::Compute;
- BuildSinks(stage, task, secureParams);
+ BuildSinks(stage, task);
LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id);
}
}
- void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task, const TMap<TString, TString>& secureParams) {
+ void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) {
if (stage.SinksSize() > 0) {
YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported");
const auto& sink = stage.GetSinks(0);
@@ -794,10 +730,10 @@ protected:
auto sinkName = extSink.GetSinkName();
if (sinkName) {
- auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
task.Meta.SecureParams.emplace(sinkName, structuredToken);
if (GetUserRequestContext()->TraceId) {
- task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->TraceId);
+ task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->CustomerSuppliedId);
// "fq.restart_count"
}
}
@@ -809,7 +745,7 @@ protected:
}
}
- void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
+ void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) {
const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
YQL_ENSURE(stage.GetSources(0).HasExternalSource());
@@ -830,7 +766,7 @@ protected:
auto sourceName = externalSource.GetSourceName();
TString structuredToken;
if (sourceName) {
- structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson();
+ structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson();
}
TVector<ui64> tasksIds;
@@ -869,11 +805,11 @@ protected:
// finish building
for (auto taskId : tasksIds) {
- BuildSinks(stage, TasksGraph.GetTask(taskId), secureParams);
+ BuildSinks(stage, TasksGraph.GetTask(taskId));
}
}
- TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
+ TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo) {
THashMap<ui64, std::vector<ui64>> nodeTasks;
THashMap<ui64, ui64> assignedShardsCount;
@@ -983,7 +919,7 @@ protected:
settings->SetLockNodeId(self.NodeId());
}
- BuildSinks(stage, task, secureParams);
+ BuildSinks(stage, task);
};
THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv());
@@ -1019,7 +955,7 @@ protected:
}
}
- void BuildComputeTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) {
+ void BuildComputeTasks(TStageInfo& stageInfo) {
auto& stage = stageInfo.Meta.GetStage(stageInfo.Id);
ui32 partitionsCount = 1;
@@ -1077,7 +1013,7 @@ protected:
auto& task = TasksGraph.AddTask(stageInfo);
task.Meta.Type = TTaskMeta::TTaskType::Compute;
task.Meta.ExecuterId = SelfId();
- BuildSinks(stage, task, secureParams);
+ BuildSinks(stage, task);
LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id);
}
}
@@ -1347,6 +1283,10 @@ protected:
});
}
+ void GetSecretsSnapshot() {
+ RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext(), MaximalSecretsSnapshotWaitTime);
+ }
+
void GetResourcesSnapshot() {
GetKqpResourceManager()->RequestClusterResourcesInfo(
[as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) {
@@ -1355,6 +1295,10 @@ protected:
});
}
+ void SaveScriptExternalEffect(std::unique_ptr<TEvSaveScriptExternalEffectRequest> scriptEffects) {
+ this->Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), scriptEffects.release());
+ }
+
protected:
void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) {
for (const auto& task : this->TasksGraph.GetTasks()) {
@@ -1542,8 +1486,6 @@ protected:
protected:
void PassAway() override {
- UnsubscribeFromSecrets();
-
for (auto channelPair: ResultChannelProxies) {
LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId());
@@ -1655,10 +1597,9 @@ protected:
std::unique_ptr<TKqpPlanner> Planner;
const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig ExecuterRetriesConfig;
- std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets;
- TVector<TString> SecretNames;
+ std::vector<TString> SecretNames;
+ std::map<TString, TString> SecureParams;
TDuration MaximalSecretsSnapshotWaitTime;
- bool SubscribedOnSecrets = false;
const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings;
TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot;
diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
index 02208b7041..958c32fbb7 100644
--- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
+++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp
@@ -197,15 +197,15 @@ private:
if (stage.SourcesSize() > 0) {
switch (stage.GetSources(0).GetTypeCase()) {
case NKqpProto::TKqpSource::kReadRangesSource:
- BuildScanTasksFromSource(stageInfo, {});
+ BuildScanTasksFromSource(stageInfo);
break;
default:
YQL_ENSURE(false, "unknown source type");
}
} else if (stageInfo.Meta.ShardOperations.empty()) {
- BuildComputeTasks(stageInfo, {});
+ BuildComputeTasks(stageInfo);
} else if (stageInfo.Meta.IsSysView()) {
- BuildSysViewScanTasks(stageInfo, {});
+ BuildSysViewScanTasks(stageInfo);
} else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) {
HasOlapTable = true;
BuildScanTasksFromShards(stageInfo);
diff --git a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt
index 551388ba4e..4161013b00 100644
--- a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt
@@ -22,5 +22,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC
generic-connector-libcpp
)
target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt
index a662d35a7c..ae88183731 100644
--- a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt
@@ -23,5 +23,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC
generic-connector-libcpp
)
target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt
index a662d35a7c..ae88183731 100644
--- a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt
@@ -23,5 +23,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC
generic-connector-libcpp
)
target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
)
diff --git a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt
index 551388ba4e..4161013b00 100644
--- a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt
@@ -22,5 +22,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC
generic-connector-libcpp
)
target_sources(core-kqp-federated_query PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp
)
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
new file mode 100644
index 0000000000..3c6d883668
--- /dev/null
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp
@@ -0,0 +1,103 @@
+#include "kqp_federated_query_actors.h"
+
+#include <ydb/services/metadata/secret/fetcher.h>
+#include <ydb/services/metadata/secret/snapshot.h>
+
+
+namespace NKikimr::NKqp {
+
+namespace {
+
+class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> {
+ STRICT_STFUNC(StateFunc,
+ hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
+ hFunc(NActors::TEvents::TEvWakeup, Handle);
+ )
+
+ void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
+ auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();
+
+ std::vector<TString> secretValues;
+ secretValues.reserve(SecretIds.size());
+ for (const auto& secretId: SecretIds) {
+ TString secretValue;
+ const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue);
+ if (!isFound) {
+ LastResponse = TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") });
+ return;
+ }
+ secretValues.push_back(secretValue);
+ }
+ Promise.SetValue(TEvDescribeSecretsResponse::TDescription(secretValues));
+
+ UnsubscribeFromSecrets();
+ PassAway();
+ }
+
+ void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
+ Promise.SetValue(LastResponse);
+
+ UnsubscribeFromSecrets();
+ PassAway();
+ }
+
+ NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() {
+ return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
+ }
+
+ void UnsubscribeFromSecrets() {
+ this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
+ }
+
+public:
+ TDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime)
+ : SecretIds(CreateSecretIds(ownerUserId, secretIds))
+ , Promise(promise)
+ , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") })
+ , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
+ {}
+
+ void Bootstrap() {
+ if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
+ Promise.SetValue(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") }));
+ PassAway();
+ return;
+ }
+
+ this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
+ this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
+ Become(&TDescribeSecretsActor::StateFunc);
+ }
+
+private:
+ static std::vector<NMetadata::NSecret::TSecretId> CreateSecretIds(const TString& ownerUserId, const std::vector<TString>& secretIds) {
+ std::vector<NMetadata::NSecret::TSecretId> result;
+ for (const auto& secretId: secretIds) {
+ result.emplace_back(ownerUserId, secretId);
+ }
+ return result;
+ }
+
+private:
+ const std::vector<NMetadata::NSecret::TSecretId> SecretIds;
+ NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Promise;
+ TEvDescribeSecretsResponse::TDescription LastResponse;
+ TDuration MaximalSecretsSnapshotWaitTime;
+};
+
+} // anonymous namespace
+
+IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime) {
+ return new TDescribeSecretsActor(ownerUserId, secretIds, promise, maximalSecretsSnapshotWaitTime);
+}
+
+void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, const TActorContext& actorContext, TDuration maximalSecretsSnapshotWaitTime) {
+ auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>();
+ actorContext.Register(CreateDescribeSecretsActor(ownerUserId, secretIds, promise, maximalSecretsSnapshotWaitTime));
+
+ promise.GetFuture().Subscribe([actorContext, replyActorId](const NThreading::TFuture<TEvDescribeSecretsResponse::TDescription>& result){
+ actorContext.Send(replyActorId, new TEvDescribeSecretsResponse(result.GetValue()));
+ });
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h
new file mode 100644
index 0000000000..6682c63f87
--- /dev/null
+++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h
@@ -0,0 +1,13 @@
+#pragma once
+
+#include <ydb/core/kqp/common/events/script_executions.h>
+
+#include <library/cpp/actors/core/actor.h>
+
+
+namespace NKikimr::NKqp {
+
+NActors::IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime);
+void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, const TActorContext& actorContext, TDuration maximalSecretsSnapshotWaitTime);
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/federated_query/ya.make b/ydb/core/kqp/federated_query/ya.make
index 4af74496ca..41397c2ee5 100644
--- a/ydb/core/kqp/federated_query/ya.make
+++ b/ydb/core/kqp/federated_query/ya.make
@@ -1,6 +1,7 @@
LIBRARY()
SRCS(
+ kqp_federated_query_actors.cpp
kqp_federated_query_helpers.cpp
)
diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt
new file mode 100644
index 0000000000..985367871a
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-finalize_script_service)
+target_compile_options(core-kqp-finalize_script_service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-finalize_script_service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ core-kqp-proxy_service
+)
+target_sources(core-kqp-finalize_script_service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
+)
diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt
new file mode 100644
index 0000000000..fa886df9b9
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-finalize_script_service)
+target_compile_options(core-kqp-finalize_script_service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-finalize_script_service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ core-kqp-proxy_service
+)
+target_sources(core-kqp-finalize_script_service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
+)
diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt
new file mode 100644
index 0000000000..fa886df9b9
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt
@@ -0,0 +1,23 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-finalize_script_service)
+target_compile_options(core-kqp-finalize_script_service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-finalize_script_service PUBLIC
+ contrib-libs-linux-headers
+ contrib-libs-cxxsupp
+ yutil
+ core-kqp-proxy_service
+)
+target_sources(core-kqp-finalize_script_service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
+)
diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.txt
new file mode 100644
index 0000000000..f8b31df0c1
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/CMakeLists.txt
@@ -0,0 +1,17 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-aarch64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64")
+ include(CMakeLists.darwin-x86_64.txt)
+elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA)
+ include(CMakeLists.windows-x86_64.txt)
+elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA)
+ include(CMakeLists.linux-x86_64.txt)
+endif()
diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt
new file mode 100644
index 0000000000..985367871a
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt
@@ -0,0 +1,22 @@
+
+# This file was generated by the build system used internally in the Yandex monorepo.
+# Only simple modifications are allowed (adding source-files to targets, adding simple properties
+# like target_include_directories). These modifications will be ported to original
+# ya.make files by maintainers. Any complex modifications which can't be ported back to the
+# original buildsystem will not be accepted.
+
+
+
+add_library(core-kqp-finalize_script_service)
+target_compile_options(core-kqp-finalize_script_service PRIVATE
+ -DUSE_CURRENT_UDF_ABI_VERSION
+)
+target_link_libraries(core-kqp-finalize_script_service PUBLIC
+ contrib-libs-cxxsupp
+ yutil
+ core-kqp-proxy_service
+)
+target_sources(core-kqp-finalize_script_service PRIVATE
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
+)
diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
new file mode 100644
index 0000000000..03d2a475bb
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp
@@ -0,0 +1,218 @@
+#include "kqp_finalize_script_actor.h"
+
+#include <ydb/core/fq/libs/events/events.h>
+
+#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
+#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
+
+#include <ydb/library/yql/providers/common/provider/yql_provider_names.h>
+#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h>
+#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h>
+#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
+
+
+namespace NKikimr::NKqp {
+
+namespace {
+
+class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> {
+public:
+ TScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
+ const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup)
+ : ReplyActor_(request->Sender)
+ , ExecutionId_(request->Get()->ExecutionId)
+ , Database_(request->Get()->Database)
+ , FinalizationStatus_(request->Get()->FinalizationStatus)
+ , Request_(std::move(request))
+ , FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds()))
+ , MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds()))
+ , FederatedQuerySetup_(federatedQuerySetup)
+ {}
+
+ void Bootstrap() {
+ Register(CreateSaveScriptFinalStatusActor(std::move(Request_)));
+ Become(&TScriptFinalizerActor::FetchState);
+ }
+
+ STRICT_STFUNC(FetchState,
+ hFunc(TEvSaveScriptFinalStatusResponse, Handle);
+ hFunc(TEvScriptExecutionFinished, Handle);
+ )
+
+ void Handle(TEvSaveScriptFinalStatusResponse::TPtr& ev) {
+ Schedule(FinalizationTimeout_, new TEvents::TEvWakeup());
+ Become(&TScriptFinalizerActor::PrepareState);
+
+ CustomerSuppliedId_ = ev->Get()->CustomerSuppliedId;
+ Sinks_ = std::move(ev->Get()->Sinks);
+ UserToken_ = ev->Get()->UserToken;
+ SecretNames_ = std::move(ev->Get()->SecretNames);
+
+ if (Sinks_.empty()) {
+ FinishScriptFinalization();
+ } else if (SecretNames_.empty()) {
+ ComputeScriptExternalEffect();
+ } else {
+ FetchSecrets();
+ }
+ }
+
+private:
+ STRICT_STFUNC(PrepareState,
+ hFunc(TEvents::TEvWakeup, Handle);
+ hFunc(TEvDescribeSecretsResponse, Handle);
+ hFunc(NFq::TEvents::TEvEffectApplicationResult, Handle);
+ )
+
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ FinishScriptFinalization(Ydb::StatusIds::TIMEOUT, "Script finalization timeout");
+ }
+
+ void FetchSecrets() {
+ RegisterDescribeSecretsActor(SelfId(), UserToken_, SecretNames_, ActorContext(), MaximalSecretsSnapshotWaitTime_);
+ }
+
+ void Handle(TEvDescribeSecretsResponse::TPtr& ev) {
+ if (ev->Get()->Description.Status != Ydb::StatusIds::SUCCESS) {
+ FinishScriptFinalization(ev->Get()->Description.Status, std::move(ev->Get()->Description.Issues));
+ return;
+ }
+
+ FillSecureParams(ev->Get()->Description.SecretValues);
+ }
+
+ void FillSecureParams(const std::vector<TString>& secretValues) {
+ std::map<TString, TString> secretsMap;
+ for (size_t i = 0; i < secretValues.size(); ++i) {
+ secretsMap.emplace(SecretNames_[i], secretValues[i]);
+ }
+
+ for (const auto& sink : Sinks_) {
+ auto sinkName = sink.GetSinkName();
+
+ if (sinkName) {
+ const auto& structuredToken = NYql::CreateStructuredTokenParser(sink.GetAuthInfo()).ToBuilder().ReplaceReferences(secretsMap).ToJson();
+ SecureParams_.emplace(sinkName, structuredToken);
+ }
+ }
+
+ ComputeScriptExternalEffect();
+ }
+
+private:
+ static void AddExternalEffectS3(const NKqpProto::TKqpExternalSink& sink, NYql::NDqProto::TExternalEffect& externalEffectS3) {
+ NYql::NS3::TSink sinkSettings;
+ sink.GetSettings().UnpackTo(&sinkSettings);
+
+ NYql::NS3::TEffect sinkEffect;
+ sinkEffect.SetToken(sink.GetSinkName());
+ sinkEffect.MutableCleanup()->SetUrl(sinkSettings.GetUrl());
+ sinkEffect.MutableCleanup()->SetPrefix(sinkSettings.GetPath());
+
+ externalEffectS3.AddEffects()->SetData(sinkEffect.SerializeAsString());
+ }
+
+ void ComputeScriptExternalEffect() {
+ NYql::NDqProto::TExternalEffect externalEffectS3;
+ externalEffectS3.SetProviderName(TString(NYql::S3ProviderName));
+
+ for (const auto& sink : Sinks_) {
+ const TString& sinkType = sink.GetType();
+
+ if (sinkType == "S3Sink") {
+ AddExternalEffectS3(sink, externalEffectS3);
+ } else {
+ FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "unknown effect sink type: " << sinkType);
+ return;
+ }
+ }
+
+ RunS3ApplicatorActor(externalEffectS3);
+ }
+
+ void RunS3ApplicatorActor(const NYql::NDqProto::TExternalEffect& externalEffect) {
+ if (!FederatedQuerySetup_) {
+ FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, "unable to aplicate s3 external effect, invalid federated query setup");
+ return;
+ }
+
+ Register(NYql::NDq::MakeS3ApplicatorActor(
+ SelfId(),
+ FederatedQuerySetup_->HttpGateway,
+ CreateGuidAsString(),
+ CustomerSuppliedId_,
+ std::nullopt,
+ FinalizationStatus_ == EFinalizationStatus::FS_COMMIT,
+ THashMap<TString, TString>(SecureParams_.begin(), SecureParams_.end()),
+ FederatedQuerySetup_->CredentialsFactory,
+ externalEffect
+ ).Release());
+ }
+
+ void Handle(NFq::TEvents::TEvEffectApplicationResult::TPtr& ev) {
+ if (ev->Get()->FatalError) {
+ FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, std::move(ev->Get()->Issues));
+ } else {
+ FinishScriptFinalization();
+ }
+ }
+
+private:
+ STRICT_STFUNC(FinishState,
+ IgnoreFunc(TEvents::TEvWakeup);
+ IgnoreFunc(TEvDescribeSecretsResponse);
+ IgnoreFunc(NFq::TEvents::TEvEffectApplicationResult);
+ hFunc(TEvScriptExecutionFinished, Handle);
+ )
+
+ void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) {
+ Register(CreateScriptFinalizationFinisherActor(ExecutionId_, Database_, status, std::move(issues)));
+ Become(&TScriptFinalizerActor::FinishState);
+ }
+
+ void FinishScriptFinalization(Ydb::StatusIds::StatusCode status, const TString& message) {
+ FinishScriptFinalization(status, { NYql::TIssue(message) });
+ }
+
+ void FinishScriptFinalization() {
+ FinishScriptFinalization(std::nullopt, {});
+ }
+
+ void Handle(TEvScriptExecutionFinished::TPtr& ev) {
+ Send(ReplyActor_, ev->Release());
+ Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_));
+
+ PassAway();
+ }
+
+private:
+ TActorId ReplyActor_;
+ TString ExecutionId_;
+ TString Database_;
+ EFinalizationStatus FinalizationStatus_;
+ TEvScriptFinalizeRequest::TPtr Request_;
+
+ TDuration FinalizationTimeout_;
+ TDuration MaximalSecretsSnapshotWaitTime_;
+ const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup_;
+
+ TString CustomerSuppliedId_;
+ std::vector<NKqpProto::TKqpExternalSink> Sinks_;
+
+ TString UserToken_;
+ std::vector<TString> SecretNames_;
+ std::unordered_map<TString, TString> SecureParams_;
+};
+
+} // anonymous namespace
+
+IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
+ const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) {
+ return new TScriptFinalizerActor(std::move(request), finalizeScriptServiceConfig, metadataProviderConfig, federatedQuerySetup);
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h
new file mode 100644
index 0000000000..b47e553957
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h
@@ -0,0 +1,14 @@
+#pragma once
+
+#include <ydb/core/kqp/common/events/script_executions.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
+
+
+namespace NKikimr::NKqp {
+
+IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request,
+ const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup);
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
new file mode 100644
index 0000000000..cf6c66d5b5
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp
@@ -0,0 +1,143 @@
+#include "kqp_finalize_script_service.h"
+#include "kqp_finalize_script_actor.h"
+
+#include <ydb/core/kqp/proxy_service/kqp_script_executions.h>
+
+#include <ydb/library/yql/providers/s3/proto/sink.pb.h>
+
+
+namespace NKikimr::NKqp {
+
+namespace {
+
+class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptService> {
+public:
+ TKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory)
+ : FinalizeScriptServiceConfig_(finalizeScriptServiceConfig)
+ , MetadataProviderConfig_(metadataProviderConfig)
+ , FederatedQuerySetupFactory_(federatedQuerySetupFactory)
+ {}
+
+ void Bootstrap(const TActorContext &ctx) {
+ FederatedQuerySetup_ = FederatedQuerySetupFactory_->Make(ctx.ActorSystem());
+
+ Become(&TKqpFinalizeScriptService::MainState);
+ }
+
+ void Handle(TEvSaveScriptExternalEffectRequest::TPtr& ev) {
+ ev->Get()->Sinks = FilterExternalSinks(ev->Get()->Sinks);
+
+ if (!ev->Get()->Sinks.empty()) {
+ Register(CreateSaveScriptExternalEffectActor(std::move(ev)));
+ } else {
+ Send(ev->Sender, new TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::SUCCESS, {}));
+ }
+ }
+
+ void Handle(TEvScriptFinalizeRequest::TPtr& ev) {
+ TString executionId = ev->Get()->ExecutionId;
+
+ if (!FinalizationRequestsQueue_.contains(executionId)) {
+ WaitingFinalizationExecutions_.push(executionId);
+ }
+ FinalizationRequestsQueue_[executionId].emplace_back(std::move(ev));
+
+ TryStartFinalizeRequest();
+ }
+
+ STATEFN(MainState) {
+ switch (ev->GetTypeRewrite()) {
+ hFunc(TEvSaveScriptExternalEffectRequest, Handle);
+ hFunc(TEvScriptFinalizeRequest, Handle);
+ hFunc(TEvScriptFinalizeResponse, Handle);
+ default:
+ Y_ABORT("TKqpScriptFinalizeService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data());
+ }
+ }
+
+private:
+ void TryStartFinalizeRequest() {
+ if (FinalizationRequestsInFlight_ >= FinalizeScriptServiceConfig_.GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions_.empty()) {
+ return;
+ }
+
+ TString executionId = WaitingFinalizationExecutions_.front();
+ WaitingFinalizationExecutions_.pop();
+
+ auto& queue = FinalizationRequestsQueue_[executionId];
+ Y_ENSURE(!queue.empty());
+
+ StartFinalizeRequest(std::move(queue.back()));
+ queue.pop_back();
+ }
+
+ void StartFinalizeRequest(TEvScriptFinalizeRequest::TPtr request) {
+ ++FinalizationRequestsInFlight_;
+
+ Register(CreateScriptFinalizerActor(
+ std::move(request),
+ FinalizeScriptServiceConfig_,
+ MetadataProviderConfig_,
+ FederatedQuerySetup_
+ ));
+ }
+
+ void Handle(TEvScriptFinalizeResponse::TPtr& ev) {
+ --FinalizationRequestsInFlight_;
+ TString executionId = ev->Get()->ExecutionId;
+
+ if (!FinalizationRequestsQueue_[executionId].empty()) {
+ WaitingFinalizationExecutions_.push(executionId);
+ } else {
+ FinalizationRequestsQueue_.erase(executionId);
+ }
+ TryStartFinalizeRequest();
+ }
+
+private:
+ bool ValidateExternalSink(const NKqpProto::TKqpExternalSink& sink) {
+ if (sink.GetType() != "S3Sink") {
+ return false;
+ }
+
+ NYql::NS3::TSink sinkSettings;
+ sink.GetSettings().UnpackTo(&sinkSettings);
+
+ return sinkSettings.GetAtomicUploadCommit();
+ }
+
+ std::vector<NKqpProto::TKqpExternalSink> FilterExternalSinks(const std::vector<NKqpProto::TKqpExternalSink>& sinks) {
+ std::vector<NKqpProto::TKqpExternalSink> filteredSinks;
+ filteredSinks.reserve(sinks.size());
+ for (const auto& sink : sinks) {
+ if (ValidateExternalSink(sink)) {
+ filteredSinks.push_back(sink);
+ }
+ }
+
+ return filteredSinks;
+ }
+
+private:
+ NKikimrConfig::TFinalizeScriptServiceConfig FinalizeScriptServiceConfig_;
+ NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig_;
+
+ IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory_;
+ std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup_;
+
+ ui32 FinalizationRequestsInFlight_ = 0;
+ std::queue<TString> WaitingFinalizationExecutions_;
+ std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue_;
+};
+
+} // anonymous namespace
+
+IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) {
+ return new TKqpFinalizeScriptService(finalizeScriptServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory));
+}
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h
new file mode 100644
index 0000000000..53f0c2ea5f
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h
@@ -0,0 +1,12 @@
+#pragma once
+
+#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h>
+
+
+namespace NKikimr::NKqp {
+
+IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig,
+ const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig,
+ IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory);
+
+} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/finalize_script_service/ya.make b/ydb/core/kqp/finalize_script_service/ya.make
new file mode 100644
index 0000000000..935b275fa5
--- /dev/null
+++ b/ydb/core/kqp/finalize_script_service/ya.make
@@ -0,0 +1,14 @@
+LIBRARY()
+
+SRCS(
+ kqp_finalize_script_actor.cpp
+ kqp_finalize_script_service.cpp
+)
+
+PEERDIR(
+ ydb/core/kqp/proxy_service
+)
+
+YQL_LAST_ABI_VERSION()
+
+END()
diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
index 9c9ab9b85a..966f444a73 100644
--- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
+++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h
@@ -3,8 +3,6 @@
#include <ydb/core/base/tablet_pipe.h>
#include <ydb/core/base/appdata.h>
#include <ydb/library/yql/providers/common/gateway/yql_provider_gateway.h>
-#include <ydb/services/metadata/secret/fetcher.h>
-#include <ydb/services/metadata/secret/snapshot.h>
#include <library/cpp/actors/core/actor_bootstrapped.h>
#include <library/cpp/actors/core/hfunc.h>
@@ -113,97 +111,4 @@ private:
TActorId ActorId;
};
-struct TDescribeSecretsResponse {
- TDescribeSecretsResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues)
- : Status(status)
- , Issues(std::move(issues))
- {}
-
- TDescribeSecretsResponse(const TVector<TString>& secretValues)
- : SecretValues(secretValues)
- , Status(Ydb::StatusIds::SUCCESS)
- {}
-
- TVector<TString> SecretValues;
- Ydb::StatusIds::StatusCode Status;
- NYql::TIssues Issues;
-};
-
-class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> {
- STRICT_STFUNC(StateFunc,
- hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle);
- hFunc(NActors::TEvents::TEvWakeup, Handle);
- )
-
- void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) {
- auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>();
-
- TVector<TString> secretValues;
- secretValues.reserve(SecretIds.size());
- for (const auto& secretId: SecretIds) {
- TString secretValue;
- const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue);
- if (!isFound) {
- LastResponse = TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") });
- return;
- }
- secretValues.push_back(secretValue);
- }
- Promise.SetValue(TDescribeSecretsResponse(secretValues));
-
- UnsubscribeFromSecrets();
- PassAway();
- }
-
- void Handle(NActors::TEvents::TEvWakeup::TPtr&) {
- Promise.SetValue(LastResponse);
-
- UnsubscribeFromSecrets();
- PassAway();
- }
-
- NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() {
- return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>();
- }
-
- void UnsubscribeFromSecrets() {
- this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser()));
- }
-
-public:
- TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime)
- : SecretIds(CreateSecretIds(ownerUserId, secretIds))
- , Promise(promise)
- , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") })
- , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime)
- {}
-
- void Bootstrap() {
- if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) {
- Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") }));
- PassAway();
- return;
- }
-
- this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser()));
- this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup());
- Become(&TDescribeSecretsActor::StateFunc);
- }
-
-private:
- static TVector<NMetadata::NSecret::TSecretId> CreateSecretIds(const TString& ownerUserId, const TVector<TString>& secretIds) {
- TVector<NMetadata::NSecret::TSecretId> result;
- for (const auto& secretId: secretIds) {
- result.emplace_back(ownerUserId, secretId);
- }
- return result;
- }
-
-private:
- const TVector<NMetadata::NSecret::TSecretId> SecretIds;
- NThreading::TPromise<TDescribeSecretsResponse> Promise;
- TDescribeSecretsResponse LastResponse;
- TDuration MaximalSecretsSnapshotWaitTime;
-};
-
}
diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
index f996e69885..8d075b9b3c 100644
--- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
+++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp
@@ -2,6 +2,7 @@
#include "actors/kqp_ic_gateway_actors.h"
#include <ydb/core/base/path.h>
+#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h>
#include <ydb/core/statistics/events.h>
#include <ydb/core/statistics/stat_service.h>
@@ -361,7 +362,7 @@ void SetError(TTableMetadataResult& externalDataSourceMetadata, const TString& e
externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::BAD_REQUEST));
}
-void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSourceMetadata, const TDescribeSecretsResponse& objectDescription) {
+void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSourceMetadata, const TEvDescribeSecretsResponse::TDescription& objectDescription) {
if (objectDescription.Status != Ydb::StatusIds::SUCCESS) {
externalDataSourceMetadata.AddIssues(objectDescription.Issues);
externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(objectDescription.Status));
@@ -419,44 +420,44 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour
}
}
-NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) {
+NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) {
const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth();
switch (authDescription.identity_case()) {
case NKikimrSchemeOp::TAuth::kServiceAccount: {
const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName();
- auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime));
+ auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
+ actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
case NKikimrSchemeOp::TAuth::kNone:
- return MakeFuture(TDescribeSecretsResponse({}));
+ return MakeFuture(TEvDescribeSecretsResponse::TDescription({}));
case NKikimrSchemeOp::TAuth::kBasic: {
const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName();
- auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime));
+ auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
+ actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
case NKikimrSchemeOp::TAuth::kMdbBasic: {
const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName();
const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName();
- auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime));
+ auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
+ actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
case NKikimrSchemeOp::TAuth::kAws: {
const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName();
const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName();
- auto promise = NewPromise<TDescribeSecretsResponse>();
- actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime));
+ auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>();
+ actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime));
return promise.GetFuture();
}
case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET:
- return MakeFuture(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") }));
+ return MakeFuture(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") }));
}
}
@@ -733,7 +734,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta
return;
}
LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem)
- .Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeSecretsResponse>& result) mutable
+ .Subscribe([promise, externalDataSourceMetadata](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable
{
UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue());
promise.SetValue(externalDataSourceMetadata);
diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp
index dd7f40f13b..cfac07d4bb 100644
--- a/ydb/core/kqp/host/kqp_host.cpp
+++ b/ydb/core/kqp/host/kqp_host.cpp
@@ -1504,12 +1504,13 @@ private:
*PlanBuilder, sqlVersion, true /* UseDqExplain */);
}
- void InitS3Provider() {
+ void InitS3Provider(EKikimrQueryType queryType) {
auto state = MakeIntrusive<NYql::TS3State>();
state->Types = TypesCtx.Get();
state->FunctionRegistry = FuncRegistry;
state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory;
state->Configuration->WriteThroughDqIntegration = true;
+ state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script;
state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx);
@@ -1565,7 +1566,7 @@ private:
TypesCtx->AddDataSink(providerNames, kikimrDataSink);
if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) {
- InitS3Provider();
+ InitS3Provider(queryType);
InitGenericProvider();
}
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
index 3070b4feab..c8050700bb 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp
@@ -127,6 +127,7 @@ private:
Col("run_script_actor_id", NScheme::NTypeIds::Text),
Col("operation_status", NScheme::NTypeIds::Int32),
Col("execution_status", NScheme::NTypeIds::Int32),
+ Col("finalization_status", NScheme::NTypeIds::Int32),
Col("execution_mode", NScheme::NTypeIds::Int32),
Col("start_ts", NScheme::NTypeIds::Timestamp),
Col("end_ts", NScheme::NTypeIds::Timestamp),
@@ -140,6 +141,10 @@ private:
Col("result_set_metas", NScheme::NTypeIds::JsonDocument),
Col("stats", NScheme::NTypeIds::JsonDocument),
Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline.
+ Col("customer_supplied_id", NScheme::NTypeIds::Text),
+ Col("user_token", NScheme::NTypeIds::Text),
+ Col("script_sinks", NScheme::NTypeIds::JsonDocument),
+ Col("script_secret_names", NScheme::NTypeIds::JsonDocument),
},
{ "database", "execution_id" },
TtlCol("expire_at")
@@ -559,241 +564,119 @@ private:
IRetryPolicy::IRetryState::TPtr RetryState = nullptr;
};
+class TCheckLeaseStatusActorBase : public TActorBootstrapped<TCheckLeaseStatusActorBase> {
+ using TBase = TActorBootstrapped<TCheckLeaseStatusActorBase>;
-class TScriptExecutionFinisherBase : public TQueryBase {
-public:
- using TQueryBase::TQueryBase;
-
- void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus,
- TDuration operationTtl, TDuration resultsTtl, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(),
- TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) {
- TString sql = R"(
- -- TScriptExecutionFinisherBase::FinishScriptExecution
- DECLARE $database AS Text;
- DECLARE $execution_id AS Text;
- DECLARE $operation_status AS Int32;
- DECLARE $execution_status AS Int32;
- DECLARE $issues AS JsonDocument;
- DECLARE $plan AS JsonDocument;
- DECLARE $stats AS JsonDocument;
- DECLARE $ast AS Text;
- DECLARE $operation_ttl AS Interval;
- DECLARE $results_ttl AS Interval;
-
- UPDATE `.metadata/script_executions`
- SET
- operation_status = $operation_status,
- execution_status = $execution_status,
- issues = $issues,
- plan = $plan,
- end_ts = CurrentUtcTimestamp(),
- stats = $stats,
- ast = $ast,
- expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL)
- WHERE database = $database AND execution_id = $execution_id;
-
- DELETE FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
-
- UPDATE `.metadata/result_sets`
- SET
- expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL)
- where database = $database AND execution_id = $execution_id;
- )";
-
- TString serializedStats = "{}";
- if (kqpStats) {
- NJson::TJsonValue statsJson;
- Ydb::TableStats::QueryStats queryStats;
- NGRpcService::FillQueryStats(queryStats, *kqpStats);
- NProtobufJson::Proto2Json(queryStats, statsJson, NProtobufJson::TProto2JsonConfig());
- serializedStats = NJson::WriteJson(statsJson);
- }
-
- NYdb::TParamsBuilder params;
- params
- .AddParam("$database")
- .Utf8(database)
- .Build()
- .AddParam("$execution_id")
- .Utf8(executionId)
- .Build()
- .AddParam("$operation_status")
- .Int32(operationStatus)
- .Build()
- .AddParam("$execution_status")
- .Int32(execStatus)
- .Build()
- .AddParam("$issues")
- .JsonDocument(SerializeIssues(issues))
- .Build()
- .AddParam("$plan")
- .JsonDocument(queryPlan.GetOrElse("{}"))
- .Build()
- .AddParam("$stats")
- .JsonDocument(serializedStats)
- .Build()
- .AddParam("$ast")
- .Utf8(queryAst.GetOrElse(""))
- .Build()
- .AddParam("$operation_ttl")
- .Interval(static_cast<i64>(operationTtl.MicroSeconds()))
- .Build()
- .AddParam("$results_ttl")
- .Interval(static_cast<i64>(resultsTtl.MicroSeconds()))
- .Build();
+ inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60);
- RunDataQuery(sql, &params, txControl);
+public:
+ void Bootstrap() {
+ OnBootstrap();
}
- void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus,
- TDuration operationTtl, TDuration resultsTtl, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx(),
- TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) {
- FinishScriptExecution(database, executionId, operationStatus, execStatus, operationTtl, resultsTtl, IssuesFromMessage(message), txControl, kqpStats, queryPlan, queryAst);
+ Ydb::StatusIds::StatusCode GetOperationStatus() const {
+ return FinalOperationStatus;
}
- static NYql::TIssues IssuesFromMessage(const TString& message) {
- NYql::TIssues issues;
- issues.AddIssue(message);
- return issues;
+ Ydb::Query::ExecStatus GetExecStatus() const {
+ return FinalExecStatus;
}
- static NYql::TIssues LeaseExpiredIssues() {
- return IssuesFromMessage("Lease expired");
+ NYql::TIssues GetIssues() const {
+ return FinalIssues;
}
-};
-TMaybe<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) {
- NKikimrKqp::TScriptExecutionOperationMeta meta;
- try {
- NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig());
- return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl()));
- } catch (NJson::TJsonException &e) {
- return Nothing();
- }
-}
+ void StartScriptFinalization(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database, TMaybe<Ydb::StatusIds::StatusCode> status, TMaybe<Ydb::Query::ExecStatus> execStatus, NYql::TIssues issues) {
+ if (!status || !execStatus) {
+ issues.AddIssue("Finalization is not complete");
+ }
-class TScriptExecutionFinisher : public TScriptExecutionFinisherBase {
-public:
- TScriptExecutionFinisher(
- const TString& executionId,
- const TString& database,
- ui64 leaseGeneration,
- Ydb::StatusIds::StatusCode operationStatus,
- Ydb::Query::ExecStatus execStatus,
- NYql::TIssues issues,
- TMaybe<NKqpProto::TKqpStatsQuery> queryStats = Nothing(),
- TMaybe<TString> queryPlan = Nothing(),
- TMaybe<TString> queryAst = Nothing()
- )
- : Database(database)
- , ExecutionId(executionId)
- , LeaseGeneration(leaseGeneration)
- , OperationStatus(operationStatus)
- , ExecStatus(execStatus)
- , Issues(std::move(issues))
- , QueryStats(std::move(queryStats))
- , QueryPlan(std::move(queryPlan))
- , QueryAst(std::move(queryAst))
- {
+ ScriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(finalizationStatus, executionId, database, status ? *status : Ydb::StatusIds::UNAVAILABLE, execStatus ? *execStatus : Ydb::Query::EXEC_STATUS_ABORTED, std::move(issues));
+ RunScriptFinalizeRequest();
+
+ Become(&TCheckLeaseStatusActorBase::StateFunc);
}
- void OnRunQuery() override {
- TString sql = R"(
- -- TScriptExecutionFinisher::OnRunQuery
- DECLARE $database AS Text;
- DECLARE $execution_id AS Text;
+ void StartLeaseChecking(TActorId runScriptActorId, const TString& executionId, const TString& database) {
+ ScriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(EFinalizationStatus::FS_ROLLBACK, executionId, database, Ydb::StatusIds::UNAVAILABLE, Ydb::Query::EXEC_STATUS_ABORTED, NYql::TIssues{ NYql::TIssue("Lease expired") });
- SELECT lease_generation FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
+ Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup());
- SELECT meta FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id;
- )";
+ ui64 flags = IEventHandle::FlagTrackDelivery;
+ if (runScriptActorId.NodeId() != SelfId().NodeId()) {
+ flags |= IEventHandle::FlagSubscribeOnSession;
+ SubscribedOnSession = runScriptActorId.NodeId();
+ }
+ Send(runScriptActorId, new TEvCheckAliveRequest(), flags);
- NYdb::TParamsBuilder params;
- params
- .AddParam("$database")
- .Utf8(Database)
- .Build()
- .AddParam("$execution_id")
- .Utf8(ExecutionId)
- .Build();
+ Become(&TCheckLeaseStatusActorBase::StateFunc);
+ }
- RunDataQuery(sql, &params, TTxControl::BeginTx());
+ void PassAway() override {
+ if (SubscribedOnSession) {
+ Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe());
+ }
+ TBase::PassAway();
}
- void OnQueryResult() override {
- if (!FinishWasRun) {
- if (ResultSets.size() != 2) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
- return;
- }
- NYdb::TResultSetParser result(ResultSets[0]);
- if (result.RowsCount() == 0) {
- Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution");
- return;
- }
+ virtual void OnBootstrap() = 0;
+ virtual void OnLeaseVerified() = 0;
+ virtual void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) = 0;
- result.TryNextRow();
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvCheckAliveResponse, Handle);
+ hFunc(TEvents::TEvWakeup, Handle);
+ hFunc(NActors::TEvents::TEvUndelivered, Handle);
+ hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
+ hFunc(TEvScriptExecutionFinished, Handle);
+ )
- const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64();
- if (!leaseGenerationInDatabase) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation");
- return;
- }
+ void RunScriptFinalizeRequest() {
+ if (WaitFinishQuery) {
+ return;
+ }
- if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) {
- Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost");
- return;
- }
+ WaitFinishQuery = true;
+ FinalOperationStatus = ScriptFinalizeRequest->OperationStatus;
+ FinalExecStatus = ScriptFinalizeRequest->ExecStatus;
+ FinalIssues = ScriptFinalizeRequest->Issues;
+ Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), ScriptFinalizeRequest.release());
+ }
- NYdb::TResultSetParser metaResult(ResultSets[1]);
- metaResult.TryNextRow();
+ void Handle(TEvCheckAliveResponse::TPtr&) {
+ OnLeaseVerified();
+ }
- const auto serializedMeta = metaResult.ColumnParser("meta").GetOptionalJsonDocument();
- if (!serializedMeta) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation");
- return;
- }
+ void Handle(TEvents::TEvWakeup::TPtr&) {
+ RunScriptFinalizeRequest();
+ }
- const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
- if (!ttl) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
- return;
- }
+ void Handle(NActors::TEvents::TEvUndelivered::TPtr&) {
+ RunScriptFinalizeRequest();
+ }
- const auto [operationTtl, resultsTtl] = *ttl;
- FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, operationTtl, resultsTtl,
- Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst));
- FinishWasRun = true;
- } else {
- Finish();
- }
+ void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
+ RunScriptFinalizeRequest();
}
- void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
- KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << ExecutionId << ". Lease generation: " <<
- LeaseGeneration << ": " << Ydb::StatusIds::StatusCode_Name(status) << ". Issues: " << issues.ToOneLineString() << ". Plan: " << QueryPlan);
- Send(Owner, new TEvScriptExecutionFinished(status, std::move(issues)));
+ void Handle(TEvScriptExecutionFinished::TPtr& ev) {
+ OnScriptExecutionFinished(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues));
}
private:
- const TString Database;
- const TString ExecutionId;
- const ui64 LeaseGeneration;
- const Ydb::StatusIds::StatusCode OperationStatus;
- const Ydb::Query::ExecStatus ExecStatus;
- const NYql::TIssues Issues;
- const TMaybe<NKqpProto::TKqpStatsQuery> QueryStats;
- const TMaybe<TString> QueryPlan;
- const TMaybe<TString> QueryAst;
- bool FinishWasRun = false;
+ std::unique_ptr<TEvScriptFinalizeRequest> ScriptFinalizeRequest;
+ Ydb::StatusIds::StatusCode FinalOperationStatus;
+ Ydb::Query::ExecStatus FinalExecStatus;
+ NYql::TIssues FinalIssues;
+
+ bool WaitFinishQuery = false;
+ std::optional<ui32> SubscribedOnSession;
};
-class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase {
+class TCheckLeaseStatusQueryActor : public TQueryBase {
public:
- TCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie = 0)
+ TCheckLeaseStatusQueryActor(const TString& database, const TString& executionId, ui64 cookie = 0)
: Database(database)
, ExecutionId(executionId)
, Cookie(cookie)
@@ -801,16 +684,23 @@ public:
void OnRunQuery() override {
const TString sql = R"(
- -- TCheckLeaseStatusActor::OnRunQuery
+ -- TCheckLeaseStatusQueryActor::OnRunQuery
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
- SELECT operation_status, execution_status, issues, run_script_actor_id, meta FROM `.metadata/script_executions`
- WHERE database = $database AND execution_id = $execution_id AND
- (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
+ SELECT
+ operation_status,
+ execution_status,
+ finalization_status,
+ issues,
+ run_script_actor_id
+ FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id AND
+ (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
- SELECT lease_deadline FROM `.metadata/script_execution_leases`
- WHERE database = $database AND execution_id = $execution_id;
+ SELECT lease_deadline
+ FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
)";
NYdb::TParamsBuilder params;
@@ -822,11 +712,10 @@ public:
.Utf8(ExecutionId)
.Build();
- RunDataQuery(sql, &params, TTxControl::BeginTx());
- SetQueryResultHandler(&TCheckLeaseStatusActor::OnResult);
+ RunDataQuery(sql, &params);
}
- void OnResult() {
+ void OnQueryResult() override {
if (ResultSets.size() != 2) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
@@ -850,6 +739,12 @@ public:
}
TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
+
+ const TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32();
+ if (finalizationStatus) {
+ FinalizationStatus = static_cast<EFinalizationStatus>(*finalizationStatus);
+ }
+
TMaybe<TInstant> leaseDeadline;
NYdb::TResultSetParser result2(ResultSets[1]);
@@ -864,22 +759,8 @@ public:
if (operationStatus) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
} else if (*leaseDeadline < RunStartTime) {
- auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
- if (!serializedMeta) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation");
- return;
- }
- const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
- if (!ttl) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
- return;
- }
- const auto [operationTtl, resultsTtl] = *ttl;
- FinishScriptExecution(Database, ExecutionId, Ydb::StatusIds::UNAVAILABLE, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl);
- SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution);
- } else {
- // OperationStatus is Nothing(): currently running
- CommitTransaction();
+ LeaseExpired = true;
+ FinalizationStatus = EFinalizationStatus::FS_ROLLBACK;
}
} else if (operationStatus) {
OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
@@ -891,22 +772,16 @@ public:
if (issuesSerialized) {
OperationIssues = DeserializeIssues(*issuesSerialized);
}
- CommitTransaction();
} else {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state");
}
- }
- void OnFinishScriptExecution() {
- OperationStatus = Ydb::StatusIds::UNAVAILABLE;
- ExecutionStatus = Ydb::Query::EXEC_STATUS_ABORTED;
- OperationIssues = LeaseExpiredIssues();
Finish();
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
if (status == Ydb::StatusIds::SUCCESS) {
- Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId), 0, Cookie);
+ Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId, LeaseExpired, FinalizationStatus), 0, Cookie);
} else {
Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie);
}
@@ -919,8 +794,83 @@ private:
const ui64 Cookie;
TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
+ TMaybe<EFinalizationStatus> FinalizationStatus;
TMaybe<NYql::TIssues> OperationIssues;
NActors::TActorId RunScriptActorId;
+ bool LeaseExpired;
+};
+
+class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase {
+public:
+ TCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie = 0)
+ : ReplyActorId(replyActorId)
+ , Database(database)
+ , ExecutionId(executionId)
+ , Cookie(cookie)
+ {}
+
+ void OnBootstrap() override {
+ Register(new TCheckLeaseStatusQueryActor(Database, ExecutionId, Cookie));
+ Become(&TCheckLeaseStatusActor::StateFunc);
+ }
+
+ void OnLeaseVerified() override {
+ Reply();
+ }
+
+ void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (status != Ydb::StatusIds::SUCCESS) {
+ Reply(status, std::move(issues));
+ return;
+ }
+
+ if (alreadyFinalized) {
+ // Final status and issues are unknown, the operation must be repeated
+ Response->Get()->OperationStatus = Nothing();
+ Response->Get()->ExecutionStatus = Nothing();
+ Response->Get()->OperationIssues->Clear();
+ } else {
+ Response->Get()->OperationStatus = GetOperationStatus();
+ Response->Get()->ExecutionStatus = GetExecStatus();
+ Response->Get()->OperationIssues = GetIssues();
+ }
+
+ Reply();
+ }
+
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvPrivate::TEvLeaseCheckResult, Handle);
+ )
+
+ void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) {
+ Response = std::move(ev);
+
+ if (!Response->Get()->FinalizationStatus) {
+ Reply();
+ } else if (Response->Get()->LeaseExpired) {
+ StartLeaseChecking(Response->Get()->RunScriptActorId, ExecutionId, Database);
+ } else {
+ StartScriptFinalization(*Response->Get()->FinalizationStatus, ExecutionId, Database, Response->Get()->OperationStatus, Response->Get()->ExecutionStatus, Response->Get()->Issues);
+ }
+ }
+
+ void Reply() {
+ Send(ReplyActorId, Response->Release());
+ PassAway();
+ }
+
+ void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
+ Send(ReplyActorId, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)));
+ PassAway();
+ }
+
+private:
+ NActors::TActorId ReplyActorId;
+ TString Database;
+ TString ExecutionId;
+ ui64 Cookie;
+ TEvPrivate::TEvLeaseCheckResult::TPtr Response;
};
class TForgetScriptExecutionOperationActor : public TQueryBase {
@@ -1046,18 +996,17 @@ private:
NYql::TIssues Issues;
};
-class TGetScriptExecutionOperationQueryActor : public TScriptExecutionFinisherBase {
+class TGetScriptExecutionOperationQueryActor : public TQueryBase {
public:
- TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId, bool finishIfLeaseExpired)
+ TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId)
: Database(database)
, OperationId(operationId)
- , FinishIfLeaseExpired(finishIfLeaseExpired)
, StartActorTime(TInstant::Now())
{}
void OnRunQuery() override {
TString sql = R"(
- -- TGetScriptExecutionOperationActor::OnRunQuery
+ -- TGetScriptExecutionOperationQueryActor::OnRunQuery
DECLARE $database AS Text;
DECLARE $execution_id AS Text;
@@ -1065,6 +1014,7 @@ public:
run_script_actor_id,
operation_status,
execution_status,
+ finalization_status,
query_text,
syntax,
execution_mode,
@@ -1072,8 +1022,7 @@ public:
plan,
issues,
stats,
- ast,
- meta
+ ast
FROM `.metadata/script_executions`
WHERE database = $database AND execution_id = $execution_id AND
(expire_at > CurrentUtcTimestamp() OR expire_at IS NULL);
@@ -1097,11 +1046,10 @@ public:
.Utf8(ExecutionId)
.Build();
- RunDataQuery(sql, &params, TTxControl::BeginTx());
- SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnGetInfo);
+ RunDataQuery(sql, &params);
}
- void OnGetInfo() {
+ void OnQueryResult() override {
if (ResultSets.size() != 2) {
Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
return;
@@ -1119,6 +1067,11 @@ public:
OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus);
}
+ const TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32();
+ if (finalizationStatus) {
+ FinalizationStatus = static_cast<EFinalizationStatus>(*finalizationStatus);
+ }
+
Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId));
const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32();
@@ -1191,19 +1144,6 @@ public:
return;
}
- auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
- if (!serializedMeta) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation");
- return;
- }
-
- const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
- if (!ttl) {
- Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
- return;
- }
-
- const auto [operationTtl, resultsTtl] = *ttl;
deadlineResult.TryNextRow();
TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp();
@@ -1212,136 +1152,108 @@ public:
return;
}
- LeaseExpired = *leaseDeadline < StartActorTime;
- if (LeaseExpired && FinishIfLeaseExpired) {
- FinishScriptExecution(Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues);
- SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnFinishOperation);
+ if (*leaseDeadline < StartActorTime) {
+ LeaseExpired = true;
+ FinalizationStatus = EFinalizationStatus::FS_ROLLBACK;
}
}
- if (!LeaseExpired || !FinishIfLeaseExpired) {
- CommitTransaction();
- }
- }
-
- void OnFinishOperation() {
- OperationStatus = Ydb::StatusIds::UNAVAILABLE;
- Issues = LeaseExpiredIssues();
- Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED);
-
- Finish(Ydb::StatusIds::SUCCESS, std::move(Issues));
+ Finish();
}
void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
if (OperationStatus) {
- TMaybe<google::protobuf::Any> metadata;
- metadata.ConstructInPlace().PackFrom(Metadata);
- Send(Owner, new TEvGetScriptExecutionOperationResponse(true, LeaseExpired, RunScriptActorId, *OperationStatus, std::move(Issues), std::move(metadata)));
+ Send(Owner, new TEvGetScriptExecutionOperationQueryResponse(true, LeaseExpired, FinalizationStatus, RunScriptActorId, ExecutionId, *OperationStatus, std::move(Issues), std::move(Metadata)));
} else {
- Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing()));
+ Send(Owner, new TEvGetScriptExecutionOperationQueryResponse(false, LeaseExpired, FinalizationStatus, RunScriptActorId, ExecutionId, status, std::move(issues), std::move(Metadata)));
}
}
private:
TString Database;
NOperationId::TOperationId OperationId;
- bool FinishIfLeaseExpired;
TInstant StartActorTime;
TString ExecutionId;
- TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ std::optional<Ydb::StatusIds::StatusCode> OperationStatus;
+ std::optional<EFinalizationStatus> FinalizationStatus;
bool LeaseExpired = false;
TActorId RunScriptActorId;
NYql::TIssues Issues;
Ydb::Query::ExecuteScriptMetadata Metadata;
};
-class TGetScriptExecutionOperationActor : public TActorBootstrapped<TGetScriptExecutionOperationActor> {
- using TBase = TActorBootstrapped<TGetScriptExecutionOperationActor>;
-
- inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60);
-
+class TGetScriptExecutionOperationActor : public TCheckLeaseStatusActorBase {
public:
explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev)
: Request(std::move(ev))
{}
- void Bootstrap() {
- CreateGetScriptExecutionOperationQuery(false);
+ void OnBootstrap() override {
+ Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId));
Become(&TGetScriptExecutionOperationActor::StateFunc);
}
-private:
- STRICT_STFUNC(StateFunc,
- hFunc(TEvGetScriptExecutionOperationResponse, Handle);
- hFunc(TEvCheckAliveResponse, Handle);
- hFunc(TEvents::TEvWakeup, Handle);
- hFunc(NActors::TEvents::TEvUndelivered, Handle);
- hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle);
- )
-
- void CreateGetScriptExecutionOperationQuery(bool finishIfLeaseExpired) {
- Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId, finishIfLeaseExpired));
- }
-
- void CreateFinishScriptExecutionOperationQuery() {
- if (!WaitFinishQuery) {
- WaitFinishQuery = true;
- CreateGetScriptExecutionOperationQuery(true);
- }
+ void OnLeaseVerified() override {
+ Reply();
}
- void Handle(TEvGetScriptExecutionOperationResponse::TPtr& ev) {
- Response = std::move(ev);
-
- if (WaitFinishQuery || !Response->Get()->LeaseExpired) {
- Reply();
+ void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (status != Ydb::StatusIds::SUCCESS) {
+ Reply(status, std::move(issues));
return;
}
- Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup());
-
- NActors::TActorId runScriptActor = Response->Get()->RunScriptActorId;
- ui64 flags = IEventHandle::FlagTrackDelivery;
- if (runScriptActor.NodeId() != SelfId().NodeId()) {
- flags |= IEventHandle::FlagSubscribeOnSession;
- SubscribedOnSession = runScriptActor.NodeId();
+ if (alreadyFinalized) {
+ // Final status and issues are unknown, the operation must be repeated
+ Response->Get()->Ready = false;
+ Response->Get()->Status = Ydb::StatusIds::SUCCESS;
+ Response->Get()->Issues.Clear();
+ } else {
+ Response->Get()->Ready = true;
+ Response->Get()->Status = GetOperationStatus();
+ Response->Get()->Issues = GetIssues();
+ Response->Get()->Metadata.set_exec_status(GetExecStatus());
}
- Send(runScriptActor, new TEvCheckAliveRequest(), flags);
- }
- void Handle(TEvCheckAliveResponse::TPtr&) {
Reply();
}
- void Handle(TEvents::TEvWakeup::TPtr&) {
- CreateFinishScriptExecutionOperationQuery();
- }
+private:
+ STRICT_STFUNC(StateFunc,
+ hFunc(TEvGetScriptExecutionOperationQueryResponse, Handle);
+ )
- void Handle(NActors::TEvents::TEvUndelivered::TPtr&) {
- CreateFinishScriptExecutionOperationQuery();
- }
+ void Handle(TEvGetScriptExecutionOperationQueryResponse::TPtr& ev) {
+ Response = std::move(ev);
- void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) {
- CreateFinishScriptExecutionOperationQuery();
+ if (!Response->Get()->FinalizationStatus) {
+ Reply();
+ } else if (Response->Get()->LeaseExpired) {
+ StartLeaseChecking(Response->Get()->RunScriptActorId, Response->Get()->ExecutionId, Request->Get()->Database);
+ } else {
+ TMaybe<Ydb::Query::ExecStatus> execStatus;
+ if (Response->Get()->Ready) {
+ execStatus = Response->Get()->Metadata.exec_status();
+ }
+ StartScriptFinalization(*Response->Get()->FinalizationStatus, Response->Get()->ExecutionId, Request->Get()->Database, Response->Get()->Status, execStatus, Response->Get()->Issues);
+ }
}
void Reply() {
- Send(Request->Sender, Response->Release().Release());
+ TMaybe<google::protobuf::Any> metadata;
+ metadata.ConstructInPlace().PackFrom(Response->Get()->Metadata);
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Response->Get()->Ready, Response->Get()->Status, std::move(Response->Get()->Issues), std::move(metadata)));
PassAway();
}
- void PassAway() override {
- if (SubscribedOnSession) {
- Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe());
- }
- TBase::PassAway();
+ void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) {
+ Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(status, std::move(issues)));
+ PassAway();
}
private:
TEvGetScriptExecutionOperation::TPtr Request;
- TEvGetScriptExecutionOperationResponse::TPtr Response;
- bool WaitFinishQuery = false;
- TMaybe<ui32> SubscribedOnSession;
+ TEvGetScriptExecutionOperationQueryResponse::TPtr Response;
};
class TListScriptExecutionOperationsQuery : public TQueryBase {
@@ -1535,7 +1447,7 @@ public:
if (!op.ready()) {
Ydb::Query::ExecuteScriptMetadata metadata;
op.metadata().UnpackTo(&metadata);
- Register(new TCheckLeaseStatusActor(Request->Get()->Database, metadata.execution_id(), i));
+ Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, metadata.execution_id(), i));
++OperationsToCheck;
}
}
@@ -1603,7 +1515,7 @@ public:
ExecutionId = *executionId;
Become(&TCancelScriptExecutionOperationActor::StateFunc);
- Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId));
+ Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId));
}
STRICT_STFUNC(StateFunc,
@@ -1648,7 +1560,7 @@ public:
void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) {
if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived.
- Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished.
+ Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId)); // Check if the operation has finished.
} else {
Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination");
}
@@ -1880,6 +1792,16 @@ private:
std::vector<TString> SerializedRows;
};
+std::optional<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) {
+ NKikimrKqp::TScriptExecutionOperationMeta meta;
+ try {
+ NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig());
+ return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl()));
+ } catch (const NJson::TJsonException& e) {
+ return std::nullopt;
+ }
+}
+
class TGetScriptExecutionResultQuery : public TQueryBase {
public:
TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit)
@@ -2122,6 +2044,562 @@ private:
THolder<TEvKqp::TEvFetchScriptResultsResponse> Response;
};
+class TSaveScriptExternalEffectActor : public TQueryBase {
+public:
+ explicit TSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev)
+ : Request(std::move(ev))
+ {}
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ -- TSaveScriptExternalEffectActor::OnRunQuery
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $customer_supplied_id AS Text;
+ DECLARE $user_token AS Text;
+ DECLARE $script_sinks AS JsonDocument;
+ DECLARE $script_secret_names AS JsonDocument;
+
+ UPDATE `.metadata/script_executions`
+ SET
+ customer_supplied_id = $customer_supplied_id,
+ user_token = $user_token,
+ script_sinks = $script_sinks,
+ script_secret_names = $script_secret_names
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Request->Get()->Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(Request->Get()->ExecutionId)
+ .Build()
+ .AddParam("$customer_supplied_id")
+ .Utf8(Request->Get()->CustomerSuppliedId)
+ .Build()
+ .AddParam("$user_token")
+ .Utf8(Request->Get()->UserToken)
+ .Build()
+ .AddParam("$script_sinks")
+ .JsonDocument(SerializeSinks(Request->Get()->Sinks))
+ .Build()
+ .AddParam("$script_secret_names")
+ .JsonDocument(SerializeSecretNames(Request->Get()->SecretNames))
+ .Build();
+
+ RunDataQuery(sql, &params);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ Send(Request->Sender, new TEvSaveScriptExternalEffectResponse(status, std::move(issues)));
+ }
+
+private:
+ static TString SerializeSinks(const std::vector<NKqpProto::TKqpExternalSink>& sinks) {
+ NJson::TJsonValue value;
+ value.SetType(NJson::EJsonValueType::JSON_ARRAY);
+
+ NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe();
+ jsonArray.resize(sinks.size());
+ for (size_t i = 0; i < sinks.size(); ++i) {
+ NProtobufJson::Proto2Json(sinks[i], jsonArray[i], NProtobufJson::TProto2JsonConfig());
+ }
+
+ NJsonWriter::TBuf serializedSinks;
+ serializedSinks.WriteJsonValue(&value);
+
+ return serializedSinks.Str();
+ }
+
+ static TString SerializeSecretNames(const std::vector<TString>& secretNames) {
+ NJson::TJsonValue value;
+ value.SetType(NJson::EJsonValueType::JSON_ARRAY);
+
+ NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe();
+ jsonArray.resize(secretNames.size());
+ for (size_t i = 0; i < secretNames.size(); ++i) {
+ jsonArray[i] = NJson::TJsonValue(secretNames[i]);
+ }
+
+ NJsonWriter::TBuf serializedSecretNames;
+ serializedSecretNames.WriteJsonValue(&value);
+
+ return serializedSecretNames.Str();
+ }
+
+private:
+ TEvSaveScriptExternalEffectRequest::TPtr Request;
+};
+
+class TSaveScriptFinalStatusActor : public TQueryBase {
+public:
+ explicit TSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev)
+ : Request(ev)
+ {}
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ -- TSaveScriptFinalStatusActor::OnRunQuery
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ SELECT
+ operation_status,
+ finalization_status,
+ meta,
+ customer_supplied_id,
+ user_token,
+ script_sinks,
+ script_secret_names
+ FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ SELECT lease_generation
+ FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Request->Get()->Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(Request->Get()->ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ SetQueryResultHandler(&TSaveScriptFinalStatusActor::OnGetInfo);
+ }
+
+ void OnGetInfo() {
+ if (ResultSets.size() != 2) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::NOT_FOUND, "No such execution");
+ return;
+ }
+
+ result.TryNextRow();
+
+ TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32();
+ if (finalizationStatus) {
+ if (Request->Get()->FinalizationStatus != *finalizationStatus) {
+ Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Execution already have different finalization status");
+ return;
+ }
+ ApplicateScriptExternalEffectRequired = true;
+ }
+
+ TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32();
+
+ if (Request->Get()->LeaseGeneration && !operationStatus) {
+ NYdb::TResultSetParser leaseResult(ResultSets[1]);
+ if (leaseResult.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state");
+ return;
+ }
+
+ leaseResult.TryNextRow();
+
+ TMaybe<i64> leaseGenerationInDatabase = leaseResult.ColumnParser("lease_generation").GetOptionalInt64();
+ if (!leaseGenerationInDatabase) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation");
+ return;
+ }
+
+ if (*Request->Get()->LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) {
+ Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost");
+ return;
+ }
+ }
+
+ TMaybe<TString> customerSuppliedId = result.ColumnParser("customer_supplied_id").GetOptionalUtf8();
+ if (customerSuppliedId) {
+ CustomerSuppliedId = *customerSuppliedId;
+ }
+
+ TMaybe<TString> userToken = result.ColumnParser("user_token").GetOptionalUtf8();
+ if (userToken) {
+ UserToken = *userToken;
+ }
+
+ SerializedSinks = result.ColumnParser("script_sinks").GetOptionalJsonDocument();
+ if (SerializedSinks) {
+ NJson::TJsonValue value;
+ if (!NJson::ReadJsonTree(*SerializedSinks, &value) || value.GetType() != NJson::JSON_ARRAY) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Script sinks are corrupted");
+ return;
+ }
+
+ for (auto i = 0; i < value.GetIntegerRobust(); i++) {
+ const NJson::TJsonValue* serializedSink;
+ value.GetValuePointer(i, &serializedSink);
+
+ NKqpProto::TKqpExternalSink sink;
+ NProtobufJson::Json2Proto(*serializedSink, sink);
+ Sinks.push_back(sink);
+ }
+ }
+
+ SerializedSecretNames = result.ColumnParser("script_secret_names").GetOptionalJsonDocument();
+ if (SerializedSecretNames) {
+ NJson::TJsonValue value;
+ if (!NJson::ReadJsonTree(*SerializedSecretNames, &value) || value.GetType() != NJson::JSON_ARRAY) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Script secret names are corrupted");
+ return;
+ }
+
+ for (auto i = 0; i < value.GetIntegerRobust(); i++) {
+ const NJson::TJsonValue* serializedSecretName;
+ value.GetValuePointer(i, &serializedSecretName);
+
+ SecretNames.push_back(serializedSecretName->GetString());
+ }
+ }
+
+ const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument();
+ if (!serializedMeta) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation meta information");
+ return;
+ }
+
+ const auto ttl = GetTtlFromSerializedMeta(*serializedMeta);
+ if (!ttl) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted");
+ return;
+ }
+
+ Request->Get()->OperationTtl = ttl->first;
+ Request->Get()->ResultsTtl = ttl->second;
+
+ if (operationStatus) {
+ FinalStatusAlreadySaved = true;
+ OperationAlreadyFinalized = !finalizationStatus;
+ CommitTransaction();
+ return;
+ }
+
+ ApplicateScriptExternalEffectRequired = ApplicateScriptExternalEffectRequired || HasExternalEffect();
+ FinishScriptExecution();
+ }
+
+ void FinishScriptExecution() {
+ TString sql = R"(
+ -- TSaveScriptFinalStatusActor::FinishScriptExecution
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $operation_status AS Int32;
+ DECLARE $execution_status AS Int32;
+ DECLARE $finalization_status AS Int32;
+ DECLARE $issues AS JsonDocument;
+ DECLARE $plan AS JsonDocument;
+ DECLARE $stats AS JsonDocument;
+ DECLARE $ast AS Text;
+ DECLARE $operation_ttl AS Interval;
+ DECLARE $results_ttl AS Interval;
+ DECLARE $customer_supplied_id AS Text;
+ DECLARE $user_token AS Text;
+ DECLARE $script_sinks AS Optional<JsonDocument>;
+ DECLARE $script_secret_names AS Optional<JsonDocument>;
+ DECLARE $applicate_script_external_effect_required AS Bool;
+
+ UPDATE `.metadata/script_executions`
+ SET
+ operation_status = $operation_status,
+ execution_status = $execution_status,
+ finalization_status = IF($applicate_script_external_effect_required, $finalization_status, NULL),
+ issues = $issues,
+ plan = $plan,
+ end_ts = CurrentUtcTimestamp(),
+ stats = $stats,
+ ast = $ast,
+ expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL),
+ customer_supplied_id = IF($applicate_script_external_effect_required, $customer_supplied_id, NULL),
+ user_token = IF($applicate_script_external_effect_required, $user_token, NULL),
+ script_sinks = IF($applicate_script_external_effect_required, $script_sinks, NULL),
+ script_secret_names = IF($applicate_script_external_effect_required, $script_secret_names, NULL)
+ WHERE database = $database AND execution_id = $execution_id;
+
+ DELETE FROM `.metadata/script_execution_leases`
+ WHERE database = $database AND execution_id = $execution_id;
+
+ UPDATE `.metadata/result_sets`
+ SET expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL)
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ TString serializedStats = "{}";
+ if (Request->Get()->QueryStats) {
+ NJson::TJsonValue statsJson;
+ Ydb::TableStats::QueryStats queryStats;
+ NGRpcService::FillQueryStats(queryStats, *Request->Get()->QueryStats);
+ NProtobufJson::Proto2Json(queryStats, statsJson, NProtobufJson::TProto2JsonConfig());
+ serializedStats = NJson::WriteJson(statsJson);
+ }
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Request->Get()->Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(Request->Get()->ExecutionId)
+ .Build()
+ .AddParam("$operation_status")
+ .Int32(Request->Get()->OperationStatus)
+ .Build()
+ .AddParam("$execution_status")
+ .Int32(Request->Get()->ExecStatus)
+ .Build()
+ .AddParam("$finalization_status")
+ .Int32(Request->Get()->FinalizationStatus)
+ .Build()
+ .AddParam("$issues")
+ .JsonDocument(SerializeIssues(Request->Get()->Issues))
+ .Build()
+ .AddParam("$plan")
+ .JsonDocument(Request->Get()->QueryPlan.value_or("{}"))
+ .Build()
+ .AddParam("$stats")
+ .JsonDocument(serializedStats)
+ .Build()
+ .AddParam("$ast")
+ .Utf8(Request->Get()->QueryAst.value_or(""))
+ .Build()
+ .AddParam("$operation_ttl")
+ .Interval(static_cast<i64>(Request->Get()->OperationTtl.MicroSeconds()))
+ .Build()
+ .AddParam("$results_ttl")
+ .Interval(static_cast<i64>(Request->Get()->ResultsTtl.MicroSeconds()))
+ .Build()
+ .AddParam("$customer_supplied_id")
+ .Utf8(CustomerSuppliedId)
+ .Build()
+ .AddParam("$user_token")
+ .Utf8(UserToken)
+ .Build()
+ .AddParam("$script_sinks")
+ .OptionalJsonDocument(SerializedSinks)
+ .Build()
+ .AddParam("$script_secret_names")
+ .OptionalJsonDocument(SerializedSecretNames)
+ .Build()
+ .AddParam("$applicate_script_external_effect_required")
+ .Bool(ApplicateScriptExternalEffectRequired)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
+ SetQueryResultHandler(&TSaveScriptFinalStatusActor::OnQueryResult);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (!FinalStatusAlreadySaved) {
+ KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << Request->Get()->ExecutionId
+ << ". " << Ydb::StatusIds::StatusCode_Name(Request->Get()->OperationStatus)
+ << ". Issues: " << Request->Get()->Issues.ToOneLineString() << ". Plan: " << Request->Get()->QueryPlan.value_or(""));
+ }
+
+ if (!ApplicateScriptExternalEffectRequired || status != Ydb::StatusIds::SUCCESS) {
+ Send(Owner, new TEvScriptExecutionFinished(OperationAlreadyFinalized, status, issues));
+ return;
+ }
+
+ auto response = std::make_unique<TEvSaveScriptFinalStatusResponse>(CustomerSuppliedId, UserToken);
+ response->Sinks = std::move(Sinks);
+ response->SecretNames = std::move(SecretNames);
+
+ Send(Owner, response.release());
+ }
+
+private:
+ bool HasExternalEffect() const {
+ return !Sinks.empty();
+ }
+
+private:
+ TEvScriptFinalizeRequest::TPtr Request;
+
+ bool OperationAlreadyFinalized = false;
+ bool FinalStatusAlreadySaved = false;
+ bool ApplicateScriptExternalEffectRequired = false;
+
+ TString CustomerSuppliedId;
+ TString UserToken;
+ TMaybe<TString> SerializedSinks;
+ std::vector<NKqpProto::TKqpExternalSink> Sinks;
+ TMaybe<TString> SerializedSecretNames;
+ std::vector<TString> SecretNames;
+};
+
+class TScriptFinalizationFinisherActor : public TQueryBase {
+public:
+ TScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues)
+ : ExecutionId(executionId)
+ , Database(database)
+ , OperationStatus(operationStatus)
+ , OperationIssues(std::move(operationIssues))
+ {}
+
+ void OnRunQuery() override {
+ TString sql = R"(
+ -- TScriptFinalizationFinisherActor::OnRunQuery
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ SELECT finalization_status
+ FROM `.metadata/script_executions`
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::BeginTx());
+ SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnGetInfo);
+ }
+
+ void OnGetInfo() {
+ if (ResultSets.size() != 1) {
+ Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response");
+ return;
+ }
+
+ NYdb::TResultSetParser result(ResultSets[0]);
+ if (result.RowsCount() == 0) {
+ Finish(Ydb::StatusIds::NOT_FOUND, "No such execution");
+ return;
+ }
+
+ result.TryNextRow();
+
+ TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32();
+ if (!finalizationStatus) {
+ Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished");
+ return;
+ }
+
+ if (OperationStatus) {
+ UpdateOperationFinalStatus();
+ } else {
+ UpdateOnlyFinalizationStatus();
+ }
+ }
+
+ void UpdateOperationFinalStatus() {
+ TString sql = R"(
+ -- TScriptFinalizationFinisherActor::UpdateOperationFinalStatus
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+ DECLARE $operation_status AS Int32;
+ DECLARE $execution_status AS Int32;
+ DECLARE $issues AS JsonDocument;
+
+ UPDATE `.metadata/script_executions`
+ SET
+ operation_status = $operation_status,
+ execution_status = $execution_status,
+ finalization_status = NULL,
+ issues = $issues,
+ customer_supplied_id = NULL,
+ user_token = NULL,
+ script_sinks = NULL,
+ script_secret_names = NULL
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build()
+ .AddParam("$operation_status")
+ .Int32(*OperationStatus)
+ .Build()
+ .AddParam("$execution_status")
+ .Int32(Ydb::Query::EXEC_STATUS_FAILED)
+ .Build()
+ .AddParam("$issues")
+ .JsonDocument(SerializeIssues(OperationIssues))
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
+ SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnQueryResult);
+ }
+
+ void UpdateOnlyFinalizationStatus() {
+ TString sql = R"(
+ -- TScriptFinalizationFinisherActor::UpdateOnlyFinalizationStatus
+ DECLARE $database AS Text;
+ DECLARE $execution_id AS Text;
+
+ UPDATE `.metadata/script_executions`
+ SET
+ finalization_status = NULL,
+ customer_supplied_id = NULL,
+ user_token = NULL,
+ script_sinks = NULL,
+ script_secret_names = NULL
+ WHERE database = $database AND execution_id = $execution_id;
+ )";
+
+ NYdb::TParamsBuilder params;
+ params
+ .AddParam("$execution_id")
+ .Utf8(ExecutionId)
+ .Build()
+ .AddParam("$database")
+ .Utf8(Database)
+ .Build();
+
+ RunDataQuery(sql, &params, TTxControl::ContinueAndCommitTx());
+ SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnQueryResult);
+ }
+
+ void OnQueryResult() override {
+ Finish();
+ }
+
+ void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override {
+ if (OperationStatus) {
+ OperationIssues.AddIssues(std::move(issues));
+ Send(Owner, new TEvScriptExecutionFinished(status != Ydb::StatusIds::SUCCESS, *OperationStatus, std::move(OperationIssues)));
+ } else {
+ Send(Owner, new TEvScriptExecutionFinished(status != Ydb::StatusIds::SUCCESS, Ydb::StatusIds::SUCCESS, std::move(issues)));
+ }
+ }
+
+private:
+ TString ExecutionId;
+ TString Database;
+ std::optional<Ydb::StatusIds::StatusCode> OperationStatus;
+ NYql::TIssues OperationIssues;
+};
+
} // anonymous namespace
NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) {
@@ -2132,21 +2610,6 @@ NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase
return new TScriptExecutionsTablesCreator(std::move(resultEvent));
}
-NActors::IActor* CreateScriptExecutionFinisher(
- const TString& executionId,
- const TString& database,
- ui64 leaseGeneration,
- Ydb::StatusIds::StatusCode operationStatus,
- Ydb::Query::ExecStatus execStatus,
- NYql::TIssues issues,
- TMaybe<NKqpProto::TKqpStatsQuery> queryStats,
- TMaybe<TString> queryPlan,
- TMaybe<TString> queryAst
- )
-{
- return new TScriptExecutionFinisher(executionId, database, leaseGeneration, operationStatus, execStatus, std::move(issues), std::move(queryStats), std::move(queryPlan), std::move(queryAst));
-}
-
NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) {
return new TForgetScriptExecutionOperationActor(std::move(ev));
}
@@ -2179,14 +2642,26 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re
return new TGetScriptExecutionResultActor(replyActorId, database, executionId, resultSetIndex, offset, limit);
}
+NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev) {
+ return new TSaveScriptExternalEffectActor(std::move(ev));
+}
+
+NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev) {
+ return new TSaveScriptFinalStatusActor(std::move(ev));
+}
+
+NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues) {
+ return new TScriptFinalizationFinisherActor(executionId, database, operationStatus, std::move(operationIssues));
+}
+
namespace NPrivate {
NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) {
return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, operationTtl, resultsTtl, leaseDuration);
}
-NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie) {
- return new TCheckLeaseStatusActor(database, executionId, cookie);
+NActors::IActor* CreateCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie) {
+ return new TCheckLeaseStatusActor(replyActorId, database, executionId, cookie);
}
} // namespace NPrivate
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h
index cda7cbe179..e0f526612d 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h
@@ -23,19 +23,6 @@ NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOpe
NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev);
NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev);
-// Updates status in database.
-NActors::IActor* CreateScriptExecutionFinisher(
- const TString& executionId,
- const TString& database,
- ui64 leaseGeneration,
- Ydb::StatusIds::StatusCode operationStatus,
- Ydb::Query::ExecStatus execStatus,
- NYql::TIssues issues,
- TMaybe<NKqpProto::TKqpStatsQuery> queryStats = Nothing(),
- TMaybe<TString> queryPlan = Nothing(),
- TMaybe<TString> queryAst = Nothing()
-);
-
// Updates lease deadline in database.
NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters);
@@ -44,4 +31,9 @@ NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorI
NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows);
NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit);
+// Compute external effects and updates status in database
+NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev);
+NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev);
+NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues);
+
} // namespace NKikimr::NKqp
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
index 5a33c23802..27592314b6 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h
@@ -45,26 +45,33 @@ struct TEvPrivate {
TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues)
: Status(statusCode)
, Issues(std::move(issues))
+ , LeaseExpired(false)
{
}
TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus,
TMaybe<Ydb::Query::ExecStatus> executionStatus,
TMaybe<NYql::TIssues> operationIssues,
- const NActors::TActorId& runScriptActorId)
+ const NActors::TActorId& runScriptActorId,
+ bool leaseExpired,
+ TMaybe<EFinalizationStatus> finalizationStatus)
: Status(Ydb::StatusIds::SUCCESS)
, OperationStatus(operationStatus)
, ExecutionStatus(executionStatus)
, OperationIssues(operationIssues)
, RunScriptActorId(runScriptActorId)
+ , LeaseExpired(leaseExpired)
+ , FinalizationStatus(finalizationStatus)
{}
const Ydb::StatusIds::StatusCode Status;
const NYql::TIssues Issues;
- const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
- const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
- const TMaybe<NYql::TIssues> OperationIssues;
+ TMaybe<Ydb::StatusIds::StatusCode> OperationStatus;
+ TMaybe<Ydb::Query::ExecStatus> ExecutionStatus;
+ TMaybe<NYql::TIssues> OperationIssues;
const NActors::TActorId RunScriptActorId;
+ const bool LeaseExpired;
+ const TMaybe<EFinalizationStatus> FinalizationStatus;
};
};
@@ -74,6 +81,6 @@ NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionI
TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero());
// Checks lease of execution, finishes execution if its lease is off, returns current status
-NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie = 0);
+NActors::IActor* CreateCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie = 0);
} // namespace NKikimr::NKqp::NPrivate
diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
index cd4eec4f4b..e3de144911 100644
--- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
+++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp
@@ -189,7 +189,7 @@ struct TScriptExecutionsYdbSetup {
NPrivate::TEvPrivate::TEvLeaseCheckResult::TPtr CheckLeaseStatus(const TString& executionId) {
const ui32 node = 0;
TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node);
- GetRuntime()->Register(NPrivate::CreateCheckLeaseStatusActor(TestDatabase, executionId), 0, 0, TMailboxType::Simple, 0, edgeActor);
+ GetRuntime()->Register(NPrivate::CreateCheckLeaseStatusActor(edgeActor, TestDatabase, executionId));
auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvLeaseCheckResult>(edgeActor);
UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS);
diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
index 99ad58c00a..fe97c82bce 100644
--- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
+++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp
@@ -128,6 +128,7 @@ private:
auto ev = MakeHolder<TEvKqp::TEvQueryRequest>();
ev->Record = Request;
ev->Record.MutableRequest()->SetSessionId(SessionId);
+ ev->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, SessionId, ExecutionId, Request.GetTraceId()));
NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId());
@@ -170,10 +171,17 @@ private:
}
void RunScriptExecutionFinisher() {
+ if (RunState == ERunState::Cancelling) {
+ Issues.AddIssue("Script execution is cancelled");
+ }
+
if (!FinalStatusIsSaved) {
FinalStatusIsSaved = true;
- Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, Status, GetExecStatusFromStatusCode(Status),
- Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst)));
+ auto scriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(
+ GetFinalizationStatusFromRunState(), ExecutionId, Database, Status, GetExecStatusFromStatusCode(Status),
+ Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst), LeaseGeneration
+ );
+ Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), scriptFinalizeRequest.release());
return;
}
@@ -439,6 +447,12 @@ private:
}
}
+ EFinalizationStatus GetFinalizationStatusFromRunState() const {
+ if (Status == Ydb::StatusIds::SUCCESS && (RunState == ERunState::Finishing || RunState == ERunState::Finished)) {
+ return EFinalizationStatus::FS_COMMIT;
+ }
+ return EFinalizationStatus::FS_ROLLBACK;
+ }
void CheckInflight() {
if (Status == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED || (Status == Ydb::StatusIds::SUCCESS && RunState == ERunState::Finishing && (SaveResultMetaInflight || SaveResultInflight))) {
@@ -451,10 +465,6 @@ private:
} else {
FinishAfterLeaseUpdate = true;
}
-
- if (RunState == ERunState::Cancelling) {
- Issues.AddIssue("Script execution is cancelled");
- }
}
void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) {
@@ -513,9 +523,9 @@ private:
ui32 SaveResultInflight = 0;
ui32 SaveResultMetaInflight = 0;
bool PendingResultMeta = false;
- TMaybe<TString> QueryPlan;
- TMaybe<TString> QueryAst;
- TMaybe<NKqpProto::TKqpStatsQuery> QueryStats;
+ std::optional<TString> QueryPlan;
+ std::optional<TString> QueryAst;
+ std::optional<NKqpProto::TKqpStatsQuery> QueryStats;
};
} // namespace
diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h
index 26e84891f4..6dcc44118c 100644
--- a/ydb/core/kqp/session_actor/kqp_query_state.h
+++ b/ydb/core/kqp/session_actor/kqp_query_state.h
@@ -61,7 +61,11 @@ public:
KqpSessionSpan = NWilson::TSpan(
TWilsonKqp::KqpSession, std::move(traceId),
"Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END);
- UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId);
+ if (RequestEv->GetUserRequestContext()) {
+ UserRequestContext = RequestEv->GetUserRequestContext();
+ } else {
+ UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId);
+ }
}
// the monotonously growing counter, the ordinal number of the query,
diff --git a/ydb/core/kqp/ya.make b/ydb/core/kqp/ya.make
index 0c70ffb04c..a45a8d67cd 100644
--- a/ydb/core/kqp/ya.make
+++ b/ydb/core/kqp/ya.make
@@ -62,6 +62,7 @@ RECURSE(
executer_actor
expr_nodes
federated_query
+ finalize_script_service
gateway
host
node_service
diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto
index 77629404d0..5e5e874d98 100644
--- a/ydb/core/protos/config.proto
+++ b/ydb/core/protos/config.proto
@@ -1013,6 +1013,11 @@ message TTableProfilesConfig {
repeated TCachingPolicy CachingPolicies = 7;
}
+message TFinalizeScriptServiceConfig {
+ optional uint64 ScriptFinalizationTimeoutSeconds = 1 [default = 60];
+ optional uint32 MaxInFlightFinalizationsCount = 2 [default = 10];
+};
+
message TQueryServiceConfig {
optional uint64 ScriptOperationTimeoutDefaultSeconds = 1 [default = 604800]; // default = 1 week
optional uint64 ScriptForgetAfterDefaultSeconds = 2 [default = 31536000]; // default = 1 year; 0 = infinity
@@ -1026,6 +1031,7 @@ message TQueryServiceConfig {
optional string MdbGateway = 9 [deprecated=true];
optional bool MdbTransformHost = 10;
optional NYql.TGenericGatewayConfig Generic = 11;
+ optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12;
}
// Config describes immediate controls and allows
diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
index 196f5b664a..374379e37b 100644
--- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt
@@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-core-keyvalue
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
index 80814daf46..43e9c7e52c 100644
--- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt
@@ -49,6 +49,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-core-keyvalue
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
index 80814daf46..43e9c7e52c 100644
--- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt
@@ -49,6 +49,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-core-keyvalue
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
index 196f5b664a..374379e37b 100644
--- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt
@@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC
ydb-core-keyvalue
ydb-core-kqp
core-kqp-federated_query
+ core-kqp-finalize_script_service
ydb-core-metering
ydb-core-mind
core-mind-address_classification
diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp
index 786808273e..a7a923cfbb 100644
--- a/ydb/core/testlib/test_client.cpp
+++ b/ydb/core/testlib/test_client.cpp
@@ -57,6 +57,7 @@
#include <ydb/core/kqp/common/kqp.h>
#include <ydb/core/kqp/rm_service/kqp_rm_service.h>
#include <ydb/core/kqp/proxy_service/kqp_proxy_service.h>
+#include <ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h>
#include <ydb/core/metering/metering.h>
#include <ydb/library/services/services.pb.h>
#include <ydb/core/tablet_flat/tablet_flat_executed.h>
@@ -889,6 +890,10 @@ namespace Tests {
federatedQuerySetupFactory);
TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx);
Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx);
+
+ IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService(Settings->AppConfig.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Settings->AppConfig.GetMetadataProviderConfig(), federatedQuerySetupFactory);
+ TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx);
+ Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx);
}
{
diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make
index af028b07f5..1df7391d65 100644
--- a/ydb/core/testlib/ya.make
+++ b/ydb/core/testlib/ya.make
@@ -52,6 +52,7 @@ PEERDIR(
ydb/core/keyvalue
ydb/core/kqp
ydb/core/kqp/federated_query
+ ydb/core/kqp/finalize_script_service
ydb/core/metering
ydb/core/mind
ydb/core/mind/address_classification
diff --git a/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp b/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp
index 69f8a577b7..37ae5af9f3 100644
--- a/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp
+++ b/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp
@@ -45,7 +45,7 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::SetNoAuth() {
return *this;
}
-TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const TMap<TString, TString> secrets) {
+TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const std::map<TString, TString>& secrets) {
if (Data.HasField("basic_password_ref")) {
auto reference = Data.GetField("basic_password_ref");
Data.ClearField("basic_password_ref");
diff --git a/ydb/library/yql/providers/common/structured_token/yql_token_builder.h b/ydb/library/yql/providers/common/structured_token/yql_token_builder.h
index 0661cec10e..c9f24b67c0 100644
--- a/ydb/library/yql/providers/common/structured_token/yql_token_builder.h
+++ b/ydb/library/yql/providers/common/structured_token/yql_token_builder.h
@@ -18,7 +18,7 @@ public:
TStructuredTokenBuilder& SetBasicAuthWithSecret(const TString& login, const TString& passwordReference);
TStructuredTokenBuilder& SetIAMToken(const TString& token);
TStructuredTokenBuilder& SetNoAuth();
- TStructuredTokenBuilder& ReplaceReferences(const TMap<TString, TString> secrets);
+ TStructuredTokenBuilder& ReplaceReferences(const std::map<TString, TString>& secrets);
TStructuredTokenBuilder& RemoveSecrets();
TString ToJson() const;
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
index 7e170a8ec3..d56fe3c5af 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp
@@ -204,7 +204,7 @@ public:
IHTTPGateway::TPtr gateway,
const TString& queryId,
const TString& jobId,
- ui32 restartNumber,
+ std::optional<ui32> restartNumber,
bool commit,
const THashMap<TString, TString>& secureParams,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
@@ -213,7 +213,7 @@ public:
, Gateway(gateway)
, QueryId(queryId)
, KeyPrefix(jobId + "_")
- , KeySubPrefix(ToString(restartNumber) + "_")
+ , KeySubPrefix(restartNumber ? ToString(*restartNumber) + "_" : "")
, Commit(commit)
, SecureParams(secureParams)
, CredentialsFactory(credentialsFactory)
@@ -326,6 +326,11 @@ public:
}
void Finish(bool fatal = false, const TString& message = "") {
+ if (ApplicationFinished) {
+ return;
+ }
+ ApplicationFinished = true;
+
if (message) {
Issues.AddIssue(TIssue(message));
}
@@ -442,9 +447,15 @@ public:
void Process(TEvPrivate::TEvAbortMultipartUpload::TPtr& ev) {
auto& result = ev->Get()->Result;
- if (!result.Issues && result.Content.HttpResponseCode >= 200 && result.Content.HttpResponseCode < 300) {
- LOG_D("AbortMultipartUpload SUCCESS " << ev->Get()->State->BuildUrl());
- return;
+ if (!result.Issues) {
+ if (result.Content.HttpResponseCode == 404) {
+ LOG_W("AbortMultipartUpload NOT FOUND " << ev->Get()->State->BuildUrl() << " (may be aborted already)");
+ return;
+ }
+ if (result.Content.HttpResponseCode >= 200 && result.Content.HttpResponseCode < 300) {
+ LOG_D("AbortMultipartUpload SUCCESS " << ev->Get()->State->BuildUrl());
+ return;
+ }
}
LOG_D("AbortMultipartUpload ERROR " << ev->Get()->State->BuildUrl());
if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) {
@@ -572,6 +583,7 @@ private:
THashSet<TString> CommitUploads;
NYql::TIssues Issues;
std::queue<TObjectStorageRequest> RequestQueue;
+ bool ApplicationFinished = false;
};
} // namespace
@@ -581,7 +593,7 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
IHTTPGateway::TPtr gateway,
const TString& queryId,
const TString& jobId,
- ui32 restartNumber,
+ std::optional<ui32> restartNumber,
bool commit,
const THashMap<TString, TString>& secureParams,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h
index f4cfd53b7c..45e44e602f 100644
--- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h
+++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h
@@ -13,7 +13,7 @@ THolder<NActors::IActor> MakeS3ApplicatorActor(
IHTTPGateway::TPtr gateway,
const TString& queryId,
const TString& jobId,
- ui32 restartNumber,
+ std::optional<ui32> restartNumber,
bool commit,
const THashMap<TString, TString>& secureParams,
ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory,
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
index 03b76d30b8..4936f48f45 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp
@@ -407,7 +407,7 @@ public:
sinkDesc.SetCompression(TString(compression));
sinkDesc.SetMultipart(GetMultipart(settings.Settings().Ref()));
- sinkDesc.SetAtomicUploadCommit(State_->Configuration->AtomicUploadCommit.Get().GetOrElse(false));
+ sinkDesc.SetAtomicUploadCommit(State_->Configuration->AllowAtomicUploadCommit && State_->Configuration->AtomicUploadCommit.Get().GetOrElse(false));
protoSettings.PackFrom(sinkDesc);
sinkType = "S3Sink";
diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
index 4f4a8734d6..2d19d65582 100644
--- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
+++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h
@@ -61,6 +61,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher
ui64 GeneratorPathsLimit = 0;
bool WriteThroughDqIntegration = false;
ui64 MaxListingResultSizePerPhysicalPartition;
+ bool AllowAtomicUploadCommit = true;
};
} // NYql
diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp
index 3594b60772..325c38c760 100644
--- a/ydb/tests/tools/kqprun/kqprun.cpp
+++ b/ydb/tests/tools/kqprun/kqprun.cpp
@@ -17,6 +17,8 @@ struct TExecutionOptions {
TString SchemeQuery;
NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE;
+
+ TString ScriptTraceId = "kqprun";
};
@@ -35,7 +37,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner
if (executionOptions.ScriptQuery) {
Cout << colors.Yellow() << "Executing script..." << colors.Default() << Endl;
- if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction)) {
+ if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) {
ythrow yexception() << "Script execution failed";
}
}
diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
index 1d118a93d0..7677fd5eee 100644
--- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp
+++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp
@@ -37,8 +37,8 @@ public:
return true;
}
- bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action) {
- TRequestResult status = YdbSetup_.ScriptQueryRequest(script, action, ExecutionOperation_);
+ bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) {
+ TRequestResult status = YdbSetup_.ScriptQueryRequest(script, action, traceId, ExecutionOperation_);
if (!status.IsSuccess()) {
Cerr << CerrColors_.Red() << "Failed to start script execution, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl;
@@ -124,8 +124,8 @@ bool TKqpRunner::ExecuteSchemeQuery(const TString& query) const {
return Impl_->ExecuteSchemeQuery(query);
}
-bool TKqpRunner::ExecuteScript(const TString& query, NKikimrKqp::EQueryAction action) const {
- return Impl_->ExecuteScript(query, action);
+bool TKqpRunner::ExecuteScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const {
+ return Impl_->ExecuteScript(query, action, traceId);
}
bool TKqpRunner::WriteScriptResults() const {
diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h
index 233cdd0475..09d3c1c657 100644
--- a/ydb/tests/tools/kqprun/src/kqp_runner.h
+++ b/ydb/tests/tools/kqprun/src/kqp_runner.h
@@ -11,7 +11,7 @@ public:
bool ExecuteSchemeQuery(const TString& query) const;
- bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action) const;
+ bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const;
bool WriteScriptResults() const;
diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
index 31ff8f78b2..ff105e1655 100644
--- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp
+++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp
@@ -147,8 +147,10 @@ public:
return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvQueryRequest, NKikimr::NKqp::TEvKqp::TEvQueryResponse>(std::move(event));
}
- NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action) const {
+ NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const {
auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>();
+ event->Record.SetTraceId(traceId);
+
FillScriptRequest(script, action, *event->Record.MutableRequest());
return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvScriptRequest, NKikimr::NKqp::TEvKqp::TEvScriptResponse>(std::move(event));
@@ -172,11 +174,6 @@ public:
return GetRuntime()->GrabEdgeEvent<NKikimr::NKqp::TEvKqp::TEvFetchScriptResultsResponse>(edgeActor);
}
- ~TImpl() {
- Server_.Reset();
- Client_.Reset();
- }
-
private:
NActors::TTestActorRuntime* GetRuntime() const {
return Server_->GetRuntime();
@@ -262,8 +259,8 @@ TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, TSchemeMeta&
return TRequestResult(schemeQueryOperationResponse.GetYdbStatus(), issues);
}
-TRequestResult TYdbSetup::ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, TString& operation) const {
- auto scriptExecutionOperation = Impl_->ScriptQueryRequest(script, action);
+TRequestResult TYdbSetup::ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const {
+ auto scriptExecutionOperation = Impl_->ScriptQueryRequest(script, action, traceId);
operation = scriptExecutionOperation->Get()->OperationId;
diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h
index c918ef25e3..3e22389b85 100644
--- a/ydb/tests/tools/kqprun/src/ydb_setup.h
+++ b/ydb/tests/tools/kqprun/src/ydb_setup.h
@@ -43,7 +43,7 @@ public:
TRequestResult SchemeQueryRequest(const TString& query, TSchemeMeta& meta) const;
- TRequestResult ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, TString& operation) const;
+ TRequestResult ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const;
TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const;