aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorhcpp <hcpp@ydb.tech>2023-09-27 16:41:57 +0300
committerhcpp <hcpp@ydb.tech>2023-09-27 17:32:59 +0300
commitc437fa9f244a185589a18f4b97cb5ed8bf432876 (patch)
tree025855782be6d9051051157a75848abc6e1f89df
parent93d5296e2a4188d8e1aeffcfd6e79a7df7335b94 (diff)
downloadydb-c437fa9f244a185589a18f4b97cb5ed8bf432876.tar.gz
rollback for sync has been disabled
-rw-r--r--ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp6
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp36
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h6
3 files changed, 34 insertions, 14 deletions
diff --git a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
index a14f621440e..2fa86fcb419 100644
--- a/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
+++ b/ydb/core/fq/libs/compute/ydb/synchronization_service/synchronization_service.cpp
@@ -403,7 +403,8 @@ private:
Counters,
TPermissions{},
CommonConfig,
- Signer
+ Signer,
+ true
));
}
if (Connections.empty()) {
@@ -432,7 +433,8 @@ private:
request,
TDuration::Seconds(30),
Counters,
- TPermissions{}
+ TPermissions{},
+ true
));
}
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index 70e53bf6649..20cd96eec77 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -21,15 +21,21 @@ using namespace NThreading;
using namespace NYdb;
using namespace NYdb::NTable;
+namespace {
+
template<class TEventRequest, class TEventResponse>
class TSchemaQueryYDBActor;
+}
+
template<class TEventRequest, class TEventResponse>
struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
using TRequest = TEventRequest;
using TResponse = TEventResponse;
};
+namespace {
+
using TScheduleErrorRecoverySQLGeneration =
std::function<bool(NActors::TActorId sender, const TStatus& issues)>;
@@ -496,6 +502,12 @@ bool IsPathDoesNotExistIssue(const TStatus& status) {
return status.GetIssues().ToOneLineString().Contains("Path does not exist");
}
+bool IsPathExistsIssue(const TStatus& status) {
+ return status.GetIssues().ToOneLineString().Contains("error: path exist");
+}
+
+}
+
/// Connection actors
NActors::IActor* MakeCreateConnectionActor(
const TActorId& proxyActorId,
@@ -504,12 +516,13 @@ NActors::IActor* MakeCreateConnectionActor(
TCounters& counters,
TPermissions permissions,
const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer) {
+ TSigner::TPtr signer,
+ bool withoutRollback) {
auto queryFactoryMethod =
[objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
signer = std::move(signer),
requestTimeout,
- &counters, permissions](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
+ &counters, permissions, withoutRollback](const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto& connectionContent = request->Get()->Request.content();
@@ -522,12 +535,13 @@ NActors::IActor* MakeCreateConnectionActor(
if (createSecretStatement) {
statements.push_back(
TSchemaQueryTask{.SQL = *createSecretStatement,
- .RollbackSQL = DropSecretObjectQuery(connectionContent.name())});
+ .RollbackSQL = withoutRollback ? TMaybe<TString>{} : DropSecretObjectQuery(connectionContent.name()),
+ .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}});
}
TScheduleErrorRecoverySQLGeneration alreadyExistRecoveryActorFactoryMethod =
[&request, requestTimeout, &counters, permissions](NActors::TActorId sender,
- const TStatus& status) {
+ const TStatus& status) {
if (status.GetStatus() == NYdb::EStatus::ALREADY_EXISTS ||
status.GetIssues().ToOneLineString().Contains("error: path exist")) {
TActivationContext::ActorSystem()->Register(
@@ -544,9 +558,9 @@ NActors::IActor* MakeCreateConnectionActor(
};
statements.push_back(
TSchemaQueryTask{.SQL = TString{MakeCreateExternalDataSourceQuery(
- connectionContent, objectStorageEndpoint, signer)},
- .ScheduleErrorRecoverySQLGeneration =
- alreadyExistRecoveryActorFactoryMethod});
+ connectionContent, objectStorageEndpoint, signer)},
+ .ScheduleErrorRecoverySQLGeneration = withoutRollback ? TScheduleErrorRecoverySQLGeneration{} : alreadyExistRecoveryActorFactoryMethod,
+ .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}});
return statements;
};
@@ -739,10 +753,11 @@ NActors::IActor* MakeCreateBindingActor(
TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- TPermissions permissions) {
+ TPermissions permissions,
+ bool withoutRollback) {
auto queryFactoryMethod =
[requestTimeout,
- &counters, permissions](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
+ &counters, permissions, withoutRollback](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request)
-> std::vector<TSchemaQueryTask> {
auto& bindingContent = request->Get()->Request.content();
auto& externalSourceName = request->Get()->ConnectionContent->name();
@@ -768,7 +783,8 @@ NActors::IActor* MakeCreateBindingActor(
statements.push_back(TSchemaQueryTask{
.SQL = TString{MakeCreateExternalDataTableQuery(bindingContent,
externalSourceName)},
- .ScheduleErrorRecoverySQLGeneration = alreadyExistRecoveryActorFactoryMethod});
+ .ScheduleErrorRecoverySQLGeneration = withoutRollback ? TScheduleErrorRecoverySQLGeneration{} :alreadyExistRecoveryActorFactoryMethod,
+ .ShouldSkipStepOnError = withoutRollback ? IsPathExistsIssue : TShouldSkipStepOnError{}});
return statements;
};
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
index 0ddb989e586..1454db48a6c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -26,7 +26,8 @@ NActors::IActor* MakeCreateConnectionActor(
TCounters& counters,
TPermissions permissions,
const NConfig::TCommonConfig& commonConfig,
- TSigner::TPtr signer);
+ TSigner::TPtr signer,
+ bool withoutRollback = false);
NActors::IActor* MakeModifyConnectionActor(
const NActors::TActorId& proxyActorId,
@@ -50,7 +51,8 @@ NActors::IActor* MakeCreateBindingActor(
TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- TPermissions permissions);
+ TPermissions permissions,
+ bool withoutRollback = false);
NActors::IActor* MakeModifyBindingActor(
const NActors::TActorId& proxyActorId,