diff options
author | Alexey Uzhegov <auzhegov@ydb.tech> | 2024-02-16 18:36:51 +0300 |
---|---|---|
committer | GitHub <noreply@github.com> | 2024-02-16 18:36:51 +0300 |
commit | 5e1e046209a6c2b6e63ed498b1f82e61079e227b (patch) | |
tree | 325d59000c1bb56c4fb376808731733f3325546e | |
parent | 72c965e55695d8ea52923a97f4946fb8353d9538 (diff) | |
download | ydb-5e1e046209a6c2b6e63ed498b1f82e61079e227b.tar.gz |
[YQ-2728] spurious empty TMaybe fix (#1984)
4 files changed, 223 insertions, 186 deletions
diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h index 86664504d5..658d8e5693 100644 --- a/ydb/core/fq/libs/common/util.h +++ b/ydb/core/fq/libs/common/util.h @@ -76,6 +76,6 @@ FederatedQuery::IamAuth GetAuth(const FederatedQuery::Connection& connection); TString RemoveDatabaseFromStr(TString str, const TString& substr); -NYql::TIssues RemoveDatabaseFromIssues(const NYql::TIssues& issues, const TString& str); +NYql::TIssues RemoveDatabaseFromIssues(const NYql::TIssues& issues, const TString& databasePath); } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h index 871f54ed7d..e7d68a5e51 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h @@ -12,30 +12,29 @@ #include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> #include <ydb/library/yql/public/issue/yql_issue.h> -namespace NFq { -namespace NPrivate { +namespace NFq::NPrivate { using namespace NActors; using namespace NThreading; using namespace NYdb; template<typename TDerived> -class TPlainBaseActor : public NActors::TActorBootstrapped<TDerived> { +class TPlainBaseActor : public TActorBootstrapped<TDerived> { public: - using TBase = NActors::TActorBootstrapped<TDerived>; + using TBase = TActorBootstrapped<TDerived>; using TBase::Become; using TBase::SelfId; using TBase::Send; public: TPlainBaseActor(const TActorId& successActorId, - const TActorId& errorActorId, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters) + const TActorId& errorActorId, + TDuration requestTimeout, + const TRequestCommonCountersPtr& counters) : Counters(counters) , SuccessActorId(successActorId) , ErrorActorId(errorActorId) - , RequestTimeout(requestTimeout) { } + , RequestTimeout(std::move(requestTimeout)) { } void Bootstrap() { CPP_LOG_T("TBaseActor Bootstrap started. Actor id: " << SelfId()); @@ -84,7 +83,7 @@ protected: virtual IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) = 0; protected: - const NPrivate::TRequestCommonCountersPtr Counters; + const TRequestCommonCountersPtr Counters; const TActorId SuccessActorId; const TActorId ErrorActorId; const TDuration RequestTimeout; @@ -112,7 +111,7 @@ public: TBaseActor(const TActorId& proxyActorId, const TEventRequestPtr request, TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters) + const TRequestCommonCountersPtr& counters) : TPlainBaseActor<TDerived>(proxyActorId, request->Sender, std::move(requestTimeout), @@ -152,5 +151,4 @@ protected: const TEventRequestPtr Request; }; -} // namespace NPrivate -} // namespace NFq +} // namespace NFq::NPrivate diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h index 7be05ece0d..05db07fc50 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/request_actor.h @@ -12,16 +12,14 @@ #include <ydb/core/fq/libs/control_plane_storage/events/events.h> #include <ydb/library/yql/public/issue/yql_issue.h> -namespace NFq { -namespace NPrivate { +namespace NFq::NPrivate { template<class TRequestProto, class TRequest, class TResponse, class TRequestProxy, class TResponseProxy> class TRequestActor : - public NActors::TActorBootstrapped< + public TActorBootstrapped< TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>> { protected: - using TBase = NActors::TActorBootstrapped< - TRequestActor<TRequestProto, TRequest, TResponse, TRequestProxy, TResponseProxy>>; + using TBase = TActorBootstrapped<TRequestActor>; using TBase::SelfId; using TBase::Send; using TBase::PassAway; @@ -29,7 +27,7 @@ protected: using TBase::Schedule; typename TRequestProxy::TPtr RequestProxy; - ::NFq::TControlPlaneProxyConfig Config; + TControlPlaneProxyConfig Config; TActorId ServiceId; TRequestCounters Counters; TInstant StartTime; @@ -42,7 +40,7 @@ public: static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_REQUEST_ACTOR"; explicit TRequestActor(typename TRequestProxy::TPtr requestProxy, - const ::NFq::TControlPlaneProxyConfig& config, + const TControlPlaneProxyConfig& config, const TActorId& serviceId, const TRequestCounters& counters, const std::function<void(const TDuration&, bool, bool)>& probe, @@ -255,5 +253,4 @@ public: } }; -} -} +} // namespace NFq::NPrivate diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index bde41a058b..2854598fab 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -11,8 +11,7 @@ #include <ydb/public/api/protos/draft/fq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> -namespace NFq { -namespace NPrivate { +namespace NFq::NPrivate { using namespace NActors; using namespace ::NFq::NConfig; @@ -37,16 +36,16 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { namespace { using TScheduleErrorRecoverySQLGeneration = - std::function<bool(NActors::TActorId sender, const TStatus& issues)>; + std::function<bool(TActorId sender, const TStatus& issues)>; using TShouldSkipStepOnError = std::function<bool(const TStatus& issues)>; -inline TScheduleErrorRecoverySQLGeneration NoRecoverySQLGeneration() { +TScheduleErrorRecoverySQLGeneration NoRecoverySQLGeneration() { return TScheduleErrorRecoverySQLGeneration{}; } -inline TShouldSkipStepOnError NoSkipOnError() { +TShouldSkipStepOnError NoSkipOnError() { return TShouldSkipStepOnError{}; } @@ -69,20 +68,19 @@ struct TEvPrivate { "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); struct TEvProcessNextTaskRequest : - NActors::TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { }; + TEventLocal<TEvProcessNextTaskRequest, EvProcessNextTaskRequest> { }; struct TEvQueryExecutionResponse : - NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { - TEvQueryExecutionResponse(TStatus result) + TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { + explicit TEvQueryExecutionResponse(TStatus result) : Result(std::move(result)) { } TStatus Result; }; - struct TEvRecoveryResponse : - NActors::TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> { + struct TEvRecoveryResponse : TEventLocal<TEvRecoveryResponse, EvRecoveryResponse> { TEvRecoveryResponse(TMaybe<TString> recoverySQL, TStatus result) - : RecoverySQL(recoverySQL) + : RecoverySQL(std::move(recoverySQL)) , Result(std::move(result)) { } TMaybe<TString> RecoverySQL; @@ -93,8 +91,8 @@ struct TEvPrivate { template<class TEventRequest, class TEventResponse> class TSchemaQueryYDBActor : public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { -private: - using TBase = TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>; + + using TBase = TBaseActor<TSchemaQueryYDBActor>; using TBase::Become; using TBase::Request; using TBase::SelfId; @@ -106,30 +104,33 @@ public: using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>; using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; using TErrorMessageFactoryMethod = - std::function<TString(const EStatus status, const NYql::TIssues& issues)>; + std::function<TString(EStatus status, const NYql::TIssues& issues)>; TSchemaQueryYDBActor(const TActorId& proxyActorId, const TEventRequestPtr request, TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters, + const TRequestCommonCountersPtr& counters, TQueryFactoryMethod queryFactoryMethod, TErrorMessageFactoryMethod errorMessageFactoryMethod) - : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( + : TBaseActor<TSchemaQueryYDBActor>( proxyActorId, std::move(request), requestTimeout, counters) , Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}} - , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } + , CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE) + , ErrorMessageFactoryMethod(std::move(errorMessageFactoryMethod)) + , DBPath(Request->Get()->ComputeDatabase->connection().database()) { } TSchemaQueryYDBActor(const TActorId& proxyActorId, const TEventRequestPtr request, TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters, + const TRequestCommonCountersPtr& counters, TTasksFactoryMethod tasksFactoryMethod, TErrorMessageFactoryMethod errorMessageFactoryMethod) - : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( + : TBaseActor<TSchemaQueryYDBActor>( proxyActorId, std::move(request), requestTimeout, counters) - , Tasks(tasksFactoryMethod(Request)) + , Tasks{tasksFactoryMethod(Request)} , CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE) - , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } + , ErrorMessageFactoryMethod(std::move(errorMessageFactoryMethod)) + , DBPath(Request->Get()->ComputeDatabase->connection().database()) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR"; @@ -271,7 +272,7 @@ public: RecoveryHandleExecutionResponse);) void ScheduleNextTask() { - TBase::Send(SelfId(), new typename TEvPrivate::TEvProcessNextTaskRequest{}); + TBase::Send(SelfId(), new TEvPrivate::TEvProcessNextTaskRequest{}); } void TransitionToRollbackState() { @@ -311,8 +312,7 @@ public: void SaveIssues(const TString& message, const TStatus& status) { auto issue = MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, message); - auto path = Request->Get()->ComputeDatabase->connection().database(); - for (const auto& subIssue : RemoveDatabaseFromIssues(status.GetIssues(), path)) { + for (const auto& subIssue : RemoveDatabaseFromIssues(status.GetIssues(), DBPath)) { issue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); } @@ -327,11 +327,11 @@ public: FirstStatus.Clear(); } - static NYdb::TStatus ExtractStatus(const TAsyncStatus& future) { + static TStatus ExtractStatus(const TAsyncStatus& future) { try { return std::move(future.GetValueSync()); // can throw an exception } catch (...) { - return NYdb::TStatus{EStatus::BAD_REQUEST, NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}}; + return TStatus{EStatus::BAD_REQUEST, NYql::TIssues{NYql::TIssue{CurrentExceptionMessage()}}}; } } @@ -343,10 +343,10 @@ public: ->RetryOperation([query = schemeQuery](TSession session) { return session.ExecuteSchemeQuery(query); }) - .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), + .Subscribe([actorSystem = TActivationContext::ActorSystem(), self = SelfId()](const TAsyncStatus& future) { actorSystem->Send(self, - new typename TEvPrivate::TEvQueryExecutionResponse{ + new TEvPrivate::TEvQueryExecutionResponse{ ExtractStatus(future), }); }); @@ -371,47 +371,50 @@ private: i32 CurrentTaskIndex = 0; TMaybe<EStatus> FirstStatus; NYql::TIssues Issues; + TString DBPath; }; class TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor : public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor> { public: - using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>; + using TBase = TPlainBaseActor; TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor( - NActors::TActorId sender, - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request, - TPermissions permissions, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters) - : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor>( - sender, sender, requestTimeout, counters) - , Request(request) - , Permissions(std::move(permissions)) { } - - void BootstrapImpl() { CheckConnectionExistenceInCPS(); } - - IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) { + const TActorId sender, + const TString& scope, + const TString& user, + const TString& token, + const TString& cloudId, + const TMaybe<TQuotaMap>& quotas, + const TTenantInfo::TPtr& tenantInfo, + const TString& connectionName, + const TPermissions& permissions, + const TDuration& requestTimeout, + const TRequestCommonCountersPtr& counters) + : TPlainBaseActor(sender, sender, requestTimeout, counters) + , Scope(scope) + , User(user) + , Token(token) + , CloudId(cloudId) + , Quotas(quotas) + , TenantInfo(tenantInfo) + , ConnectionName(connectionName) + , Permissions(permissions) { } + + void BootstrapImpl() override { CheckConnectionExistenceInCPS(); } + + IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) override { return new TEvPrivate::TEvRecoveryResponse( Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}}); }; void CheckConnectionExistenceInCPS() { FederatedQuery::ListConnectionsRequest result; - auto connectionName = Request->Get()->Request.content().name(); - result.mutable_filter()->set_name(connectionName); + result.mutable_filter()->set_name(ConnectionName); result.set_limit(2); - auto event = - new TEvControlPlaneStorage::TEvListConnectionsRequest(Request->Get()->Scope, - result, - Request->Get()->User, - Request->Get()->Token, - Request->Get()->CloudId, - Permissions, - Request->Get()->Quotas, - Request->Get()->TenantInfo, - {}); + auto event = new TEvControlPlaneStorage::TEvListConnectionsRequest( + Scope, result, User, Token, CloudId, Permissions, Quotas, TenantInfo, {}); event->IsExactNameMatch = true; @@ -431,55 +434,62 @@ public: return; } TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse( - MakeDeleteExternalDataSourceQuery(Request->Get()->Request.content().name()), + MakeDeleteExternalDataSourceQuery(ConnectionName), TStatus{EStatus::SUCCESS, {}})); } private: - NActors::TActorId Sender; - const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& Request; - TPermissions Permissions; + const TString Scope; + const TString User; + const TString Token; + const TString CloudId; + const TMaybe<TQuotaMap> Quotas; + const TTenantInfo::TPtr TenantInfo; + const TString ConnectionName; + const TPermissions Permissions; }; class TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor : public TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor> { public: - using TBase = TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>; + using TBase = TPlainBaseActor; TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor( - NActors::TActorId sender, - const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request, - TPermissions permissions, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters) - : TPlainBaseActor<TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor>( - sender, sender, requestTimeout, counters) - , Request(request) - , Permissions(std::move(permissions)) { } - - void BootstrapImpl() { CheckBindingExistenceInCPS(); } - - IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) { + const TActorId& sender, + const TString& scope, + const TString& user, + const TString& token, + const TString& cloudId, + const TMaybe<TQuotaMap>& quotas, + const TTenantInfo::TPtr& tenantInfo, + const TString& bindingName, + const TPermissions& permissions, + const TDuration& requestTimeout, + const TRequestCommonCountersPtr& counters) + : TPlainBaseActor(sender, sender, requestTimeout, counters) + , Scope(scope) + , User(user) + , Token(token) + , CloudId(cloudId) + , Quotas(quotas) + , TenantInfo(tenantInfo) + , BindingName(bindingName) + , Permissions(permissions) { } + + void BootstrapImpl() override { CheckBindingExistenceInCPS(); } + + IEventBase* MakeTimeoutEventImpl(NYql::TIssue issue) override { return new TEvPrivate::TEvRecoveryResponse( Nothing(), TStatus{EStatus::TIMEOUT, NYql::TIssues{std::move(issue)}}); - }; + } void CheckBindingExistenceInCPS() { FederatedQuery::ListBindingsRequest result; - auto bindingName = Request->Get()->Request.content().name(); - result.mutable_filter()->set_name(bindingName); + result.mutable_filter()->set_name(BindingName); result.set_limit(2); - auto event = - new TEvControlPlaneStorage::TEvListBindingsRequest(Request->Get()->Scope, - result, - Request->Get()->User, - Request->Get()->Token, - Request->Get()->CloudId, - Permissions, - Request->Get()->Quotas, - Request->Get()->TenantInfo, - {}); + auto event = new TEvControlPlaneStorage::TEvListBindingsRequest( + Scope, result, User, Token, CloudId, Permissions, Quotas, TenantInfo, {}); event->IsExactNameMatch = true; @@ -499,14 +509,19 @@ public: return; } TBase::SendRequestToSender(new TEvPrivate::TEvRecoveryResponse( - MakeDeleteExternalDataTableQuery(Request->Get()->Request.content().name()), + MakeDeleteExternalDataTableQuery(BindingName), TStatus{EStatus::SUCCESS, {}})); } private: - NActors::TActorId Sender; - const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& Request; - TPermissions Permissions; + const TString Scope; + const TString User; + const TString Token; + const TString CloudId; + const TMaybe<TQuotaMap> Quotas; + const TTenantInfo::TPtr TenantInfo; + const TString BindingName; + const TPermissions Permissions; }; bool IsPathDoesNotExistIssue(const TStatus& status) { @@ -518,7 +533,7 @@ bool IsPathExistsIssue(const TStatus& status) { return status.GetIssues().ToOneLineString().Contains("error: path exist"); } -} +} // namespace /// Connection actors IActor* MakeCreateConnectionActor( @@ -539,35 +554,48 @@ IActor* MakeCreateConnectionActor( permissions, withoutRollback, commonConfig, - computeConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) + computeConfig](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& req) -> std::vector<TSchemaQueryTask> { - auto& connectionContent = request->Get()->Request.content(); + auto& connectionContent = req->Get()->Request.content(); - auto createSecretStatement = - CreateSecretObjectQuery(connectionContent.setting(), - connectionContent.name(), - signer); + auto createSecretStatement = CreateSecretObjectQuery(connectionContent.setting(), + connectionContent.name(), + signer); std::vector<TSchemaQueryTask> statements; if (createSecretStatement) { - statements.push_back( - TSchemaQueryTask{.SQL = *createSecretStatement, - .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : NoSkipOnError()}); + statements.push_back(TSchemaQueryTask{.SQL = *createSecretStatement, + .ShouldSkipStepOnError = + withoutRollback ? IsPathExistsIssue + : NoSkipOnError()}); } TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = - [&request, requestTimeout, &counters, permissions](TActorId sender, - const TStatus& status) { - if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || + [scope = req->Get()->Scope, + user = req->Get()->User, + token = req->Get()->Token, + cloudId = req->Get()->CloudId, + quotas = req->Get()->Quotas, + tenantInfo = req->Get()->TenantInfo, + connectionName = req->Get()->Request.content().name(), + requestTimeout, + &counters, + permissions](TActorId sender, const TStatus& status) { + if (status.GetStatus() == EStatus::ALREADY_EXISTS || status.GetIssues().ToOneLineString().Contains("error: path exist")) { TActivationContext::ActorSystem()->Register( new TGenerateRecoverySQLIfExternalDataSourceAlreadyExistsActor( sender, - request, + scope, + user, + token, + cloudId, + quotas, + tenantInfo, + connectionName, permissions, requestTimeout, - counters.GetCommonCounters( - RTC_CREATE_CONNECTION_IN_YDB))); // change counter + counters.GetCommonCounters(RTC_CREATE_CONNECTION_IN_YDB))); return true; } return false; @@ -576,28 +604,31 @@ IActor* MakeCreateConnectionActor( .SQL = MakeCreateExternalDataSourceQuery( connectionContent, signer, commonConfig, computeConfig.IsReplaceIfExistsSyntaxSupported()), - .ScheduleErrorRecoverySQLGeneration = - withoutRollback ? NoRecoverySQLGeneration() - : alreadyExistRecoveryActorFactoryMethod, + .ScheduleErrorRecoverySQLGeneration = + withoutRollback + ? NoRecoverySQLGeneration() + : std::move(alreadyExistRecoveryActorFactoryMethod), .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : NoSkipOnError()}); return statements; }; - auto& connectionName = request->Get()->Request.content().name(); - auto errorMessageFactoryMethod = [connectionId, connectionName](const EStatus status, - const NYql::TIssues& issues) -> TString { + auto errorMessageFactoryMethod = + [connectionId, connectionName = request->Get()->Request.content().name()]( + const EStatus status, const NYql::TIssues& issues) -> TString { Y_UNUSED(issues); - TStringBuilder message = TStringBuilder {} << "Synchronization of connection"; + TStringBuilder message = TStringBuilder{} << "Synchronization of connection"; if (connectionId.Defined()) { message << " with id '" << connectionId << "'"; } if (status == NYdb::EStatus::ALREADY_EXISTS) { - message << " failed, because external data source with name '" << connectionName << "' already exists"; + message << " failed, because external data source with name '" + << connectionName << "' already exists"; } else { - message << " failed, because creation of external data source with name '" << connectionName << "' wasn't successful"; + message << " failed, because creation of external data source with name '" + << connectionName << "' wasn't successful"; } - return message; + return TString{message}; }; return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateConnectionRequest, @@ -610,12 +641,12 @@ IActor* MakeCreateConnectionActor( errorMessageFactoryMethod); } -NActors::IActor* MakeModifyConnectionActor( +IActor* MakeModifyConnectionActor( const TActorId& proxyActorId, TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const NConfig::TCommonConfig& commonConfig, + const TCommonConfig& commonConfig, const NFq::TComputeConfig& computeConfig, TSigner::TPtr signer) { auto queryFactoryMethod = @@ -693,9 +724,9 @@ NActors::IActor* MakeModifyConnectionActor( .ShouldSkipStepOnError = IsPathDoesNotExistIssue}); } if (createNewSecret) { - statements.push_back( - TSchemaQueryTask{.SQL = *createNewSecret, - .RollbackSQL = DropSecretObjectQuery(newConnectionContent.name())}); + statements.push_back(TSchemaQueryTask{.SQL = *createNewSecret, + .RollbackSQL = DropSecretObjectQuery( + newConnectionContent.name())}); } statements.push_back( @@ -743,12 +774,12 @@ NActors::IActor* MakeModifyConnectionActor( errorMessageFactoryMethod); } -NActors::IActor* MakeDeleteConnectionActor( +IActor* MakeDeleteConnectionActor( const TActorId& proxyActorId, TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const NConfig::TCommonConfig& commonConfig, + const TCommonConfig& commonConfig, TSigner::TPtr signer) { auto queryFactoryMethod = [signer = std::move(signer), @@ -760,7 +791,7 @@ NActors::IActor* MakeDeleteConnectionActor( auto dropSecret = DropSecretObjectQuery(connectionContent.name()); - std::vector<TSchemaQueryTask> statements = { + std::vector statements = { TSchemaQueryTask{.SQL = TString{MakeDeleteExternalDataSourceQuery( connectionContent.name())}, .RollbackSQL = MakeCreateExternalDataSourceQuery( @@ -796,36 +827,47 @@ NActors::IActor* MakeDeleteConnectionActor( } /// Bindings actors -NActors::IActor* MakeCreateBindingActor( - const TActorId& proxyActorId, - TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, - TDuration requestTimeout, - TCounters& counters, - TPermissions permissions, - const NFq::TComputeConfig& computeConfig, - bool withoutRollback, - TMaybe<TString> bindingId) { +IActor* MakeCreateBindingActor(const TActorId& proxyActorId, + TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, + TPermissions permissions, + const NFq::TComputeConfig& computeConfig,bool withoutRollback, + TMaybe<TString> bindingId) { auto queryFactoryMethod = - [requestTimeout, - &counters, permissions, withoutRollback, computeConfig](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) + [requestTimeout, &counters, permissions, withoutRollback, computeConfig]( + const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& req) -> std::vector<TSchemaQueryTask> { - auto& bindingContent = request->Get()->Request.content(); - auto& externalSourceName = request->Get()->ConnectionContent->name(); + auto& bindingContent = req->Get()->Request.content(); + auto& externalSourceName = req->Get()->ConnectionContent->name(); std::vector<TSchemaQueryTask> statements; TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = - [&request, requestTimeout, &counters, permissions](NActors::TActorId sender, - const TStatus& status) { + [scope = req->Get()->Scope, + user = req->Get()->User, + token = req->Get()->Token, + cloudId = req->Get()->CloudId, + quotas = req->Get()->Quotas, + tenantInfo = req->Get()->TenantInfo, + bindingName = req->Get()->Request.content().name(), + requestTimeout, + &counters, + permissions](TActorId sender, const TStatus& status) { if (status.GetStatus() == EStatus::ALREADY_EXISTS || status.GetIssues().ToOneLineString().Contains("error: path exist")) { TActivationContext::ActorSystem()->Register( new TGenerateRecoverySQLIfExternalDataTableAlreadyExistsActor( sender, - request, + scope, + user, + token, + cloudId, + quotas, + tenantInfo, + bindingName, permissions, requestTimeout, - counters.GetCommonCounters( - RTC_CREATE_BINDING_IN_YDB))); // change counter + counters.GetCommonCounters(RTC_CREATE_BINDING_IN_YDB))); return true; } return false; @@ -836,27 +878,28 @@ NActors::IActor* MakeCreateBindingActor( computeConfig.IsReplaceIfExistsSyntaxSupported())}, .ScheduleErrorRecoverySQLGeneration = withoutRollback ? NoRecoverySQLGeneration() - : alreadyExistRecoveryActorFactoryMethod, + : std::move(alreadyExistRecoveryActorFactoryMethod), .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : NoSkipOnError()}); return statements; }; - auto content = request->Get()->Request.content(); - auto bindingName = content.name(); - auto errorMessageFactoryMethod = [bindingId, bindingName](const EStatus status, - const NYql::TIssues& issues) -> TString { + auto errorMessageFactoryMethod = + [bindingId, bindingName = request->Get()->Request.content().name()]( + const EStatus status, const NYql::TIssues& issues) -> TString { Y_UNUSED(issues); - TStringBuilder message = TStringBuilder {} << "Synchronization of binding"; + TStringBuilder message = TStringBuilder{} << "Synchronization of binding"; if (bindingId.Defined()) { message << " with id '" << bindingId << "'"; } - if (status == NYdb::EStatus::ALREADY_EXISTS) { - message << " failed, because external data table with name '" << bindingName << "' already exists"; + if (status == EStatus::ALREADY_EXISTS) { + message << " failed, because external data table with name '" << bindingName + << "' already exists"; } else { - message << " failed, because creation of external data table with name '" << bindingName << "' wasn't successful"; + message << " failed, because creation of external data table with name '" + << bindingName << "' wasn't successful"; } - return message; + return TString{message}; }; return new TSchemaQueryYDBActor<TEvControlPlaneProxy::TEvCreateBindingRequest, @@ -869,11 +912,10 @@ NActors::IActor* MakeCreateBindingActor( errorMessageFactoryMethod); } -NActors::IActor* MakeModifyBindingActor( - const TActorId& proxyActorId, - TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request, - TDuration requestTimeout, - TCounters& counters, +IActor* MakeModifyBindingActor(const TActorId& proxyActorId, + TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters, const NFq::TComputeConfig& computeConfig) { auto queryFactoryMethod = [computeConfig](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) @@ -921,15 +963,15 @@ NActors::IActor* MakeModifyBindingActor( errorMessageFactoryMethod); } -NActors::IActor* MakeDeleteBindingActor( - const TActorId& proxyActorId, - TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request, - TDuration requestTimeout, - TCounters& counters) { +IActor* MakeDeleteBindingActor(const TActorId& proxyActorId, + TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr request, + TDuration requestTimeout, + TCounters& counters) { auto queryFactoryMethod = [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { - return {{.SQL = MakeDeleteExternalDataTableQuery(request->Get()->OldBindingContent->name()), + return {{.SQL = MakeDeleteExternalDataTableQuery( + request->Get()->OldBindingContent->name()), .ShouldSkipStepOnError = IsPathDoesNotExistIssue}}; }; @@ -950,5 +992,5 @@ NActors::IActor* MakeDeleteBindingActor( errorMessageFactoryMethod); } -} // namespace NPrivate -} // namespace NFq +} // namespace NFq::NPrivate + |