diff options
author | hcpp <hcpp@ydb.tech> | 2023-09-27 16:41:57 +0300 |
---|---|---|
committer | hcpp <hcpp@ydb.tech> | 2023-09-27 17:32:59 +0300 |
commit | c437fa9f244a185589a18f4b97cb5ed8bf432876 (patch) | |
tree | 025855782be6d9051051157a75848abc6e1f89df | |
parent | 93d5296e2a4188d8e1aeffcfd6e79a7df7335b94 (diff) | |
download | ydb-c437fa9f244a185589a18f4b97cb5ed8bf432876.tar.gz |
rollback for sync has been disabled
3 files changed, 34 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp index a14f621440e..2fa86fcb419 100644 --- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp +++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp @@ -403,7 +403,8 @@ private: Counters, TPermissions{}, CommonConfig, - Signer + Signer, + true )); } if (Connections.empty()) { @@ -432,7 +433,8 @@ private: request, TDuration::Seconds(30), Counters, - TPermissions{} + TPermissions{}, + true )); } diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index 70e53bf6649..20cd96eec77 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -21,15 +21,21 @@ using namespace NThreading; using namespace NYdb; using namespace NYdb::NTable; +namespace { + template<class TEventRequest, class TEventResponse> class TSchemaQueryYDBActor; +} + template<class TEventRequest, class TEventResponse> struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { using TRequest = TEventRequest; using TResponse = TEventResponse; }; +namespace { + using TScheduleErrorRecoverySQLGeneration = std::function<bool(NActors::TActorId sender, const TStatus& issues)>; @@ -496,6 +502,12 @@ bool IsPathDoesNotExistIssue(const TStatus& status) { return status.GetIssues().ToOneLineString().Contains("Path does not exist"); } +bool IsPathExistsIssue(const TStatus& status) { + return status.GetIssues().ToOneLineString().Contains("error: path exist"); +} + +} + /// Connection actors NActors::IActor* MakeCreateConnectionActor( const TActorId& proxyActorId, @@ -504,12 +516,13 @@ NActors::IActor* MakeCreateConnectionActor( TCounters& counters, TPermissions permissions, const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer) { + TSigner::TPtr signer, + bool withoutRollback) { auto queryFactoryMethod = [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), signer = std::move(signer), requestTimeout, - &counters, permissions](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) + &counters, permissions, withoutRollback](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { auto& connectionContent = request->Get()->Request.content(); @@ -522,12 +535,13 @@ NActors::IActor* MakeCreateConnectionActor( if (createSecretStatement) { statements.push_back( TSchemaQueryTask{.SQL = *createSecretStatement, - .RollbackSQL = DropSecretObjectQuery(connectionContent.name())}); + .RollbackSQL = withoutRollback ? TMaybe<TString>{} : DropSecretObjectQuery(connectionContent.name()), + .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}}); } TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod = [&request, requestTimeout, &counters, permissions](NActors::TActorId sender, - const TStatus& status) { + const TStatus& status) { if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS || status.GetIssues().ToOneLineString().Contains("error: path exist")) { TActivationContext::ActorSystem()->Register( @@ -544,9 +558,9 @@ NActors::IActor* MakeCreateConnectionActor( }; statements.push_back( TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery( - connectionContent, objectStorageEndpoint, signer)}, - .ScheduleErrorRecoverySQLGeneration = - alreadyExistRecoveryActorFactoryMethod}); + connectionContent, objectStorageEndpoint, signer)}, + .ScheduleErrorRecoverySQLGeneration = withoutRollback ? TScheduleErrorRecoverySQLGeneration{} : alreadyExistRecoveryActorFactoryMethod, + .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}}); return statements; }; @@ -739,10 +753,11 @@ NActors::IActor* MakeCreateBindingActor( TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - TPermissions permissions) { + TPermissions permissions, + bool withoutRollback) { auto queryFactoryMethod = [requestTimeout, - &counters, permissions](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) + &counters, permissions, withoutRollback](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { auto& bindingContent = request->Get()->Request.content(); auto& externalSourceName = request->Get()->ConnectionContent->name(); @@ -768,7 +783,8 @@ NActors::IActor* MakeCreateBindingActor( statements.push_back(TSchemaQueryTask{ .SQL = TString{MakeCreateExternalDataTableQuery(bindingContent, externalSourceName)}, - .ScheduleErrorRecoverySQLGeneration = alreadyExistRecoveryActorFactoryMethod}); + .ScheduleErrorRecoverySQLGeneration = withoutRollback ? TScheduleErrorRecoverySQLGeneration{} :alreadyExistRecoveryActorFactoryMethod, + .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}}); return statements; }; diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h index 0ddb989e586..1454db48a6c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -26,7 +26,8 @@ NActors::IActor* MakeCreateConnectionActor( TCounters& counters, TPermissions permissions, const NConfig::TCommonConfig& commonConfig, - TSigner::TPtr signer); + TSigner::TPtr signer, + bool withoutRollback = false); NActors::IActor* MakeModifyConnectionActor( const NActors::TActorId& proxyActorId, @@ -50,7 +51,8 @@ NActors::IActor* MakeCreateBindingActor( TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - TPermissions permissions); + TPermissions permissions, + bool withoutRollback = false); NActors::IActor* MakeModifyBindingActor( const NActors::TActorId& proxyActorId, |