diff options
author | auzhegov <auzhegov@yandex-team.com> | 2023-07-31 18:02:17 +0300 |
---|---|---|
committer | auzhegov <auzhegov@yandex-team.com> | 2023-07-31 18:02:17 +0300 |
commit | 3ca8b54c96e09eb2b65be7f09675623438d559c7 (patch) | |
tree | f1c54a03907031b42e2cb03d18bb7caebf3eaa25 | |
parent | 18c340161d3f523e1c899b606b8b370b2d7a1264 (diff) | |
download | ydb-3ca8b54c96e09eb2b65be7f09675623438d559c7.tar.gz |
Improved modify operations
Initial version
23 files changed, 513 insertions, 388 deletions
diff --git a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt index a6a0765c5d..af28035aad 100644 --- a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt @@ -26,5 +26,6 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt index e88b073786..eba7bc83ba 100644 --- a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt @@ -27,5 +27,6 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt index e88b073786..eba7bc83ba 100644 --- a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt @@ -27,5 +27,6 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) diff --git a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt index a6a0765c5d..af28035aad 100644 --- a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt @@ -26,5 +26,6 @@ target_sources(fq-libs-common PRIVATE ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp + ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp ) diff --git a/ydb/core/fq/libs/common/util.cpp b/ydb/core/fq/libs/common/util.cpp new file mode 100644 index 0000000000..053de08889 --- /dev/null +++ b/ydb/core/fq/libs/common/util.cpp @@ -0,0 +1,38 @@ +#include "util.h" + +#include <util/generic/string.h> +#include <util/string/builder.h> +#include <util/string/subst.h> + +namespace NFq { + +TString EscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith) { + auto escapedValue = value; + SubstGlobal(escapedValue, enclosingSeq, replaceWith); + return escapedValue; +} + +TString EscapeString(const TString& value, char enclosingChar) { + auto escapedValue = value; + SubstGlobal(escapedValue, + TString{enclosingChar}, + TStringBuilder{} << '\\' << enclosingChar); + return escapedValue; +} + +TString EncloseAndEscapeString(const TString& value, char enclosingChar) { + return TStringBuilder{} << enclosingChar << EscapeString(value, enclosingChar) + << enclosingChar; +} + +TString EncloseAndEscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith) { + return TStringBuilder{} << enclosingSeq + << EscapeString(value, enclosingSeq, replaceWith) + << enclosingSeq; +} + +} // namespace NFq diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h index e61a7c86b3..b7aedf6e6c 100644 --- a/ydb/core/fq/libs/common/util.h +++ b/ydb/core/fq/libs/common/util.h @@ -5,7 +5,10 @@ #include <google/protobuf/repeated_field.h> +#include <library/cpp/iterator/mapped.h> +#include <util/generic/string.h> #include <util/generic/vector.h> +#include <util/string/join.h> namespace NFq { @@ -22,4 +25,25 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl return { field.begin(), field.end() }; } +template <typename TIter, typename TFunc> +TString JoinMapRange(TString delim, const TIter beg, const TIter end, const TFunc func) { + auto mappedBegin = + MakeMappedIterator(beg, func); + auto mappedEnd = + MakeMappedIterator(end, func); + return JoinRange(delim, mappedBegin, mappedEnd); +} + +TString EscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith); + +TString EscapeString(const TString& value, char enclosingChar); + +TString EncloseAndEscapeString(const TString& value, char enclosingChar); + +TString EncloseAndEscapeString(const TString& value, + const TString& enclosingSeq, + const TString& replaceWith); + } // namespace NFq diff --git a/ydb/core/fq/libs/common/ya.make b/ydb/core/fq/libs/common/ya.make index f30242480e..3246b6455e 100644 --- a/ydb/core/fq/libs/common/ya.make +++ b/ydb/core/fq/libs/common/ya.make @@ -7,6 +7,7 @@ SRCS( debug_info.cpp entity_id.cpp entity_id.h + util.cpp rows_proto_splitter.cpp rows_proto_splitter.h ) diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt index 870e205dcc..9cf705e7cd 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC yutil contrib-libs-fmt library-cpp-iterator + fq-libs-common libs-control_plane_proxy-events libs-control_plane_storage-events fq-libs-result_formatter diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt index 5429166e90..d6df134d3a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt @@ -17,6 +17,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC yutil contrib-libs-fmt library-cpp-iterator + fq-libs-common libs-control_plane_proxy-events libs-control_plane_storage-events fq-libs-result_formatter diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt index 5429166e90..d6df134d3a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt @@ -17,6 +17,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC yutil contrib-libs-fmt library-cpp-iterator + fq-libs-common libs-control_plane_proxy-events libs-control_plane_storage-events fq-libs-result_formatter diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt index 870e205dcc..9cf705e7cd 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt +++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt @@ -16,6 +16,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC yutil contrib-libs-fmt library-cpp-iterator + fq-libs-common libs-control_plane_proxy-events libs-control_plane_storage-events fq-libs-result_formatter diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp index 168e4f6fa2..d2e7e8bfa9 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp @@ -1,5 +1,6 @@ #include "control_plane_storage_requester_actor.h" #include "base_actor.h" +#include "util/generic/maybe.h" #include <contrib/libs/fmt/include/fmt/format.h> #include <library/cpp/actors/core/event.h> @@ -227,7 +228,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName( auto cpsRequestFactory = [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) { FederatedQuery::DescribeConnectionRequest result; - auto connectionId = *event->Get()->ConnectionId; + auto connectionId = event->Get()->OldBindingContent->connection_id(); result.set_connection_id(connectionId); return result; }; @@ -273,8 +274,7 @@ NActors::IActor* MakeDiscoverYDBBindingName( auto entityNameExtractorFactoryMethod = [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event, const FederatedQuery::DescribeBindingResult& result) { - event->Get()->OldBindingName = result.binding().content().name(); - event->Get()->ConnectionId = result.binding().content().connection_id(); + event->Get()->OldBindingContent = result.binding().content(); }; return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest, @@ -327,5 +327,99 @@ NActors::IActor* MakeDiscoverYDBBindingName( errorMessageFactoryMethod, entityNameExtractorFactoryMethod); } + +NActors::IActor* MakeListBindingIds( + const TActorId sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) { + FederatedQuery::ListBindingsRequest result; + auto connectionId = event->Get()->Request.connection_id(); + result.mutable_filter()->set_connection_id(connectionId); + result.set_limit(100); + if (event->Get()->NextListingBindingsToken) { + result.set_page_token(*event->Get()->NextListingBindingsToken); + } + return result; + }; + + auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + return "Couldn't resolve binding id(s)"; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event, + const FederatedQuery::ListBindingsResult& result) { + for (auto& binding: result.binding()) { + event->Get()->OldBindingIds.emplace_back(binding.meta().id()); + } + + TString nextPageToken = result.next_page_token(); + if (nextPageToken == "") { + event->Get()->NextListingBindingsToken = Nothing(); + event->Get()->OldBindingNamesDiscoveryFinished = true; + } else { + event->Get()->NextListingBindingsToken = nextPageToken; + } + }; + + return new TControlPlaneStorageRequesterActor< + TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse, + TEvControlPlaneStorage::TEvListBindingsRequest, + TEvControlPlaneStorage::TEvListBindingsResponse>(sender, + request, + requestTimeout, + counters.GetCommonCounters( + RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + errorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + +NActors::IActor* MakeDescribeListedBinding( + const TActorId sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions) { + auto cpsRequestFactory = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) { + auto bindingId = event->Get()->OldBindingIds[event->Get()->OldBindingContents.size()]; + + FederatedQuery::DescribeBindingRequest result; + result.set_binding_id(bindingId); + return result; + }; + + auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString { + Y_UNUSED(issues); + return "Couldn't resolve binding content"; + }; + auto entityNameExtractorFactoryMethod = + [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event, + const FederatedQuery::DescribeBindingResult& result) { + event->Get()->OldBindingContents.push_back(result.binding().content()); + }; + + return new TControlPlaneStorageRequesterActor< + TEvControlPlaneProxy::TEvModifyConnectionRequest, + TEvControlPlaneProxy::TEvModifyConnectionResponse, + TEvControlPlaneStorage::TEvDescribeBindingRequest, + TEvControlPlaneStorage::TEvDescribeBindingResponse>( + sender, + request, + requestTimeout, + counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY), + permissions, + cpsRequestFactory, + errorMessageFactoryMethod, + entityNameExtractorFactoryMethod); +} + } // namespace NPrivate } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h index 24995807ea..d9801a1d07 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h @@ -54,5 +54,22 @@ NActors::IActor* MakeDiscoverYDBBindingName( TCounters& counters, TDuration requestTimeout, TPermissions permissions); + +/// ModifyConnection + +NActors::IActor* MakeListBindingIds( + const TActorId sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + +NActors::IActor* MakeDescribeListedBinding( + const TActorId sender, + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request, + TCounters& counters, + TDuration requestTimeout, + TPermissions permissions); + } // namespace NPrivate } // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp index 9859dac2f3..494e44034a 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp @@ -1,47 +1,16 @@ #include "query_utils.h" -#include "ydb/public/api/protos/draft/fq.pb.h" #include <contrib/libs/fmt/include/fmt/format.h> -#include <library/cpp/iterator/mapped.h> #include <util/generic/maybe.h> -#include <util/string/join.h> +#include <ydb/core/fq/libs/common/util.h> #include <ydb/core/fq/libs/result_formatter/result_formatter.h> #include <ydb/core/kqp/provider/yql_kikimr_results.h> +#include <ydb/public/api/protos/draft/fq.pb.h> namespace NFq { namespace NPrivate { -TString EscapeString(const TString& value, - const TString& enclosingSeq, - const TString& replaceWith) { - auto escapedValue = value; - SubstGlobal(escapedValue, enclosingSeq, replaceWith); - return escapedValue; -} -TString EscapeString(const TString& value, char enclosingChar) { - auto escapedValue = value; - SubstGlobal(escapedValue, - TString{enclosingChar}, - TStringBuilder{} << '\\' << enclosingChar); - return escapedValue; -} - -TString EncloseAndEscapeString(const TString& value, char enclosingChar) { - return TStringBuilder{} << enclosingChar - << EscapeString(value, - enclosingChar) - << enclosingChar; -} - -TString EncloseAndEscapeString(const TString& value, - const TString& enclosingSeq, - const TString& replaceWith) { - return TStringBuilder{} << enclosingSeq - << EscapeString(value, enclosingSeq, replaceWith) - << enclosingSeq; -} - TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, const TString& connectionName) { using namespace fmt::literals; @@ -52,7 +21,7 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c // Schema NYql::TExprContext context; - auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString { + auto columnsTransformFunction = [&context](const Ydb::Column& column) -> TString { NYdb::TTypeParser typeParser(column.type()); auto node = MakeType(typeParser, context); auto typeName = NYql::FormatType(node); @@ -63,10 +32,6 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c "columnType"_a = typeName, "notNull"_a = notNull); }; - auto columnsBegin = - MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction); - auto columnsEnd = - MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction); // WithOptions auto withOptions = std::unordered_map<TString, TString>{}; @@ -85,17 +50,14 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c } if (!subset.partitioned_by().empty()) { - auto stringEscapeMapper = [](const TString& value) { - return EscapeString(value, '"'); - }; - auto partitionBy = TStringBuilder{} << "\"[" - << JoinRange(", ", - MakeMappedIterator(subset.partitioned_by().begin(), - stringEscapeMapper), - MakeMappedIterator(subset.partitioned_by().end(), - stringEscapeMapper)) + << JoinMapRange(", ", + subset.partitioned_by().begin(), + subset.partitioned_by().end(), + [](const TString& value) { + return EscapeString(value, '"'); + }) << "]\""; withOptions.insert({"PARTITIONED_BY", partitionBy}); } @@ -105,15 +67,6 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c EncloseAndEscapeString(kv.second, '"')}); } - auto concatEscapedKeyValueMapper = [](const std::pair<TString, TString>& kv) -> TString { - return TStringBuilder{} << " " << kv.first << " = " << kv.second; - }; - - auto withOptionsBegin = - MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper); - auto withOptionsEnd = - MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper); - return fmt::format( R"( CREATE EXTERNAL TABLE {externalTableName} ( @@ -122,8 +75,17 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c {withOptions} );)", "externalTableName"_a = EncloseAndEscapeString(bindingName, '`'), - "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd), - "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd)); + "columns"_a = JoinMapRange(",\n", + subset.schema().column().begin(), + subset.schema().column().end(), + columnsTransformFunction), + "withOptions"_a = JoinMapRange(",\n", + withOptions.begin(), + withOptions.end(), + [](const std::pair<TString, TString>& kv) -> TString { + return TStringBuilder{} << " " << kv.first + << " = " << kv.second; + })); } TString SignAccountId(const TString& id, const TSigner::TPtr& signer) { diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h index b5824ce685..480ee99ee7 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h @@ -7,10 +7,6 @@ namespace NFq { namespace NPrivate { -TString EscapeString(const TString& value, char enclosingChar); - -TString EncloseAndEscapeString(const TString& value, char enclosingChar); - TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content, const TString& connectionName); diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/utils.h new file mode 100644 index 0000000000..42701c3311 --- /dev/null +++ b/ydb/core/fq/libs/control_plane_proxy/actors/utils.h @@ -0,0 +1,30 @@ +#pragma once + +#include <ydb/core/fq/libs/compute/common/config.h> +#include <ydb/core/fq/libs/shared_resources/shared_resources.h> +#include <ydb/core/fq/libs/ydb/ydb.h> +#include <ydb/public/sdk/cpp/client/ydb_table/table.h> + +namespace NFq { + +template<typename T> +std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient( + const T& ev, + const NFq::TComputeConfig& computeConfig, + const TYqSharedResources::TPtr& yqSharedResources, + const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { + auto scope = "yandexcloud://" + ev->Get()->FolderId; + NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(scope); + + computeConnection.set_endpoint(ev->Get()->ComputeDatabase->connection().endpoint()); + computeConnection.set_database(ev->Get()->ComputeDatabase->connection().database()); + computeConnection.set_usessl(ev->Get()->ComputeDatabase->connection().usessl()); + + auto tableSettings = + GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection, + credentialsProviderFactory); + return std::make_shared<NYdb::NTable::TTableClient>(yqSharedResources->UserSpaceYdbDriver, + tableSettings); +} + +} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make index 212a8d0f3c..b08611e3de 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make @@ -9,6 +9,7 @@ SRCS( PEERDIR( contrib/libs/fmt library/cpp/iterator + ydb/core/fq/libs/common ydb/core/fq/libs/control_plane_proxy/events ydb/core/fq/libs/control_plane_storage/events ydb/core/fq/libs/result_formatter diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h deleted file mode 100644 index 3345263e6d..0000000000 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h +++ /dev/null @@ -1,181 +0,0 @@ -#pragma once - -#include <ydb/core/fq/libs/actors/logging/log.h> -#include <ydb/core/fq/libs/compute/common/config.h> -#include <ydb/core/fq/libs/config/yq_issue.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/counters.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h> -#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h> -#include <ydb/core/fq/libs/control_plane_proxy/events/events.h> -#include <ydb/core/fq/libs/control_plane_storage/events/events.h> -#include <ydb/core/fq/libs/ydb/ydb.h> -#include <ydb/library/yql/public/issue/yql_issue.h> -#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h> - -#include <contrib/libs/fmt/include/fmt/format.h> -#include <library/cpp/actors/core/actor.h> -#include <library/cpp/actors/core/actor_bootstrapped.h> -#include <library/cpp/lwtrace/mon/mon_lwtrace.h> - -#include <util/generic/maybe.h> -#include <util/generic/ptr.h> -#include <util/string/join.h> -#include <util/system/types.h> - -namespace NFq { -namespace NPrivate { - -using namespace NActors; -using namespace NFq::NConfig; -using namespace NKikimr; -using namespace NThreading; -using namespace NYdb; -using namespace NYdb::NTable; - -using TTableClientPtr = std::shared_ptr<NYdb::NTable::TTableClient>; - -struct TEvPrivate { - enum EEv { - EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE), - EvEnd - }; - - struct TEvCreateSessionResponse : - NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> { - TCreateSessionResult Result; - - TEvCreateSessionResponse(TCreateSessionResult result) - : Result(std::move(result)) { } - }; - - static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE), - "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)"); -}; - -template<typename TEventRequest, typename TEventResponse> -class TCreateYdbComputeSessionActor : - public TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> { -private: - using TBase = TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>; - using TBase::SelfId; - using TBase::SendRequestToSender; - using TBase::Request; - using TEventRequestPtr = typename TEventRequest::TPtr; - -public: - TCreateYdbComputeSessionActor( - const TActorId& sender, - const TEventRequestPtr request, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters, - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) - : TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>( - sender, std::move(request), requestTimeout, counters) - , ComputeConfig(computeConfig) - , YqSharedResources(yqSharedResources) - , CredentialsProviderFactory(credentialsProviderFactory) { } - - static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB"; - - void BootstrapImpl() override { - InitiateConnectionCreation(); - } - - STRICT_STFUNC(StateFunc, - cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); - hFunc(TEvPrivate::TEvCreateSessionResponse, Handle); - ) - - void InitiateConnectionCreation() { - CPP_LOG_D("TCreateYdbComputeSessionActor InitiateConnectionCreation called. Actor id: " - << SelfId()); - auto tableClient = CreateNewTableClient(ComputeConfig, - YqSharedResources, - CredentialsProviderFactory); - tableClient->CreateSession().Subscribe( - [actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncCreateSessionResult& future) { - actorSystem->Send(self, - new TEvPrivate::TEvCreateSessionResponse( - future.GetValueSync())); - }); - } - - void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) { - CPP_LOG_D("TCreateYdbComputeSessionActor " - "Handle for TEvCreateSessionResponse called. Actor id: " - << SelfId()); - const auto& createSessionResult = event->Get()->Result; - if (!createSessionResult.IsSuccess()) { - TBase::HandleError("Couldn't create YDB session", - createSessionResult.GetStatus(), - createSessionResult.GetIssues()); - return; - } - - CPP_LOG_D("TCreateYdbComputeSessionActor Session was successfully acquired. Actor id: " - << SelfId()); - Request->Get()->YDBSession = createSessionResult.GetSession(); - SendRequestToSender(); - } - -private: - const NFq::TComputeConfig ComputeConfig; - const TYqSharedResources::TPtr YqSharedResources; - const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory; - - TTableClientPtr CreateNewTableClient( - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - - auto scope = "yandexcloud://" + Request->Get()->FolderId; - NFq::NConfig::TYdbStorageConfig computeConnection = - computeConfig.GetConnection(scope); - computeConnection.set_endpoint( - Request->Get()->ComputeDatabase->connection().endpoint()); - computeConnection.set_database( - Request->Get()->ComputeDatabase->connection().database()); - computeConnection.set_usessl( - Request->Get()->ComputeDatabase->connection().usessl()); - - auto tableSettings = - GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection, - credentialsProviderFactory); - return std::make_shared<NYdb::NTable::TTableClient>( - yqSharedResources->UserSpaceYdbDriver, tableSettings); - } -}; - -template<typename TEventRequest, typename TEventResponse> -struct TBaseActorTypeTag<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> { - using TRequest = TEventRequest; - using TResponse = TEventResponse; -}; - -template<typename TEventRequest, typename TEventResponse> -NActors::IActor* MakeComputeYDBSessionActor( - const NActors::TActorId sender, - const typename TEventRequest::TPtr request, - TDuration requestTimeout, - const NPrivate::TRequestCommonCountersPtr& counters, - const NFq::TComputeConfig& computeConfig, - const TYqSharedResources::TPtr& yqSharedResources, - const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) { - return new NPrivate::TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>( - sender, - std::move(request), - requestTimeout, - counters, - computeConfig, - yqSharedResources, - credentialsProviderFactory); -} - -} // namespace NPrivate -} // namespace NFq diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp index dd5e2a634b..fbfeba1bbe 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp @@ -2,7 +2,11 @@ #include "query_utils.h" #include <contrib/libs/fmt/include/fmt/format.h> +#include <util/string/join.h> +#include <ydb/core/fq/libs/common/util.h> +#include <ydb/core/fq/libs/config/yq_issue.h> #include <ydb/core/fq/libs/control_plane_proxy/events/events.h> +#include <ydb/public/api/protos/draft/fq.pb.h> #include <ydb/public/sdk/cpp/client/ydb_table/table.h> namespace NFq { @@ -24,6 +28,11 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { using TResponse = TEventResponse; }; +struct TSchemaQueryTask { + TString SQL; + TMaybe<TString> RollbackSQL; +}; + template<class TEventRequest, class TEventResponse> class TSchemaQueryYDBActor : public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> { @@ -40,9 +49,18 @@ private: struct TEvQueryExecutionResponse : NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> { TStatus Result; - - TEvQueryExecutionResponse(TStatus result) - : Result(std::move(result)) { } + size_t TaskIndex = 0u; + bool Rollback = false; + TMaybe<TStatus> MaybeInitialStatus; + + TEvQueryExecutionResponse(TStatus result, + size_t taskIndex, + bool rollback, + TMaybe<TStatus> MaybeInitialStatus) + : Result(std::move(result)) + , TaskIndex(taskIndex) + , Rollback(rollback) + , MaybeInitialStatus(std::move(MaybeInitialStatus)) { } }; }; @@ -53,7 +71,9 @@ private: using TEventRequestPtr = typename TEventRequest::TPtr; public: - using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; + using TTasks = std::vector<TSchemaQueryTask>; + using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>; + using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>; using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>; TSchemaQueryYDBActor(const TActorId& sender, @@ -64,56 +84,152 @@ public: TErrorMessageFactoryMethod errorMessageFactoryMethod) : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( sender, std::move(request), requestTimeout, counters) - , QueryFactoryMethod(queryFactoryMethod) + , Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}} , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } - static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB"; + TSchemaQueryYDBActor(const TActorId& sender, + const TEventRequestPtr request, + TDuration requestTimeout, + const NPrivate::TRequestCommonCountersPtr& counters, + TTasksFactoryMethod tasksFactoryMethod, + TErrorMessageFactoryMethod errorMessageFactoryMethod) + : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>( + sender, std::move(request), requestTimeout, counters) + , Tasks(tasksFactoryMethod(Request)) + , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { } + + static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR"; void BootstrapImpl() override { CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId()); - InitiateSchemaQueryExecution(); + InitiateSchemaQueryExecution(0, false, Nothing()); } - void InitiateSchemaQueryExecution() { - CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId()); - - const auto& request = Request; - TString schemeQuery = QueryFactoryMethod(request); - request->Get() - ->YDBSession->ExecuteSchemeQuery(schemeQuery) - .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), - self = SelfId()](const TAsyncStatus& future) { - actorSystem->Send(self, - new typename TEvPrivate::TEvQueryExecutionResponse( - std::move(future.GetValueSync()))); - }); + TMaybe<TString> SelectTask(size_t taskIndex, bool rollback) { + if (!rollback) { + if (taskIndex < Tasks.size()) { + return Tasks[taskIndex].SQL; + } + return Nothing(); + } + + while (true) { + const auto& maybeRollback = Tasks[taskIndex].RollbackSQL; + if (maybeRollback) { + return maybeRollback; + } + if (taskIndex == 0u) { + return Nothing(); + } + taskIndex--; + } + } + + bool InitiateSchemaQueryExecution(size_t taskIndex, + bool rollback, + const TMaybe<TStatus>& maybeInitialStatus) { + CPP_LOG_I( + "TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId()); + auto schemeQuery = SelectTask(taskIndex, rollback); + if (schemeQuery) { + CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. schemeQuery: " + << schemeQuery); + Request->Get() + ->YDBClient + ->RetryOperation([query = *schemeQuery](TSession session) { + return session.ExecuteSchemeQuery(query); + }) + .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(), + self = SelfId(), + taskIndex, + rollback, + maybeInitialStatus](const TAsyncStatus& future) { + actorSystem->Send(self, + new typename TEvPrivate::TEvQueryExecutionResponse{ + std::move(future.GetValueSync()), + taskIndex, + rollback, + std::move(maybeInitialStatus)}); + }); + } + return schemeQuery.Defined(); } STRICT_STFUNC(StateFunc, - cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); - hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle); + cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout); + hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle); ) + void FinishSuccessfully() { + CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " + << TBase::SelfId()); + Request->Get()->ComputeYDBOperationWasPerformed = true; + TBase::SendRequestToSender(); + } + void SendError(const TStatus& executeSchemeQueryStatus) { + CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " + << TBase::SelfId()); + TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus); + + TBase::HandleError(errorMessage, + executeSchemeQueryStatus.GetStatus(), + executeSchemeQueryStatus.GetIssues()); + } + void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) { - CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Actor id: " << TBase::SelfId()); const auto& executeSchemeQueryStatus = event->Get()->Result; - if (!executeSchemeQueryStatus.IsSuccess()) { - CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " << TBase::SelfId()); - TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus); - - TBase::HandleError(errorMessage, - executeSchemeQueryStatus.GetStatus(), - executeSchemeQueryStatus.GetIssues()); + auto isRollback = event->Get()->Rollback; + + auto successExecutionRunMode = executeSchemeQueryStatus.IsSuccess() && !isRollback; + auto successExecutionRollbackMode = + executeSchemeQueryStatus.IsSuccess() && isRollback; + auto failedExecutionRunMode = !executeSchemeQueryStatus.IsSuccess() && !isRollback; + + if (successExecutionRunMode) { + if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) { + FinishSuccessfully(); + return; + } + } else if (successExecutionRollbackMode) { + if (event->Get()->TaskIndex == 0 || + !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1, + true, + std::move(event->Get()->MaybeInitialStatus))) { + SendError(*event->Get()->MaybeInitialStatus); + return; + } + } else if (failedExecutionRunMode) { + if (event->Get()->TaskIndex == 0 || + !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1, + true, + std::move(event->Get()->Result))) { + SendError(event->Get()->Result); + return; + } + } else { + // Failed during rollback + const auto& initialIssues = *(event->Get()->MaybeInitialStatus); + + auto originalIssue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't execute SQL script"); + for (const auto& subIssue : initialIssues.GetIssues()) { + originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + auto rollbackIssue = + MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, + "Couldn't execute rollback SQL script"); + for (const auto& subIssue : event->Get()->Result.GetIssues()) { + originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue)); + } + SendError(TStatus{initialIssues.GetStatus(), + NYql::TIssues{std::move(originalIssue), + std::move(rollbackIssue)}}); return; } - - CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId()); - Request->Get()->ComputeYDBOperationWasPerformed = true; - TBase::SendRequestToSender(); } private: - TQueryFactoryMethod QueryFactoryMethod; + TTasks Tasks; TErrorMessageFactoryMethod ErrorMessageFactoryMethod; }; @@ -123,13 +239,12 @@ NActors::IActor* MakeCreateConnectionActor( TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const TString& objectStorageEndpoint, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer) { auto queryFactoryMethod = - [objectStorageEndpoint, signer = std::move(signer)]( + [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), + signer = std::move(signer)]( const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString { - using namespace fmt::literals; - return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(), objectStorageEndpoint, signer); @@ -158,24 +273,68 @@ NActors::IActor* MakeModifyConnectionActor( TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const TString& objectStorageEndpoint, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer) { auto queryFactoryMethod = - [objectStorageEndpoint, signer = std::move(signer)]( - const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request) -> TString { + [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(), + signer = std::move(signer)]( + const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request) + -> std::vector<TSchemaQueryTask> { using namespace fmt::literals; auto& oldConnectionContent = (*request->Get()->OldConnectionContent); - auto& newConnectionContent = request->Get()->Request.content(); - return fmt::format( + auto& newConnectionContent = request->Get()->Request.content(); + + auto deleteOldEntities = fmt::format( R"( + {delete_external_data_tables}; {delete_external_data_source}; - {create_external_data_source}; )", + "delete_external_data_tables"_a = + JoinMapRange("\n", + request->Get()->OldBindingContents.begin(), + request->Get()->OldBindingContents.end(), + [](const FederatedQuery::BindingContent& binding) { + return MakeDeleteExternalDataTableQuery(binding.name()); + }), "delete_external_data_source"_a = - MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer), + MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer)); + + auto createOldEntities = fmt::format( + R"( + {create_external_data_source}; + {create_external_data_tables}; + )", "create_external_data_source"_a = MakeCreateExternalDataSourceQuery( - newConnectionContent, objectStorageEndpoint, signer)); + oldConnectionContent, objectStorageEndpoint, signer), + "create_external_data_tables"_a = JoinMapRange( + "\n", + request->Get()->OldBindingContents.begin(), + request->Get()->OldBindingContents.end(), + [&oldConnectionContent](const FederatedQuery::BindingContent& binding) { + return MakeCreateExternalDataTableQuery(binding, + oldConnectionContent.name()); + })); + + auto createNewEntities = fmt::format( + R"( + {create_external_data_source}; + {create_external_data_tables}; + )", + "create_external_data_source"_a = MakeCreateExternalDataSourceQuery( + newConnectionContent, objectStorageEndpoint, signer), + "create_external_data_tables"_a = JoinMapRange( + "\n", + request->Get()->OldBindingContents.begin(), + request->Get()->OldBindingContents.end(), + [&newConnectionContent](const FederatedQuery::BindingContent& binding) { + return MakeCreateExternalDataTableQuery(binding, + newConnectionContent.name()); + })); + + return {TSchemaQueryTask{.SQL = TString{deleteOldEntities}, + .RollbackSQL = TString{createOldEntities}}, + TSchemaQueryTask{.SQL = TString{createNewEntities}}}; }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { @@ -202,7 +361,6 @@ NActors::IActor* MakeDeleteConnectionActor( auto queryFactoryMethod = [signer = std::move(signer)]( const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString { - using namespace fmt::literals; return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent, signer); }; @@ -233,8 +391,6 @@ NActors::IActor* MakeCreateBindingActor( TCounters& counters) { auto queryFactoryMethod = [](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString { - using namespace fmt::literals; - auto externalSourceName = *request->Get()->ConnectionName; return MakeCreateExternalDataTableQuery(request->Get()->Request.content(), externalSourceName); @@ -264,20 +420,18 @@ NActors::IActor* MakeModifyBindingActor( TDuration requestTimeout, TCounters& counters) { auto queryFactoryMethod = - [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> TString { - using namespace fmt::literals; - + [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> { auto sourceName = *request->Get()->ConnectionName; - auto oldTableName = *request->Get()->OldBindingName; - return fmt::format( - R"( - {delete_external_data_table}; - {create_external_data_table}; - )", - "delete_external_data_table"_a = MakeDeleteExternalDataTableQuery(oldTableName), - "create_external_data_table"_a = - MakeCreateExternalDataTableQuery(request->Get()->Request.content(), - sourceName)); + auto oldTableName = request->Get()->OldBindingContent->name(); + + auto deleteOldEntities = MakeDeleteExternalDataTableQuery(oldTableName); + auto createOldEntities = + MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent, sourceName); + auto createNewEntities = + MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName); + + return {TSchemaQueryTask{.SQL = deleteOldEntities, .RollbackSQL = createOldEntities}, + TSchemaQueryTask{.SQL = createNewEntities}}; }; auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString { @@ -302,7 +456,6 @@ NActors::IActor* MakeDeleteBindingActor( TCounters& counters) { auto queryFactoryMethod = [](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString { - using namespace fmt::literals; return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName); }; diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h index 7ba6a06038..4a86d8921b 100644 --- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h +++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h @@ -17,7 +17,7 @@ NActors::IActor* MakeCreateConnectionActor( TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const TString& objectStorageEndpoint, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer); NActors::IActor* MakeModifyConnectionActor( @@ -25,7 +25,7 @@ NActors::IActor* MakeModifyConnectionActor( TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request, TDuration requestTimeout, TCounters& counters, - const TString& objectStorageEndpoint, + const NConfig::TCommonConfig& commonConfig, TSigner::TPtr signer); NActors::IActor* MakeDeleteConnectionActor( diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp index c81e34bee9..2ee716c84c 100644 --- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp +++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp @@ -18,7 +18,7 @@ #include <ydb/core/fq/libs/config/yq_issue.h> #include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h> -#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h> +#include <ydb/core/fq/libs/control_plane_proxy/actors/utils.h> #include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h> #include <ydb/core/fq/libs/control_plane_proxy/events/events.h> @@ -43,6 +43,7 @@ #include <ydb/library/folder_service/folder_service.h> #include <ydb/library/folder_service/events.h> +#include <memory> #include <contrib/libs/fmt/include/fmt/format.h> @@ -832,6 +833,8 @@ private: return issues; } + + void Handle(TEvControlPlaneProxy::TEvCreateQueryRequest::TPtr& ev) { TInstant startTime = TInstant::Now(); FederatedQuery::CreateQueryRequest request = ev->Get()->Request; @@ -1687,18 +1690,11 @@ private: }; if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) { - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvCreateConnectionRequest, - TEvControlPlaneProxy::TEvCreateConnectionResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } Register(NPrivate::MakeCreateConnectionActor( @@ -1706,7 +1702,7 @@ private: std::move(ev), Config.RequestTimeout, Counters, - Config.CommonConfig.GetObjectStorageEndpoint(), + Config.CommonConfig, Signer)); return; } @@ -1963,7 +1959,18 @@ private: sender, ev, Counters, Config.RequestTimeout, permissions)); return; } - + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->OldBindingNamesDiscoveryFinished) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeListBindingIds( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } + if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) { + auto permissions = ExtractPermissions(ev, availablePermissions); + Register(MakeDescribeListedBinding( + sender, ev, Counters, Config.RequestTimeout, permissions)); + return; + } const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed; if (!controlPlaneYDBOperationWasPerformed) { auto sender = ev->Sender; @@ -1992,18 +1999,11 @@ private: } if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvModifyConnectionRequest, - TEvControlPlaneProxy::TEvModifyConnectionResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } if (!ev->Get()->ComputeYDBOperationWasPerformed) { @@ -2012,7 +2012,7 @@ private: ev, Config.RequestTimeout, Counters, - Config.CommonConfig.GetObjectStorageEndpoint(), + Config.CommonConfig, Signer)); return; } @@ -2116,16 +2116,11 @@ private: } if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvDeleteConnectionRequest, - TEvControlPlaneProxy::TEvDeleteConnectionResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } if (!ev->Get()->ComputeYDBOperationWasPerformed) { @@ -2283,16 +2278,11 @@ private: return; } - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvCreateBindingRequest, - TEvControlPlaneProxy::TEvCreateBindingResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } Register(MakeCreateBindingActor( @@ -2528,7 +2518,7 @@ private: }; if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { - if (!ev->Get()->OldBindingName) { + if (!ev->Get()->OldBindingContent) { auto permissions = ExtractPermissions(ev, availablePermissions); Register(MakeDiscoverYDBBindingName( sender, ev, Counters, Config.RequestTimeout, permissions)); @@ -2564,18 +2554,11 @@ private: } if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvModifyBindingRequest, - TEvControlPlaneProxy::TEvModifyBindingResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } if (!ev->Get()->ComputeYDBOperationWasPerformed) { @@ -2686,18 +2669,11 @@ private: } if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) { - if (!ev->Get()->YDBSession) { - Register(MakeComputeYDBSessionActor< - TEvControlPlaneProxy::TEvDeleteBindingRequest, - TEvControlPlaneProxy::TEvDeleteBindingResponse>( - sender, - std::move(ev), - Config.RequestTimeout, - Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION), - Config.ComputeConfig, - YqSharedResources, - CredentialsProviderFactory)); - return; + if (!ev->Get()->YDBClient) { + ev->Get()->YDBClient = CreateNewTableClient(ev, + Config.ComputeConfig, + YqSharedResources, + CredentialsProviderFactory); } if (!ev->Get()->ComputeYDBOperationWasPerformed) { diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h index bde54c6a8a..7db87b0d15 100644 --- a/ydb/core/fq/libs/control_plane_proxy/events/events.h +++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h @@ -104,7 +104,7 @@ struct TEvControlPlaneProxy { bool ComputeYDBOperationWasPerformed; bool ControlPlaneYDBOperationWasPerformed; std::unique_ptr<TProxyResponse> Response; - TMaybe<NYdb::NTable::TSession> YDBSession; + std::shared_ptr<NYdb::NTable::TTableClient> YDBClient; TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase; }; @@ -310,6 +310,12 @@ struct TEvControlPlaneProxy { EvModifyConnectionRequest>::TBaseControlPlaneRequest; TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent; + // ListBindings + bool OldBindingNamesDiscoveryFinished = false; + TMaybe<TString> NextListingBindingsToken; + std::vector<TString> OldBindingIds; + // DescribeEachBinding + std::vector<FederatedQuery::BindingContent> OldBindingContents; }; template<> @@ -348,8 +354,7 @@ struct TEvControlPlaneProxy { FederatedQuery::ModifyBindingRequest, EvModifyBindingRequest>::TBaseControlPlaneRequest; - TMaybe<TString> OldBindingName; - TMaybe<TString> ConnectionId; + TMaybe<FederatedQuery::BindingContent> OldBindingContent; TMaybe<TString> ConnectionName; }; diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp index 1e6c59ea33..bf4e70a67c 100644 --- a/ydb/core/grpc_services/rpc_fq.cpp +++ b/ydb/core/grpc_services/rpc_fq.cpp @@ -546,6 +546,7 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyConnectionReques TVector<NPerms::TPermission> basePermissions{ NPerms::Required("yq.connections.update"), NPerms::Required("yq.connections.get"), + NPerms::Required("yq.bindings.get"), NPerms::Optional("yq.resources.managePrivate"), NPerms::Optional("yq.resources.viewPublic"), NPerms::Optional("yq.resources.viewPrivate") |