diff options
author | auzhegov <[email protected]> | 2023-09-01 11:33:07 +0300 |
---|---|---|
committer | auzhegov <[email protected]> | 2023-09-01 11:55:24 +0300 |
commit | ce4f14f3b059b1b44b3c87a82abe5144c29ce650 (patch) | |
tree | f9b2f5248e5a3583ba869635d935973344b51c02 | |
parent | 804d5f57f707a59fe23a62426e5d454cb3c03115 (diff) |
CreateConnection/CreateBinding improvements
8 files changed, 517 insertions, 353 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp index be2c5c3ea43..5524f1921ce 100644 --- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp @@ -393,8 +393,7 @@ private: TDuration::Seconds(30), Counters, CommonConfig, - Signer, - true + Signer )); } if (Connections.empty()) { @@ -422,8 +421,7 @@ private: SelfId(), request, TDuration::Seconds(30), - Counters, - true + Counters )); } diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h index a90e0f44256..d3573a2e680 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h @@ -19,14 +19,8 @@ using namespace NActors; using namespace NThreading; using namespace NYdb; -template<typename T> -struct TBaseActorTypeTag { - using TRequest = typename T::TRequest; - using TResponse = typename T::TResponse; -}; - template<typename TDerived> -class TBaseActor : public NActors::TActorBootstrapped<TDerived> { +class TPlainBaseActor : public NActors::TActorBootstrapped<TDerived> { public: using TBase = NActors::TActorBootstrapped<TDerived>; using TBase::Become; @@ -34,18 +28,14 @@ public: using TBase::SelfId; using TBase::Send; - using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr; - using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse; - public: - TBaseActor(const TActorId& proxyActorId, - const TEventRequestPtr request, + TPlainBaseActor(const TActorId& successActorId, + const TActorId& errorActorId, TDuration requestTimeout, const NPrivate::TRequestCommonCountersPtr& counters) - : Request(std::move(request)) - , Counters(counters) - , ProxyActorId(proxyActorId) - , ResponseActorId(Request->Sender) + : Counters(counters) + , SuccessActorId(successActorId) + , ErrorActorId(errorActorId) , RequestTimeout(requestTimeout) { } void Bootstrap() { @@ -55,17 +45,24 @@ public: BootstrapImpl(); } - void SendErrorMessageToSender(const NYql::TIssue& issue) { - Counters->Error->Inc(); - NYql::TIssues issues; - issues.AddIssue(issue); - Send(ResponseActorId, new TEventResponse(issues, {}), 0, Request->Cookie); + template<class THandler> + void SendRequestToSender(TAutoPtr<THandler> handle) { + Counters->Ok->Inc(); + Send(handle->Forward(SuccessActorId)); PassAway(); } - void SendRequestToSender() { + void SendRequestToSender(IEventBase* event) { Counters->Ok->Inc(); - Send(Request->Forward(ProxyActorId)); + Send(SuccessActorId, event); + PassAway(); + } + + void SendErrorMessageToSender(IEventBase* event, + NActors::TActorIdentity::TEventFlags flags = 0, + ui64 cookie = 0) { + Counters->Error->Inc(); + Send(ErrorActorId, event, flags, cookie); PassAway(); } @@ -73,9 +70,55 @@ public: CPP_LOG_D("TBaseActor Timeout occurred. Actor id: " << SelfId()); Counters->Timeout->Inc(); - SendErrorMessageToSender(MakeErrorIssue( - TIssuesIds::TIMEOUT, - "Timeout occurred. Try repeating the request later")); + SendErrorMessageToSender(MakeTimeoutEventImpl( + MakeErrorIssue(TIssuesIds::TIMEOUT, + "Timeout occurred. Try repeating the request later"))); + } + +protected: + virtual void BootstrapImpl() = 0; + virtual IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) = 0; + +protected: + const NPrivate::TRequestCommonCountersPtr Counters; + const TActorId SuccessActorId; + const TActorId ErrorActorId; + const TDuration RequestTimeout; +}; + +template<typename T> +struct TBaseActorTypeTag { + using TRequest = typename T::TRequest; + using TResponse = typename T::TResponse; +}; + +template<typename TDerived> +class TBaseActor : public TPlainBaseActor<TDerived> { +public: + using TBase = TPlainBaseActor<TDerived>; + using TBase::Become; + using TBase::PassAway; + using TBase::SelfId; + using TBase::Send; + + using TEventRequestPtr = typename TBaseActorTypeTag<TDerived>::TRequest::TPtr; + using TEventResponse = typename TBaseActorTypeTag<TDerived>::TResponse; + +public: + TBaseActor(const TActorId& proxyActorId, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters) + : TPlainBaseActor<TDerived>(proxyActorId, + request->Sender, + std::move(requestTimeout), + counters) + , Request(std::move(request)) { } + + void SendErrorMessageToSender(const NYql::TIssue& issue) { + TBase::SendErrorMessageToSender(new TEventResponse({issue}, {}), + 0, + Request->Cookie); } void HandleError(const TString& message, EStatus status, const NYql::TIssues& issues) { @@ -92,15 +135,17 @@ public: SendErrorMessageToSender(std::move(issue)); } -private: - virtual void BootstrapImpl() = 0; + void SendRequestToSender() { + TBase::SendRequestToSender(Request); + } + +protected: + IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) final { + return new TEventResponse({std::move(issue)}, {}); + }; protected: const TEventRequestPtr Request; - const NPrivate::TRequestCommonCountersPtr Counters; - const TActorId ProxyActorId; - const TActorId ResponseActorId; - const TDuration RequestTimeout; }; } // namespace NPrivate diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp index d9666ad965c..c2de079c09e 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp @@ -132,47 +132,6 @@ TString DescribeConnectionErrorMessageFactoryMethod(const NYql::TIssues& issues) return "Couldn't resolve connection"; }; -NActors::IActor* MakeListConnectionActor( - const TActorId& proxyActorId, - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, - TCounters& counters, - TDuration requestTimeout, - TPermissions permissions) { - auto cpsRequestFactory = - [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event) { - FederatedQuery::ListConnectionsRequest result; - auto connectionName = event->Get()->Request.content().name(); - result.mutable_filter()->set_name(connectionName); - result.set_limit(2); - return result; - }; - auto cpsRequestPostProcessor = - [](TEvControlPlaneStorage::TEvListConnectionsRequest& event) { - event.IsExactNameMatch = true; - }; - auto entityNameExtractorFactoryMethod = - [](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& event, - const FederatedQuery::ListConnectionsResult& result) { - event->Get()->CPSListingFinished = true; - event->Get()->CPSConnectionCount = result.connection_size(); - }; - - return new TControlPlaneStorageRequesterActor< - TEvControlPlaneProxy::TEvCreateConnectionRequest, - TEvControlPlaneProxy::TEvCreateConnectionResponse, - TEvControlPlaneStorage::TEvListConnectionsRequest, - TEvControlPlaneStorage::TEvListConnectionsResponse>( - proxyActorId, - request, - requestTimeout, - counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), - permissions, - cpsRequestFactory, - DescribeConnectionErrorMessageFactoryMethod, - entityNameExtractorFactoryMethod, - cpsRequestPostProcessor); -} - NActors::IActor* MakeDiscoverYDBConnectionNameActor( const TActorId& proxyActorId, const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h index 60c89e9f95c..86252f6e3b3 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h @@ -39,13 +39,6 @@ NActors::IActor* MakeDiscoverYDBConnectionNameActor( TDuration requestTimeout, TPermissions permissions); -NActors::IActor* MakeListConnectionActor( - const TActorId& proxyActorId, - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, - TCounters& counters, - TDuration requestTimeout, - TPermissions permissions); - /// Discover binding_name NActors::IActor* MakeDiscoverYDBBindingNameActor( diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index 583c0285754..64102172cf0 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -6,6 +6,7 @@ #include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/config/yq_issue.h> #include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/core/fq/libs/control_plane_storage/control_plane_storage.h> #include <ydb/public/api/protos/draft/fq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> @@ -28,229 +29,427 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { using TResponse = TEventResponse; }; +using TScheduleErrorRecoverySQLGeneration = + std::function<bool(NActors::TActorId sender, const TStatus& issues)>; + struct TSchemaQueryTask { TString SQL; TMaybe<TString> RollbackSQL; + TScheduleErrorRecoverySQLGeneration ScheduleErrorRecoverySQLGeneration; +}; + +struct TEvPrivate { + enum EEv { + EvProcessNextTaskRequest = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), + EvQueryExecutionResponse, + EvRecoveryResponse, + EvEnd + }; + + static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), + "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); + + struct TEvProcessNextTaskRequest : + NActors::TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { }; + + struct TEvQueryExecutionResponse : + NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { + TEvQueryExecutionResponse(TStatus result) + : Result(std::move(result)) { } + + TStatus Result; + }; + + struct TEvRecoveryResponse : + NActors::TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> { + TEvRecoveryResponse(TMaybe<TString> recoverySQL, TStatus result) + : RecoverySQL(recoverySQL) + , Result(std::move(result)) { } + + TMaybe<TString> RecoverySQL; + TStatus Result; + }; }; template<class TEventRequest, class TEventResponse> class TSchemaQueryYDBActor : public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { private: - struct TEvPrivate { - enum EEv { - EvQueryExecutionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - EvEnd - }; - - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), - "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); - - struct TEvQueryExecutionResponse : - NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { - TStatus Result; - size_t TaskIndex = 0u; - bool Rollback = false; - TMaybe<TStatus> MaybeInitialStatus; - - TEvQueryExecutionResponse(TStatus result, - size_t taskIndex, - bool rollback, - TMaybe<TStatus> MaybeInitialStatus) - : Result(std::move(result)) - , TaskIndex(taskIndex) - , Rollback(rollback) - , MaybeInitialStatus(std::move(MaybeInitialStatus)) { } - }; - }; - using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>; - using TBase::SelfId; + using TBase::Become; using TBase::Request; + using TBase::SelfId; using TEventRequestPtr = typename TEventRequest::TPtr; public: - using TTasks = std::vector<TSchemaQueryTask>; - using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>; - using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; - using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>; + using TTasks = std::vector<TSchemaQueryTask>; + using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>; + using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; + using TErrorMessageFactoryMethod = + std::function<TString(const EStatus status, const NYql::TIssues& issues)>; TSchemaQueryYDBActor(const TActorId& proxyActorId, const TEventRequestPtr request, TDuration requestTimeout, const NPrivate::TRequestCommonCountersPtr& counters, TQueryFactoryMethod queryFactoryMethod, - TErrorMessageFactoryMethod errorMessageFactoryMethod, - bool successOnAlreadyExists = false) + TErrorMessageFactoryMethod errorMessageFactoryMethod) : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( proxyActorId, std::move(request), requestTimeout, counters) , Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}} - , ErrorMessageFactoryMethod(errorMessageFactoryMethod) - , SuccessOnAlreadyExists(successOnAlreadyExists) - { } + , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } TSchemaQueryYDBActor(const TActorId& proxyActorId, const TEventRequestPtr request, TDuration requestTimeout, const NPrivate::TRequestCommonCountersPtr& counters, TTasksFactoryMethod tasksFactoryMethod, - TErrorMessageFactoryMethod errorMessageFactoryMethod, - bool successOnAlreadyExists = false) + TErrorMessageFactoryMethod errorMessageFactoryMethod) : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( proxyActorId, std::move(request), requestTimeout, counters) , Tasks(tasksFactoryMethod(Request)) - , ErrorMessageFactoryMethod(errorMessageFactoryMethod) - , SuccessOnAlreadyExists(successOnAlreadyExists) - { } + , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR"; void BootstrapImpl() override { CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId()); - InitiateSchemaQueryExecution(0, false, Nothing()); + ScheduleNextTask(); + } + + // Normal state + void HandleProcessNextTask(typename TEvPrivate::TEvProcessNextTaskRequest::TPtr& event) { + Y_UNUSED(event); + auto schemeQuery = NormalSelectTask(); + if (schemeQuery) { + InitiateSchemaQueryExecution(*schemeQuery); + } else { + FinishSuccessfully(); + } + } + + TMaybe<TString> NormalSelectTask() { + if (CurrentTaskIndex < static_cast<i32>(Tasks.size())) { + return Tasks[CurrentTaskIndex].SQL; + } + return Nothing(); } - TMaybe<TString> SelectTask(size_t taskIndex, bool rollback) { - if (!rollback) { - if (taskIndex < Tasks.size()) { - return Tasks[taskIndex].SQL; + void NormalHandleExecutionResponse( + typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { + const auto& executeSchemeQueryStatus = event->Get()->Result; + + if (executeSchemeQueryStatus.IsSuccess()) { + CurrentTaskIndex++; + ScheduleNextTask(); + } else { + SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus); + + auto& task = Tasks[CurrentTaskIndex]; + if (task.ScheduleErrorRecoverySQLGeneration && + task.ScheduleErrorRecoverySQLGeneration(SelfId(), + executeSchemeQueryStatus)) { + TransitionToRecoveryState(); + return; } - return Nothing(); + TransitionToRollbackState(); + } + } + + // Rollback state + void RollbackHandleProcessNextTask( + typename TEvPrivate::TEvProcessNextTaskRequest::TPtr& event) { + Y_UNUSED(event); + + auto rollbackSchemeQuery = RollbackSelectTask(); + if (rollbackSchemeQuery) { + InitiateSchemaQueryExecution(*rollbackSchemeQuery); + } else { + SendError(); } + } - while (true) { - const auto& maybeRollback = Tasks[taskIndex].RollbackSQL; + TMaybe<TString> RollbackSelectTask() { + while (CurrentTaskIndex >= 0) { + const auto& maybeRollback = Tasks[CurrentTaskIndex].RollbackSQL; if (maybeRollback) { return maybeRollback; } - if (taskIndex == 0u) { - return Nothing(); - } - taskIndex--; + CurrentTaskIndex--; } + return Nothing(); } - bool InitiateSchemaQueryExecution(size_t taskIndex, - bool rollback, - const TMaybe<TStatus>& maybeInitialStatus) { - CPP_LOG_I( - "TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId()); - auto schemeQuery = SelectTask(taskIndex, rollback); - if (schemeQuery) { - CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. schemeQuery: " - << schemeQuery); - Request->Get() - ->YDBClient - ->RetryOperation([query = *schemeQuery](TSession session) { - return session.ExecuteSchemeQuery(query); - }) - .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId(), - taskIndex, - rollback, - maybeInitialStatus](const TAsyncStatus& future) { - actorSystem->Send(self, - new typename TEvPrivate::TEvQueryExecutionResponse{ - std::move(future.GetValueSync()), - taskIndex, - rollback, - std::move(maybeInitialStatus)}); - }); + void RollbackHandleExecutionResponse( + typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { + const auto& executeSchemeQueryStatus = event->Get()->Result; + + if (executeSchemeQueryStatus.IsSuccess()) { + CurrentTaskIndex--; + ScheduleNextTask(); + return; + } else { + SaveIssues("Couldn't execute rollback SQL", executeSchemeQueryStatus); + SendError(); + return; + } + } + + // Recovery state + void Handle(typename TEvPrivate::TEvRecoveryResponse::TPtr& event) { + if (event->Get()->Result.IsSuccess()) { + InitiateSchemaQueryExecution(*event->Get()->RecoverySQL); + } else { + SaveIssues("Failed to generate recovery SQL", event->Get()->Result); + TransitionToRollbackState(); + } + } + + void RecoveryHandleExecutionResponse( + typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { + const auto& executeSchemeQueryStatus = event->Get()->Result; + + if (executeSchemeQueryStatus.IsSuccess()) { + ClearIssues(); + TransitionToNormalState(); + } else { + SaveIssues("Failed to execute recovery SQL", event->Get()->Result); + TransitionToRollbackState(); } - return schemeQuery.Defined(); } - STRICT_STFUNC(StateFunc, + // FSM description + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvPrivate::TEvQueryExecutionResponse, + NormalHandleExecutionResponse); + hFunc(TEvPrivate::TEvProcessNextTaskRequest, HandleProcessNextTask);) + + STRICT_STFUNC( + RollbackStateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvPrivate::TEvQueryExecutionResponse, RollbackHandleExecutionResponse); + hFunc(TEvPrivate::TEvProcessNextTaskRequest, RollbackHandleProcessNextTask);) + + STRICT_STFUNC(RecoveryStateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); - hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle); - ) + hFunc(TEvPrivate::TEvRecoveryResponse, Handle); + hFunc(TEvPrivate::TEvQueryExecutionResponse, + RecoveryHandleExecutionResponse);) + + void ScheduleNextTask() { + TBase::Send(SelfId(), new typename TEvPrivate::TEvProcessNextTaskRequest{}); + } + + void TransitionToRollbackState() { + CPP_LOG_I("TSchemaQueryYDBActor TransitionToRollbackState. Actor id: " + << TBase::SelfId()); + CurrentTaskIndex--; + Become(&TSchemaQueryYDBActor::RollbackStateFunc); + ScheduleNextTask(); + } + + void TransitionToNormalState() { + CPP_LOG_I("TSchemaQueryYDBActor TransitionToNormalState. Actor id: " + << TBase::SelfId()); + Become(&TSchemaQueryYDBActor::StateFunc); + ScheduleNextTask(); + } + + void TransitionToRecoveryState() { + CPP_LOG_I("TSchemaQueryYDBActor TransitionToRecoveryState. Actor id: " + << TBase::SelfId()); + Become(&TSchemaQueryYDBActor::RecoveryStateFunc); + } - void FinishSuccessfully(bool isAlreadyExistSuccessStatus) { + void FinishSuccessfully() { CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId()); Request->Get()->ComputeYDBOperationWasPerformed = true; - Request->Get()->ComputeYDBIsAlreadyExistFlag = isAlreadyExistSuccessStatus; TBase::SendRequestToSender(); } - void SendError(const TStatus& executeSchemeQueryStatus) { + void SendError() { CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " << TBase::SelfId()); - TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus); + TString errorMessage = ErrorMessageFactoryMethod(*FirstStatus, Issues); - TBase::HandleError(errorMessage, - executeSchemeQueryStatus.GetStatus(), - executeSchemeQueryStatus.GetIssues()); + TBase::HandleError(errorMessage, *FirstStatus, std::move(Issues)); } + void SaveIssues(const TString& message, const TStatus& status) { + auto issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message); + for (const auto& subIssue : status.GetIssues()) { + issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + Issues.AddIssue(std::move(issue)); + if (!FirstStatus) { + FirstStatus = status.GetStatus(); + } + } - void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { - const auto& executeSchemeQueryStatus = event->Get()->Result; - auto isRollback = event->Get()->Rollback; - auto isAlreadyExistSuccessStatus = - IsAlreadyExistSuccessStatus(executeSchemeQueryStatus); - auto isExecuteStatementSuccessful = - executeSchemeQueryStatus.IsSuccess() || isAlreadyExistSuccessStatus; - auto successExecutionRunMode = isExecuteStatementSuccessful && !isRollback; - auto successExecutionRollbackMode = isExecuteStatementSuccessful && isRollback; - auto failedExecutionRunMode = !isExecuteStatementSuccessful && !isRollback; - - if (successExecutionRunMode) { - if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) { - FinishSuccessfully(isAlreadyExistSuccessStatus); - return; - } - } else if (successExecutionRollbackMode) { - if (event->Get()->TaskIndex == 0 || - !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1, - true, - std::move(event->Get()->MaybeInitialStatus))) { - SendError(*event->Get()->MaybeInitialStatus); - return; - } - } else if (failedExecutionRunMode) { - if (event->Get()->TaskIndex == 0 || - !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1, - true, - std::move(event->Get()->Result))) { - SendError(event->Get()->Result); - return; - } - } else { - // Failed during rollback - const auto& initialIssues = *(event->Get()->MaybeInitialStatus); + void ClearIssues() { + Issues.Clear(); + FirstStatus.Clear(); + } - auto originalIssue = - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't execute SQL script"); - for (const auto& subIssue : initialIssues.GetIssues()) { - originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - auto rollbackIssue = - MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, - "Couldn't execute rollback SQL script"); - for (const auto& subIssue : event->Get()->Result.GetIssues()) { - originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); - } - SendError(TStatus{initialIssues.GetStatus(), - NYql::TIssues{std::move(originalIssue), - std::move(rollbackIssue)}}); - return; + void InitiateSchemaQueryExecution(const TString& schemeQuery) { + CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: " + << TBase::SelfId() << " SchemeQuery: " << schemeQuery); + Request->Get() + ->YDBClient + ->RetryOperation([query = schemeQuery](TSession session) { + return session.ExecuteSchemeQuery(query); + }) + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId()](const TAsyncStatus& future) { + actorSystem->Send(self, + new typename TEvPrivate::TEvQueryExecutionResponse{ + std::move(future.GetValueSync()), + }); + }); + } + +private: + TTasks Tasks; + TErrorMessageFactoryMethod ErrorMessageFactoryMethod; + i32 CurrentTaskIndex = 0; + TMaybe<EStatus> FirstStatus; + NYql::TIssues Issues; +}; + +class TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor : + public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor> { +public: + using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>; + + TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor( + NActors::TActorId sender, + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, + TPermissions permissions, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters) + : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>( + sender, sender, requestTimeout, counters) + , Request(request) + , Permissions(std::move(permissions)) { } + + void BootstrapImpl() { CheckConnectionExistenceInCPS(); } + + IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) { + return new TEvPrivate::TEvRecoveryResponse( + Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}}); + }; + + void CheckConnectionExistenceInCPS() { + FederatedQuery::ListConnectionsRequest result; + auto connectionName = Request->Get()->Request.content().name(); + result.mutable_filter()->set_name(connectionName); + result.set_limit(2); + + auto event = + new TEvControlPlaneStorage::TEvListConnectionsRequest(Request->Get()->Scope, + result, + Request->Get()->User, + Request->Get()->Token, + Request->Get()->CloudId, + Permissions, + Request->Get()->Quotas, + Request->Get()->TenantInfo, + {}); + + event->IsExactNameMatch = true; + + TBase::Send(NFq::ControlPlaneStorageServiceActorId(), event); + } + + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvControlPlaneStorage::TEvListConnectionsResponse, Handle);) + + void Handle(const TEvControlPlaneStorage::TEvListConnectionsResponse::TPtr& event) { + auto connectionSize = event->Get()->Result.connection_size(); + if (connectionSize != 0) { + // Already exist in CPS + TBase::SendErrorMessageToSender( + new TEvPrivate::TEvRecoveryResponse(Nothing(), + TStatus{EStatus::TIMEOUT, {}})); } + TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse( + MakeDeleteExternalDataSourceQuery(Request->Get()->Request.content().name()), + TStatus{EStatus::SUCCESS, {}})); } private: - bool IsAlreadyExistSuccessStatus(const TStatus& status) const { - return SuccessOnAlreadyExists && - (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || - status.GetIssues().ToOneLineString().Contains("error: path exist")); + NActors::TActorId Sender; + const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& Request; + TPermissions Permissions; +}; + +class TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor : + public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor> { +public: + using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>; + + TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor( + NActors::TActorId sender, + const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, + TPermissions permissions, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters) + : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>( + sender, sender, requestTimeout, counters) + , Request(request) + , Permissions(std::move(permissions)) { } + + void BootstrapImpl() { CheckBindingExistenceInCPS(); } + + IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) { + return new TEvPrivate::TEvRecoveryResponse( + Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}}); + }; + + void CheckBindingExistenceInCPS() { + FederatedQuery::ListBindingsRequest result; + auto bindingName = Request->Get()->Request.content().name(); + result.mutable_filter()->set_name(bindingName); + result.set_limit(2); + + auto event = + new TEvControlPlaneStorage::TEvListBindingsRequest(Request->Get()->Scope, + result, + Request->Get()->User, + Request->Get()->Token, + Request->Get()->CloudId, + Permissions, + Request->Get()->Quotas, + Request->Get()->TenantInfo, + {}); + + event->IsExactNameMatch = true; + + TBase::Send(NFq::ControlPlaneStorageServiceActorId(), event); + } + + STRICT_STFUNC(StateFunc, cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvControlPlaneStorage::TEvListBindingsResponse, Handle);) + + void Handle(const TEvControlPlaneStorage::TEvListBindingsResponse::TPtr& event) { + auto bindingSize = event->Get()->Result.binding_size(); + if (bindingSize != 0) { + // Already exist in CPS + TBase::SendErrorMessageToSender( + new TEvPrivate::TEvRecoveryResponse(Nothing(), + TStatus{EStatus::TIMEOUT, {}})); + } + TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse( + MakeDeleteExternalDataTableQuery(Request->Get()->Request.content().name()), + TStatus{EStatus::SUCCESS, {}})); } private: - TTasks Tasks; - TErrorMessageFactoryMethod ErrorMessageFactoryMethod; - bool SuccessOnAlreadyExists = false; + NActors::TActorId Sender; + const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& Request; + TPermissions Permissions; }; /// Connection actors @@ -260,12 +459,12 @@ NActors::IActor* MakeCreateConnectionActor( TDuration requestTimeout, TCounters& counters, const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer, - bool successOnAlreadyExists) { + TSigner::TPtr signer) { auto queryFactoryMethod = [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), - signer = std::move(signer)]( - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) + signer = std::move(signer), + requestTimeout, + &counters](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { auto& connectionContent = request->Get()->Request.content(); @@ -283,14 +482,37 @@ NActors::IActor* MakeCreateConnectionActor( connectionContent.name(), signer)}); } + + TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = + [&request, requestTimeout, &counters](NActors::TActorId sender, + const TStatus& status) { + Cerr << "Status 1 " << status.GetIssues().ToOneLineString() << Endl; + if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || + status.GetIssues().ToOneLineString().Contains("error: path exist")) { + TActivationContext::ActorSystem()->Register( + new TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor( + sender, + request, + TPermissions{}, + requestTimeout, + counters.GetCommonCounters( + RTC_CREATE_CONNECTION_IN_YDB))); // change counter + return true; + } + return false; + }; statements.push_back( TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( - connectionContent, objectStorageEndpoint, signer)}}); + connectionContent, objectStorageEndpoint, signer)}, + .ScheduleErrorRecoverySQLGeneration = + alreadyExistRecoveryActorFactoryMethod}); return statements; }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + if (status == NYdb::EStatus::ALREADY_EXISTS) { return "External data source with such name already exists"; } else { return "Couldn't create external data source in YDB"; @@ -304,8 +526,7 @@ NActors::IActor* MakeCreateConnectionActor( requestTimeout, counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), queryFactoryMethod, - errorMessageFactoryMethod, - successOnAlreadyExists); + errorMessageFactoryMethod); } NActors::IActor* MakeModifyConnectionActor( @@ -405,8 +626,10 @@ NActors::IActor* MakeModifyConnectionActor( return statements; }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - Y_UNUSED(queryStatus); + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(status); + Y_UNUSED(issues); return "Couldn't modify external data source in YDB"; }; @@ -455,8 +678,10 @@ NActors::IActor* MakeDeleteConnectionActor( return statements; }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - Y_UNUSED(queryStatus); + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(status); + Y_UNUSED(issues); return "Couldn't delete external data source in YDB"; }; @@ -470,55 +695,49 @@ NActors::IActor* MakeDeleteConnectionActor( errorMessageFactoryMethod); } -NActors::IActor* MakeDropCreateConnectionActor( - const TActorId& proxyActorId, - TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, - TDuration requestTimeout, - TCounters& counters, - const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer) { - auto queryFactoryMethod = - [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), - signer = std::move(signer)]( - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) - -> std::vector<TSchemaQueryTask> { - auto& connectionContent = request->Get()->Request.content(); - return {TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery( - connectionContent.name())}}, - TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( - connectionContent, objectStorageEndpoint, signer)}}}; - }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - Y_UNUSED(queryStatus); - return "Couldn't recreate external source in YDB"; - }; - - return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest, - TEvControlPlaneProxy::TEvCreateConnectionResponse>( - proxyActorId, - std::move(request), - requestTimeout, - counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB), - queryFactoryMethod, - errorMessageFactoryMethod); -} - /// Bindings actors NActors::IActor* MakeCreateBindingActor( const TActorId& proxyActorId, TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, TDuration requestTimeout, - TCounters& counters, - bool successOnAlreadyExists) { + TCounters& counters) { auto queryFactoryMethod = - [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString { - auto externalSourceName = *request->Get()->ConnectionName; - return MakeCreateExternalDataTableQuery(request->Get()->Request.content(), - externalSourceName); + [requestTimeout, + &counters](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { + auto& bindingContent = request->Get()->Request.content(); + auto& externalSourceName = *request->Get()->ConnectionName; + std::vector<TSchemaQueryTask> statements; + + TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = + [&request, requestTimeout, &counters](NActors::TActorId sender, + const TStatus& status) { + Cerr << "Status 1 " << status.GetIssues().ToOneLineString() << Endl; + if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || + status.GetIssues().ToOneLineString().Contains("error: path exist")) { + TActivationContext::ActorSystem()->Register( + new TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor( + sender, + request, + TPermissions{}, + requestTimeout, + counters.GetCommonCounters( + RTC_CREATE_CONNECTION_IN_YDB))); // change counter + return true; + } + return false; + }; + statements.push_back(TSchemaQueryTask{ + .SQL = TString{MakeCreateExternalDataTableQuery(bindingContent, + externalSourceName)}, + .ScheduleErrorRecoverySQLGeneration = alreadyExistRecoveryActorFactoryMethod}); + return statements; }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - if (queryStatus.GetStatus() == NYdb::EStatus::ALREADY_EXISTS) { + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + if (status == NYdb::EStatus::ALREADY_EXISTS) { return "External data table with such name already exists"; } else { return "Couldn't create external data table in YDB"; @@ -532,8 +751,7 @@ NActors::IActor* MakeCreateBindingActor( requestTimeout, counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB), queryFactoryMethod, - errorMessageFactoryMethod, - successOnAlreadyExists); + errorMessageFactoryMethod); } NActors::IActor* MakeModifyBindingActor( @@ -542,13 +760,15 @@ NActors::IActor* MakeModifyBindingActor( TDuration requestTimeout, TCounters& counters) { auto queryFactoryMethod = - [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { auto sourceName = *request->Get()->ConnectionName; auto oldTableName = request->Get()->OldBindingContent->name(); auto deleteOldEntities = MakeDeleteExternalDataTableQuery(oldTableName); auto createOldEntities = - MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent, sourceName); + MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent, + sourceName); auto createNewEntities = MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName); @@ -556,8 +776,10 @@ NActors::IActor* MakeModifyBindingActor( TSchemaQueryTask{.SQL = createNewEntities}}; }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - Y_UNUSED(queryStatus); + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(status); + Y_UNUSED(issues); return "Couldn't modify external data table in YDB"; }; @@ -581,8 +803,10 @@ NActors::IActor* MakeDeleteBindingActor( return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName); }; - auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { - Y_UNUSED(queryStatus); + auto errorMessageFactoryMethod = [](const EStatus status, + const NYql::TIssues& issues) -> TString { + Y_UNUSED(status); + Y_UNUSED(issues); return "Couldn't delete external data source in YDB"; }; diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h index 0df17eca66d..c8ef2449e16 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -17,8 +17,7 @@ NActors::IActor* MakeCreateConnectionActor( TDuration requestTimeout, TCounters& counters, const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer, - bool successOnAlreadyExists = false); + TSigner::TPtr signer); NActors::IActor* MakeModifyConnectionActor( const NActors::TActorId& proxyActorId, @@ -36,21 +35,12 @@ NActors::IActor* MakeDeleteConnectionActor( const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer); -NActors::IActor* MakeDropCreateConnectionActor( - const NActors::TActorId& proxyActorId, - TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, - TDuration requestTimeout, - TCounters& counters, - const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer); - /// Binding manipulation actors NActors::IActor* MakeCreateBindingActor( const NActors::TActorId& proxyActorId, TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, TDuration requestTimeout, - TCounters& counters, - bool successOnAlreadyExists = false); + TCounters& counters); NActors::IActor* MakeModifyBindingActor( const NActors::TActorId& proxyActorId, diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index 3022997a06a..05f5cf39a4d 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -1350,53 +1350,13 @@ private: CredentialsProviderFactory); } - if (!ev->Get()->ComputeYDBOperationWasPerformed && !ev->Get()->ComputeYDBIsAlreadyExistFlag) { + if (!ev->Get()->ComputeYDBOperationWasPerformed) { Register(NPrivate::MakeCreateConnectionActor(ControlPlaneProxyActorId(), std::move(ev), Config.RequestTimeout, Counters, Config.CommonConfig, - Signer, - true)); - return; - } - - if (ev->Get()->ComputeYDBIsAlreadyExistFlag && !ev->Get()->CPSListingFinished) { - ev->Get()->ComputeYDBOperationWasPerformed = false; - Register(MakeListConnectionActor(ControlPlaneProxyActorId(), - std::move(ev), - Counters, - Config.RequestTimeout, - availablePermissions)); - return; - } - - if (ev->Get()->ComputeYDBIsAlreadyExistFlag && - !ev->Get()->ComputeYDBOperationWasPerformed) { - if (ev->Get()->CPSConnectionCount != 0) { - CPS_LOG_E("CreateConnectionRequest, Connection with such name already exists: " - << scope << " " << user << " " << NKikimr::MaskTicket(token) - << " " << request.DebugString()); - Send(ev->Sender, - new TEvControlPlaneProxy::TEvCreateConnectionResponse( - NYql::TIssues{ - NYql::TIssue{"Connection with such name already exists"}}, - subjectType), - 0, - ev->Cookie); - requestCounters.IncError(); - TDuration delta = TInstant::Now() - startTime; - requestCounters.Common->LatencyMs->Collect(delta.MilliSeconds()); - probe(delta, false, false); - return; - } - - Register(NPrivate::MakeDropCreateConnectionActor(ControlPlaneProxyActorId(), - std::move(ev), - Config.RequestTimeout, - Counters, - Config.CommonConfig, - Signer)); + Signer)); return; } } diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index ebd103ae226..54f49d06d07 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -312,11 +312,6 @@ struct TEvControlPlaneProxy { EvCreateConnectionRequest>::TBaseControlPlaneRequest; bool ComputeYDBIsAlreadyExistFlag = false; - // Check that connection does not exist in CPS - bool CPSListingFinished = false; - size_t CPSConnectionCount = 0; - // ?? - bool IsInDeleteCreateState = false; }; template<> |