diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-09-26 15:51:00 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-09-26 18:01:37 +0300 |
commit | 15644505fe64a4c7894f605ae6dd5dc7a9d49435 (patch) | |
tree | b9f412c633f54a0f44311cf008c301750778cb54 | |
parent | 337d481541c1a68f1c609751a1d0769380ae95f1 (diff) | |
download | ydb-15644505fe64a4c7894f605ae6dd5dc7a9d49435.tar.gz |
Added skip task functionality for YDB schema actor
8 files changed, 133 insertions, 37 deletions
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt index 9cf705e7cda..aec61de9c64 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(libs-control_plane_proxy-actors) target_compile_options(libs-control_plane_proxy-actors PRIVATE @@ -22,9 +28,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC fq-libs-result_formatter core-kqp-provider library-db_pool-protos + tools-enum_parser-enum_serialization_runtime ) target_sources(libs-control_plane_proxy-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp ) +generate_enum_serilization(libs-control_plane_proxy-actors + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h + INCLUDE_HEADERS + ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt index d6df134d3a9..71934ac1970 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(libs-control_plane_proxy-actors) target_compile_options(libs-control_plane_proxy-actors PRIVATE @@ -23,9 +29,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC fq-libs-result_formatter core-kqp-provider library-db_pool-protos + tools-enum_parser-enum_serialization_runtime ) target_sources(libs-control_plane_proxy-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp ) +generate_enum_serilization(libs-control_plane_proxy-actors + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h + INCLUDE_HEADERS + ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt index d6df134d3a9..71934ac1970 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(libs-control_plane_proxy-actors) target_compile_options(libs-control_plane_proxy-actors PRIVATE @@ -23,9 +29,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC fq-libs-result_formatter core-kqp-provider library-db_pool-protos + tools-enum_parser-enum_serialization_runtime ) target_sources(libs-control_plane_proxy-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp ) +generate_enum_serilization(libs-control_plane_proxy-actors + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h + INCLUDE_HEADERS + ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt index 9cf705e7cda..aec61de9c64 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt @@ -6,6 +6,12 @@ # original buildsystem will not be accepted. +get_built_tool_path( + TOOL_enum_parser_bin + TOOL_enum_parser_dependency + tools/enum_parser/enum_parser + enum_parser +) add_library(libs-control_plane_proxy-actors) target_compile_options(libs-control_plane_proxy-actors PRIVATE @@ -22,9 +28,15 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC fq-libs-result_formatter core-kqp-provider library-db_pool-protos + tools-enum_parser-enum_serialization_runtime ) target_sources(libs-control_plane_proxy-actors PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp ) +generate_enum_serilization(libs-control_plane_proxy-actors + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h + INCLUDE_HEADERS + ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make index b08611e3de0..8f316072b7c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make @@ -17,6 +17,8 @@ PEERDIR( ydb/library/db_pool/protos ) +GENERATE_ENUM_SERIALIZATION(ydb_schema_query_actor.h) + YQL_LAST_ABI_VERSION() END() diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index 9844877619f..d372afe7a17 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -1,3 +1,4 @@ +#include "ydb_schema_query_actor.h" #include "base_actor.h" #include "query_utils.h" @@ -32,10 +33,14 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { using TScheduleErrorRecoverySQLGeneration = std::function<bool(NActors::TActorId sender, const TStatus& issues)>; +using TShouldSkipStepOnError = + std::function<bool(const TStatus& issues)>; + struct TSchemaQueryTask { TString SQL; TMaybe<TString> RollbackSQL; TScheduleErrorRecoverySQLGeneration ScheduleErrorRecoverySQLGeneration; + TShouldSkipStepOnError ShouldSkipStepOnError; }; struct TEvPrivate { @@ -109,6 +114,7 @@ public: : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( proxyActorId, std::move(request), requestTimeout, counters) , Tasks(tasksFactoryMethod(Request)) + , CompletionStatuses(Tasks.size(), ETaskCompletionStatus::NONE) , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR"; @@ -136,23 +142,35 @@ public: return Nothing(); } + void NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus status) { + CompletionStatuses[CurrentTaskIndex] = status; + CurrentTaskIndex++; + ScheduleNextTask(); + } + void NormalHandleExecutionResponse( typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { const auto& executeSchemeQueryStatus = event->Get()->Result; if (executeSchemeQueryStatus.IsSuccess()) { - CurrentTaskIndex++; - ScheduleNextTask(); + NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus::SUCCESS); } else { - SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus); - auto& task = Tasks[CurrentTaskIndex]; if (task.ScheduleErrorRecoverySQLGeneration && task.ScheduleErrorRecoverySQLGeneration(SelfId(), executeSchemeQueryStatus)) { + SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus); TransitionToRecoveryState(); return; } + + if (task.ShouldSkipStepOnError && + task.ShouldSkipStepOnError(executeSchemeQueryStatus)) { + NormalRecordCurrentProgressAndScheduleNextTask(ETaskCompletionStatus::SKIPPED); + return; + } + + SaveIssues("Couldn't execute SQL script", executeSchemeQueryStatus); TransitionToRollbackState(); } } @@ -173,7 +191,8 @@ public: TMaybe<TString> RollbackSelectTask() { while (CurrentTaskIndex >= 0) { const auto& maybeRollback = Tasks[CurrentTaskIndex].RollbackSQL; - if (maybeRollback) { + if (maybeRollback && + CompletionStatuses[CurrentTaskIndex] == ETaskCompletionStatus::SUCCESS) { return maybeRollback; } CurrentTaskIndex--; @@ -186,6 +205,7 @@ public: const auto& executeSchemeQueryStatus = event->Get()->Result; if (executeSchemeQueryStatus.IsSuccess()) { + CompletionStatuses[CurrentTaskIndex] = ETaskCompletionStatus::ROLL_BACKED; CurrentTaskIndex--; ScheduleNextTask(); return; @@ -243,6 +263,7 @@ public: void TransitionToRollbackState() { CPP_LOG_I("TSchemaQueryYDBActor TransitionToRollbackState. Actor id: " << TBase::SelfId()); + CompletionStatuses[CurrentTaskIndex] = ETaskCompletionStatus::ERROR; CurrentTaskIndex--; Become(&TSchemaQueryYDBActor::RollbackStateFunc); ScheduleNextTask(); @@ -262,17 +283,15 @@ public: } void FinishSuccessfully() { - CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " - << TBase::SelfId()); + LogCurrentState("Query finished successfully"); Request->Get()->ComputeYDBOperationWasPerformed = true; TBase::SendRequestToSender(); } void SendError() { - CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " - << TBase::SelfId()); + Y_ENSURE(FirstStatus, "Status of first issue was not recorded"); + LogCurrentState("Query finished with issues"); TString errorMessage = ErrorMessageFactoryMethod(*FirstStatus, Issues); - TBase::HandleError(errorMessage, *FirstStatus, std::move(Issues)); } @@ -318,8 +337,21 @@ public: }); } + void LogCurrentState(const TString& message) { + using TEnumToString = TString(const ETaskCompletionStatus&); + CPP_LOG_I("TSchemaQueryYDBActor Logging current state. Message: '" + << message << "', Actor id: " << TBase::SelfId() + << ". CompletionStatuses: [" + << JoinMapRange(", ", + CompletionStatuses.cbegin(), + CompletionStatuses.cend(), + (TEnumToString*)ToString<ETaskCompletionStatus>) + << "], CurrentTaskIndex: " << CurrentTaskIndex); + } + private: TTasks Tasks; + std::vector<ETaskCompletionStatus> CompletionStatuses; TErrorMessageFactoryMethod ErrorMessageFactoryMethod; i32 CurrentTaskIndex = 0; TMaybe<EStatus> FirstStatus; @@ -460,6 +492,10 @@ private: TPermissions Permissions; }; +bool IsPathDoesNotExistIssue(const TStatus& status) { + return status.GetIssues().ToOneLineString().Contains("Path does not exist"); +} + /// Connection actors NActors::IActor* MakeCreateConnectionActor( const TActorId& proxyActorId, @@ -576,21 +612,23 @@ NActors::IActor* MakeModifyConnectionActor( [&oldConnectionContent](const FederatedQuery::BindingContent& binding) { return MakeCreateExternalDataTableQuery(binding, oldConnectionContent.name()); - })}); + }), + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}); }; statements.push_back(TSchemaQueryTask{ .SQL = TString{MakeDeleteExternalDataSourceQuery(oldConnectionContent.name())}, - .RollbackSQL = TString{MakeCreateExternalDataSourceQuery( - oldConnectionContent, objectStorageEndpoint, signer)}}); + .RollbackSQL = TString{MakeCreateExternalDataSourceQuery( + oldConnectionContent, objectStorageEndpoint, signer)}, + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}); if (dropOldSecret) { - statements.push_back( - TSchemaQueryTask{.SQL = *dropOldSecret, - .RollbackSQL = CreateSecretObjectQuery( - oldConnectionContent.setting(), - oldConnectionContent.name(), - signer)}); + statements.push_back(TSchemaQueryTask{ + .SQL = *dropOldSecret, + .RollbackSQL = CreateSecretObjectQuery(oldConnectionContent.setting(), + oldConnectionContent.name(), + signer), + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}); } if (createNewSecret) { statements.push_back( @@ -621,7 +659,7 @@ NActors::IActor* MakeModifyConnectionActor( [](const FederatedQuery::BindingContent& binding) { return MakeDeleteExternalDataTableQuery(binding.name()); })}); - }; + } return statements; }; @@ -662,16 +700,18 @@ NActors::IActor* MakeDeleteConnectionActor( std::vector<TSchemaQueryTask> statements = {TSchemaQueryTask{ .SQL = TString{MakeDeleteExternalDataSourceQuery(connectionContent.name())}, - .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent, + .RollbackSQL = MakeCreateExternalDataSourceQuery(connectionContent, objectStorageEndpoint, - signer)}}; + signer), + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}}; if (dropSecret) { statements.push_back( - TSchemaQueryTask{.SQL = *dropSecret, - .RollbackSQL = CreateSecretObjectQuery( - connectionContent.setting(), - connectionContent.name(), - signer)}); + TSchemaQueryTask{.SQL = *dropSecret, + .RollbackSQL = + CreateSecretObjectQuery(connectionContent.setting(), + connectionContent.name(), + signer), + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}); } return statements; }; @@ -770,7 +810,9 @@ NActors::IActor* MakeModifyBindingActor( auto createNewEntities = MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName); - return {TSchemaQueryTask{.SQL = deleteOldEntities, .RollbackSQL = createOldEntities}, + return {TSchemaQueryTask{.SQL = deleteOldEntities, + .RollbackSQL = createOldEntities, + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}, TSchemaQueryTask{.SQL = createNewEntities}}; }; @@ -797,8 +839,10 @@ NActors::IActor* MakeDeleteBindingActor( TDuration requestTimeout, TCounters& counters) { auto queryFactoryMethod = - [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString { - return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName); + [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { + return {{.SQL = MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName), + .ShouldSkipStepOnError = IsPathDoesNotExistIssue}}; }; auto errorMessageFactoryMethod = [](const EStatus status, diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h index 2c992abcf39..0ddb989e586 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -10,6 +10,14 @@ namespace NFq { namespace NPrivate { +enum class ETaskCompletionStatus { + NONE, + SUCCESS, + SKIPPED, + ROLL_BACKED, + ERROR +}; + /// Connection manipulation actors NActors::IActor* MakeCreateConnectionActor( const NActors::TActorId& proxyActorId, diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index 54f49d06d07..184700f6563 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -310,8 +310,7 @@ struct TEvControlPlaneProxy { TControlPlaneRequest<FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>, FederatedQuery::CreateConnectionRequest, EvCreateConnectionRequest>::TBaseControlPlaneRequest; - - bool ComputeYDBIsAlreadyExistFlag = false; + }; template<> @@ -324,7 +323,6 @@ struct TEvControlPlaneProxy { FederatedQuery::ModifyConnectionRequest, EvModifyConnectionRequest>::TBaseControlPlaneRequest; - bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent; // ListBindings bool OldBindingNamesDiscoveryFinished = false; @@ -344,7 +342,6 @@ struct TEvControlPlaneProxy { FederatedQuery::DeleteConnectionRequest, EvDeleteConnectionRequest>::TBaseControlPlaneRequest; - bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::ConnectionContent> ConnectionContent; }; @@ -358,7 +355,6 @@ struct TEvControlPlaneProxy { FederatedQuery::CreateBindingRequest, EvCreateBindingRequest>::TBaseControlPlaneRequest; - bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<TString> ConnectionName; }; @@ -372,7 +368,6 @@ struct TEvControlPlaneProxy { FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest>::TBaseControlPlaneRequest; - bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<FederatedQuery::BindingContent> OldBindingContent; TMaybe<TString> ConnectionName; }; @@ -387,7 +382,6 @@ struct TEvControlPlaneProxy { FederatedQuery::DeleteBindingRequest, EvDeleteBindingRequest>::TBaseControlPlaneRequest; - bool ComputeYDBIsAlreadyExistFlag = false; TMaybe<TString> OldBindingName; }; }; |