aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-09-26 15:51:00 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-09-26 18:01:37 +0300
commit15644505fe64a4c7894f605ae6dd5dc7a9d49435 (patch)
treeb9f412c633f54a0f44311cf008c301750778cb54
parent337d481541c1a68f1c609751a1d0769380ae95f1 (diff)
downloadydb-15644505fe64a4c7894f605ae6dd5dc7a9d49435.tar.gz
Added skip task functionality for YDB schema actor
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt12
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt12
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ya.make2
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp104
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h8
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h8
8 files changed, 133 insertions, 37 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
index 9cf705e7cda..aec61de9c64 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(libs-control_plane_proxy-actors)
target_compile_options(libs-control_plane_proxy-actors PRIVATE
@@ -22,9 +28,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
fq-libs-result_formatter
core-kqp-provider
library-db_pool-protos
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(libs-control_plane_proxy-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
)
+generate_enum_serilization(libs-control_plane_proxy-actors
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
index d6df134d3a9..71934ac1970 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(libs-control_plane_proxy-actors)
target_compile_options(libs-control_plane_proxy-actors PRIVATE
@@ -23,9 +29,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
fq-libs-result_formatter
core-kqp-provider
library-db_pool-protos
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(libs-control_plane_proxy-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
)
+generate_enum_serilization(libs-control_plane_proxy-actors
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
index d6df134d3a9..71934ac1970 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(libs-control_plane_proxy-actors)
target_compile_options(libs-control_plane_proxy-actors PRIVATE
@@ -23,9 +29,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
fq-libs-result_formatter
core-kqp-provider
library-db_pool-protos
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(libs-control_plane_proxy-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
)
+generate_enum_serilization(libs-control_plane_proxy-actors
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
index 9cf705e7cda..aec61de9c64 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
@@ -6,6 +6,12 @@
# original buildsystem will not be accepted.
+get_built_tool_path(
+ TOOL_enum_parser_bin
+ TOOL_enum_parser_dependency
+ tools/enum_parser/enum_parser
+ enum_parser
+)
add_library(libs-control_plane_proxy-actors)
target_compile_options(libs-control_plane_proxy-actors PRIVATE
@@ -22,9 +28,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
fq-libs-result_formatter
core-kqp-provider
library-db_pool-protos
+ tools-enum_parser-enum_serialization_runtime
)
target_sources(libs-control_plane_proxy-actors PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
)
+generate_enum_serilization(libs-control_plane_proxy-actors
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+ INCLUDE_HEADERS
+ ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
index b08611e3de0..8f316072b7c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
@@ -17,6 +17,8 @@ PEERDIR(
ydb/library/db_pool/protos
)
+GENERATE_ENUM_SERIALIZATION(ydb_schema_query_actor.h)
+
YQL_LAST_ABI_VERSION()
END()
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index 9844877619f..d372afe7a17 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -1,3 +1,4 @@
+#include "ydb_schema_query_actor.h"
#include "base_actor.h"
#include "query_utils.h"
@@ -32,10 +33,14 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
using TScheduleErrorRecoverySQLGeneration =
std::function<bool(NActors::TActorId sender, const TStatus& issues)>;
+using TShouldSkipStepOnError =
+ std::function<bool(const TStatus& issues)>;
+
struct TSchemaQueryTask {
TString SQL;
TMaybe<TString> RollbackSQL;
TScheduleErrorRecoverySQLGeneration ScheduleErrorRecoverySQLGeneration;
+ TShouldSkipStepOnError ShouldSkipStepOnError;
};
struct TEvPrivate {
@@ -109,6 +114,7 @@ public:
: TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
proxyActorId, std::move(request), requestTimeout, counters)
, Tasks(tasksFactoryMethod(Request))
+ , CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE)
, ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR";
@@ -136,23 +142,35 @@ public:
return Nothing();
}
+ void NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus status) {
+ CompletionStatuses[CurrentTaskIndex] = status;
+ CurrentTaskIndex++;
+ ScheduleNextTask();
+ }
+
void NormalHandleExecutionResponse(
typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
const auto& executeSchemeQueryStatus = event->Get()->Result;
if (executeSchemeQueryStatus.IsSuccess()) {
- CurrentTaskIndex++;
- ScheduleNextTask();
+ NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus::SUCCESS);
} else {
- SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus);
-
auto& task = Tasks[CurrentTaskIndex];
if (task.ScheduleErrorRecoverySQLGeneration &&
task.ScheduleErrorRecoverySQLGeneration(SelfId(),
executeSchemeQueryStatus)) {
+ SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus);
TransitionToRecoveryState();
return;
}
+
+ if (task.ShouldSkipStepOnError &&
+ task.ShouldSkipStepOnError(executeSchemeQueryStatus)) {
+ NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus::SKIPPED);
+ return;
+ }
+
+ SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus);
TransitionToRollbackState();
}
}
@@ -173,7 +191,8 @@ public:
TMaybe<TString> RollbackSelectTask() {
while (CurrentTaskIndex >= 0) {
const auto& maybeRollback = Tasks[CurrentTaskIndex].RollbackSQL;
- if (maybeRollback) {
+ if (maybeRollback &&
+ CompletionStatuses[CurrentTaskIndex] == ETaskCompletionStatus::SUCCESS) {
return maybeRollback;
}
CurrentTaskIndex--;
@@ -186,6 +205,7 @@ public:
const auto& executeSchemeQueryStatus = event->Get()->Result;
if (executeSchemeQueryStatus.IsSuccess()) {
+ CompletionStatuses[CurrentTaskIndex] = ETaskCompletionStatus::ROLL_BACKED;
CurrentTaskIndex--;
ScheduleNextTask();
return;
@@ -243,6 +263,7 @@ public:
void TransitionToRollbackState() {
CPP_LOG_I("TSchemaQueryYDBActor TransitionToRollbackState. Actor id: "
<< TBase::SelfId());
+ CompletionStatuses[CurrentTaskIndex] = ETaskCompletionStatus::ERROR;
CurrentTaskIndex--;
Become(&TSchemaQueryYDBActor::RollbackStateFunc);
ScheduleNextTask();
@@ -262,17 +283,15 @@ public:
}
void FinishSuccessfully() {
- CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: "
- << TBase::SelfId());
+ LogCurrentState("Query finished successfully");
Request->Get()->ComputeYDBOperationWasPerformed = true;
TBase::SendRequestToSender();
}
void SendError() {
- CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: "
- << TBase::SelfId());
+ Y_ENSURE(FirstStatus, "Status of first issue was not recorded");
+ LogCurrentState("Query finished with issues");
TString errorMessage = ErrorMessageFactoryMethod(*FirstStatus, Issues);
-
TBase::HandleError(errorMessage, *FirstStatus, std::move(Issues));
}
@@ -318,8 +337,21 @@ public:
});
}
+ void LogCurrentState(const TString& message) {
+ using TEnumToString = TString(const ETaskCompletionStatus&);
+ CPP_LOG_I("TSchemaQueryYDBActor Logging current state. Message: '"
+ << message << "', Actor id: " << TBase::SelfId()
+ << ". CompletionStatuses: ["
+ << JoinMapRange(", ",
+ CompletionStatuses.cbegin(),
+ CompletionStatuses.cend(),
+ (TEnumToString*)ToString<ETaskCompletionStatus>)
+ << "], CurrentTaskIndex: " << CurrentTaskIndex);
+ }
+
private:
TTasks Tasks;
+ std::vector<ETaskCompletionStatus> CompletionStatuses;
TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
i32 CurrentTaskIndex = 0;
TMaybe<EStatus> FirstStatus;
@@ -460,6 +492,10 @@ private:
TPermissions Permissions;
};
+bool IsPathDoesNotExistIssue(const TStatus& status) {
+ return status.GetIssues().ToOneLineString().Contains("Path does not exist");
+}
+
/// Connection actors
NActors::IActor* MakeCreateConnectionActor(
const TActorId& proxyActorId,
@@ -576,21 +612,23 @@ NActors::IActor* MakeModifyConnectionActor(
[&oldConnectionContent](const FederatedQuery::BindingContent& binding) {
return MakeCreateExternalDataTableQuery(binding,
oldConnectionContent.name());
- })});
+ }),
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue});
};
statements.push_back(TSchemaQueryTask{
.SQL = TString{MakeDeleteExternalDataSourceQuery(oldConnectionContent.name())},
- .RollbackSQL = TString{MakeCreateExternalDataSourceQuery(
- oldConnectionContent, objectStorageEndpoint, signer)}});
+ .RollbackSQL = TString{MakeCreateExternalDataSourceQuery(
+ oldConnectionContent, objectStorageEndpoint, signer)},
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue});
if (dropOldSecret) {
- statements.push_back(
- TSchemaQueryTask{.SQL = *dropOldSecret,
- .RollbackSQL = CreateSecretObjectQuery(
- oldConnectionContent.setting(),
- oldConnectionContent.name(),
- signer)});
+ statements.push_back(TSchemaQueryTask{
+ .SQL = *dropOldSecret,
+ .RollbackSQL = CreateSecretObjectQuery(oldConnectionContent.setting(),
+ oldConnectionContent.name(),
+ signer),
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue});
}
if (createNewSecret) {
statements.push_back(
@@ -621,7 +659,7 @@ NActors::IActor* MakeModifyConnectionActor(
[](const FederatedQuery::BindingContent& binding) {
return MakeDeleteExternalDataTableQuery(binding.name());
})});
- };
+ }
return statements;
};
@@ -662,16 +700,18 @@ NActors::IActor* MakeDeleteConnectionActor(
std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{
.SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())},
- .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent,
+ .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent,
objectStorageEndpoint,
- signer)}};
+ signer),
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue}};
if (dropSecret) {
statements.push_back(
- TSchemaQueryTask{.SQL = *dropSecret,
- .RollbackSQL = CreateSecretObjectQuery(
- connectionContent.setting(),
- connectionContent.name(),
- signer)});
+ TSchemaQueryTask{.SQL = *dropSecret,
+ .RollbackSQL =
+ CreateSecretObjectQuery(connectionContent.setting(),
+ connectionContent.name(),
+ signer),
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue});
}
return statements;
};
@@ -770,7 +810,9 @@ NActors::IActor* MakeModifyBindingActor(
auto createNewEntities =
MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName);
- return {TSchemaQueryTask{.SQL = deleteOldEntities, .RollbackSQL = createOldEntities},
+ return {TSchemaQueryTask{.SQL = deleteOldEntities,
+ .RollbackSQL = createOldEntities,
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue},
TSchemaQueryTask{.SQL = createNewEntities}};
};
@@ -797,8 +839,10 @@ NActors::IActor* MakeDeleteBindingActor(
TDuration requestTimeout,
TCounters& counters) {
auto queryFactoryMethod =
- [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString {
- return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName);
+ [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
+ return {{.SQL = MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName),
+ .ShouldSkipStepOnError = IsPathDoesNotExistIssue}};
};
auto errorMessageFactoryMethod = [](const EStatus status,
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
index 2c992abcf39..0ddb989e586 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -10,6 +10,14 @@
namespace NFq {
namespace NPrivate {
+enum class ETaskCompletionStatus {
+ NONE,
+ SUCCESS,
+ SKIPPED,
+ ROLL_BACKED,
+ ERROR
+};
+
/// Connection manipulation actors
NActors::IActor* MakeCreateConnectionActor(
const NActors::TActorId& proxyActorId,
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index 54f49d06d07..184700f6563 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -310,8 +310,7 @@ struct TEvControlPlaneProxy {
TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>,
FederatedQuery::CreateConnectionRequest,
EvCreateConnectionRequest>::TBaseControlPlaneRequest;
-
- bool ComputeYDBIsAlreadyExistFlag = false;
+
};
template<>
@@ -324,7 +323,6 @@ struct TEvControlPlaneProxy {
FederatedQuery::ModifyConnectionRequest,
EvModifyConnectionRequest>::TBaseControlPlaneRequest;
- bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent;
// ListBindings
bool OldBindingNamesDiscoveryFinished = false;
@@ -344,7 +342,6 @@ struct TEvControlPlaneProxy {
FederatedQuery::DeleteConnectionRequest,
EvDeleteConnectionRequest>::TBaseControlPlaneRequest;
- bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::ConnectionContent> ConnectionContent;
};
@@ -358,7 +355,6 @@ struct TEvControlPlaneProxy {
FederatedQuery::CreateBindingRequest,
EvCreateBindingRequest>::TBaseControlPlaneRequest;
- bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<TString> ConnectionName;
};
@@ -372,7 +368,6 @@ struct TEvControlPlaneProxy {
FederatedQuery::ModifyBindingRequest,
EvModifyBindingRequest>::TBaseControlPlaneRequest;
- bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<FederatedQuery::BindingContent> OldBindingContent;
TMaybe<TString> ConnectionName;
};
@@ -387,7 +382,6 @@ struct TEvControlPlaneProxy {
FederatedQuery::DeleteBindingRequest,
EvDeleteBindingRequest>::TBaseControlPlaneRequest;
- bool ComputeYDBIsAlreadyExistFlag = false;
TMaybe<TString> OldBindingName;
};
};