diff options
author | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-11-13 10:42:35 +0300 |
---|---|---|
committer | grigoriypisar <grigoriypisar@yandex-team.com> | 2023-11-13 11:17:32 +0300 |
commit | 005735009aadf431092af0ecbb430dafee3f58ea (patch) | |
tree | 0544f6e0d758770b3ccbe01a24777a36c4047c7e | |
parent | 977f49e54c144e59ce1299caf8e8249d09909e5b (diff) | |
download | ydb-005735009aadf431092af0ecbb430dafee3f58ea.tar.gz |
-commit-and-rollback-for-pending-loads
Created first commit version
65 files changed, 1851 insertions, 642 deletions
diff --git a/.mapping.json b/.mapping.json index b1d5ec9df7..2297dce1d8 100644 --- a/.mapping.json +++ b/.mapping.json @@ -4624,6 +4624,11 @@ "ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt":"", "ydb/core/kqp/federated_query/CMakeLists.txt":"", "ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt":"", + "ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt":"", + "ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt":"", + "ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt":"", + "ydb/core/kqp/finalize_script_service/CMakeLists.txt":"", + "ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt":"", "ydb/core/kqp/gateway/CMakeLists.darwin-x86_64.txt":"", "ydb/core/kqp/gateway/CMakeLists.linux-aarch64.txt":"", "ydb/core/kqp/gateway/CMakeLists.linux-x86_64.txt":"", diff --git a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt index 105a543cf8..8439de3a0c 100644 --- a/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.darwin-x86_64.txt @@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC ydb-core-kafka_proxy ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt index 43ad518bdc..fb358d77d5 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-aarch64.txt @@ -70,6 +70,7 @@ target_link_libraries(run PUBLIC ydb-core-kafka_proxy ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt index 43ad518bdc..fb358d77d5 100644 --- a/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.linux-x86_64.txt @@ -70,6 +70,7 @@ target_link_libraries(run PUBLIC ydb-core-kafka_proxy ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt index 105a543cf8..8439de3a0c 100644 --- a/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt +++ b/ydb/core/driver_lib/run/CMakeLists.windows-x86_64.txt @@ -69,6 +69,7 @@ target_link_libraries(run PUBLIC ydb-core-kafka_proxy ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service core-kqp-rm_service ydb-core-load_test ydb-core-local_pgwire diff --git a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp index bd8fe08b55..13d147c149 100644 --- a/ydb/core/driver_lib/run/kikimr_services_initializers.cpp +++ b/ydb/core/driver_lib/run/kikimr_services_initializers.cpp @@ -73,6 +73,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> +#include <ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h> #include <ydb/core/load_test/service_actor.h> @@ -2109,13 +2110,21 @@ void TKqpServiceInitializer::InitializeServices(NActors::TActorSystemSetup* setu GlobalObjects.AddGlobalObject(std::make_shared<NYql::NLog::YqlLoggerScope>( new NYql::NLog::TTlsLogBackend(new TNullLogBackend()))); + auto federatedQuerySetupFactory = NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config); + auto proxy = NKqp::CreateKqpProxyService(Config.GetLogConfig(), Config.GetTableServiceConfig(), Config.GetQueryServiceConfig(), Config.GetMetadataProviderConfig(), std::move(settings), Factories->QueryReplayBackendFactory, std::move(kqpProxySharedResources), - NKqp::MakeKqpFederatedQuerySetupFactory(setup, appData, Config) + federatedQuerySetupFactory ); setup->LocalServices.push_back(std::make_pair( NKqp::MakeKqpProxyID(NodeId), TActorSetupCmd(proxy, TMailboxType::HTSwap, appData->UserPoolId))); + + // Create finalize script service + auto finalize = NKqp::CreateKqpFinalizeScriptService(Config.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Config.GetMetadataProviderConfig(), federatedQuerySetupFactory); + setup->LocalServices.push_back(std::make_pair( + NKqp::MakeKqpFinalizeScriptServiceId(NodeId), + TActorSetupCmd(finalize, TMailboxType::HTSwap, appData->UserPoolId))); } } diff --git a/ydb/core/driver_lib/run/ya.make b/ydb/core/driver_lib/run/ya.make index 9ae7e14eac..cf75b16e98 100644 --- a/ydb/core/driver_lib/run/ya.make +++ b/ydb/core/driver_lib/run/ya.make @@ -78,6 +78,7 @@ PEERDIR( ydb/core/kafka_proxy ydb/core/kqp ydb/core/kqp/federated_query + ydb/core/kqp/finalize_script_service ydb/core/kqp/rm_service ydb/core/load_test ydb/core/local_pgwire diff --git a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt index 1bad2f5045..54cac3e15d 100644 --- a/ydb/core/kqp/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.darwin-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) add_subdirectory(federated_query) +add_subdirectory(finalize_script_service) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/CMakeLists.linux-aarch64.txt index 5337a48cda..f064e17c70 100644 --- a/ydb/core/kqp/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/CMakeLists.linux-aarch64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) add_subdirectory(federated_query) +add_subdirectory(finalize_script_service) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/CMakeLists.linux-x86_64.txt index 5337a48cda..f064e17c70 100644 --- a/ydb/core/kqp/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.linux-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) add_subdirectory(federated_query) +add_subdirectory(finalize_script_service) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/CMakeLists.windows-x86_64.txt index 1bad2f5045..54cac3e15d 100644 --- a/ydb/core/kqp/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/CMakeLists.windows-x86_64.txt @@ -13,6 +13,7 @@ add_subdirectory(counters) add_subdirectory(executer_actor) add_subdirectory(expr_nodes) add_subdirectory(federated_query) +add_subdirectory(finalize_script_service) add_subdirectory(gateway) add_subdirectory(host) add_subdirectory(node_service) diff --git a/ydb/core/kqp/common/events/query.h b/ydb/core/kqp/common/events/query.h index 59fbac0e00..d40c9ecea3 100644 --- a/ydb/core/kqp/common/events/query.h +++ b/ydb/core/kqp/common/events/query.h @@ -1,6 +1,7 @@ #pragma once #include <ydb/core/protos/kqp.pb.h> #include <ydb/core/kqp/common/simple/kqp_event_ids.h> +#include <ydb/core/kqp/common/kqp_user_request_context.h> #include <ydb/core/grpc_services/base/base.h> #include <ydb/core/grpc_services/cancelation/cancelation_event.h> #include <ydb/core/grpc_services/cancelation/cancelation.h> @@ -265,6 +266,14 @@ public: } } + void SetUserRequestContext(TIntrusivePtr<TUserRequestContext> userRequestContext) { + UserRequestContext = userRequestContext; + } + + TIntrusivePtr<TUserRequestContext> GetUserRequestContext() const { + return UserRequestContext; + } + mutable NKikimrKqp::TEvQueryRequest Record; private: @@ -291,6 +300,7 @@ private: TDuration OperationTimeout; TDuration CancelAfter; const ::Ydb::Query::Syntax Syntax = Ydb::Query::Syntax::SYNTAX_UNSPECIFIED; + TIntrusivePtr<TUserRequestContext> UserRequestContext; }; struct TEvDataQueryStreamPart: public TEventPB<TEvDataQueryStreamPart, diff --git a/ydb/core/kqp/common/events/script_executions.h b/ydb/core/kqp/common/events/script_executions.h index 94ff1de909..893fae46da 100644 --- a/ydb/core/kqp/common/events/script_executions.h +++ b/ydb/core/kqp/common/events/script_executions.h @@ -14,6 +14,11 @@ namespace NKikimr::NKqp { +enum EFinalizationStatus : i32 { + FS_COMMIT, + FS_ROLLBACK, +}; + struct TEvForgetScriptExecutionOperation : public NActors::TEventLocal<TEvForgetScriptExecutionOperation, TKqpScriptExecutionEvents::EvForgetScriptExecutionOperation> { explicit TEvForgetScriptExecutionOperation(const TString& database, const NOperationId::TOperationId& id) : Database(database) @@ -47,28 +52,44 @@ struct TEvGetScriptExecutionOperation : public NActors::TEventLocal<TEvGetScript NOperationId::TOperationId OperationId; }; -struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> { - TEvGetScriptExecutionOperationResponse(bool ready, bool leaseExpired, TActorId runScriptActorId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata) +struct TEvGetScriptExecutionOperationQueryResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationQueryResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationQueryResponse> { + TEvGetScriptExecutionOperationQueryResponse(bool ready, bool leaseExpired, std::optional<EFinalizationStatus> finalizationStatus, TActorId runScriptActorId, + const TString& executionId, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, Ydb::Query::ExecuteScriptMetadata metadata) : Ready(ready) , LeaseExpired(leaseExpired) + , FinalizationStatus(finalizationStatus) , RunScriptActorId(runScriptActorId) + , ExecutionId(executionId) , Status(status) , Issues(std::move(issues)) , Metadata(std::move(metadata)) - { - } + {} + + bool Ready; + bool LeaseExpired; + std::optional<EFinalizationStatus> FinalizationStatus; + TActorId RunScriptActorId; + TString ExecutionId; + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + Ydb::Query::ExecuteScriptMetadata Metadata; +}; + +struct TEvGetScriptExecutionOperationResponse : public NActors::TEventLocal<TEvGetScriptExecutionOperationResponse, TKqpScriptExecutionEvents::EvGetScriptExecutionOperationResponse> { + TEvGetScriptExecutionOperationResponse(bool ready, Ydb::StatusIds::StatusCode status, NYql::TIssues issues, TMaybe<google::protobuf::Any> metadata) + : Ready(ready) + , Status(status) + , Issues(std::move(issues)) + , Metadata(std::move(metadata)) + {} TEvGetScriptExecutionOperationResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) : Ready(false) - , LeaseExpired(false) , Status(status) , Issues(std::move(issues)) - { - } + {} bool Ready; - bool LeaseExpired; - TActorId RunScriptActorId; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; TMaybe<google::protobuf::Any> Metadata; @@ -153,12 +174,19 @@ struct TEvCancelScriptExecutionOperationResponse : public NActors::TEventLocal<T }; struct TEvScriptExecutionFinished : public NActors::TEventLocal<TEvScriptExecutionFinished, TKqpScriptExecutionEvents::EvScriptExecutionFinished> { - TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) - : Status(status) + explicit TEvScriptExecutionFinished(Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : OperationAlreadyFinalized(false) + , Status(status) , Issues(std::move(issues)) - { - } + {} + + TEvScriptExecutionFinished(bool operationAlreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues issues = {}) + : OperationAlreadyFinalized(operationAlreadyFinalized) + , Status(status) + , Issues(std::move(issues)) + {} + bool OperationAlreadyFinalized; Ydb::StatusIds::StatusCode Status; NYql::TIssues Issues; }; @@ -196,4 +224,105 @@ struct TEvFetchScriptResultsQueryResponse : public NActors::TEventLocal<TEvFetch NKikimrKqp::TEvFetchScriptResultsResponse Results; }; +struct TEvSaveScriptExternalEffectRequest : public NActors::TEventLocal<TEvSaveScriptExternalEffectRequest, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectRequest> { + TEvSaveScriptExternalEffectRequest(const TString& executionId, const TString& database, const TString& customerSuppliedId, const TString& userToken) + : ExecutionId(executionId) + , Database(database) + , CustomerSuppliedId(customerSuppliedId) + , UserToken(userToken) + {} + + TString ExecutionId; + TString Database; + + TString CustomerSuppliedId; + TString UserToken; + std::vector<NKqpProto::TKqpExternalSink> Sinks; + std::vector<TString> SecretNames; +}; + +struct TEvSaveScriptExternalEffectResponse : public NActors::TEventLocal<TEvSaveScriptExternalEffectResponse, TKqpScriptExecutionEvents::EvSaveScriptExternalEffectResponse> { + TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + {} + + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; +}; + +struct TEvScriptFinalizeRequest : public NActors::TEventLocal<TEvScriptFinalizeRequest, TKqpScriptExecutionEvents::EvScriptFinalizeRequest> { + TEvScriptFinalizeRequest(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database, + Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, NYql::TIssues issues = {}, std::optional<NKqpProto::TKqpStatsQuery> queryStats = std::nullopt, + std::optional<TString> queryPlan = std::nullopt, std::optional<TString> queryAst = std::nullopt, std::optional<ui64> leaseGeneration = std::nullopt) + : FinalizationStatus(finalizationStatus) + , ExecutionId(executionId) + , Database(database) + , OperationStatus(operationStatus) + , ExecStatus(execStatus) + , Issues(std::move(issues)) + , QueryStats(std::move(queryStats)) + , QueryPlan(std::move(queryPlan)) + , QueryAst(std::move(queryAst)) + , LeaseGeneration(leaseGeneration) + {} + + EFinalizationStatus FinalizationStatus; + TString ExecutionId; + TString Database; + Ydb::StatusIds::StatusCode OperationStatus; + Ydb::Query::ExecStatus ExecStatus; + NYql::TIssues Issues; + std::optional<NKqpProto::TKqpStatsQuery> QueryStats; + std::optional<TString> QueryPlan; + std::optional<TString> QueryAst; + std::optional<ui64> LeaseGeneration; + TDuration OperationTtl; + TDuration ResultsTtl; +}; + +struct TEvScriptFinalizeResponse : public NActors::TEventLocal<TEvScriptFinalizeResponse, TKqpScriptExecutionEvents::EvScriptFinalizeResponse> { + explicit TEvScriptFinalizeResponse(const TString& executionId) + : ExecutionId(executionId) + {} + + TString ExecutionId; +}; + +struct TEvSaveScriptFinalStatusResponse : public NActors::TEventLocal<TEvSaveScriptFinalStatusResponse, TKqpScriptExecutionEvents::EvSaveScriptFinalStatusResponse> { + TEvSaveScriptFinalStatusResponse(const TString& customerSuppliedId, const TString& userToken) + : CustomerSuppliedId(customerSuppliedId) + , UserToken(userToken) + {} + + TString CustomerSuppliedId; + TString UserToken; + std::vector<NKqpProto::TKqpExternalSink> Sinks; + std::vector<TString> SecretNames; +}; + +struct TEvDescribeSecretsResponse : public NActors::TEventLocal<TEvDescribeSecretsResponse, TKqpScriptExecutionEvents::EvDescribeSecretsResponse> { + struct TDescription { + TDescription(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) + : Status(status) + , Issues(std::move(issues)) + {} + + TDescription(const std::vector<TString>& secretValues) + : SecretValues(secretValues) + , Status(Ydb::StatusIds::SUCCESS) + {} + + std::vector<TString> SecretValues; + Ydb::StatusIds::StatusCode Status; + NYql::TIssues Issues; + }; + + TEvDescribeSecretsResponse(const TDescription& description) + : Description(description) + {} + + TDescription Description; +}; + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/common/kqp_user_request_context.cpp b/ydb/core/kqp/common/kqp_user_request_context.cpp index 48dcacde95..685dc0ffe1 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.cpp +++ b/ydb/core/kqp/common/kqp_user_request_context.cpp @@ -3,12 +3,14 @@ namespace NKikimr::NKqp { void TUserRequestContext::Out(IOutputStream& o) const { - o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << "}"; + o << "{" << " TraceId: " << TraceId << ", Database: " << Database << ", SessionId: " << SessionId << ", CurrentExecutionId: " << CurrentExecutionId << ", CustomerSuppliedId: " << CustomerSuppliedId << "}"; } void SerializeCtxToMap(const TUserRequestContext& ctx, google::protobuf::Map<TString, TString>& resultMap) { resultMap["TraceId"] = ctx.TraceId; resultMap["Database"] = ctx.Database; resultMap["SessionId"] = ctx.SessionId; + resultMap["CurrentExecutionId"] = ctx.CurrentExecutionId; + resultMap["CustomerSuppliedId"] = ctx.CustomerSuppliedId; } } diff --git a/ydb/core/kqp/common/kqp_user_request_context.h b/ydb/core/kqp/common/kqp_user_request_context.h index 62bf4402fa..d091f7aa19 100644 --- a/ydb/core/kqp/common/kqp_user_request_context.h +++ b/ydb/core/kqp/common/kqp_user_request_context.h @@ -10,6 +10,8 @@ namespace NKikimr::NKqp { TString TraceId; TString Database; TString SessionId; + TString CurrentExecutionId; + TString CustomerSuppliedId; TUserRequestContext() = default; @@ -18,6 +20,12 @@ namespace NKikimr::NKqp { , Database(database) , SessionId(sessionId) {} + TUserRequestContext(const TString& traceId, const TString& database, const TString& sessionId, const TString& currentExecutionId, const TString& customerSuppliedId) + : TraceId(traceId) + , Database(database) + , SessionId(sessionId) + , CurrentExecutionId(currentExecutionId) + , CustomerSuppliedId(customerSuppliedId) {} void Out(IOutputStream& o) const; }; diff --git a/ydb/core/kqp/common/simple/kqp_event_ids.h b/ydb/core/kqp/common/simple/kqp_event_ids.h index 4aecb291a3..2bbbfe6ee1 100644 --- a/ydb/core/kqp/common/simple/kqp_event_ids.h +++ b/ydb/core/kqp/common/simple/kqp_event_ids.h @@ -142,6 +142,13 @@ struct TKqpScriptExecutionEvents { EvCheckAliveRequest, EvCheckAliveResponse, EvFetchScriptResultsQueryResponse, + EvSaveScriptExternalEffectRequest, + EvSaveScriptExternalEffectResponse, + EvScriptFinalizeRequest, + EvScriptFinalizeResponse, + EvSaveScriptFinalStatusResponse, + EvGetScriptExecutionOperationQueryResponse, + EvDescribeSecretsResponse, }; }; diff --git a/ydb/core/kqp/common/simple/services.h b/ydb/core/kqp/common/simple/services.h index ebee571eb0..21746985c9 100644 --- a/ydb/core/kqp/common/simple/services.h +++ b/ydb/core/kqp/common/simple/services.h @@ -36,4 +36,9 @@ inline NActors::TActorId MakeKqpCompileComputationPatternServiceID(ui32 nodeId) return NActors::TActorId(nodeId, TStringBuf(name, 12)); } +inline NActors::TActorId MakeKqpFinalizeScriptServiceId(ui32 nodeId) { + const char name[12] = "kqp_sfinal"; + return NActors::TActorId(nodeId, TStringBuf(name, 12)); +} + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp index e09aab4a57..ddc2b6e622 100644 --- a/ydb/core/kqp/executer_actor/kqp_data_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_data_executer.cpp @@ -313,9 +313,9 @@ public: hFunc(TEvKqpExecuter::TEvTableResolveStatus, HandleResolve); hFunc(TEvKqpExecuter::TEvShardsResolveStatus, HandleResolve); hFunc(TEvPrivate::TEvResourcesSnapshot, HandleResolve); + hFunc(TEvSaveScriptExternalEffectResponse, HandleResolve); + hFunc(TEvDescribeSecretsResponse, HandleResolve); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, HandleRefreshSubscriberData); - hFunc(NActors::TEvents::TEvWakeup, HandleSecretsWaitingTimeout); default: UnexpectedEvent("WaitResolveState", ev->GetTypeRewrite()); } @@ -363,7 +363,6 @@ private: hFunc(TEvInterconnect::TEvNodeDisconnected, HandleDisconnected); hFunc(TEvKqpNode::TEvStartKqpTasksResponse, HandleStartKqpTasksResponse); IgnoreFunc(TEvInterconnect::TEvNodeConnected); - IgnoreFunc(NActors::TEvents::TEvWakeup); default: { CancelProposal(0); UnexpectedEvent("PrepareState", ev->GetTypeRewrite()); @@ -953,7 +952,6 @@ private: hFunc(TEvDqCompute::TEvChannelData, HandleExecute); hFunc(TEvKqp::TEvAbortExecution, HandleExecute); IgnoreFunc(TEvInterconnect::TEvNodeConnected); - IgnoreFunc(NActors::TEvents::TEvWakeup); default: UnexpectedEvent("ExecuteState", ev->GetTypeRewrite()); } @@ -1360,7 +1358,7 @@ private: return true; } - void BuildDatashardTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { + void BuildDatashardTasks(TStageInfo& stageInfo) { THashMap<ui64, ui64> shardTasks; // shardId -> taskId auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -1375,7 +1373,7 @@ private: task.Meta.ShardId = shardId; shardTasks.emplace(shardId, task.Id); - BuildSinks(stage, task, secureParams); + BuildSinks(stage, task); return task; }; @@ -1588,47 +1586,82 @@ private: YQL_ENSURE(result.second); } + bool WaitRequired() const { + return SecretSnapshotRequired || ResourceSnapshotRequired || SaveScriptExternalEffectRequired; + } + + void HandleResolve(TEvDescribeSecretsResponse::TPtr& ev) { + YQL_ENSURE(ev->Get()->Description.Status == Ydb::StatusIds::SUCCESS, "failed to get secrets snapshot with issues: " << ev->Get()->Description.Issues.ToOneLineString()); + + for (size_t i = 0; i < SecretNames.size(); ++i) { + SecureParams.emplace(SecretNames[i], ev->Get()->Description.SecretValues[i]); + } + + SecretSnapshotRequired = false; + if (!WaitRequired()) { + Execute(); + } + } + void HandleResolve(TEvPrivate::TEvResourcesSnapshot::TPtr& ev) { if (ev->Get()->Snapshot.empty()) { LOG_E("Can not find default state storage group for database " << Database); } ResourceSnapshot = std::move(ev->Get()->Snapshot); ResourceSnapshotRequired = false; - if (!SecretSnapshotRequired) { + if (!WaitRequired()) { + Execute(); + } + } + + void HandleResolve(TEvSaveScriptExternalEffectResponse::TPtr& ev) { + YQL_ENSURE(ev->Get()->Status == Ydb::StatusIds::SUCCESS, "failed to save script external effect with issues: " << ev->Get()->Issues.ToOneLineString()); + + SaveScriptExternalEffectRequired = false; + if (!WaitRequired()) { Execute(); } } void DoExecute() { - TVector<TString> secretNames; + const auto& requestContext = GetUserRequestContext(); + auto scriptExternalEffect = std::make_unique<TEvSaveScriptExternalEffectRequest>( + requestContext->CurrentExecutionId, requestContext->Database, + requestContext->CustomerSuppliedId, UserToken ? UserToken->GetUserSID() : "" + ); for (const auto& transaction : Request.Transactions) { for (const auto& secretName : transaction.Body->GetSecretNames()) { SecretSnapshotRequired = true; - secretNames.push_back(secretName); + SecretNames.push_back(secretName); } for (const auto& stage : transaction.Body->GetStages()) { if (stage.SourcesSize() > 0 && stage.GetSources(0).GetTypeCase() == NKqpProto::TKqpSource::kExternalSource) { ResourceSnapshotRequired = true; HasExternalSources = true; } + if (requestContext->CurrentExecutionId) { + for (const auto& sink : stage.GetSinks()) { + if (sink.GetTypeCase() == NKqpProto::TKqpSink::kExternalSink) { + SaveScriptExternalEffectRequired = true; + scriptExternalEffect->Sinks.push_back(sink.GetExternalSink()); + } + } + } } } + scriptExternalEffect->SecretNames = SecretNames; - if (!SecretSnapshotRequired && !ResourceSnapshotRequired) { + if (!WaitRequired()) { return Execute(); } if (SecretSnapshotRequired) { - FetchSecrets(std::move(secretNames)); + GetSecretsSnapshot(); } if (ResourceSnapshotRequired) { GetResourcesSnapshot(); } - } - - void OnSecretsFetched() override { - SecretSnapshotRequired = false; - if (!ResourceSnapshotRequired) { - Execute(); + if (SaveScriptExternalEffectRequired) { + SaveScriptExternalEffect(std::move(scriptExternalEffect)); } } @@ -1651,8 +1684,6 @@ private: size_t readActors = 0; for (ui32 txIdx = 0; txIdx < Request.Transactions.size(); ++txIdx) { auto& tx = Request.Transactions[txIdx]; - TMap<TString, TString> secureParams = ResolveSecretNames(tx.Body->GetSecretNames()); - for (ui32 stageIdx = 0; stageIdx < tx.Body->StagesSize(); ++stageIdx) { auto& stage = tx.Body->GetStages(stageIdx); auto& stageInfo = TasksGraph.GetStageInfo(TStageId(txIdx, stageIdx)); @@ -1689,14 +1720,14 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - if (auto actors = BuildScanTasksFromSource(stageInfo, secureParams)) { + if (auto actors = BuildScanTasksFromSource(stageInfo)) { readActors += *actors; } else { UnknownAffectedShardCount = true; } break; case NKqpProto::TKqpSource::kExternalSource: - BuildReadTasksFromSource(stageInfo, secureParams, ResourceSnapshot); + BuildReadTasksFromSource(stageInfo, ResourceSnapshot); break; default: YQL_ENSURE(false, "unknown source type"); @@ -1704,11 +1735,11 @@ private: } else if (StreamResult && stageInfo.Meta.IsOlap()) { BuildScanTasksFromShards(stageInfo); } else if (stageInfo.Meta.ShardOperations.empty()) { - BuildComputeTasks(stageInfo, secureParams); + BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { - BuildSysViewScanTasks(stageInfo, secureParams); + BuildSysViewScanTasks(stageInfo); } else { - BuildDatashardTasks(stageInfo, secureParams); + BuildDatashardTasks(stageInfo); } if (stage.GetIsSinglePartition()) { @@ -1910,7 +1941,6 @@ private: switch (ev->GetTypeRewrite()) { hFunc(NLongTxService::TEvLongTxService::TEvAcquireReadSnapshotResult, Handle); hFunc(TEvKqp::TEvAbortExecution, HandleAbortExecution); - IgnoreFunc(NActors::TEvents::TEvWakeup); default: UnexpectedEvent("WaitSnapshotState", ev->GetTypeRewrite()); } @@ -2368,6 +2398,7 @@ private: bool HasExternalSources = false; bool SecretSnapshotRequired = false; bool ResourceSnapshotRequired = false; + bool SaveScriptExternalEffectRequired = false; TVector<NKikimrKqp::TKqpNodeResources> ResourceSnapshot; ui64 TxCoordinator = 0; diff --git a/ydb/core/kqp/executer_actor/kqp_executer_impl.h b/ydb/core/kqp/executer_actor/kqp_executer_impl.h index b613d0b796..7654c0aa98 100644 --- a/ydb/core/kqp/executer_actor/kqp_executer_impl.h +++ b/ydb/core/kqp/executer_actor/kqp_executer_impl.h @@ -21,6 +21,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/common/kqp_yql.h> #include <ydb/core/kqp/common/kqp_user_request_context.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h> #include <ydb/core/grpc_services/local_rate_limiter.h> #include <ydb/services/metadata/secret/fetcher.h> @@ -386,71 +387,6 @@ protected: return res; } - NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() { - return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>(); - } - - void FetchSecrets(TVector<TString>&& secretNames) { - YQL_ENSURE(NMetadata::NProvider::TServiceOperator::IsEnabled(), "metadata service is not active"); - SecretNames = std::move(secretNames); - - SubscribedOnSecrets = true; - this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); - this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); - } - - void HandleRefreshSubscriberData(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { - if (!SubscribedOnSecrets) { - return; - } - - Secrets = ev->Get()->GetSnapshotPtrAs<NMetadata::NSecret::TSnapshot>(); - - TString secretValue; - for (const TString& secretName : SecretNames) { - auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName); - if (!Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue)) { - return; - } - } - - UnsubscribeFromSecrets(); - OnSecretsFetched(); - } - - void HandleSecretsWaitingTimeout(NActors::TEvents::TEvWakeup::TPtr&) { - if (!SubscribedOnSecrets) { - return; - } - - YQL_ENSURE(Secrets != nullptr, "secrets snapshot fetching timeout"); - UnsubscribeFromSecrets(); - OnSecretsFetched(); - } - - void UnsubscribeFromSecrets() { - if (SubscribedOnSecrets) { - SubscribedOnSecrets = false; - this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); - } - } - - virtual void OnSecretsFetched() {} - - TMap<TString, TString> ResolveSecretNames(const google::protobuf::RepeatedPtrField<TProtoStringType>& secretNames) { - TMap<TString, TString> secureParams; - for (const auto& secretName : secretNames) { - auto secretId = NMetadata::NSecret::TSecretId(UserToken->GetUserSID(), secretName); - - TString secretValue; - YQL_ENSURE(Secrets->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue), "secret with name '" << secretName << "' not found"); - - secureParams[secretName] = secretValue; - } - - return secureParams; - } - protected: bool CheckExecutionComplete() { if (Planner && Planner->GetPendingComputeActors().empty() && Planner->GetPendingComputeTasks().empty()) { @@ -735,7 +671,7 @@ protected: protected: - void BuildSysViewScanTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { + void BuildSysViewScanTasks(TStageInfo& stageInfo) { Y_DEBUG_ABORT_UNLESS(stageInfo.Meta.IsSysView()); auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); @@ -778,13 +714,13 @@ protected: task.Meta.ReadInfo.Reverse = op.GetReadRange().GetReverse(); task.Meta.Type = TTaskMeta::TTaskType::Compute; - BuildSinks(stage, task, secureParams); + BuildSinks(stage, task); LOG_D("Stage " << stageInfo.Id << " create sysview scan task: " << task.Id); } } - void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task, const TMap<TString, TString>& secureParams) { + void BuildSinks(const NKqpProto::TKqpPhyStage& stage, TKqpTasksGraph::TTaskType& task) { if (stage.SinksSize() > 0) { YQL_ENSURE(stage.SinksSize() == 1, "multiple sinks are not supported"); const auto& sink = stage.GetSinks(0); @@ -794,10 +730,10 @@ protected: auto sinkName = extSink.GetSinkName(); if (sinkName) { - auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + auto structuredToken = NYql::CreateStructuredTokenParser(extSink.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson(); task.Meta.SecureParams.emplace(sinkName, structuredToken); if (GetUserRequestContext()->TraceId) { - task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->TraceId); + task.Meta.TaskParams.emplace("fq.job_id", GetUserRequestContext()->CustomerSuppliedId); // "fq.restart_count" } } @@ -809,7 +745,7 @@ protected: } } - void BuildReadTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) { + void BuildReadTasksFromSource(TStageInfo& stageInfo, const TVector<NKikimrKqp::TKqpNodeResources>& resourceSnapshot) { const auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); YQL_ENSURE(stage.GetSources(0).HasExternalSource()); @@ -830,7 +766,7 @@ protected: auto sourceName = externalSource.GetSourceName(); TString structuredToken; if (sourceName) { - structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(secureParams).ToJson(); + structuredToken = NYql::CreateStructuredTokenParser(externalSource.GetAuthInfo()).ToBuilder().ReplaceReferences(SecureParams).ToJson(); } TVector<ui64> tasksIds; @@ -869,11 +805,11 @@ protected: // finish building for (auto taskId : tasksIds) { - BuildSinks(stage, TasksGraph.GetTask(taskId), secureParams); + BuildSinks(stage, TasksGraph.GetTask(taskId)); } } - TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { + TMaybe<size_t> BuildScanTasksFromSource(TStageInfo& stageInfo) { THashMap<ui64, std::vector<ui64>> nodeTasks; THashMap<ui64, ui64> assignedShardsCount; @@ -983,7 +919,7 @@ protected: settings->SetLockNodeId(self.NodeId()); } - BuildSinks(stage, task, secureParams); + BuildSinks(stage, task); }; THashMap<ui64, TShardInfo> partitions = PrunePartitions(source, stageInfo, HolderFactory(), TypeEnv()); @@ -1019,7 +955,7 @@ protected: } } - void BuildComputeTasks(TStageInfo& stageInfo, const TMap<TString, TString>& secureParams) { + void BuildComputeTasks(TStageInfo& stageInfo) { auto& stage = stageInfo.Meta.GetStage(stageInfo.Id); ui32 partitionsCount = 1; @@ -1077,7 +1013,7 @@ protected: auto& task = TasksGraph.AddTask(stageInfo); task.Meta.Type = TTaskMeta::TTaskType::Compute; task.Meta.ExecuterId = SelfId(); - BuildSinks(stage, task, secureParams); + BuildSinks(stage, task); LOG_D("Stage " << stageInfo.Id << " create compute task: " << task.Id); } } @@ -1347,6 +1283,10 @@ protected: }); } + void GetSecretsSnapshot() { + RegisterDescribeSecretsActor(this->SelfId(), UserToken ? UserToken->GetUserSID() : "", SecretNames, this->ActorContext(), MaximalSecretsSnapshotWaitTime); + } + void GetResourcesSnapshot() { GetKqpResourceManager()->RequestClusterResourcesInfo( [as = TlsActivationContext->ActorSystem(), self = SelfId()](TVector<NKikimrKqp::TKqpNodeResources>&& resources) { @@ -1355,6 +1295,10 @@ protected: }); } + void SaveScriptExternalEffect(std::unique_ptr<TEvSaveScriptExternalEffectRequest> scriptEffects) { + this->Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), scriptEffects.release()); + } + protected: void TerminateComputeActors(Ydb::StatusIds::StatusCode code, const NYql::TIssues& issues) { for (const auto& task : this->TasksGraph.GetTasks()) { @@ -1542,8 +1486,6 @@ protected: protected: void PassAway() override { - UnsubscribeFromSecrets(); - for (auto channelPair: ResultChannelProxies) { LOG_D("terminate result channel " << channelPair.first << " proxy at " << channelPair.second->SelfId()); @@ -1655,10 +1597,9 @@ protected: std::unique_ptr<TKqpPlanner> Planner; const NKikimrConfig::TTableServiceConfig::TExecuterRetriesConfig ExecuterRetriesConfig; - std::shared_ptr<NMetadata::NSecret::TSnapshot> Secrets; - TVector<TString> SecretNames; + std::vector<TString> SecretNames; + std::map<TString, TString> SecureParams; TDuration MaximalSecretsSnapshotWaitTime; - bool SubscribedOnSecrets = false; const NKikimrConfig::TTableServiceConfig::TAggregationConfig AggregationSettings; TVector<NKikimrKqp::TKqpNodeResources> ResourcesSnapshot; diff --git a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp index 02208b7041..958c32fbb7 100644 --- a/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp +++ b/ydb/core/kqp/executer_actor/kqp_scan_executer.cpp @@ -197,15 +197,15 @@ private: if (stage.SourcesSize() > 0) { switch (stage.GetSources(0).GetTypeCase()) { case NKqpProto::TKqpSource::kReadRangesSource: - BuildScanTasksFromSource(stageInfo, {}); + BuildScanTasksFromSource(stageInfo); break; default: YQL_ENSURE(false, "unknown source type"); } } else if (stageInfo.Meta.ShardOperations.empty()) { - BuildComputeTasks(stageInfo, {}); + BuildComputeTasks(stageInfo); } else if (stageInfo.Meta.IsSysView()) { - BuildSysViewScanTasks(stageInfo, {}); + BuildSysViewScanTasks(stageInfo); } else if (stageInfo.Meta.IsOlap() || stageInfo.Meta.IsDatashard()) { HasOlapTable = true; BuildScanTasksFromShards(stageInfo); diff --git a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt index 551388ba4e..4161013b00 100644 --- a/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/kqp/federated_query/CMakeLists.darwin-x86_64.txt @@ -22,5 +22,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC generic-connector-libcpp ) target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp ) diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt index a662d35a7c..ae88183731 100644 --- a/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt +++ b/ydb/core/kqp/federated_query/CMakeLists.linux-aarch64.txt @@ -23,5 +23,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC generic-connector-libcpp ) target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp ) diff --git a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt index a662d35a7c..ae88183731 100644 --- a/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt +++ b/ydb/core/kqp/federated_query/CMakeLists.linux-x86_64.txt @@ -23,5 +23,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC generic-connector-libcpp ) target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp ) diff --git a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt index 551388ba4e..4161013b00 100644 --- a/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt +++ b/ydb/core/kqp/federated_query/CMakeLists.windows-x86_64.txt @@ -22,5 +22,6 @@ target_link_libraries(core-kqp-federated_query PUBLIC generic-connector-libcpp ) target_sources(core-kqp-federated_query PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp ${CMAKE_SOURCE_DIR}/ydb/core/kqp/federated_query/kqp_federated_query_helpers.cpp ) diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp new file mode 100644 index 0000000000..3c6d883668 --- /dev/null +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.cpp @@ -0,0 +1,103 @@ +#include "kqp_federated_query_actors.h" + +#include <ydb/services/metadata/secret/fetcher.h> +#include <ydb/services/metadata/secret/snapshot.h> + + +namespace NKikimr::NKqp { + +namespace { + +class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> { + STRICT_STFUNC(StateFunc, + hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); + hFunc(NActors::TEvents::TEvWakeup, Handle); + ) + + void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { + auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); + + std::vector<TString> secretValues; + secretValues.reserve(SecretIds.size()); + for (const auto& secretId: SecretIds) { + TString secretValue; + const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue); + if (!isFound) { + LastResponse = TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") }); + return; + } + secretValues.push_back(secretValue); + } + Promise.SetValue(TEvDescribeSecretsResponse::TDescription(secretValues)); + + UnsubscribeFromSecrets(); + PassAway(); + } + + void Handle(NActors::TEvents::TEvWakeup::TPtr&) { + Promise.SetValue(LastResponse); + + UnsubscribeFromSecrets(); + PassAway(); + } + + NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() { + return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>(); + } + + void UnsubscribeFromSecrets() { + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); + } + +public: + TDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime) + : SecretIds(CreateSecretIds(ownerUserId, secretIds)) + , Promise(promise) + , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") }) + , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) + {} + + void Bootstrap() { + if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { + Promise.SetValue(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") })); + PassAway(); + return; + } + + this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); + this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); + Become(&TDescribeSecretsActor::StateFunc); + } + +private: + static std::vector<NMetadata::NSecret::TSecretId> CreateSecretIds(const TString& ownerUserId, const std::vector<TString>& secretIds) { + std::vector<NMetadata::NSecret::TSecretId> result; + for (const auto& secretId: secretIds) { + result.emplace_back(ownerUserId, secretId); + } + return result; + } + +private: + const std::vector<NMetadata::NSecret::TSecretId> SecretIds; + NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> Promise; + TEvDescribeSecretsResponse::TDescription LastResponse; + TDuration MaximalSecretsSnapshotWaitTime; +}; + +} // anonymous namespace + +IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime) { + return new TDescribeSecretsActor(ownerUserId, secretIds, promise, maximalSecretsSnapshotWaitTime); +} + +void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, const TActorContext& actorContext, TDuration maximalSecretsSnapshotWaitTime) { + auto promise = NThreading::NewPromise<TEvDescribeSecretsResponse::TDescription>(); + actorContext.Register(CreateDescribeSecretsActor(ownerUserId, secretIds, promise, maximalSecretsSnapshotWaitTime)); + + promise.GetFuture().Subscribe([actorContext, replyActorId](const NThreading::TFuture<TEvDescribeSecretsResponse::TDescription>& result){ + actorContext.Send(replyActorId, new TEvDescribeSecretsResponse(result.GetValue())); + }); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/federated_query/kqp_federated_query_actors.h b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h new file mode 100644 index 0000000000..6682c63f87 --- /dev/null +++ b/ydb/core/kqp/federated_query/kqp_federated_query_actors.h @@ -0,0 +1,13 @@ +#pragma once + +#include <ydb/core/kqp/common/events/script_executions.h> + +#include <library/cpp/actors/core/actor.h> + + +namespace NKikimr::NKqp { + +NActors::IActor* CreateDescribeSecretsActor(const TString& ownerUserId, const std::vector<TString>& secretIds, NThreading::TPromise<TEvDescribeSecretsResponse::TDescription> promise, TDuration maximalSecretsSnapshotWaitTime); +void RegisterDescribeSecretsActor(const NActors::TActorId& replyActorId, const TString& ownerUserId, const std::vector<TString>& secretIds, const TActorContext& actorContext, TDuration maximalSecretsSnapshotWaitTime); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/federated_query/ya.make b/ydb/core/kqp/federated_query/ya.make index 4af74496ca..41397c2ee5 100644 --- a/ydb/core/kqp/federated_query/ya.make +++ b/ydb/core/kqp/federated_query/ya.make @@ -1,6 +1,7 @@ LIBRARY() SRCS( + kqp_federated_query_actors.cpp kqp_federated_query_helpers.cpp ) diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt new file mode 100644 index 0000000000..985367871a --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/CMakeLists.darwin-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-finalize_script_service) +target_compile_options(core-kqp-finalize_script_service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-finalize_script_service PUBLIC + contrib-libs-cxxsupp + yutil + core-kqp-proxy_service +) +target_sources(core-kqp-finalize_script_service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +) diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt new file mode 100644 index 0000000000..fa886df9b9 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-aarch64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-finalize_script_service) +target_compile_options(core-kqp-finalize_script_service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-finalize_script_service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + core-kqp-proxy_service +) +target_sources(core-kqp-finalize_script_service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +) diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt new file mode 100644 index 0000000000..fa886df9b9 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/CMakeLists.linux-x86_64.txt @@ -0,0 +1,23 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-finalize_script_service) +target_compile_options(core-kqp-finalize_script_service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-finalize_script_service PUBLIC + contrib-libs-linux-headers + contrib-libs-cxxsupp + yutil + core-kqp-proxy_service +) +target_sources(core-kqp-finalize_script_service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +) diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.txt new file mode 100644 index 0000000000..f8b31df0c1 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/CMakeLists.txt @@ -0,0 +1,17 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + +if (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "aarch64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-aarch64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Darwin" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64") + include(CMakeLists.darwin-x86_64.txt) +elseif (WIN32 AND CMAKE_SYSTEM_PROCESSOR STREQUAL "AMD64" AND NOT HAVE_CUDA) + include(CMakeLists.windows-x86_64.txt) +elseif (CMAKE_SYSTEM_NAME STREQUAL "Linux" AND CMAKE_SYSTEM_PROCESSOR STREQUAL "x86_64" AND NOT HAVE_CUDA) + include(CMakeLists.linux-x86_64.txt) +endif() diff --git a/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt b/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt new file mode 100644 index 0000000000..985367871a --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/CMakeLists.windows-x86_64.txt @@ -0,0 +1,22 @@ + +# This file was generated by the build system used internally in the Yandex monorepo. +# Only simple modifications are allowed (adding source-files to targets, adding simple properties +# like target_include_directories). These modifications will be ported to original +# ya.make files by maintainers. Any complex modifications which can't be ported back to the +# original buildsystem will not be accepted. + + + +add_library(core-kqp-finalize_script_service) +target_compile_options(core-kqp-finalize_script_service PRIVATE + -DUSE_CURRENT_UDF_ABI_VERSION +) +target_link_libraries(core-kqp-finalize_script_service PUBLIC + contrib-libs-cxxsupp + yutil + core-kqp-proxy_service +) +target_sources(core-kqp-finalize_script_service PRIVATE + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp +) diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp new file mode 100644 index 0000000000..03d2a475bb --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.cpp @@ -0,0 +1,218 @@ +#include "kqp_finalize_script_actor.h" + +#include <ydb/core/fq/libs/events/events.h> + +#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h> +#include <ydb/core/kqp/proxy_service/kqp_script_executions.h> + +#include <ydb/library/yql/providers/common/provider/yql_provider_names.h> +#include <ydb/library/yql/providers/common/structured_token/yql_token_builder.h> +#include <ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h> +#include <ydb/library/yql/providers/s3/proto/sink.pb.h> + + +namespace NKikimr::NKqp { + +namespace { + +class TScriptFinalizerActor : public TActorBootstrapped<TScriptFinalizerActor> { +public: + TScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request, + const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) + : ReplyActor_(request->Sender) + , ExecutionId_(request->Get()->ExecutionId) + , Database_(request->Get()->Database) + , FinalizationStatus_(request->Get()->FinalizationStatus) + , Request_(std::move(request)) + , FinalizationTimeout_(TDuration::Seconds(finalizeScriptServiceConfig.GetScriptFinalizationTimeoutSeconds())) + , MaximalSecretsSnapshotWaitTime_(2 * TDuration::Seconds(metadataProviderConfig.GetRefreshPeriodSeconds())) + , FederatedQuerySetup_(federatedQuerySetup) + {} + + void Bootstrap() { + Register(CreateSaveScriptFinalStatusActor(std::move(Request_))); + Become(&TScriptFinalizerActor::FetchState); + } + + STRICT_STFUNC(FetchState, + hFunc(TEvSaveScriptFinalStatusResponse, Handle); + hFunc(TEvScriptExecutionFinished, Handle); + ) + + void Handle(TEvSaveScriptFinalStatusResponse::TPtr& ev) { + Schedule(FinalizationTimeout_, new TEvents::TEvWakeup()); + Become(&TScriptFinalizerActor::PrepareState); + + CustomerSuppliedId_ = ev->Get()->CustomerSuppliedId; + Sinks_ = std::move(ev->Get()->Sinks); + UserToken_ = ev->Get()->UserToken; + SecretNames_ = std::move(ev->Get()->SecretNames); + + if (Sinks_.empty()) { + FinishScriptFinalization(); + } else if (SecretNames_.empty()) { + ComputeScriptExternalEffect(); + } else { + FetchSecrets(); + } + } + +private: + STRICT_STFUNC(PrepareState, + hFunc(TEvents::TEvWakeup, Handle); + hFunc(TEvDescribeSecretsResponse, Handle); + hFunc(NFq::TEvents::TEvEffectApplicationResult, Handle); + ) + + void Handle(TEvents::TEvWakeup::TPtr&) { + FinishScriptFinalization(Ydb::StatusIds::TIMEOUT, "Script finalization timeout"); + } + + void FetchSecrets() { + RegisterDescribeSecretsActor(SelfId(), UserToken_, SecretNames_, ActorContext(), MaximalSecretsSnapshotWaitTime_); + } + + void Handle(TEvDescribeSecretsResponse::TPtr& ev) { + if (ev->Get()->Description.Status != Ydb::StatusIds::SUCCESS) { + FinishScriptFinalization(ev->Get()->Description.Status, std::move(ev->Get()->Description.Issues)); + return; + } + + FillSecureParams(ev->Get()->Description.SecretValues); + } + + void FillSecureParams(const std::vector<TString>& secretValues) { + std::map<TString, TString> secretsMap; + for (size_t i = 0; i < secretValues.size(); ++i) { + secretsMap.emplace(SecretNames_[i], secretValues[i]); + } + + for (const auto& sink : Sinks_) { + auto sinkName = sink.GetSinkName(); + + if (sinkName) { + const auto& structuredToken = NYql::CreateStructuredTokenParser(sink.GetAuthInfo()).ToBuilder().ReplaceReferences(secretsMap).ToJson(); + SecureParams_.emplace(sinkName, structuredToken); + } + } + + ComputeScriptExternalEffect(); + } + +private: + static void AddExternalEffectS3(const NKqpProto::TKqpExternalSink& sink, NYql::NDqProto::TExternalEffect& externalEffectS3) { + NYql::NS3::TSink sinkSettings; + sink.GetSettings().UnpackTo(&sinkSettings); + + NYql::NS3::TEffect sinkEffect; + sinkEffect.SetToken(sink.GetSinkName()); + sinkEffect.MutableCleanup()->SetUrl(sinkSettings.GetUrl()); + sinkEffect.MutableCleanup()->SetPrefix(sinkSettings.GetPath()); + + externalEffectS3.AddEffects()->SetData(sinkEffect.SerializeAsString()); + } + + void ComputeScriptExternalEffect() { + NYql::NDqProto::TExternalEffect externalEffectS3; + externalEffectS3.SetProviderName(TString(NYql::S3ProviderName)); + + for (const auto& sink : Sinks_) { + const TString& sinkType = sink.GetType(); + + if (sinkType == "S3Sink") { + AddExternalEffectS3(sink, externalEffectS3); + } else { + FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, TStringBuilder() << "unknown effect sink type: " << sinkType); + return; + } + } + + RunS3ApplicatorActor(externalEffectS3); + } + + void RunS3ApplicatorActor(const NYql::NDqProto::TExternalEffect& externalEffect) { + if (!FederatedQuerySetup_) { + FinishScriptFinalization(Ydb::StatusIds::INTERNAL_ERROR, "unable to aplicate s3 external effect, invalid federated query setup"); + return; + } + + Register(NYql::NDq::MakeS3ApplicatorActor( + SelfId(), + FederatedQuerySetup_->HttpGateway, + CreateGuidAsString(), + CustomerSuppliedId_, + std::nullopt, + FinalizationStatus_ == EFinalizationStatus::FS_COMMIT, + THashMap<TString, TString>(SecureParams_.begin(), SecureParams_.end()), + FederatedQuerySetup_->CredentialsFactory, + externalEffect + ).Release()); + } + + void Handle(NFq::TEvents::TEvEffectApplicationResult::TPtr& ev) { + if (ev->Get()->FatalError) { + FinishScriptFinalization(Ydb::StatusIds::BAD_REQUEST, std::move(ev->Get()->Issues)); + } else { + FinishScriptFinalization(); + } + } + +private: + STRICT_STFUNC(FinishState, + IgnoreFunc(TEvents::TEvWakeup); + IgnoreFunc(TEvDescribeSecretsResponse); + IgnoreFunc(NFq::TEvents::TEvEffectApplicationResult); + hFunc(TEvScriptExecutionFinished, Handle); + ) + + void FinishScriptFinalization(std::optional<Ydb::StatusIds::StatusCode> status, NYql::TIssues issues) { + Register(CreateScriptFinalizationFinisherActor(ExecutionId_, Database_, status, std::move(issues))); + Become(&TScriptFinalizerActor::FinishState); + } + + void FinishScriptFinalization(Ydb::StatusIds::StatusCode status, const TString& message) { + FinishScriptFinalization(status, { NYql::TIssue(message) }); + } + + void FinishScriptFinalization() { + FinishScriptFinalization(std::nullopt, {}); + } + + void Handle(TEvScriptExecutionFinished::TPtr& ev) { + Send(ReplyActor_, ev->Release()); + Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), new TEvScriptFinalizeResponse(ExecutionId_)); + + PassAway(); + } + +private: + TActorId ReplyActor_; + TString ExecutionId_; + TString Database_; + EFinalizationStatus FinalizationStatus_; + TEvScriptFinalizeRequest::TPtr Request_; + + TDuration FinalizationTimeout_; + TDuration MaximalSecretsSnapshotWaitTime_; + const std::optional<TKqpFederatedQuerySetup>& FederatedQuerySetup_; + + TString CustomerSuppliedId_; + std::vector<NKqpProto::TKqpExternalSink> Sinks_; + + TString UserToken_; + std::vector<TString> SecretNames_; + std::unordered_map<TString, TString> SecureParams_; +}; + +} // anonymous namespace + +IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request, + const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup) { + return new TScriptFinalizerActor(std::move(request), finalizeScriptServiceConfig, metadataProviderConfig, federatedQuerySetup); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h new file mode 100644 index 0000000000..b47e553957 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_actor.h @@ -0,0 +1,14 @@ +#pragma once + +#include <ydb/core/kqp/common/events/script_executions.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> + + +namespace NKikimr::NKqp { + +IActor* CreateScriptFinalizerActor(TEvScriptFinalizeRequest::TPtr request, + const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + const std::optional<TKqpFederatedQuerySetup>& federatedQuerySetup); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp new file mode 100644 index 0000000000..cf6c66d5b5 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.cpp @@ -0,0 +1,143 @@ +#include "kqp_finalize_script_service.h" +#include "kqp_finalize_script_actor.h" + +#include <ydb/core/kqp/proxy_service/kqp_script_executions.h> + +#include <ydb/library/yql/providers/s3/proto/sink.pb.h> + + +namespace NKikimr::NKqp { + +namespace { + +class TKqpFinalizeScriptService : public TActorBootstrapped<TKqpFinalizeScriptService> { +public: + TKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) + : FinalizeScriptServiceConfig_(finalizeScriptServiceConfig) + , MetadataProviderConfig_(metadataProviderConfig) + , FederatedQuerySetupFactory_(federatedQuerySetupFactory) + {} + + void Bootstrap(const TActorContext &ctx) { + FederatedQuerySetup_ = FederatedQuerySetupFactory_->Make(ctx.ActorSystem()); + + Become(&TKqpFinalizeScriptService::MainState); + } + + void Handle(TEvSaveScriptExternalEffectRequest::TPtr& ev) { + ev->Get()->Sinks = FilterExternalSinks(ev->Get()->Sinks); + + if (!ev->Get()->Sinks.empty()) { + Register(CreateSaveScriptExternalEffectActor(std::move(ev))); + } else { + Send(ev->Sender, new TEvSaveScriptExternalEffectResponse(Ydb::StatusIds::SUCCESS, {})); + } + } + + void Handle(TEvScriptFinalizeRequest::TPtr& ev) { + TString executionId = ev->Get()->ExecutionId; + + if (!FinalizationRequestsQueue_.contains(executionId)) { + WaitingFinalizationExecutions_.push(executionId); + } + FinalizationRequestsQueue_[executionId].emplace_back(std::move(ev)); + + TryStartFinalizeRequest(); + } + + STATEFN(MainState) { + switch (ev->GetTypeRewrite()) { + hFunc(TEvSaveScriptExternalEffectRequest, Handle); + hFunc(TEvScriptFinalizeRequest, Handle); + hFunc(TEvScriptFinalizeResponse, Handle); + default: + Y_ABORT("TKqpScriptFinalizeService: unexpected event type: %" PRIx32 " event: %s", ev->GetTypeRewrite(), ev->ToString().data()); + } + } + +private: + void TryStartFinalizeRequest() { + if (FinalizationRequestsInFlight_ >= FinalizeScriptServiceConfig_.GetMaxInFlightFinalizationsCount() || WaitingFinalizationExecutions_.empty()) { + return; + } + + TString executionId = WaitingFinalizationExecutions_.front(); + WaitingFinalizationExecutions_.pop(); + + auto& queue = FinalizationRequestsQueue_[executionId]; + Y_ENSURE(!queue.empty()); + + StartFinalizeRequest(std::move(queue.back())); + queue.pop_back(); + } + + void StartFinalizeRequest(TEvScriptFinalizeRequest::TPtr request) { + ++FinalizationRequestsInFlight_; + + Register(CreateScriptFinalizerActor( + std::move(request), + FinalizeScriptServiceConfig_, + MetadataProviderConfig_, + FederatedQuerySetup_ + )); + } + + void Handle(TEvScriptFinalizeResponse::TPtr& ev) { + --FinalizationRequestsInFlight_; + TString executionId = ev->Get()->ExecutionId; + + if (!FinalizationRequestsQueue_[executionId].empty()) { + WaitingFinalizationExecutions_.push(executionId); + } else { + FinalizationRequestsQueue_.erase(executionId); + } + TryStartFinalizeRequest(); + } + +private: + bool ValidateExternalSink(const NKqpProto::TKqpExternalSink& sink) { + if (sink.GetType() != "S3Sink") { + return false; + } + + NYql::NS3::TSink sinkSettings; + sink.GetSettings().UnpackTo(&sinkSettings); + + return sinkSettings.GetAtomicUploadCommit(); + } + + std::vector<NKqpProto::TKqpExternalSink> FilterExternalSinks(const std::vector<NKqpProto::TKqpExternalSink>& sinks) { + std::vector<NKqpProto::TKqpExternalSink> filteredSinks; + filteredSinks.reserve(sinks.size()); + for (const auto& sink : sinks) { + if (ValidateExternalSink(sink)) { + filteredSinks.push_back(sink); + } + } + + return filteredSinks; + } + +private: + NKikimrConfig::TFinalizeScriptServiceConfig FinalizeScriptServiceConfig_; + NKikimrConfig::TMetadataProviderConfig MetadataProviderConfig_; + + IKqpFederatedQuerySetupFactory::TPtr FederatedQuerySetupFactory_; + std::optional<TKqpFederatedQuerySetup> FederatedQuerySetup_; + + ui32 FinalizationRequestsInFlight_ = 0; + std::queue<TString> WaitingFinalizationExecutions_; + std::unordered_map<TString, std::vector<TEvScriptFinalizeRequest::TPtr>> FinalizationRequestsQueue_; +}; + +} // anonymous namespace + +IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory) { + return new TKqpFinalizeScriptService(finalizeScriptServiceConfig, metadataProviderConfig, std::move(federatedQuerySetupFactory)); +} + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h new file mode 100644 index 0000000000..53f0c2ea5f --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h @@ -0,0 +1,12 @@ +#pragma once + +#include <ydb/core/kqp/federated_query/kqp_federated_query_helpers.h> + + +namespace NKikimr::NKqp { + +IActor* CreateKqpFinalizeScriptService(const NKikimrConfig::TFinalizeScriptServiceConfig& finalizeScriptServiceConfig, + const NKikimrConfig::TMetadataProviderConfig& metadataProviderConfig, + IKqpFederatedQuerySetupFactory::TPtr federatedQuerySetupFactory); + +} // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/finalize_script_service/ya.make b/ydb/core/kqp/finalize_script_service/ya.make new file mode 100644 index 0000000000..935b275fa5 --- /dev/null +++ b/ydb/core/kqp/finalize_script_service/ya.make @@ -0,0 +1,14 @@ +LIBRARY() + +SRCS( + kqp_finalize_script_actor.cpp + kqp_finalize_script_service.cpp +) + +PEERDIR( + ydb/core/kqp/proxy_service +) + +YQL_LAST_ABI_VERSION() + +END() diff --git a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h index 9c9ab9b85a..966f444a73 100644 --- a/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h +++ b/ydb/core/kqp/gateway/actors/kqp_ic_gateway_actors.h @@ -3,8 +3,6 @@ #include <ydb/core/base/tablet_pipe.h> #include <ydb/core/base/appdata.h> #include <ydb/library/yql/providers/common/gateway/yql_provider_gateway.h> -#include <ydb/services/metadata/secret/fetcher.h> -#include <ydb/services/metadata/secret/snapshot.h> #include <library/cpp/actors/core/actor_bootstrapped.h> #include <library/cpp/actors/core/hfunc.h> @@ -113,97 +111,4 @@ private: TActorId ActorId; }; -struct TDescribeSecretsResponse { - TDescribeSecretsResponse(Ydb::StatusIds::StatusCode status, NYql::TIssues issues) - : Status(status) - , Issues(std::move(issues)) - {} - - TDescribeSecretsResponse(const TVector<TString>& secretValues) - : SecretValues(secretValues) - , Status(Ydb::StatusIds::SUCCESS) - {} - - TVector<TString> SecretValues; - Ydb::StatusIds::StatusCode Status; - NYql::TIssues Issues; -}; - -class TDescribeSecretsActor: public NActors::TActorBootstrapped<TDescribeSecretsActor> { - STRICT_STFUNC(StateFunc, - hFunc(NMetadata::NProvider::TEvRefreshSubscriberData, Handle); - hFunc(NActors::TEvents::TEvWakeup, Handle); - ) - - void Handle(NMetadata::NProvider::TEvRefreshSubscriberData::TPtr& ev) { - auto snapshot = ev->Get()->GetSnapshotAs<NMetadata::NSecret::TSnapshot>(); - - TVector<TString> secretValues; - secretValues.reserve(SecretIds.size()); - for (const auto& secretId: SecretIds) { - TString secretValue; - const bool isFound = snapshot->GetSecretValue(NMetadata::NSecret::TSecretIdOrValue::BuildAsId(secretId), secretValue); - if (!isFound) { - LastResponse = TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("secret with name '" + secretId.GetSecretId() + "' not found") }); - return; - } - secretValues.push_back(secretValue); - } - Promise.SetValue(TDescribeSecretsResponse(secretValues)); - - UnsubscribeFromSecrets(); - PassAway(); - } - - void Handle(NActors::TEvents::TEvWakeup::TPtr&) { - Promise.SetValue(LastResponse); - - UnsubscribeFromSecrets(); - PassAway(); - } - - NMetadata::NFetcher::ISnapshotsFetcher::TPtr GetSecretsSnapshotParser() { - return std::make_shared<NMetadata::NSecret::TSnapshotsFetcher>(); - } - - void UnsubscribeFromSecrets() { - this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvUnsubscribeExternal(GetSecretsSnapshotParser())); - } - -public: - TDescribeSecretsActor(const TString& ownerUserId, const TVector<TString>& secretIds, NThreading::TPromise<TDescribeSecretsResponse> promise, TDuration maximalSecretsSnapshotWaitTime) - : SecretIds(CreateSecretIds(ownerUserId, secretIds)) - , Promise(promise) - , LastResponse(Ydb::StatusIds::TIMEOUT, { NYql::TIssue("secrets snapshot fetching timeout") }) - , MaximalSecretsSnapshotWaitTime(maximalSecretsSnapshotWaitTime) - {} - - void Bootstrap() { - if (!NMetadata::NProvider::TServiceOperator::IsEnabled()) { - Promise.SetValue(TDescribeSecretsResponse(Ydb::StatusIds::INTERNAL_ERROR, { NYql::TIssue("metadata service is not active") })); - PassAway(); - return; - } - - this->Send(NMetadata::NProvider::MakeServiceId(SelfId().NodeId()), new NMetadata::NProvider::TEvSubscribeExternal(GetSecretsSnapshotParser())); - this->Schedule(MaximalSecretsSnapshotWaitTime, new NActors::TEvents::TEvWakeup()); - Become(&TDescribeSecretsActor::StateFunc); - } - -private: - static TVector<NMetadata::NSecret::TSecretId> CreateSecretIds(const TString& ownerUserId, const TVector<TString>& secretIds) { - TVector<NMetadata::NSecret::TSecretId> result; - for (const auto& secretId: secretIds) { - result.emplace_back(ownerUserId, secretId); - } - return result; - } - -private: - const TVector<NMetadata::NSecret::TSecretId> SecretIds; - NThreading::TPromise<TDescribeSecretsResponse> Promise; - TDescribeSecretsResponse LastResponse; - TDuration MaximalSecretsSnapshotWaitTime; -}; - } diff --git a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp index f996e69885..8d075b9b3c 100644 --- a/ydb/core/kqp/gateway/kqp_metadata_loader.cpp +++ b/ydb/core/kqp/gateway/kqp_metadata_loader.cpp @@ -2,6 +2,7 @@ #include "actors/kqp_ic_gateway_actors.h" #include <ydb/core/base/path.h> +#include <ydb/core/kqp/federated_query/kqp_federated_query_actors.h> #include <ydb/core/statistics/events.h> #include <ydb/core/statistics/stat_service.h> @@ -361,7 +362,7 @@ void SetError(TTableMetadataResult& externalDataSourceMetadata, const TString& e externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(Ydb::StatusIds::BAD_REQUEST)); } -void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSourceMetadata, const TDescribeSecretsResponse& objectDescription) { +void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSourceMetadata, const TEvDescribeSecretsResponse::TDescription& objectDescription) { if (objectDescription.Status != Ydb::StatusIds::SUCCESS) { externalDataSourceMetadata.AddIssues(objectDescription.Issues); externalDataSourceMetadata.SetStatus(NYql::YqlStatusFromYdbStatus(objectDescription.Status)); @@ -419,44 +420,44 @@ void UpdateExternalDataSourceSecretsValue(TTableMetadataResult& externalDataSour } } -NThreading::TFuture<TDescribeSecretsResponse> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) { +NThreading::TFuture<TEvDescribeSecretsResponse::TDescription> LoadExternalDataSourceSecretValues(const NSchemeCache::TSchemeCacheNavigate::TEntry& entry, const TIntrusiveConstPtr<NACLib::TUserToken>& userToken, TDuration maximalSecretsSnapshotWaitTime, TActorSystem* actorSystem) { const auto& authDescription = entry.ExternalDataSourceInfo->Description.GetAuth(); switch (authDescription.identity_case()) { case NKikimrSchemeOp::TAuth::kServiceAccount: { const TString& saSecretId = authDescription.GetServiceAccount().GetSecretName(); - auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime)); + auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>(); + actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::kNone: - return MakeFuture(TDescribeSecretsResponse({})); + return MakeFuture(TEvDescribeSecretsResponse::TDescription({})); case NKikimrSchemeOp::TAuth::kBasic: { const TString& passwordSecretId = authDescription.GetBasic().GetPasswordSecretName(); - auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime)); + auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>(); + actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {passwordSecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::kMdbBasic: { const TString& saSecretId = authDescription.GetMdbBasic().GetServiceAccountSecretName(); const TString& passwordSecreId = authDescription.GetMdbBasic().GetPasswordSecretName(); - auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime)); + auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>(); + actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {saSecretId, passwordSecreId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::kAws: { const TString& awsAccessKeyIdSecretId = authDescription.GetAws().GetAwsAccessKeyIdSecretName(); const TString& awsAccessKeyKeySecretId = authDescription.GetAws().GetAwsSecretAccessKeySecretName(); - auto promise = NewPromise<TDescribeSecretsResponse>(); - actorSystem->Register(new TDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime)); + auto promise = NewPromise<TEvDescribeSecretsResponse::TDescription>(); + actorSystem->Register(CreateDescribeSecretsActor(userToken ? userToken->GetUserSID() : "", {awsAccessKeyIdSecretId, awsAccessKeyKeySecretId}, promise, maximalSecretsSnapshotWaitTime)); return promise.GetFuture(); } case NKikimrSchemeOp::TAuth::IDENTITY_NOT_SET: - return MakeFuture(TDescribeSecretsResponse(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") })); + return MakeFuture(TEvDescribeSecretsResponse::TDescription(Ydb::StatusIds::BAD_REQUEST, { NYql::TIssue("identity case is not specified") })); } } @@ -733,7 +734,7 @@ NThreading::TFuture<TTableMetadataResult> TKqpTableMetadataLoader::LoadTableMeta return; } LoadExternalDataSourceSecretValues(entry, userToken, MaximalSecretsSnapshotWaitTime, ActorSystem) - .Subscribe([promise, externalDataSourceMetadata](const TFuture<TDescribeSecretsResponse>& result) mutable + .Subscribe([promise, externalDataSourceMetadata](const TFuture<TEvDescribeSecretsResponse::TDescription>& result) mutable { UpdateExternalDataSourceSecretsValue(externalDataSourceMetadata, result.GetValue()); promise.SetValue(externalDataSourceMetadata); diff --git a/ydb/core/kqp/host/kqp_host.cpp b/ydb/core/kqp/host/kqp_host.cpp index dd7f40f13b..cfac07d4bb 100644 --- a/ydb/core/kqp/host/kqp_host.cpp +++ b/ydb/core/kqp/host/kqp_host.cpp @@ -1504,12 +1504,13 @@ private: *PlanBuilder, sqlVersion, true /* UseDqExplain */); } - void InitS3Provider() { + void InitS3Provider(EKikimrQueryType queryType) { auto state = MakeIntrusive<NYql::TS3State>(); state->Types = TypesCtx.Get(); state->FunctionRegistry = FuncRegistry; state->CredentialsFactory = FederatedQuerySetup->CredentialsFactory; state->Configuration->WriteThroughDqIntegration = true; + state->Configuration->AllowAtomicUploadCommit = queryType == EKikimrQueryType::Script; state->Configuration->Init(FederatedQuerySetup->S3GatewayConfig, TypesCtx); @@ -1565,7 +1566,7 @@ private: TypesCtx->AddDataSink(providerNames, kikimrDataSink); if ((queryType == EKikimrQueryType::Script || queryType == EKikimrQueryType::Query) && FederatedQuerySetup) { - InitS3Provider(); + InitS3Provider(queryType); InitGenericProvider(); } diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp index 3070b4feab..c8050700bb 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.cpp @@ -127,6 +127,7 @@ private: Col("run_script_actor_id", NScheme::NTypeIds::Text), Col("operation_status", NScheme::NTypeIds::Int32), Col("execution_status", NScheme::NTypeIds::Int32), + Col("finalization_status", NScheme::NTypeIds::Int32), Col("execution_mode", NScheme::NTypeIds::Int32), Col("start_ts", NScheme::NTypeIds::Timestamp), Col("end_ts", NScheme::NTypeIds::Timestamp), @@ -140,6 +141,10 @@ private: Col("result_set_metas", NScheme::NTypeIds::JsonDocument), Col("stats", NScheme::NTypeIds::JsonDocument), Col("expire_at", NScheme::NTypeIds::Timestamp), // Will be deleted from database after this deadline. + Col("customer_supplied_id", NScheme::NTypeIds::Text), + Col("user_token", NScheme::NTypeIds::Text), + Col("script_sinks", NScheme::NTypeIds::JsonDocument), + Col("script_secret_names", NScheme::NTypeIds::JsonDocument), }, { "database", "execution_id" }, TtlCol("expire_at") @@ -559,241 +564,119 @@ private: IRetryPolicy::IRetryState::TPtr RetryState = nullptr; }; +class TCheckLeaseStatusActorBase : public TActorBootstrapped<TCheckLeaseStatusActorBase> { + using TBase = TActorBootstrapped<TCheckLeaseStatusActorBase>; -class TScriptExecutionFinisherBase : public TQueryBase { -public: - using TQueryBase::TQueryBase; - - void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, - TDuration operationTtl, TDuration resultsTtl, const NYql::TIssues& issues = LeaseExpiredIssues(), TTxControl txControl = TTxControl::ContinueAndCommitTx(), - TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) { - TString sql = R"( - -- TScriptExecutionFinisherBase::FinishScriptExecution - DECLARE $database AS Text; - DECLARE $execution_id AS Text; - DECLARE $operation_status AS Int32; - DECLARE $execution_status AS Int32; - DECLARE $issues AS JsonDocument; - DECLARE $plan AS JsonDocument; - DECLARE $stats AS JsonDocument; - DECLARE $ast AS Text; - DECLARE $operation_ttl AS Interval; - DECLARE $results_ttl AS Interval; - - UPDATE `.metadata/script_executions` - SET - operation_status = $operation_status, - execution_status = $execution_status, - issues = $issues, - plan = $plan, - end_ts = CurrentUtcTimestamp(), - stats = $stats, - ast = $ast, - expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL) - WHERE database = $database AND execution_id = $execution_id; - - DELETE FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; - - UPDATE `.metadata/result_sets` - SET - expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL) - where database = $database AND execution_id = $execution_id; - )"; - - TString serializedStats = "{}"; - if (kqpStats) { - NJson::TJsonValue statsJson; - Ydb::TableStats::QueryStats queryStats; - NGRpcService::FillQueryStats(queryStats, *kqpStats); - NProtobufJson::Proto2Json(queryStats, statsJson, NProtobufJson::TProto2JsonConfig()); - serializedStats = NJson::WriteJson(statsJson); - } - - NYdb::TParamsBuilder params; - params - .AddParam("$database") - .Utf8(database) - .Build() - .AddParam("$execution_id") - .Utf8(executionId) - .Build() - .AddParam("$operation_status") - .Int32(operationStatus) - .Build() - .AddParam("$execution_status") - .Int32(execStatus) - .Build() - .AddParam("$issues") - .JsonDocument(SerializeIssues(issues)) - .Build() - .AddParam("$plan") - .JsonDocument(queryPlan.GetOrElse("{}")) - .Build() - .AddParam("$stats") - .JsonDocument(serializedStats) - .Build() - .AddParam("$ast") - .Utf8(queryAst.GetOrElse("")) - .Build() - .AddParam("$operation_ttl") - .Interval(static_cast<i64>(operationTtl.MicroSeconds())) - .Build() - .AddParam("$results_ttl") - .Interval(static_cast<i64>(resultsTtl.MicroSeconds())) - .Build(); + inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60); - RunDataQuery(sql, ¶ms, txControl); +public: + void Bootstrap() { + OnBootstrap(); } - void FinishScriptExecution(const TString& database, const TString& executionId, Ydb::StatusIds::StatusCode operationStatus, Ydb::Query::ExecStatus execStatus, - TDuration operationTtl, TDuration resultsTtl, const TString& message, TTxControl txControl = TTxControl::ContinueAndCommitTx(), - TMaybe<NKqpProto::TKqpStatsQuery> kqpStats = Nothing(), TMaybe<TString> queryPlan = Nothing(), TMaybe<TString> queryAst = Nothing()) { - FinishScriptExecution(database, executionId, operationStatus, execStatus, operationTtl, resultsTtl, IssuesFromMessage(message), txControl, kqpStats, queryPlan, queryAst); + Ydb::StatusIds::StatusCode GetOperationStatus() const { + return FinalOperationStatus; } - static NYql::TIssues IssuesFromMessage(const TString& message) { - NYql::TIssues issues; - issues.AddIssue(message); - return issues; + Ydb::Query::ExecStatus GetExecStatus() const { + return FinalExecStatus; } - static NYql::TIssues LeaseExpiredIssues() { - return IssuesFromMessage("Lease expired"); + NYql::TIssues GetIssues() const { + return FinalIssues; } -}; -TMaybe<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) { - NKikimrKqp::TScriptExecutionOperationMeta meta; - try { - NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig()); - return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl())); - } catch (NJson::TJsonException &e) { - return Nothing(); - } -} + void StartScriptFinalization(EFinalizationStatus finalizationStatus, const TString& executionId, const TString& database, TMaybe<Ydb::StatusIds::StatusCode> status, TMaybe<Ydb::Query::ExecStatus> execStatus, NYql::TIssues issues) { + if (!status || !execStatus) { + issues.AddIssue("Finalization is not complete"); + } -class TScriptExecutionFinisher : public TScriptExecutionFinisherBase { -public: - TScriptExecutionFinisher( - const TString& executionId, - const TString& database, - ui64 leaseGeneration, - Ydb::StatusIds::StatusCode operationStatus, - Ydb::Query::ExecStatus execStatus, - NYql::TIssues issues, - TMaybe<NKqpProto::TKqpStatsQuery> queryStats = Nothing(), - TMaybe<TString> queryPlan = Nothing(), - TMaybe<TString> queryAst = Nothing() - ) - : Database(database) - , ExecutionId(executionId) - , LeaseGeneration(leaseGeneration) - , OperationStatus(operationStatus) - , ExecStatus(execStatus) - , Issues(std::move(issues)) - , QueryStats(std::move(queryStats)) - , QueryPlan(std::move(queryPlan)) - , QueryAst(std::move(queryAst)) - { + ScriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(finalizationStatus, executionId, database, status ? *status : Ydb::StatusIds::UNAVAILABLE, execStatus ? *execStatus : Ydb::Query::EXEC_STATUS_ABORTED, std::move(issues)); + RunScriptFinalizeRequest(); + + Become(&TCheckLeaseStatusActorBase::StateFunc); } - void OnRunQuery() override { - TString sql = R"( - -- TScriptExecutionFinisher::OnRunQuery - DECLARE $database AS Text; - DECLARE $execution_id AS Text; + void StartLeaseChecking(TActorId runScriptActorId, const TString& executionId, const TString& database) { + ScriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>(EFinalizationStatus::FS_ROLLBACK, executionId, database, Ydb::StatusIds::UNAVAILABLE, Ydb::Query::EXEC_STATUS_ABORTED, NYql::TIssues{ NYql::TIssue("Lease expired") }); - SELECT lease_generation FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; + Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup()); - SELECT meta FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id; - )"; + ui64 flags = IEventHandle::FlagTrackDelivery; + if (runScriptActorId.NodeId() != SelfId().NodeId()) { + flags |= IEventHandle::FlagSubscribeOnSession; + SubscribedOnSession = runScriptActorId.NodeId(); + } + Send(runScriptActorId, new TEvCheckAliveRequest(), flags); - NYdb::TParamsBuilder params; - params - .AddParam("$database") - .Utf8(Database) - .Build() - .AddParam("$execution_id") - .Utf8(ExecutionId) - .Build(); + Become(&TCheckLeaseStatusActorBase::StateFunc); + } - RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + void PassAway() override { + if (SubscribedOnSession) { + Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); + } + TBase::PassAway(); } - void OnQueryResult() override { - if (!FinishWasRun) { - if (ResultSets.size() != 2) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); - return; - } - NYdb::TResultSetParser result(ResultSets[0]); - if (result.RowsCount() == 0) { - Finish(Ydb::StatusIds::BAD_REQUEST, "No such execution"); - return; - } + virtual void OnBootstrap() = 0; + virtual void OnLeaseVerified() = 0; + virtual void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) = 0; - result.TryNextRow(); +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvCheckAliveResponse, Handle); + hFunc(TEvents::TEvWakeup, Handle); + hFunc(NActors::TEvents::TEvUndelivered, Handle); + hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); + hFunc(TEvScriptExecutionFinished, Handle); + ) - const TMaybe<i64> leaseGenerationInDatabase = result.ColumnParser(0).GetOptionalInt64(); - if (!leaseGenerationInDatabase) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation"); - return; - } + void RunScriptFinalizeRequest() { + if (WaitFinishQuery) { + return; + } - if (LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) { - Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost"); - return; - } + WaitFinishQuery = true; + FinalOperationStatus = ScriptFinalizeRequest->OperationStatus; + FinalExecStatus = ScriptFinalizeRequest->ExecStatus; + FinalIssues = ScriptFinalizeRequest->Issues; + Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), ScriptFinalizeRequest.release()); + } - NYdb::TResultSetParser metaResult(ResultSets[1]); - metaResult.TryNextRow(); + void Handle(TEvCheckAliveResponse::TPtr&) { + OnLeaseVerified(); + } - const auto serializedMeta = metaResult.ColumnParser("meta").GetOptionalJsonDocument(); - if (!serializedMeta) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation"); - return; - } + void Handle(TEvents::TEvWakeup::TPtr&) { + RunScriptFinalizeRequest(); + } - const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } + void Handle(NActors::TEvents::TEvUndelivered::TPtr&) { + RunScriptFinalizeRequest(); + } - const auto [operationTtl, resultsTtl] = *ttl; - FinishScriptExecution(Database, ExecutionId, OperationStatus, ExecStatus, operationTtl, resultsTtl, - Issues, TTxControl::ContinueAndCommitTx(), std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst)); - FinishWasRun = true; - } else { - Finish(); - } + void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { + RunScriptFinalizeRequest(); } - void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { - KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << ExecutionId << ". Lease generation: " << - LeaseGeneration << ": " << Ydb::StatusIds::StatusCode_Name(status) << ". Issues: " << issues.ToOneLineString() << ". Plan: " << QueryPlan); - Send(Owner, new TEvScriptExecutionFinished(status, std::move(issues))); + void Handle(TEvScriptExecutionFinished::TPtr& ev) { + OnScriptExecutionFinished(ev->Get()->OperationAlreadyFinalized, ev->Get()->Status, std::move(ev->Get()->Issues)); } private: - const TString Database; - const TString ExecutionId; - const ui64 LeaseGeneration; - const Ydb::StatusIds::StatusCode OperationStatus; - const Ydb::Query::ExecStatus ExecStatus; - const NYql::TIssues Issues; - const TMaybe<NKqpProto::TKqpStatsQuery> QueryStats; - const TMaybe<TString> QueryPlan; - const TMaybe<TString> QueryAst; - bool FinishWasRun = false; + std::unique_ptr<TEvScriptFinalizeRequest> ScriptFinalizeRequest; + Ydb::StatusIds::StatusCode FinalOperationStatus; + Ydb::Query::ExecStatus FinalExecStatus; + NYql::TIssues FinalIssues; + + bool WaitFinishQuery = false; + std::optional<ui32> SubscribedOnSession; }; -class TCheckLeaseStatusActor : public TScriptExecutionFinisherBase { +class TCheckLeaseStatusQueryActor : public TQueryBase { public: - TCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie = 0) + TCheckLeaseStatusQueryActor(const TString& database, const TString& executionId, ui64 cookie = 0) : Database(database) , ExecutionId(executionId) , Cookie(cookie) @@ -801,16 +684,23 @@ public: void OnRunQuery() override { const TString sql = R"( - -- TCheckLeaseStatusActor::OnRunQuery + -- TCheckLeaseStatusQueryActor::OnRunQuery DECLARE $database AS Text; DECLARE $execution_id AS Text; - SELECT operation_status, execution_status, issues, run_script_actor_id, meta FROM `.metadata/script_executions` - WHERE database = $database AND execution_id = $execution_id AND - (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); + SELECT + operation_status, + execution_status, + finalization_status, + issues, + run_script_actor_id + FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id AND + (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); - SELECT lease_deadline FROM `.metadata/script_execution_leases` - WHERE database = $database AND execution_id = $execution_id; + SELECT lease_deadline + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; )"; NYdb::TParamsBuilder params; @@ -822,11 +712,10 @@ public: .Utf8(ExecutionId) .Build(); - RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); - SetQueryResultHandler(&TCheckLeaseStatusActor::OnResult); + RunDataQuery(sql, ¶ms); } - void OnResult() { + void OnQueryResult() override { if (ResultSets.size() != 2) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); return; @@ -850,6 +739,12 @@ public: } TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); + + const TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32(); + if (finalizationStatus) { + FinalizationStatus = static_cast<EFinalizationStatus>(*finalizationStatus); + } + TMaybe<TInstant> leaseDeadline; NYdb::TResultSetParser result2(ResultSets[1]); @@ -864,22 +759,8 @@ public: if (operationStatus) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); } else if (*leaseDeadline < RunStartTime) { - auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); - if (!serializedMeta) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing opeartion metainformation"); - return; - } - const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } - const auto [operationTtl, resultsTtl] = *ttl; - FinishScriptExecution(Database, ExecutionId, Ydb::StatusIds::UNAVAILABLE, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl); - SetQueryResultHandler(&TCheckLeaseStatusActor::OnFinishScriptExecution); - } else { - // OperationStatus is Nothing(): currently running - CommitTransaction(); + LeaseExpired = true; + FinalizationStatus = EFinalizationStatus::FS_ROLLBACK; } } else if (operationStatus) { OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); @@ -891,22 +772,16 @@ public: if (issuesSerialized) { OperationIssues = DeserializeIssues(*issuesSerialized); } - CommitTransaction(); } else { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Invalid operation state"); } - } - void OnFinishScriptExecution() { - OperationStatus = Ydb::StatusIds::UNAVAILABLE; - ExecutionStatus = Ydb::Query::EXEC_STATUS_ABORTED; - OperationIssues = LeaseExpiredIssues(); Finish(); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { if (status == Ydb::StatusIds::SUCCESS) { - Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId), 0, Cookie); + Send(Owner, new TEvPrivate::TEvLeaseCheckResult(OperationStatus, ExecutionStatus, std::move(OperationIssues), RunScriptActorId, LeaseExpired, FinalizationStatus), 0, Cookie); } else { Send(Owner, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues)), 0, Cookie); } @@ -919,8 +794,83 @@ private: const ui64 Cookie; TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; + TMaybe<EFinalizationStatus> FinalizationStatus; TMaybe<NYql::TIssues> OperationIssues; NActors::TActorId RunScriptActorId; + bool LeaseExpired; +}; + +class TCheckLeaseStatusActor : public TCheckLeaseStatusActorBase { +public: + TCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie = 0) + : ReplyActorId(replyActorId) + , Database(database) + , ExecutionId(executionId) + , Cookie(cookie) + {} + + void OnBootstrap() override { + Register(new TCheckLeaseStatusQueryActor(Database, ExecutionId, Cookie)); + Become(&TCheckLeaseStatusActor::StateFunc); + } + + void OnLeaseVerified() override { + Reply(); + } + + void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status != Ydb::StatusIds::SUCCESS) { + Reply(status, std::move(issues)); + return; + } + + if (alreadyFinalized) { + // Final status and issues are unknown, the operation must be repeated + Response->Get()->OperationStatus = Nothing(); + Response->Get()->ExecutionStatus = Nothing(); + Response->Get()->OperationIssues->Clear(); + } else { + Response->Get()->OperationStatus = GetOperationStatus(); + Response->Get()->ExecutionStatus = GetExecStatus(); + Response->Get()->OperationIssues = GetIssues(); + } + + Reply(); + } + +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvPrivate::TEvLeaseCheckResult, Handle); + ) + + void Handle(TEvPrivate::TEvLeaseCheckResult::TPtr& ev) { + Response = std::move(ev); + + if (!Response->Get()->FinalizationStatus) { + Reply(); + } else if (Response->Get()->LeaseExpired) { + StartLeaseChecking(Response->Get()->RunScriptActorId, ExecutionId, Database); + } else { + StartScriptFinalization(*Response->Get()->FinalizationStatus, ExecutionId, Database, Response->Get()->OperationStatus, Response->Get()->ExecutionStatus, Response->Get()->Issues); + } + } + + void Reply() { + Send(ReplyActorId, Response->Release()); + PassAway(); + } + + void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + Send(ReplyActorId, new TEvPrivate::TEvLeaseCheckResult(status, std::move(issues))); + PassAway(); + } + +private: + NActors::TActorId ReplyActorId; + TString Database; + TString ExecutionId; + ui64 Cookie; + TEvPrivate::TEvLeaseCheckResult::TPtr Response; }; class TForgetScriptExecutionOperationActor : public TQueryBase { @@ -1046,18 +996,17 @@ private: NYql::TIssues Issues; }; -class TGetScriptExecutionOperationQueryActor : public TScriptExecutionFinisherBase { +class TGetScriptExecutionOperationQueryActor : public TQueryBase { public: - TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId, bool finishIfLeaseExpired) + TGetScriptExecutionOperationQueryActor(const TString& database, const NOperationId::TOperationId& operationId) : Database(database) , OperationId(operationId) - , FinishIfLeaseExpired(finishIfLeaseExpired) , StartActorTime(TInstant::Now()) {} void OnRunQuery() override { TString sql = R"( - -- TGetScriptExecutionOperationActor::OnRunQuery + -- TGetScriptExecutionOperationQueryActor::OnRunQuery DECLARE $database AS Text; DECLARE $execution_id AS Text; @@ -1065,6 +1014,7 @@ public: run_script_actor_id, operation_status, execution_status, + finalization_status, query_text, syntax, execution_mode, @@ -1072,8 +1022,7 @@ public: plan, issues, stats, - ast, - meta + ast FROM `.metadata/script_executions` WHERE database = $database AND execution_id = $execution_id AND (expire_at > CurrentUtcTimestamp() OR expire_at IS NULL); @@ -1097,11 +1046,10 @@ public: .Utf8(ExecutionId) .Build(); - RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); - SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnGetInfo); + RunDataQuery(sql, ¶ms); } - void OnGetInfo() { + void OnQueryResult() override { if (ResultSets.size() != 2) { Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); return; @@ -1119,6 +1067,11 @@ public: OperationStatus = static_cast<Ydb::StatusIds::StatusCode>(*operationStatus); } + const TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32(); + if (finalizationStatus) { + FinalizationStatus = static_cast<EFinalizationStatus>(*finalizationStatus); + } + Metadata.set_execution_id(*ScriptExecutionIdFromOperation(OperationId)); const TMaybe<i32> executionStatus = result.ColumnParser("execution_status").GetOptionalInt32(); @@ -1191,19 +1144,6 @@ public: return; } - auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); - if (!serializedMeta) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation metainformation"); - return; - } - - const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); - if (!ttl) { - Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); - return; - } - - const auto [operationTtl, resultsTtl] = *ttl; deadlineResult.TryNextRow(); TMaybe<TInstant> leaseDeadline = deadlineResult.ColumnParser(0).GetOptionalTimestamp(); @@ -1212,136 +1152,108 @@ public: return; } - LeaseExpired = *leaseDeadline < StartActorTime; - if (LeaseExpired && FinishIfLeaseExpired) { - FinishScriptExecution(Database, Metadata.execution_id(), Ydb::StatusIds::ABORTED, Ydb::Query::EXEC_STATUS_ABORTED, operationTtl, resultsTtl, Issues); - SetQueryResultHandler(&TGetScriptExecutionOperationQueryActor::OnFinishOperation); + if (*leaseDeadline < StartActorTime) { + LeaseExpired = true; + FinalizationStatus = EFinalizationStatus::FS_ROLLBACK; } } - if (!LeaseExpired || !FinishIfLeaseExpired) { - CommitTransaction(); - } - } - - void OnFinishOperation() { - OperationStatus = Ydb::StatusIds::UNAVAILABLE; - Issues = LeaseExpiredIssues(); - Metadata.set_exec_status(Ydb::Query::EXEC_STATUS_ABORTED); - - Finish(Ydb::StatusIds::SUCCESS, std::move(Issues)); + Finish(); } void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { if (OperationStatus) { - TMaybe<google::protobuf::Any> metadata; - metadata.ConstructInPlace().PackFrom(Metadata); - Send(Owner, new TEvGetScriptExecutionOperationResponse(true, LeaseExpired, RunScriptActorId, *OperationStatus, std::move(Issues), std::move(metadata))); + Send(Owner, new TEvGetScriptExecutionOperationQueryResponse(true, LeaseExpired, FinalizationStatus, RunScriptActorId, ExecutionId, *OperationStatus, std::move(Issues), std::move(Metadata))); } else { - Send(Owner, new TEvGetScriptExecutionOperationResponse(false, LeaseExpired, RunScriptActorId, status, std::move(issues), Nothing())); + Send(Owner, new TEvGetScriptExecutionOperationQueryResponse(false, LeaseExpired, FinalizationStatus, RunScriptActorId, ExecutionId, status, std::move(issues), std::move(Metadata))); } } private: TString Database; NOperationId::TOperationId OperationId; - bool FinishIfLeaseExpired; TInstant StartActorTime; TString ExecutionId; - TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + std::optional<Ydb::StatusIds::StatusCode> OperationStatus; + std::optional<EFinalizationStatus> FinalizationStatus; bool LeaseExpired = false; TActorId RunScriptActorId; NYql::TIssues Issues; Ydb::Query::ExecuteScriptMetadata Metadata; }; -class TGetScriptExecutionOperationActor : public TActorBootstrapped<TGetScriptExecutionOperationActor> { - using TBase = TActorBootstrapped<TGetScriptExecutionOperationActor>; - - inline static const TDuration CHECK_ALIVE_REQUEST_TIMEOUT = TDuration::Seconds(60); - +class TGetScriptExecutionOperationActor : public TCheckLeaseStatusActorBase { public: explicit TGetScriptExecutionOperationActor(TEvGetScriptExecutionOperation::TPtr ev) : Request(std::move(ev)) {} - void Bootstrap() { - CreateGetScriptExecutionOperationQuery(false); + void OnBootstrap() override { + Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId)); Become(&TGetScriptExecutionOperationActor::StateFunc); } -private: - STRICT_STFUNC(StateFunc, - hFunc(TEvGetScriptExecutionOperationResponse, Handle); - hFunc(TEvCheckAliveResponse, Handle); - hFunc(TEvents::TEvWakeup, Handle); - hFunc(NActors::TEvents::TEvUndelivered, Handle); - hFunc(NActors::TEvInterconnect::TEvNodeDisconnected, Handle); - ) - - void CreateGetScriptExecutionOperationQuery(bool finishIfLeaseExpired) { - Register(new TGetScriptExecutionOperationQueryActor(Request->Get()->Database, Request->Get()->OperationId, finishIfLeaseExpired)); - } - - void CreateFinishScriptExecutionOperationQuery() { - if (!WaitFinishQuery) { - WaitFinishQuery = true; - CreateGetScriptExecutionOperationQuery(true); - } + void OnLeaseVerified() override { + Reply(); } - void Handle(TEvGetScriptExecutionOperationResponse::TPtr& ev) { - Response = std::move(ev); - - if (WaitFinishQuery || !Response->Get()->LeaseExpired) { - Reply(); + void OnScriptExecutionFinished(bool alreadyFinalized, Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (status != Ydb::StatusIds::SUCCESS) { + Reply(status, std::move(issues)); return; } - Schedule(CHECK_ALIVE_REQUEST_TIMEOUT, new TEvents::TEvWakeup()); - - NActors::TActorId runScriptActor = Response->Get()->RunScriptActorId; - ui64 flags = IEventHandle::FlagTrackDelivery; - if (runScriptActor.NodeId() != SelfId().NodeId()) { - flags |= IEventHandle::FlagSubscribeOnSession; - SubscribedOnSession = runScriptActor.NodeId(); + if (alreadyFinalized) { + // Final status and issues are unknown, the operation must be repeated + Response->Get()->Ready = false; + Response->Get()->Status = Ydb::StatusIds::SUCCESS; + Response->Get()->Issues.Clear(); + } else { + Response->Get()->Ready = true; + Response->Get()->Status = GetOperationStatus(); + Response->Get()->Issues = GetIssues(); + Response->Get()->Metadata.set_exec_status(GetExecStatus()); } - Send(runScriptActor, new TEvCheckAliveRequest(), flags); - } - void Handle(TEvCheckAliveResponse::TPtr&) { Reply(); } - void Handle(TEvents::TEvWakeup::TPtr&) { - CreateFinishScriptExecutionOperationQuery(); - } +private: + STRICT_STFUNC(StateFunc, + hFunc(TEvGetScriptExecutionOperationQueryResponse, Handle); + ) - void Handle(NActors::TEvents::TEvUndelivered::TPtr&) { - CreateFinishScriptExecutionOperationQuery(); - } + void Handle(TEvGetScriptExecutionOperationQueryResponse::TPtr& ev) { + Response = std::move(ev); - void Handle(NActors::TEvInterconnect::TEvNodeDisconnected::TPtr&) { - CreateFinishScriptExecutionOperationQuery(); + if (!Response->Get()->FinalizationStatus) { + Reply(); + } else if (Response->Get()->LeaseExpired) { + StartLeaseChecking(Response->Get()->RunScriptActorId, Response->Get()->ExecutionId, Request->Get()->Database); + } else { + TMaybe<Ydb::Query::ExecStatus> execStatus; + if (Response->Get()->Ready) { + execStatus = Response->Get()->Metadata.exec_status(); + } + StartScriptFinalization(*Response->Get()->FinalizationStatus, Response->Get()->ExecutionId, Request->Get()->Database, Response->Get()->Status, execStatus, Response->Get()->Issues); + } } void Reply() { - Send(Request->Sender, Response->Release().Release()); + TMaybe<google::protobuf::Any> metadata; + metadata.ConstructInPlace().PackFrom(Response->Get()->Metadata); + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(Response->Get()->Ready, Response->Get()->Status, std::move(Response->Get()->Issues), std::move(metadata))); PassAway(); } - void PassAway() override { - if (SubscribedOnSession) { - Send(TActivationContext::InterconnectProxy(*SubscribedOnSession), new TEvents::TEvUnsubscribe()); - } - TBase::PassAway(); + void Reply(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) { + Send(Request->Sender, new TEvGetScriptExecutionOperationResponse(status, std::move(issues))); + PassAway(); } private: TEvGetScriptExecutionOperation::TPtr Request; - TEvGetScriptExecutionOperationResponse::TPtr Response; - bool WaitFinishQuery = false; - TMaybe<ui32> SubscribedOnSession; + TEvGetScriptExecutionOperationQueryResponse::TPtr Response; }; class TListScriptExecutionOperationsQuery : public TQueryBase { @@ -1535,7 +1447,7 @@ public: if (!op.ready()) { Ydb::Query::ExecuteScriptMetadata metadata; op.metadata().UnpackTo(&metadata); - Register(new TCheckLeaseStatusActor(Request->Get()->Database, metadata.execution_id(), i)); + Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, metadata.execution_id(), i)); ++OperationsToCheck; } } @@ -1603,7 +1515,7 @@ public: ExecutionId = *executionId; Become(&TCancelScriptExecutionOperationActor::StateFunc); - Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); + Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId)); } STRICT_STFUNC(StateFunc, @@ -1648,7 +1560,7 @@ public: void Handle(NActors::TEvents::TEvUndelivered::TPtr& ev) { if (ev->Get()->Reason == NActors::TEvents::TEvUndelivered::ReasonActorUnknown) { // The actor probably had finished before our cancel message arrived. - Register(new TCheckLeaseStatusActor(Request->Get()->Database, ExecutionId)); // Check if the operation has finished. + Register(new TCheckLeaseStatusActor(SelfId(), Request->Get()->Database, ExecutionId)); // Check if the operation has finished. } else { Reply(Ydb::StatusIds::UNAVAILABLE, "Failed to deliver cancel request to destination"); } @@ -1880,6 +1792,16 @@ private: std::vector<TString> SerializedRows; }; +std::optional<std::pair<TDuration, TDuration>> GetTtlFromSerializedMeta(const TString& serializedMeta) { + NKikimrKqp::TScriptExecutionOperationMeta meta; + try { + NProtobufJson::Json2Proto(serializedMeta, meta, NProtobufJson::TJson2ProtoConfig()); + return std::pair(GetDuration(meta.GetOperationTtl()), GetDuration(meta.GetResultsTtl())); + } catch (const NJson::TJsonException& e) { + return std::nullopt; + } +} + class TGetScriptExecutionResultQuery : public TQueryBase { public: TGetScriptExecutionResultQuery(const TString& database, const TString& executionId, i32 resultSetIndex, i64 offset, i64 limit) @@ -2122,6 +2044,562 @@ private: THolder<TEvKqp::TEvFetchScriptResultsResponse> Response; }; +class TSaveScriptExternalEffectActor : public TQueryBase { +public: + explicit TSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev) + : Request(std::move(ev)) + {} + + void OnRunQuery() override { + TString sql = R"( + -- TSaveScriptExternalEffectActor::OnRunQuery + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $customer_supplied_id AS Text; + DECLARE $user_token AS Text; + DECLARE $script_sinks AS JsonDocument; + DECLARE $script_secret_names AS JsonDocument; + + UPDATE `.metadata/script_executions` + SET + customer_supplied_id = $customer_supplied_id, + user_token = $user_token, + script_sinks = $script_sinks, + script_secret_names = $script_secret_names + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Request->Get()->Database) + .Build() + .AddParam("$execution_id") + .Utf8(Request->Get()->ExecutionId) + .Build() + .AddParam("$customer_supplied_id") + .Utf8(Request->Get()->CustomerSuppliedId) + .Build() + .AddParam("$user_token") + .Utf8(Request->Get()->UserToken) + .Build() + .AddParam("$script_sinks") + .JsonDocument(SerializeSinks(Request->Get()->Sinks)) + .Build() + .AddParam("$script_secret_names") + .JsonDocument(SerializeSecretNames(Request->Get()->SecretNames)) + .Build(); + + RunDataQuery(sql, ¶ms); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + Send(Request->Sender, new TEvSaveScriptExternalEffectResponse(status, std::move(issues))); + } + +private: + static TString SerializeSinks(const std::vector<NKqpProto::TKqpExternalSink>& sinks) { + NJson::TJsonValue value; + value.SetType(NJson::EJsonValueType::JSON_ARRAY); + + NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe(); + jsonArray.resize(sinks.size()); + for (size_t i = 0; i < sinks.size(); ++i) { + NProtobufJson::Proto2Json(sinks[i], jsonArray[i], NProtobufJson::TProto2JsonConfig()); + } + + NJsonWriter::TBuf serializedSinks; + serializedSinks.WriteJsonValue(&value); + + return serializedSinks.Str(); + } + + static TString SerializeSecretNames(const std::vector<TString>& secretNames) { + NJson::TJsonValue value; + value.SetType(NJson::EJsonValueType::JSON_ARRAY); + + NJson::TJsonValue::TArray& jsonArray = value.GetArraySafe(); + jsonArray.resize(secretNames.size()); + for (size_t i = 0; i < secretNames.size(); ++i) { + jsonArray[i] = NJson::TJsonValue(secretNames[i]); + } + + NJsonWriter::TBuf serializedSecretNames; + serializedSecretNames.WriteJsonValue(&value); + + return serializedSecretNames.Str(); + } + +private: + TEvSaveScriptExternalEffectRequest::TPtr Request; +}; + +class TSaveScriptFinalStatusActor : public TQueryBase { +public: + explicit TSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev) + : Request(ev) + {} + + void OnRunQuery() override { + TString sql = R"( + -- TSaveScriptFinalStatusActor::OnRunQuery + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + SELECT + operation_status, + finalization_status, + meta, + customer_supplied_id, + user_token, + script_sinks, + script_secret_names + FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + + SELECT lease_generation + FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Request->Get()->Database) + .Build() + .AddParam("$execution_id") + .Utf8(Request->Get()->ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + SetQueryResultHandler(&TSaveScriptFinalStatusActor::OnGetInfo); + } + + void OnGetInfo() { + if (ResultSets.size() != 2) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::NOT_FOUND, "No such execution"); + return; + } + + result.TryNextRow(); + + TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32(); + if (finalizationStatus) { + if (Request->Get()->FinalizationStatus != *finalizationStatus) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Execution already have different finalization status"); + return; + } + ApplicateScriptExternalEffectRequired = true; + } + + TMaybe<i32> operationStatus = result.ColumnParser("operation_status").GetOptionalInt32(); + + if (Request->Get()->LeaseGeneration && !operationStatus) { + NYdb::TResultSetParser leaseResult(ResultSets[1]); + if (leaseResult.RowsCount() == 0) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected operation state"); + return; + } + + leaseResult.TryNextRow(); + + TMaybe<i64> leaseGenerationInDatabase = leaseResult.ColumnParser("lease_generation").GetOptionalInt64(); + if (!leaseGenerationInDatabase) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unknown lease generation"); + return; + } + + if (*Request->Get()->LeaseGeneration != static_cast<ui64>(*leaseGenerationInDatabase)) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Lease was lost"); + return; + } + } + + TMaybe<TString> customerSuppliedId = result.ColumnParser("customer_supplied_id").GetOptionalUtf8(); + if (customerSuppliedId) { + CustomerSuppliedId = *customerSuppliedId; + } + + TMaybe<TString> userToken = result.ColumnParser("user_token").GetOptionalUtf8(); + if (userToken) { + UserToken = *userToken; + } + + SerializedSinks = result.ColumnParser("script_sinks").GetOptionalJsonDocument(); + if (SerializedSinks) { + NJson::TJsonValue value; + if (!NJson::ReadJsonTree(*SerializedSinks, &value) || value.GetType() != NJson::JSON_ARRAY) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Script sinks are corrupted"); + return; + } + + for (auto i = 0; i < value.GetIntegerRobust(); i++) { + const NJson::TJsonValue* serializedSink; + value.GetValuePointer(i, &serializedSink); + + NKqpProto::TKqpExternalSink sink; + NProtobufJson::Json2Proto(*serializedSink, sink); + Sinks.push_back(sink); + } + } + + SerializedSecretNames = result.ColumnParser("script_secret_names").GetOptionalJsonDocument(); + if (SerializedSecretNames) { + NJson::TJsonValue value; + if (!NJson::ReadJsonTree(*SerializedSecretNames, &value) || value.GetType() != NJson::JSON_ARRAY) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Script secret names are corrupted"); + return; + } + + for (auto i = 0; i < value.GetIntegerRobust(); i++) { + const NJson::TJsonValue* serializedSecretName; + value.GetValuePointer(i, &serializedSecretName); + + SecretNames.push_back(serializedSecretName->GetString()); + } + } + + const auto serializedMeta = result.ColumnParser("meta").GetOptionalJsonDocument(); + if (!serializedMeta) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Missing operation meta information"); + return; + } + + const auto ttl = GetTtlFromSerializedMeta(*serializedMeta); + if (!ttl) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Metainformation is corrupted"); + return; + } + + Request->Get()->OperationTtl = ttl->first; + Request->Get()->ResultsTtl = ttl->second; + + if (operationStatus) { + FinalStatusAlreadySaved = true; + OperationAlreadyFinalized = !finalizationStatus; + CommitTransaction(); + return; + } + + ApplicateScriptExternalEffectRequired = ApplicateScriptExternalEffectRequired || HasExternalEffect(); + FinishScriptExecution(); + } + + void FinishScriptExecution() { + TString sql = R"( + -- TSaveScriptFinalStatusActor::FinishScriptExecution + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $operation_status AS Int32; + DECLARE $execution_status AS Int32; + DECLARE $finalization_status AS Int32; + DECLARE $issues AS JsonDocument; + DECLARE $plan AS JsonDocument; + DECLARE $stats AS JsonDocument; + DECLARE $ast AS Text; + DECLARE $operation_ttl AS Interval; + DECLARE $results_ttl AS Interval; + DECLARE $customer_supplied_id AS Text; + DECLARE $user_token AS Text; + DECLARE $script_sinks AS Optional<JsonDocument>; + DECLARE $script_secret_names AS Optional<JsonDocument>; + DECLARE $applicate_script_external_effect_required AS Bool; + + UPDATE `.metadata/script_executions` + SET + operation_status = $operation_status, + execution_status = $execution_status, + finalization_status = IF($applicate_script_external_effect_required, $finalization_status, NULL), + issues = $issues, + plan = $plan, + end_ts = CurrentUtcTimestamp(), + stats = $stats, + ast = $ast, + expire_at = IF($operation_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $operation_ttl, NULL), + customer_supplied_id = IF($applicate_script_external_effect_required, $customer_supplied_id, NULL), + user_token = IF($applicate_script_external_effect_required, $user_token, NULL), + script_sinks = IF($applicate_script_external_effect_required, $script_sinks, NULL), + script_secret_names = IF($applicate_script_external_effect_required, $script_secret_names, NULL) + WHERE database = $database AND execution_id = $execution_id; + + DELETE FROM `.metadata/script_execution_leases` + WHERE database = $database AND execution_id = $execution_id; + + UPDATE `.metadata/result_sets` + SET expire_at = IF($results_ttl > CAST(0 AS Interval), CurrentUtcTimestamp() + $results_ttl, NULL) + WHERE database = $database AND execution_id = $execution_id; + )"; + + TString serializedStats = "{}"; + if (Request->Get()->QueryStats) { + NJson::TJsonValue statsJson; + Ydb::TableStats::QueryStats queryStats; + NGRpcService::FillQueryStats(queryStats, *Request->Get()->QueryStats); + NProtobufJson::Proto2Json(queryStats, statsJson, NProtobufJson::TProto2JsonConfig()); + serializedStats = NJson::WriteJson(statsJson); + } + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Request->Get()->Database) + .Build() + .AddParam("$execution_id") + .Utf8(Request->Get()->ExecutionId) + .Build() + .AddParam("$operation_status") + .Int32(Request->Get()->OperationStatus) + .Build() + .AddParam("$execution_status") + .Int32(Request->Get()->ExecStatus) + .Build() + .AddParam("$finalization_status") + .Int32(Request->Get()->FinalizationStatus) + .Build() + .AddParam("$issues") + .JsonDocument(SerializeIssues(Request->Get()->Issues)) + .Build() + .AddParam("$plan") + .JsonDocument(Request->Get()->QueryPlan.value_or("{}")) + .Build() + .AddParam("$stats") + .JsonDocument(serializedStats) + .Build() + .AddParam("$ast") + .Utf8(Request->Get()->QueryAst.value_or("")) + .Build() + .AddParam("$operation_ttl") + .Interval(static_cast<i64>(Request->Get()->OperationTtl.MicroSeconds())) + .Build() + .AddParam("$results_ttl") + .Interval(static_cast<i64>(Request->Get()->ResultsTtl.MicroSeconds())) + .Build() + .AddParam("$customer_supplied_id") + .Utf8(CustomerSuppliedId) + .Build() + .AddParam("$user_token") + .Utf8(UserToken) + .Build() + .AddParam("$script_sinks") + .OptionalJsonDocument(SerializedSinks) + .Build() + .AddParam("$script_secret_names") + .OptionalJsonDocument(SerializedSecretNames) + .Build() + .AddParam("$applicate_script_external_effect_required") + .Bool(ApplicateScriptExternalEffectRequired) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); + SetQueryResultHandler(&TSaveScriptFinalStatusActor::OnQueryResult); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (!FinalStatusAlreadySaved) { + KQP_PROXY_LOG_D("Finish script execution operation. ExecutionId: " << Request->Get()->ExecutionId + << ". " << Ydb::StatusIds::StatusCode_Name(Request->Get()->OperationStatus) + << ". Issues: " << Request->Get()->Issues.ToOneLineString() << ". Plan: " << Request->Get()->QueryPlan.value_or("")); + } + + if (!ApplicateScriptExternalEffectRequired || status != Ydb::StatusIds::SUCCESS) { + Send(Owner, new TEvScriptExecutionFinished(OperationAlreadyFinalized, status, issues)); + return; + } + + auto response = std::make_unique<TEvSaveScriptFinalStatusResponse>(CustomerSuppliedId, UserToken); + response->Sinks = std::move(Sinks); + response->SecretNames = std::move(SecretNames); + + Send(Owner, response.release()); + } + +private: + bool HasExternalEffect() const { + return !Sinks.empty(); + } + +private: + TEvScriptFinalizeRequest::TPtr Request; + + bool OperationAlreadyFinalized = false; + bool FinalStatusAlreadySaved = false; + bool ApplicateScriptExternalEffectRequired = false; + + TString CustomerSuppliedId; + TString UserToken; + TMaybe<TString> SerializedSinks; + std::vector<NKqpProto::TKqpExternalSink> Sinks; + TMaybe<TString> SerializedSecretNames; + std::vector<TString> SecretNames; +}; + +class TScriptFinalizationFinisherActor : public TQueryBase { +public: + TScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues) + : ExecutionId(executionId) + , Database(database) + , OperationStatus(operationStatus) + , OperationIssues(std::move(operationIssues)) + {} + + void OnRunQuery() override { + TString sql = R"( + -- TScriptFinalizationFinisherActor::OnRunQuery + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + SELECT finalization_status + FROM `.metadata/script_executions` + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::BeginTx()); + SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnGetInfo); + } + + void OnGetInfo() { + if (ResultSets.size() != 1) { + Finish(Ydb::StatusIds::INTERNAL_ERROR, "Unexpected database response"); + return; + } + + NYdb::TResultSetParser result(ResultSets[0]); + if (result.RowsCount() == 0) { + Finish(Ydb::StatusIds::NOT_FOUND, "No such execution"); + return; + } + + result.TryNextRow(); + + TMaybe<i32> finalizationStatus = result.ColumnParser("finalization_status").GetOptionalInt32(); + if (!finalizationStatus) { + Finish(Ydb::StatusIds::PRECONDITION_FAILED, "Already finished"); + return; + } + + if (OperationStatus) { + UpdateOperationFinalStatus(); + } else { + UpdateOnlyFinalizationStatus(); + } + } + + void UpdateOperationFinalStatus() { + TString sql = R"( + -- TScriptFinalizationFinisherActor::UpdateOperationFinalStatus + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + DECLARE $operation_status AS Int32; + DECLARE $execution_status AS Int32; + DECLARE $issues AS JsonDocument; + + UPDATE `.metadata/script_executions` + SET + operation_status = $operation_status, + execution_status = $execution_status, + finalization_status = NULL, + issues = $issues, + customer_supplied_id = NULL, + user_token = NULL, + script_sinks = NULL, + script_secret_names = NULL + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$database") + .Utf8(Database) + .Build() + .AddParam("$operation_status") + .Int32(*OperationStatus) + .Build() + .AddParam("$execution_status") + .Int32(Ydb::Query::EXEC_STATUS_FAILED) + .Build() + .AddParam("$issues") + .JsonDocument(SerializeIssues(OperationIssues)) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); + SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnQueryResult); + } + + void UpdateOnlyFinalizationStatus() { + TString sql = R"( + -- TScriptFinalizationFinisherActor::UpdateOnlyFinalizationStatus + DECLARE $database AS Text; + DECLARE $execution_id AS Text; + + UPDATE `.metadata/script_executions` + SET + finalization_status = NULL, + customer_supplied_id = NULL, + user_token = NULL, + script_sinks = NULL, + script_secret_names = NULL + WHERE database = $database AND execution_id = $execution_id; + )"; + + NYdb::TParamsBuilder params; + params + .AddParam("$execution_id") + .Utf8(ExecutionId) + .Build() + .AddParam("$database") + .Utf8(Database) + .Build(); + + RunDataQuery(sql, ¶ms, TTxControl::ContinueAndCommitTx()); + SetQueryResultHandler(&TScriptFinalizationFinisherActor::OnQueryResult); + } + + void OnQueryResult() override { + Finish(); + } + + void OnFinish(Ydb::StatusIds::StatusCode status, NYql::TIssues&& issues) override { + if (OperationStatus) { + OperationIssues.AddIssues(std::move(issues)); + Send(Owner, new TEvScriptExecutionFinished(status != Ydb::StatusIds::SUCCESS, *OperationStatus, std::move(OperationIssues))); + } else { + Send(Owner, new TEvScriptExecutionFinished(status != Ydb::StatusIds::SUCCESS, Ydb::StatusIds::SUCCESS, std::move(issues))); + } + } + +private: + TString ExecutionId; + TString Database; + std::optional<Ydb::StatusIds::StatusCode> OperationStatus; + NYql::TIssues OperationIssues; +}; + } // anonymous namespace NActors::IActor* CreateScriptExecutionCreatorActor(TEvKqp::TEvScriptRequest::TPtr&& ev, const NKikimrConfig::TQueryServiceConfig& queryServiceConfig, TIntrusivePtr<TKqpCounters> counters, TDuration maxRunTime) { @@ -2132,21 +2610,6 @@ NActors::IActor* CreateScriptExecutionsTablesCreator(THolder<NActors::IEventBase return new TScriptExecutionsTablesCreator(std::move(resultEvent)); } -NActors::IActor* CreateScriptExecutionFinisher( - const TString& executionId, - const TString& database, - ui64 leaseGeneration, - Ydb::StatusIds::StatusCode operationStatus, - Ydb::Query::ExecStatus execStatus, - NYql::TIssues issues, - TMaybe<NKqpProto::TKqpStatsQuery> queryStats, - TMaybe<TString> queryPlan, - TMaybe<TString> queryAst - ) -{ - return new TScriptExecutionFinisher(executionId, database, leaseGeneration, operationStatus, execStatus, std::move(issues), std::move(queryStats), std::move(queryPlan), std::move(queryAst)); -} - NActors::IActor* CreateForgetScriptExecutionOperationActor(TEvForgetScriptExecutionOperation::TPtr ev) { return new TForgetScriptExecutionOperationActor(std::move(ev)); } @@ -2179,14 +2642,26 @@ NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& re return new TGetScriptExecutionResultActor(replyActorId, database, executionId, resultSetIndex, offset, limit); } +NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev) { + return new TSaveScriptExternalEffectActor(std::move(ev)); +} + +NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev) { + return new TSaveScriptFinalStatusActor(std::move(ev)); +} + +NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues) { + return new TScriptFinalizationFinisherActor(executionId, database, operationStatus, std::move(operationIssues)); +} + namespace NPrivate { NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionId, const NActors::TActorId& runScriptActorId, const NKikimrKqp::TEvQueryRequest& record, TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration) { return new TCreateScriptOperationQuery(executionId, runScriptActorId, record, operationTtl, resultsTtl, leaseDuration); } -NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie) { - return new TCheckLeaseStatusActor(database, executionId, cookie); +NActors::IActor* CreateCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie) { + return new TCheckLeaseStatusActor(replyActorId, database, executionId, cookie); } } // namespace NPrivate diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions.h b/ydb/core/kqp/proxy_service/kqp_script_executions.h index cda7cbe179..e0f526612d 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions.h @@ -23,19 +23,6 @@ NActors::IActor* CreateGetScriptExecutionOperationActor(TEvGetScriptExecutionOpe NActors::IActor* CreateListScriptExecutionOperationsActor(TEvListScriptExecutionOperations::TPtr ev); NActors::IActor* CreateCancelScriptExecutionOperationActor(TEvCancelScriptExecutionOperation::TPtr ev); -// Updates status in database. -NActors::IActor* CreateScriptExecutionFinisher( - const TString& executionId, - const TString& database, - ui64 leaseGeneration, - Ydb::StatusIds::StatusCode operationStatus, - Ydb::Query::ExecStatus execStatus, - NYql::TIssues issues, - TMaybe<NKqpProto::TKqpStatsQuery> queryStats = Nothing(), - TMaybe<TString> queryPlan = Nothing(), - TMaybe<TString> queryAst = Nothing() -); - // Updates lease deadline in database. NActors::IActor* CreateScriptLeaseUpdateActor(const TActorId& runScriptActorId, const TString& database, const TString& executionId, TDuration leaseDuration, TIntrusivePtr<TKqpCounters> counters); @@ -44,4 +31,9 @@ NActors::IActor* CreateSaveScriptExecutionResultMetaActor(const NActors::TActorI NActors::IActor* CreateSaveScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, TInstant expireAt, i64 firstRow, std::vector<TString>&& serializedRows); NActors::IActor* CreateGetScriptExecutionResultActor(const NActors::TActorId& runScriptActorId, const TString& database, const TString& executionId, i32 resultSetId, i64 offset, i64 limit); +// Compute external effects and updates status in database +NActors::IActor* CreateSaveScriptExternalEffectActor(TEvSaveScriptExternalEffectRequest::TPtr ev); +NActors::IActor* CreateSaveScriptFinalStatusActor(TEvScriptFinalizeRequest::TPtr ev); +NActors::IActor* CreateScriptFinalizationFinisherActor(const TString& executionId, const TString& database, std::optional<Ydb::StatusIds::StatusCode> operationStatus, NYql::TIssues operationIssues); + } // namespace NKikimr::NKqp diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h index 5a33c23802..27592314b6 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_impl.h @@ -45,26 +45,33 @@ struct TEvPrivate { TEvLeaseCheckResult(Ydb::StatusIds::StatusCode statusCode, NYql::TIssues&& issues) : Status(statusCode) , Issues(std::move(issues)) + , LeaseExpired(false) { } TEvLeaseCheckResult(TMaybe<Ydb::StatusIds::StatusCode> operationStatus, TMaybe<Ydb::Query::ExecStatus> executionStatus, TMaybe<NYql::TIssues> operationIssues, - const NActors::TActorId& runScriptActorId) + const NActors::TActorId& runScriptActorId, + bool leaseExpired, + TMaybe<EFinalizationStatus> finalizationStatus) : Status(Ydb::StatusIds::SUCCESS) , OperationStatus(operationStatus) , ExecutionStatus(executionStatus) , OperationIssues(operationIssues) , RunScriptActorId(runScriptActorId) + , LeaseExpired(leaseExpired) + , FinalizationStatus(finalizationStatus) {} const Ydb::StatusIds::StatusCode Status; const NYql::TIssues Issues; - const TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; - const TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; - const TMaybe<NYql::TIssues> OperationIssues; + TMaybe<Ydb::StatusIds::StatusCode> OperationStatus; + TMaybe<Ydb::Query::ExecStatus> ExecutionStatus; + TMaybe<NYql::TIssues> OperationIssues; const NActors::TActorId RunScriptActorId; + const bool LeaseExpired; + const TMaybe<EFinalizationStatus> FinalizationStatus; }; }; @@ -74,6 +81,6 @@ NActors::IActor* CreateCreateScriptOperationQueryActor(const TString& executionI TDuration operationTtl, TDuration resultsTtl, TDuration leaseDuration = TDuration::Zero()); // Checks lease of execution, finishes execution if its lease is off, returns current status -NActors::IActor* CreateCheckLeaseStatusActor(const TString& database, const TString& executionId, ui64 cookie = 0); +NActors::IActor* CreateCheckLeaseStatusActor(const NActors::TActorId& replyActorId, const TString& database, const TString& executionId, ui64 cookie = 0); } // namespace NKikimr::NKqp::NPrivate diff --git a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp index cd4eec4f4b..e3de144911 100644 --- a/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp +++ b/ydb/core/kqp/proxy_service/kqp_script_executions_ut.cpp @@ -189,7 +189,7 @@ struct TScriptExecutionsYdbSetup { NPrivate::TEvPrivate::TEvLeaseCheckResult::TPtr CheckLeaseStatus(const TString& executionId) { const ui32 node = 0; TActorId edgeActor = GetRuntime()->AllocateEdgeActor(node); - GetRuntime()->Register(NPrivate::CreateCheckLeaseStatusActor(TestDatabase, executionId), 0, 0, TMailboxType::Simple, 0, edgeActor); + GetRuntime()->Register(NPrivate::CreateCheckLeaseStatusActor(edgeActor, TestDatabase, executionId)); auto reply = GetRuntime()->GrabEdgeEvent<NPrivate::TEvPrivate::TEvLeaseCheckResult>(edgeActor); UNIT_ASSERT(reply->Get()->Status == Ydb::StatusIds::SUCCESS); diff --git a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp index 99ad58c00a..fe97c82bce 100644 --- a/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp +++ b/ydb/core/kqp/run_script_actor/kqp_run_script_actor.cpp @@ -128,6 +128,7 @@ private: auto ev = MakeHolder<TEvKqp::TEvQueryRequest>(); ev->Record = Request; ev->Record.MutableRequest()->SetSessionId(SessionId); + ev->SetUserRequestContext(MakeIntrusive<TUserRequestContext>(Request.GetTraceId(), Database, SessionId, ExecutionId, Request.GetTraceId())); NActors::ActorIdToProto(SelfId(), ev->Record.MutableRequestActorId()); @@ -170,10 +171,17 @@ private: } void RunScriptExecutionFinisher() { + if (RunState == ERunState::Cancelling) { + Issues.AddIssue("Script execution is cancelled"); + } + if (!FinalStatusIsSaved) { FinalStatusIsSaved = true; - Register(CreateScriptExecutionFinisher(ExecutionId, Database, LeaseGeneration, Status, GetExecStatusFromStatusCode(Status), - Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst))); + auto scriptFinalizeRequest = std::make_unique<TEvScriptFinalizeRequest>( + GetFinalizationStatusFromRunState(), ExecutionId, Database, Status, GetExecStatusFromStatusCode(Status), + Issues, std::move(QueryStats), std::move(QueryPlan), std::move(QueryAst), LeaseGeneration + ); + Send(MakeKqpFinalizeScriptServiceId(SelfId().NodeId()), scriptFinalizeRequest.release()); return; } @@ -439,6 +447,12 @@ private: } } + EFinalizationStatus GetFinalizationStatusFromRunState() const { + if (Status == Ydb::StatusIds::SUCCESS && (RunState == ERunState::Finishing || RunState == ERunState::Finished)) { + return EFinalizationStatus::FS_COMMIT; + } + return EFinalizationStatus::FS_ROLLBACK; + } void CheckInflight() { if (Status == Ydb::StatusIds::STATUS_CODE_UNSPECIFIED || (Status == Ydb::StatusIds::SUCCESS && RunState == ERunState::Finishing && (SaveResultMetaInflight || SaveResultInflight))) { @@ -451,10 +465,6 @@ private: } else { FinishAfterLeaseUpdate = true; } - - if (RunState == ERunState::Cancelling) { - Issues.AddIssue("Script execution is cancelled"); - } } void Finish(Ydb::StatusIds::StatusCode status, ERunState runState = ERunState::Finishing) { @@ -513,9 +523,9 @@ private: ui32 SaveResultInflight = 0; ui32 SaveResultMetaInflight = 0; bool PendingResultMeta = false; - TMaybe<TString> QueryPlan; - TMaybe<TString> QueryAst; - TMaybe<NKqpProto::TKqpStatsQuery> QueryStats; + std::optional<TString> QueryPlan; + std::optional<TString> QueryAst; + std::optional<NKqpProto::TKqpStatsQuery> QueryStats; }; } // namespace diff --git a/ydb/core/kqp/session_actor/kqp_query_state.h b/ydb/core/kqp/session_actor/kqp_query_state.h index 26e84891f4..6dcc44118c 100644 --- a/ydb/core/kqp/session_actor/kqp_query_state.h +++ b/ydb/core/kqp/session_actor/kqp_query_state.h @@ -61,7 +61,11 @@ public: KqpSessionSpan = NWilson::TSpan( TWilsonKqp::KqpSession, std::move(traceId), "Session.query." + NKikimrKqp::EQueryAction_Name(action), NWilson::EFlags::AUTO_END); - UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId); + if (RequestEv->GetUserRequestContext()) { + UserRequestContext = RequestEv->GetUserRequestContext(); + } else { + UserRequestContext = MakeIntrusive<TUserRequestContext>(RequestEv->GetTraceId(), Database, sessionId); + } } // the monotonously growing counter, the ordinal number of the query, diff --git a/ydb/core/kqp/ya.make b/ydb/core/kqp/ya.make index 0c70ffb04c..a45a8d67cd 100644 --- a/ydb/core/kqp/ya.make +++ b/ydb/core/kqp/ya.make @@ -62,6 +62,7 @@ RECURSE( executer_actor expr_nodes federated_query + finalize_script_service gateway host node_service diff --git a/ydb/core/protos/config.proto b/ydb/core/protos/config.proto index 77629404d0..5e5e874d98 100644 --- a/ydb/core/protos/config.proto +++ b/ydb/core/protos/config.proto @@ -1013,6 +1013,11 @@ message TTableProfilesConfig { repeated TCachingPolicy CachingPolicies = 7; } +message TFinalizeScriptServiceConfig { + optional uint64 ScriptFinalizationTimeoutSeconds = 1 [default = 60]; + optional uint32 MaxInFlightFinalizationsCount = 2 [default = 10]; +}; + message TQueryServiceConfig { optional uint64 ScriptOperationTimeoutDefaultSeconds = 1 [default = 604800]; // default = 1 week optional uint64 ScriptForgetAfterDefaultSeconds = 2 [default = 31536000]; // default = 1 year; 0 = infinity @@ -1026,6 +1031,7 @@ message TQueryServiceConfig { optional string MdbGateway = 9 [deprecated=true]; optional bool MdbTransformHost = 10; optional NYql.TGenericGatewayConfig Generic = 11; + optional TFinalizeScriptServiceConfig FinalizeScriptServiceConfig = 12; } // Config describes immediate controls and allows diff --git a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt index 196f5b664a..374379e37b 100644 --- a/ydb/core/testlib/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.darwin-x86_64.txt @@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-keyvalue ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.linux-aarch64.txt b/ydb/core/testlib/CMakeLists.linux-aarch64.txt index 80814daf46..43e9c7e52c 100644 --- a/ydb/core/testlib/CMakeLists.linux-aarch64.txt +++ b/ydb/core/testlib/CMakeLists.linux-aarch64.txt @@ -49,6 +49,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-keyvalue ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.linux-x86_64.txt b/ydb/core/testlib/CMakeLists.linux-x86_64.txt index 80814daf46..43e9c7e52c 100644 --- a/ydb/core/testlib/CMakeLists.linux-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.linux-x86_64.txt @@ -49,6 +49,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-keyvalue ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/CMakeLists.windows-x86_64.txt b/ydb/core/testlib/CMakeLists.windows-x86_64.txt index 196f5b664a..374379e37b 100644 --- a/ydb/core/testlib/CMakeLists.windows-x86_64.txt +++ b/ydb/core/testlib/CMakeLists.windows-x86_64.txt @@ -48,6 +48,7 @@ target_link_libraries(ydb-core-testlib PUBLIC ydb-core-keyvalue ydb-core-kqp core-kqp-federated_query + core-kqp-finalize_script_service ydb-core-metering ydb-core-mind core-mind-address_classification diff --git a/ydb/core/testlib/test_client.cpp b/ydb/core/testlib/test_client.cpp index 786808273e..a7a923cfbb 100644 --- a/ydb/core/testlib/test_client.cpp +++ b/ydb/core/testlib/test_client.cpp @@ -57,6 +57,7 @@ #include <ydb/core/kqp/common/kqp.h> #include <ydb/core/kqp/rm_service/kqp_rm_service.h> #include <ydb/core/kqp/proxy_service/kqp_proxy_service.h> +#include <ydb/core/kqp/finalize_script_service/kqp_finalize_script_service.h> #include <ydb/core/metering/metering.h> #include <ydb/library/services/services.pb.h> #include <ydb/core/tablet_flat/tablet_flat_executed.h> @@ -889,6 +890,10 @@ namespace Tests { federatedQuerySetupFactory); TActorId kqpProxyServiceId = Runtime->Register(kqpProxyService, nodeIdx); Runtime->RegisterService(NKqp::MakeKqpProxyID(Runtime->GetNodeId(nodeIdx)), kqpProxyServiceId, nodeIdx); + + IActor* scriptFinalizeService = NKqp::CreateKqpFinalizeScriptService(Settings->AppConfig.GetQueryServiceConfig().GetFinalizeScriptServiceConfig(), Settings->AppConfig.GetMetadataProviderConfig(), federatedQuerySetupFactory); + TActorId scriptFinalizeServiceId = Runtime->Register(scriptFinalizeService, nodeIdx); + Runtime->RegisterService(NKqp::MakeKqpFinalizeScriptServiceId(Runtime->GetNodeId(nodeIdx)), scriptFinalizeServiceId, nodeIdx); } { diff --git a/ydb/core/testlib/ya.make b/ydb/core/testlib/ya.make index af028b07f5..1df7391d65 100644 --- a/ydb/core/testlib/ya.make +++ b/ydb/core/testlib/ya.make @@ -52,6 +52,7 @@ PEERDIR( ydb/core/keyvalue ydb/core/kqp ydb/core/kqp/federated_query + ydb/core/kqp/finalize_script_service ydb/core/metering ydb/core/mind ydb/core/mind/address_classification diff --git a/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp b/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp index 69f8a577b7..37ae5af9f3 100644 --- a/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp +++ b/ydb/library/yql/providers/common/structured_token/yql_token_builder.cpp @@ -45,7 +45,7 @@ TStructuredTokenBuilder& TStructuredTokenBuilder::SetNoAuth() { return *this; } -TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const TMap<TString, TString> secrets) { +TStructuredTokenBuilder& TStructuredTokenBuilder::ReplaceReferences(const std::map<TString, TString>& secrets) { if (Data.HasField("basic_password_ref")) { auto reference = Data.GetField("basic_password_ref"); Data.ClearField("basic_password_ref"); diff --git a/ydb/library/yql/providers/common/structured_token/yql_token_builder.h b/ydb/library/yql/providers/common/structured_token/yql_token_builder.h index 0661cec10e..c9f24b67c0 100644 --- a/ydb/library/yql/providers/common/structured_token/yql_token_builder.h +++ b/ydb/library/yql/providers/common/structured_token/yql_token_builder.h @@ -18,7 +18,7 @@ public: TStructuredTokenBuilder& SetBasicAuthWithSecret(const TString& login, const TString& passwordReference); TStructuredTokenBuilder& SetIAMToken(const TString& token); TStructuredTokenBuilder& SetNoAuth(); - TStructuredTokenBuilder& ReplaceReferences(const TMap<TString, TString> secrets); + TStructuredTokenBuilder& ReplaceReferences(const std::map<TString, TString>& secrets); TStructuredTokenBuilder& RemoveSecrets(); TString ToJson() const; diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp index 7e170a8ec3..d56fe3c5af 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp +++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.cpp @@ -204,7 +204,7 @@ public: IHTTPGateway::TPtr gateway, const TString& queryId, const TString& jobId, - ui32 restartNumber, + std::optional<ui32> restartNumber, bool commit, const THashMap<TString, TString>& secureParams, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, @@ -213,7 +213,7 @@ public: , Gateway(gateway) , QueryId(queryId) , KeyPrefix(jobId + "_") - , KeySubPrefix(ToString(restartNumber) + "_") + , KeySubPrefix(restartNumber ? ToString(*restartNumber) + "_" : "") , Commit(commit) , SecureParams(secureParams) , CredentialsFactory(credentialsFactory) @@ -326,6 +326,11 @@ public: } void Finish(bool fatal = false, const TString& message = "") { + if (ApplicationFinished) { + return; + } + ApplicationFinished = true; + if (message) { Issues.AddIssue(TIssue(message)); } @@ -442,9 +447,15 @@ public: void Process(TEvPrivate::TEvAbortMultipartUpload::TPtr& ev) { auto& result = ev->Get()->Result; - if (!result.Issues && result.Content.HttpResponseCode >= 200 && result.Content.HttpResponseCode < 300) { - LOG_D("AbortMultipartUpload SUCCESS " << ev->Get()->State->BuildUrl()); - return; + if (!result.Issues) { + if (result.Content.HttpResponseCode == 404) { + LOG_W("AbortMultipartUpload NOT FOUND " << ev->Get()->State->BuildUrl() << " (may be aborted already)"); + return; + } + if (result.Content.HttpResponseCode >= 200 && result.Content.HttpResponseCode < 300) { + LOG_D("AbortMultipartUpload SUCCESS " << ev->Get()->State->BuildUrl()); + return; + } } LOG_D("AbortMultipartUpload ERROR " << ev->Get()->State->BuildUrl()); if (RetryOperation(result.CurlResponseCode, result.Content.HttpResponseCode)) { @@ -572,6 +583,7 @@ private: THashSet<TString> CommitUploads; NYql::TIssues Issues; std::queue<TObjectStorageRequest> RequestQueue; + bool ApplicationFinished = false; }; } // namespace @@ -581,7 +593,7 @@ THolder<NActors::IActor> MakeS3ApplicatorActor( IHTTPGateway::TPtr gateway, const TString& queryId, const TString& jobId, - ui32 restartNumber, + std::optional<ui32> restartNumber, bool commit, const THashMap<TString, TString>& secureParams, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h index f4cfd53b7c..45e44e602f 100644 --- a/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h +++ b/ydb/library/yql/providers/s3/actors/yql_s3_applicator_actor.h @@ -13,7 +13,7 @@ THolder<NActors::IActor> MakeS3ApplicatorActor( IHTTPGateway::TPtr gateway, const TString& queryId, const TString& jobId, - ui32 restartNumber, + std::optional<ui32> restartNumber, bool commit, const THashMap<TString, TString>& secureParams, ISecuredServiceAccountCredentialsFactory::TPtr credentialsFactory, diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp index 03b76d30b8..4936f48f45 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp +++ b/ydb/library/yql/providers/s3/provider/yql_s3_dq_integration.cpp @@ -407,7 +407,7 @@ public: sinkDesc.SetCompression(TString(compression)); sinkDesc.SetMultipart(GetMultipart(settings.Settings().Ref())); - sinkDesc.SetAtomicUploadCommit(State_->Configuration->AtomicUploadCommit.Get().GetOrElse(false)); + sinkDesc.SetAtomicUploadCommit(State_->Configuration->AllowAtomicUploadCommit && State_->Configuration->AtomicUploadCommit.Get().GetOrElse(false)); protoSettings.PackFrom(sinkDesc); sinkType = "S3Sink"; diff --git a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h index 4f4a8734d6..2d19d65582 100644 --- a/ydb/library/yql/providers/s3/provider/yql_s3_settings.h +++ b/ydb/library/yql/providers/s3/provider/yql_s3_settings.h @@ -61,6 +61,7 @@ struct TS3Configuration : public TS3Settings, public NCommon::TSettingDispatcher ui64 GeneratorPathsLimit = 0; bool WriteThroughDqIntegration = false; ui64 MaxListingResultSizePerPhysicalPartition; + bool AllowAtomicUploadCommit = true; }; } // NYql diff --git a/ydb/tests/tools/kqprun/kqprun.cpp b/ydb/tests/tools/kqprun/kqprun.cpp index 3594b60772..325c38c760 100644 --- a/ydb/tests/tools/kqprun/kqprun.cpp +++ b/ydb/tests/tools/kqprun/kqprun.cpp @@ -17,6 +17,8 @@ struct TExecutionOptions { TString SchemeQuery; NKikimrKqp::EQueryAction ScriptQueryAction = NKikimrKqp::QUERY_ACTION_EXECUTE; + + TString ScriptTraceId = "kqprun"; }; @@ -35,7 +37,7 @@ void RunScript(const TExecutionOptions& executionOptions, const NKqpRun::TRunner if (executionOptions.ScriptQuery) { Cout << colors.Yellow() << "Executing script..." << colors.Default() << Endl; - if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction)) { + if (!runner.ExecuteScript(executionOptions.ScriptQuery, executionOptions.ScriptQueryAction, executionOptions.ScriptTraceId)) { ythrow yexception() << "Script execution failed"; } } diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.cpp b/ydb/tests/tools/kqprun/src/kqp_runner.cpp index 1d118a93d0..7677fd5eee 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.cpp +++ b/ydb/tests/tools/kqprun/src/kqp_runner.cpp @@ -37,8 +37,8 @@ public: return true; } - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action) { - TRequestResult status = YdbSetup_.ScriptQueryRequest(script, action, ExecutionOperation_); + bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) { + TRequestResult status = YdbSetup_.ScriptQueryRequest(script, action, traceId, ExecutionOperation_); if (!status.IsSuccess()) { Cerr << CerrColors_.Red() << "Failed to start script execution, reason:" << CerrColors_.Default() << Endl << status.ToString() << Endl; @@ -124,8 +124,8 @@ bool TKqpRunner::ExecuteSchemeQuery(const TString& query) const { return Impl_->ExecuteSchemeQuery(query); } -bool TKqpRunner::ExecuteScript(const TString& query, NKikimrKqp::EQueryAction action) const { - return Impl_->ExecuteScript(query, action); +bool TKqpRunner::ExecuteScript(const TString& query, NKikimrKqp::EQueryAction action, const TString& traceId) const { + return Impl_->ExecuteScript(query, action, traceId); } bool TKqpRunner::WriteScriptResults() const { diff --git a/ydb/tests/tools/kqprun/src/kqp_runner.h b/ydb/tests/tools/kqprun/src/kqp_runner.h index 233cdd0475..09d3c1c657 100644 --- a/ydb/tests/tools/kqprun/src/kqp_runner.h +++ b/ydb/tests/tools/kqprun/src/kqp_runner.h @@ -11,7 +11,7 @@ public: bool ExecuteSchemeQuery(const TString& query) const; - bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action) const; + bool ExecuteScript(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const; bool WriteScriptResults() const; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.cpp b/ydb/tests/tools/kqprun/src/ydb_setup.cpp index 31ff8f78b2..ff105e1655 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.cpp +++ b/ydb/tests/tools/kqprun/src/ydb_setup.cpp @@ -147,8 +147,10 @@ public: return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvQueryRequest, NKikimr::NKqp::TEvKqp::TEvQueryResponse>(std::move(event)); } - NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action) const { + NKikimr::NKqp::TEvKqp::TEvScriptResponse::TPtr ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId) const { auto event = MakeHolder<NKikimr::NKqp::TEvKqp::TEvScriptRequest>(); + event->Record.SetTraceId(traceId); + FillScriptRequest(script, action, *event->Record.MutableRequest()); return RunKqpProxyRequest<NKikimr::NKqp::TEvKqp::TEvScriptRequest, NKikimr::NKqp::TEvKqp::TEvScriptResponse>(std::move(event)); @@ -172,11 +174,6 @@ public: return GetRuntime()->GrabEdgeEvent<NKikimr::NKqp::TEvKqp::TEvFetchScriptResultsResponse>(edgeActor); } - ~TImpl() { - Server_.Reset(); - Client_.Reset(); - } - private: NActors::TTestActorRuntime* GetRuntime() const { return Server_->GetRuntime(); @@ -262,8 +259,8 @@ TRequestResult TYdbSetup::SchemeQueryRequest(const TString& query, TSchemeMeta& return TRequestResult(schemeQueryOperationResponse.GetYdbStatus(), issues); } -TRequestResult TYdbSetup::ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, TString& operation) const { - auto scriptExecutionOperation = Impl_->ScriptQueryRequest(script, action); +TRequestResult TYdbSetup::ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const { + auto scriptExecutionOperation = Impl_->ScriptQueryRequest(script, action, traceId); operation = scriptExecutionOperation->Get()->OperationId; diff --git a/ydb/tests/tools/kqprun/src/ydb_setup.h b/ydb/tests/tools/kqprun/src/ydb_setup.h index c918ef25e3..3e22389b85 100644 --- a/ydb/tests/tools/kqprun/src/ydb_setup.h +++ b/ydb/tests/tools/kqprun/src/ydb_setup.h @@ -43,7 +43,7 @@ public: TRequestResult SchemeQueryRequest(const TString& query, TSchemeMeta& meta) const; - TRequestResult ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, TString& operation) const; + TRequestResult ScriptQueryRequest(const TString& script, NKikimrKqp::EQueryAction action, const TString& traceId, TString& operation) const; TRequestResult GetScriptExecutionOperationRequest(const TString& operation, TExecutionMeta& meta) const; |