aboutsummaryrefslogtreecommitdiffstats
diff options
context:
space:
mode:
authorauzhegov <auzhegov@yandex-team.com>2023-07-31 18:02:17 +0300
committerauzhegov <auzhegov@yandex-team.com>2023-07-31 18:02:17 +0300
commit3ca8b54c96e09eb2b65be7f09675623438d559c7 (patch)
treef1c54a03907031b42e2cb03d18bb7caebf3eaa25
parent18c340161d3f523e1c899b606b8b370b2d7a1264 (diff)
downloadydb-3ca8b54c96e09eb2b65be7f09675623438d559c7.tar.gz
Improved modify operations
Initial version
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/common/util.cpp38
-rw-r--r--ydb/core/fq/libs/common/util.h24
-rw-r--r--ydb/core/fq/libs/common/ya.make1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp100
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h17
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp78
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/utils.h30
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ya.make1
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h181
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp281
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h4
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp122
-rw-r--r--ydb/core/fq/libs/control_plane_proxy/events/events.h11
-rw-r--r--ydb/core/grpc_services/rpc_fq.cpp1
23 files changed, 513 insertions, 388 deletions
diff --git a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
index a6a0765c5d..af28035aad 100644
--- a/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.darwin-x86_64.txt
@@ -26,5 +26,6 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
index e88b073786..eba7bc83ba 100644
--- a/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.linux-aarch64.txt
@@ -27,5 +27,6 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
diff --git a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
index e88b073786..eba7bc83ba 100644
--- a/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.linux-x86_64.txt
@@ -27,5 +27,6 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
diff --git a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
index a6a0765c5d..af28035aad 100644
--- a/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/common/CMakeLists.windows-x86_64.txt
@@ -26,5 +26,6 @@ target_sources(fq-libs-common PRIVATE
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/compression.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/debug_info.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/entity_id.cpp
+ ${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/util.cpp
${CMAKE_SOURCE_DIR}/ydb/core/fq/libs/common/rows_proto_splitter.cpp
)
diff --git a/ydb/core/fq/libs/common/util.cpp b/ydb/core/fq/libs/common/util.cpp
new file mode 100644
index 0000000000..053de08889
--- /dev/null
+++ b/ydb/core/fq/libs/common/util.cpp
@@ -0,0 +1,38 @@
+#include "util.h"
+
+#include <util/generic/string.h>
+#include <util/string/builder.h>
+#include <util/string/subst.h>
+
+namespace NFq {
+
+TString EscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith) {
+ auto escapedValue = value;
+ SubstGlobal(escapedValue, enclosingSeq, replaceWith);
+ return escapedValue;
+}
+
+TString EscapeString(const TString& value, char enclosingChar) {
+ auto escapedValue = value;
+ SubstGlobal(escapedValue,
+ TString{enclosingChar},
+ TStringBuilder{} << '\\' << enclosingChar);
+ return escapedValue;
+}
+
+TString EncloseAndEscapeString(const TString& value, char enclosingChar) {
+ return TStringBuilder{} << enclosingChar << EscapeString(value, enclosingChar)
+ << enclosingChar;
+}
+
+TString EncloseAndEscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith) {
+ return TStringBuilder{} << enclosingSeq
+ << EscapeString(value, enclosingSeq, replaceWith)
+ << enclosingSeq;
+}
+
+} // namespace NFq
diff --git a/ydb/core/fq/libs/common/util.h b/ydb/core/fq/libs/common/util.h
index e61a7c86b3..b7aedf6e6c 100644
--- a/ydb/core/fq/libs/common/util.h
+++ b/ydb/core/fq/libs/common/util.h
@@ -5,7 +5,10 @@
#include <google/protobuf/repeated_field.h>
+#include <library/cpp/iterator/mapped.h>
+#include <util/generic/string.h>
#include <util/generic/vector.h>
+#include <util/string/join.h>
namespace NFq {
@@ -22,4 +25,25 @@ TVector<TElement> VectorFromProto(const ::google::protobuf::RepeatedPtrField<TEl
return { field.begin(), field.end() };
}
+template <typename TIter, typename TFunc>
+TString JoinMapRange(TString delim, const TIter beg, const TIter end, const TFunc func) {
+ auto mappedBegin =
+ MakeMappedIterator(beg, func);
+ auto mappedEnd =
+ MakeMappedIterator(end, func);
+ return JoinRange(delim, mappedBegin, mappedEnd);
+}
+
+TString EscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith);
+
+TString EscapeString(const TString& value, char enclosingChar);
+
+TString EncloseAndEscapeString(const TString& value, char enclosingChar);
+
+TString EncloseAndEscapeString(const TString& value,
+ const TString& enclosingSeq,
+ const TString& replaceWith);
+
} // namespace NFq
diff --git a/ydb/core/fq/libs/common/ya.make b/ydb/core/fq/libs/common/ya.make
index f30242480e..3246b6455e 100644
--- a/ydb/core/fq/libs/common/ya.make
+++ b/ydb/core/fq/libs/common/ya.make
@@ -7,6 +7,7 @@ SRCS(
debug_info.cpp
entity_id.cpp
entity_id.h
+ util.cpp
rows_proto_splitter.cpp
rows_proto_splitter.h
)
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
index 870e205dcc..9cf705e7cd 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.darwin-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
yutil
contrib-libs-fmt
library-cpp-iterator
+ fq-libs-common
libs-control_plane_proxy-events
libs-control_plane_storage-events
fq-libs-result_formatter
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
index 5429166e90..d6df134d3a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-aarch64.txt
@@ -17,6 +17,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
yutil
contrib-libs-fmt
library-cpp-iterator
+ fq-libs-common
libs-control_plane_proxy-events
libs-control_plane_storage-events
fq-libs-result_formatter
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
index 5429166e90..d6df134d3a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.linux-x86_64.txt
@@ -17,6 +17,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
yutil
contrib-libs-fmt
library-cpp-iterator
+ fq-libs-common
libs-control_plane_proxy-events
libs-control_plane_storage-events
fq-libs-result_formatter
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
index 870e205dcc..9cf705e7cd 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/CMakeLists.windows-x86_64.txt
@@ -16,6 +16,7 @@ target_link_libraries(libs-control_plane_proxy-actors PUBLIC
yutil
contrib-libs-fmt
library-cpp-iterator
+ fq-libs-common
libs-control_plane_proxy-events
libs-control_plane_storage-events
fq-libs-result_formatter
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
index 168e4f6fa2..d2e7e8bfa9 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.cpp
@@ -1,5 +1,6 @@
#include "control_plane_storage_requester_actor.h"
#include "base_actor.h"
+#include "util/generic/maybe.h"
#include <contrib/libs/fmt/include/fmt/format.h>
#include <library/cpp/actors/core/event.h>
@@ -227,7 +228,7 @@ NActors::IActor* MakeDiscoverYDBConnectionName(
auto cpsRequestFactory =
[](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event) {
FederatedQuery::DescribeConnectionRequest result;
- auto connectionId = *event->Get()->ConnectionId;
+ auto connectionId = event->Get()->OldBindingContent->connection_id();
result.set_connection_id(connectionId);
return result;
};
@@ -273,8 +274,7 @@ NActors::IActor* MakeDiscoverYDBBindingName(
auto entityNameExtractorFactoryMethod =
[](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& event,
const FederatedQuery::DescribeBindingResult& result) {
- event->Get()->OldBindingName = result.binding().content().name();
- event->Get()->ConnectionId = result.binding().content().connection_id();
+ event->Get()->OldBindingContent = result.binding().content();
};
return new TControlPlaneStorageRequesterActor<TEvControlPlaneProxy::TEvModifyBindingRequest,
@@ -327,5 +327,99 @@ NActors::IActor* MakeDiscoverYDBBindingName(
errorMessageFactoryMethod,
entityNameExtractorFactoryMethod);
}
+
+NActors::IActor* MakeListBindingIds(
+ const TActorId sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) {
+ FederatedQuery::ListBindingsRequest result;
+ auto connectionId = event->Get()->Request.connection_id();
+ result.mutable_filter()->set_connection_id(connectionId);
+ result.set_limit(100);
+ if (event->Get()->NextListingBindingsToken) {
+ result.set_page_token(*event->Get()->NextListingBindingsToken);
+ }
+ return result;
+ };
+
+ auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ return "Couldn't resolve binding id(s)";
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event,
+ const FederatedQuery::ListBindingsResult& result) {
+ for (auto& binding: result.binding()) {
+ event->Get()->OldBindingIds.emplace_back(binding.meta().id());
+ }
+
+ TString nextPageToken = result.next_page_token();
+ if (nextPageToken == "") {
+ event->Get()->NextListingBindingsToken = Nothing();
+ event->Get()->OldBindingNamesDiscoveryFinished = true;
+ } else {
+ event->Get()->NextListingBindingsToken = nextPageToken;
+ }
+ };
+
+ return new TControlPlaneStorageRequesterActor<
+ TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse,
+ TEvControlPlaneStorage::TEvListBindingsRequest,
+ TEvControlPlaneStorage::TEvListBindingsResponse>(sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(
+ RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ errorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
+NActors::IActor* MakeDescribeListedBinding(
+ const TActorId sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions) {
+ auto cpsRequestFactory =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event) {
+ auto bindingId = event->Get()->OldBindingIds[event->Get()->OldBindingContents.size()];
+
+ FederatedQuery::DescribeBindingRequest result;
+ result.set_binding_id(bindingId);
+ return result;
+ };
+
+ auto errorMessageFactoryMethod = [](const NYql::TIssues& issues) -> TString {
+ Y_UNUSED(issues);
+ return "Couldn't resolve binding content";
+ };
+ auto entityNameExtractorFactoryMethod =
+ [](const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& event,
+ const FederatedQuery::DescribeBindingResult& result) {
+ event->Get()->OldBindingContents.push_back(result.binding().content());
+ };
+
+ return new TControlPlaneStorageRequesterActor<
+ TEvControlPlaneProxy::TEvModifyConnectionRequest,
+ TEvControlPlaneProxy::TEvModifyConnectionResponse,
+ TEvControlPlaneStorage::TEvDescribeBindingRequest,
+ TEvControlPlaneStorage::TEvDescribeBindingResponse>(
+ sender,
+ request,
+ requestTimeout,
+ counters.GetCommonCounters(RTC_DESCRIBE_CPS_ENTITY),
+ permissions,
+ cpsRequestFactory,
+ errorMessageFactoryMethod,
+ entityNameExtractorFactoryMethod);
+}
+
} // namespace NPrivate
} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
index 24995807ea..d9801a1d07 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h
@@ -54,5 +54,22 @@ NActors::IActor* MakeDiscoverYDBBindingName(
TCounters& counters,
TDuration requestTimeout,
TPermissions permissions);
+
+/// ModifyConnection
+
+NActors::IActor* MakeListBindingIds(
+ const TActorId sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
+NActors::IActor* MakeDescribeListedBinding(
+ const TActorId sender,
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request,
+ TCounters& counters,
+ TDuration requestTimeout,
+ TPermissions permissions);
+
} // namespace NPrivate
} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
index 9859dac2f3..494e44034a 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.cpp
@@ -1,47 +1,16 @@
#include "query_utils.h"
-#include "ydb/public/api/protos/draft/fq.pb.h"
#include <contrib/libs/fmt/include/fmt/format.h>
-#include <library/cpp/iterator/mapped.h>
#include <util/generic/maybe.h>
-#include <util/string/join.h>
+#include <ydb/core/fq/libs/common/util.h>
#include <ydb/core/fq/libs/result_formatter/result_formatter.h>
#include <ydb/core/kqp/provider/yql_kikimr_results.h>
+#include <ydb/public/api/protos/draft/fq.pb.h>
namespace NFq {
namespace NPrivate {
-TString EscapeString(const TString& value,
- const TString& enclosingSeq,
- const TString& replaceWith) {
- auto escapedValue = value;
- SubstGlobal(escapedValue, enclosingSeq, replaceWith);
- return escapedValue;
-}
-TString EscapeString(const TString& value, char enclosingChar) {
- auto escapedValue = value;
- SubstGlobal(escapedValue,
- TString{enclosingChar},
- TStringBuilder{} << '\\' << enclosingChar);
- return escapedValue;
-}
-
-TString EncloseAndEscapeString(const TString& value, char enclosingChar) {
- return TStringBuilder{} << enclosingChar
- << EscapeString(value,
- enclosingChar)
- << enclosingChar;
-}
-
-TString EncloseAndEscapeString(const TString& value,
- const TString& enclosingSeq,
- const TString& replaceWith) {
- return TStringBuilder{} << enclosingSeq
- << EscapeString(value, enclosingSeq, replaceWith)
- << enclosingSeq;
-}
-
TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
const TString& connectionName) {
using namespace fmt::literals;
@@ -52,7 +21,7 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c
// Schema
NYql::TExprContext context;
- auto columnsTransformFunction = [&](const Ydb::Column& column) -> TString {
+ auto columnsTransformFunction = [&context](const Ydb::Column& column) -> TString {
NYdb::TTypeParser typeParser(column.type());
auto node = MakeType(typeParser, context);
auto typeName = NYql::FormatType(node);
@@ -63,10 +32,6 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c
"columnType"_a = typeName,
"notNull"_a = notNull);
};
- auto columnsBegin =
- MakeMappedIterator(subset.schema().column().begin(), columnsTransformFunction);
- auto columnsEnd =
- MakeMappedIterator(subset.schema().column().end(), columnsTransformFunction);
// WithOptions
auto withOptions = std::unordered_map<TString, TString>{};
@@ -85,17 +50,14 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c
}
if (!subset.partitioned_by().empty()) {
- auto stringEscapeMapper = [](const TString& value) {
- return EscapeString(value, '"');
- };
-
auto partitionBy = TStringBuilder{}
<< "\"["
- << JoinRange(", ",
- MakeMappedIterator(subset.partitioned_by().begin(),
- stringEscapeMapper),
- MakeMappedIterator(subset.partitioned_by().end(),
- stringEscapeMapper))
+ << JoinMapRange(", ",
+ subset.partitioned_by().begin(),
+ subset.partitioned_by().end(),
+ [](const TString& value) {
+ return EscapeString(value, '"');
+ })
<< "]\"";
withOptions.insert({"PARTITIONED_BY", partitionBy});
}
@@ -105,15 +67,6 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c
EncloseAndEscapeString(kv.second, '"')});
}
- auto concatEscapedKeyValueMapper = [](const std::pair<TString, TString>& kv) -> TString {
- return TStringBuilder{} << " " << kv.first << " = " << kv.second;
- };
-
- auto withOptionsBegin =
- MakeMappedIterator(withOptions.begin(), concatEscapedKeyValueMapper);
- auto withOptionsEnd =
- MakeMappedIterator(withOptions.end(), concatEscapedKeyValueMapper);
-
return fmt::format(
R"(
CREATE EXTERNAL TABLE {externalTableName} (
@@ -122,8 +75,17 @@ TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& c
{withOptions}
);)",
"externalTableName"_a = EncloseAndEscapeString(bindingName, '`'),
- "columns"_a = JoinRange(",\n", columnsBegin, columnsEnd),
- "withOptions"_a = JoinRange(",\n", withOptionsBegin, withOptionsEnd));
+ "columns"_a = JoinMapRange(",\n",
+ subset.schema().column().begin(),
+ subset.schema().column().end(),
+ columnsTransformFunction),
+ "withOptions"_a = JoinMapRange(",\n",
+ withOptions.begin(),
+ withOptions.end(),
+ [](const std::pair<TString, TString>& kv) -> TString {
+ return TStringBuilder{} << " " << kv.first
+ << " = " << kv.second;
+ }));
}
TString SignAccountId(const TString& id, const TSigner::TPtr& signer) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
index b5824ce685..480ee99ee7 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h
@@ -7,10 +7,6 @@
namespace NFq {
namespace NPrivate {
-TString EscapeString(const TString& value, char enclosingChar);
-
-TString EncloseAndEscapeString(const TString& value, char enclosingChar);
-
TString MakeCreateExternalDataTableQuery(const FederatedQuery::BindingContent& content,
const TString& connectionName);
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/utils.h b/ydb/core/fq/libs/control_plane_proxy/actors/utils.h
new file mode 100644
index 0000000000..42701c3311
--- /dev/null
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/utils.h
@@ -0,0 +1,30 @@
+#pragma once
+
+#include <ydb/core/fq/libs/compute/common/config.h>
+#include <ydb/core/fq/libs/shared_resources/shared_resources.h>
+#include <ydb/core/fq/libs/ydb/ydb.h>
+#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
+
+namespace NFq {
+
+template<typename T>
+std::shared_ptr<NYdb::NTable::TTableClient> CreateNewTableClient(
+ const T& ev,
+ const NFq::TComputeConfig& computeConfig,
+ const TYqSharedResources::TPtr& yqSharedResources,
+ const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
+ auto scope = "yandexcloud://" + ev->Get()->FolderId;
+ NFq::NConfig::TYdbStorageConfig computeConnection = computeConfig.GetConnection(scope);
+
+ computeConnection.set_endpoint(ev->Get()->ComputeDatabase->connection().endpoint());
+ computeConnection.set_database(ev->Get()->ComputeDatabase->connection().database());
+ computeConnection.set_usessl(ev->Get()->ComputeDatabase->connection().usessl());
+
+ auto tableSettings =
+ GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection,
+ credentialsProviderFactory);
+ return std::make_shared<NYdb::NTable::TTableClient>(yqSharedResources->UserSpaceYdbDriver,
+ tableSettings);
+}
+
+} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
index 212a8d0f3c..b08611e3de 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ya.make
@@ -9,6 +9,7 @@ SRCS(
PEERDIR(
contrib/libs/fmt
library/cpp/iterator
+ ydb/core/fq/libs/common
ydb/core/fq/libs/control_plane_proxy/events
ydb/core/fq/libs/control_plane_storage/events
ydb/core/fq/libs/result_formatter
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h
deleted file mode 100644
index 3345263e6d..0000000000
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h
+++ /dev/null
@@ -1,181 +0,0 @@
-#pragma once
-
-#include <ydb/core/fq/libs/actors/logging/log.h>
-#include <ydb/core/fq/libs/compute/common/config.h>
-#include <ydb/core/fq/libs/config/yq_issue.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/base_actor.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/counters.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/query_utils.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h>
-#include <ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.h>
-#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
-#include <ydb/core/fq/libs/control_plane_storage/events/events.h>
-#include <ydb/core/fq/libs/ydb/ydb.h>
-#include <ydb/library/yql/public/issue/yql_issue.h>
-#include <ydb/public/sdk/cpp/client/ydb_scheme/scheme.h>
-
-#include <contrib/libs/fmt/include/fmt/format.h>
-#include <library/cpp/actors/core/actor.h>
-#include <library/cpp/actors/core/actor_bootstrapped.h>
-#include <library/cpp/lwtrace/mon/mon_lwtrace.h>
-
-#include <util/generic/maybe.h>
-#include <util/generic/ptr.h>
-#include <util/string/join.h>
-#include <util/system/types.h>
-
-namespace NFq {
-namespace NPrivate {
-
-using namespace NActors;
-using namespace NFq::NConfig;
-using namespace NKikimr;
-using namespace NThreading;
-using namespace NYdb;
-using namespace NYdb::NTable;
-
-using TTableClientPtr = std::shared_ptr<NYdb::NTable::TTableClient>;
-
-struct TEvPrivate {
- enum EEv {
- EvCreateSessionResponse = EventSpaceBegin(NActors::TEvents::ES_PRIVATE),
- EvEnd
- };
-
- struct TEvCreateSessionResponse :
- NActors::TEventLocal<TEvCreateSessionResponse, EvCreateSessionResponse> {
- TCreateSessionResult Result;
-
- TEvCreateSessionResponse(TCreateSessionResult result)
- : Result(std::move(result)) { }
- };
-
- static_assert(EvEnd < EventSpaceEnd(NActors::TEvents::ES_PRIVATE),
- "expect EvEnd < EventSpaceEnd(TEvents::ES_PRIVATE)");
-};
-
-template<typename TEventRequest, typename TEventResponse>
-class TCreateYdbComputeSessionActor :
- public TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> {
-private:
- using TBase = TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>;
- using TBase::SelfId;
- using TBase::SendRequestToSender;
- using TBase::Request;
- using TEventRequestPtr = typename TEventRequest::TPtr;
-
-public:
- TCreateYdbComputeSessionActor(
- const TActorId& sender,
- const TEventRequestPtr request,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters,
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory)
- : TBaseActor<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>>(
- sender, std::move(request), requestTimeout, counters)
- , ComputeConfig(computeConfig)
- , YqSharedResources(yqSharedResources)
- , CredentialsProviderFactory(credentialsProviderFactory) { }
-
- static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB";
-
- void BootstrapImpl() override {
- InitiateConnectionCreation();
- }
-
- STRICT_STFUNC(StateFunc,
- cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
- hFunc(TEvPrivate::TEvCreateSessionResponse, Handle);
- )
-
- void InitiateConnectionCreation() {
- CPP_LOG_D("TCreateYdbComputeSessionActor InitiateConnectionCreation called. Actor id: "
- << SelfId());
- auto tableClient = CreateNewTableClient(ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory);
- tableClient->CreateSession().Subscribe(
- [actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncCreateSessionResult& future) {
- actorSystem->Send(self,
- new TEvPrivate::TEvCreateSessionResponse(
- future.GetValueSync()));
- });
- }
-
- void Handle(TEvPrivate::TEvCreateSessionResponse::TPtr& event) {
- CPP_LOG_D("TCreateYdbComputeSessionActor "
- "Handle for TEvCreateSessionResponse called. Actor id: "
- << SelfId());
- const auto& createSessionResult = event->Get()->Result;
- if (!createSessionResult.IsSuccess()) {
- TBase::HandleError("Couldn't create YDB session",
- createSessionResult.GetStatus(),
- createSessionResult.GetIssues());
- return;
- }
-
- CPP_LOG_D("TCreateYdbComputeSessionActor Session was successfully acquired. Actor id: "
- << SelfId());
- Request->Get()->YDBSession = createSessionResult.GetSession();
- SendRequestToSender();
- }
-
-private:
- const NFq::TComputeConfig ComputeConfig;
- const TYqSharedResources::TPtr YqSharedResources;
- const NKikimr::TYdbCredentialsProviderFactory CredentialsProviderFactory;
-
- TTableClientPtr CreateNewTableClient(
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
-
- auto scope = "yandexcloud://" + Request->Get()->FolderId;
- NFq::NConfig::TYdbStorageConfig computeConnection =
- computeConfig.GetConnection(scope);
- computeConnection.set_endpoint(
- Request->Get()->ComputeDatabase->connection().endpoint());
- computeConnection.set_database(
- Request->Get()->ComputeDatabase->connection().database());
- computeConnection.set_usessl(
- Request->Get()->ComputeDatabase->connection().usessl());
-
- auto tableSettings =
- GetClientSettings<NYdb::NTable::TClientSettings>(computeConnection,
- credentialsProviderFactory);
- return std::make_shared<NYdb::NTable::TTableClient>(
- yqSharedResources->UserSpaceYdbDriver, tableSettings);
- }
-};
-
-template<typename TEventRequest, typename TEventResponse>
-struct TBaseActorTypeTag<TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>> {
- using TRequest = TEventRequest;
- using TResponse = TEventResponse;
-};
-
-template<typename TEventRequest, typename TEventResponse>
-NActors::IActor* MakeComputeYDBSessionActor(
- const NActors::TActorId sender,
- const typename TEventRequest::TPtr request,
- TDuration requestTimeout,
- const NPrivate::TRequestCommonCountersPtr& counters,
- const NFq::TComputeConfig& computeConfig,
- const TYqSharedResources::TPtr& yqSharedResources,
- const NKikimr::TYdbCredentialsProviderFactory& credentialsProviderFactory) {
- return new NPrivate::TCreateYdbComputeSessionActor<TEventRequest, TEventResponse>(
- sender,
- std::move(request),
- requestTimeout,
- counters,
- computeConfig,
- yqSharedResources,
- credentialsProviderFactory);
-}
-
-} // namespace NPrivate
-} // namespace NFq
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
index dd5e2a634b..fbfeba1bbe 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.cpp
@@ -2,7 +2,11 @@
#include "query_utils.h"
#include <contrib/libs/fmt/include/fmt/format.h>
+#include <util/string/join.h>
+#include <ydb/core/fq/libs/common/util.h>
+#include <ydb/core/fq/libs/config/yq_issue.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
+#include <ydb/public/api/protos/draft/fq.pb.h>
#include <ydb/public/sdk/cpp/client/ydb_table/table.h>
namespace NFq {
@@ -24,6 +28,11 @@ struct TBaseActorTypeTag<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
using TResponse = TEventResponse;
};
+struct TSchemaQueryTask {
+ TString SQL;
+ TMaybe<TString> RollbackSQL;
+};
+
template<class TEventRequest, class TEventResponse>
class TSchemaQueryYDBActor :
public TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>> {
@@ -40,9 +49,18 @@ private:
struct TEvQueryExecutionResponse :
NActors::TEventLocal<TEvQueryExecutionResponse, EvQueryExecutionResponse> {
TStatus Result;
-
- TEvQueryExecutionResponse(TStatus result)
- : Result(std::move(result)) { }
+ size_t TaskIndex = 0u;
+ bool Rollback = false;
+ TMaybe<TStatus> MaybeInitialStatus;
+
+ TEvQueryExecutionResponse(TStatus result,
+ size_t taskIndex,
+ bool rollback,
+ TMaybe<TStatus> MaybeInitialStatus)
+ : Result(std::move(result))
+ , TaskIndex(taskIndex)
+ , Rollback(rollback)
+ , MaybeInitialStatus(std::move(MaybeInitialStatus)) { }
};
};
@@ -53,7 +71,9 @@ private:
using TEventRequestPtr = typename TEventRequest::TPtr;
public:
- using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
+ using TTasks = std::vector<TSchemaQueryTask>;
+ using TTasksFactoryMethod = std::function<TTasks(const TEventRequestPtr& request)>;
+ using TQueryFactoryMethod = std::function<TString(const TEventRequestPtr& request)>;
using TErrorMessageFactoryMethod = std::function<TString(const TStatus& status)>;
TSchemaQueryYDBActor(const TActorId& sender,
@@ -64,56 +84,152 @@ public:
TErrorMessageFactoryMethod errorMessageFactoryMethod)
: TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
sender, std::move(request), requestTimeout, counters)
- , QueryFactoryMethod(queryFactoryMethod)
+ , Tasks{TSchemaQueryTask{.SQL = queryFactoryMethod(Request)}}
, ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
- static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_DELETE_CONNECTION_IN_YDB";
+ TSchemaQueryYDBActor(const TActorId& sender,
+ const TEventRequestPtr request,
+ TDuration requestTimeout,
+ const NPrivate::TRequestCommonCountersPtr& counters,
+ TTasksFactoryMethod tasksFactoryMethod,
+ TErrorMessageFactoryMethod errorMessageFactoryMethod)
+ : TBaseActor<TSchemaQueryYDBActor<TEventRequest, TEventResponse>>(
+ sender, std::move(request), requestTimeout, counters)
+ , Tasks(tasksFactoryMethod(Request))
+ , ErrorMessageFactoryMethod(errorMessageFactoryMethod) { }
+
+ static constexpr char ActorName[] = "YQ_CONTROL_PLANE_PROXY_YDB_SCHEMA_QUERY_ACTOR";
void BootstrapImpl() override {
CPP_LOG_I("TSchemaQueryYDBActor BootstrapImpl. Actor id: " << TBase::SelfId());
- InitiateSchemaQueryExecution();
+ InitiateSchemaQueryExecution(0, false, Nothing());
}
- void InitiateSchemaQueryExecution() {
- CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId());
-
- const auto& request = Request;
- TString schemeQuery = QueryFactoryMethod(request);
- request->Get()
- ->YDBSession->ExecuteSchemeQuery(schemeQuery)
- .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
- self = SelfId()](const TAsyncStatus& future) {
- actorSystem->Send(self,
- new typename TEvPrivate::TEvQueryExecutionResponse(
- std::move(future.GetValueSync())));
- });
+ TMaybe<TString> SelectTask(size_t taskIndex, bool rollback) {
+ if (!rollback) {
+ if (taskIndex < Tasks.size()) {
+ return Tasks[taskIndex].SQL;
+ }
+ return Nothing();
+ }
+
+ while (true) {
+ const auto& maybeRollback = Tasks[taskIndex].RollbackSQL;
+ if (maybeRollback) {
+ return maybeRollback;
+ }
+ if (taskIndex == 0u) {
+ return Nothing();
+ }
+ taskIndex--;
+ }
+ }
+
+ bool InitiateSchemaQueryExecution(size_t taskIndex,
+ bool rollback,
+ const TMaybe<TStatus>& maybeInitialStatus) {
+ CPP_LOG_I(
+ "TSchemaQueryYDBActor Executing schema query. Actor id: " << TBase::SelfId());
+ auto schemeQuery = SelectTask(taskIndex, rollback);
+ if (schemeQuery) {
+ CPP_LOG_I("TSchemaQueryYDBActor Executing schema query. schemeQuery: "
+ << schemeQuery);
+ Request->Get()
+ ->YDBClient
+ ->RetryOperation([query = *schemeQuery](TSession session) {
+ return session.ExecuteSchemeQuery(query);
+ })
+ .Subscribe([actorSystem = NActors::TActivationContext::ActorSystem(),
+ self = SelfId(),
+ taskIndex,
+ rollback,
+ maybeInitialStatus](const TAsyncStatus& future) {
+ actorSystem->Send(self,
+ new typename TEvPrivate::TEvQueryExecutionResponse{
+ std::move(future.GetValueSync()),
+ taskIndex,
+ rollback,
+ std::move(maybeInitialStatus)});
+ });
+ }
+ return schemeQuery.Defined();
}
STRICT_STFUNC(StateFunc,
- cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
- hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle);
+ cFunc(NActors::TEvents::TSystem::Wakeup, TBase::HandleTimeout);
+ hFunc(TEvPrivate::TEvQueryExecutionResponse, Handle);
)
+ void FinishSuccessfully() {
+ CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: "
+ << TBase::SelfId());
+ Request->Get()->ComputeYDBOperationWasPerformed = true;
+ TBase::SendRequestToSender();
+ }
+ void SendError(const TStatus& executeSchemeQueryStatus) {
+ CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: "
+ << TBase::SelfId());
+ TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus);
+
+ TBase::HandleError(errorMessage,
+ executeSchemeQueryStatus.GetStatus(),
+ executeSchemeQueryStatus.GetIssues());
+ }
+
void Handle(typename TEvPrivate::TEvQueryExecutionResponse::TPtr& event) {
- CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Actor id: " << TBase::SelfId());
const auto& executeSchemeQueryStatus = event->Get()->Result;
- if (!executeSchemeQueryStatus.IsSuccess()) {
- CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished with issues. Actor id: " << TBase::SelfId());
- TString errorMessage = ErrorMessageFactoryMethod(executeSchemeQueryStatus);
-
- TBase::HandleError(errorMessage,
- executeSchemeQueryStatus.GetStatus(),
- executeSchemeQueryStatus.GetIssues());
+ auto isRollback = event->Get()->Rollback;
+
+ auto successExecutionRunMode = executeSchemeQueryStatus.IsSuccess() && !isRollback;
+ auto successExecutionRollbackMode =
+ executeSchemeQueryStatus.IsSuccess() && isRollback;
+ auto failedExecutionRunMode = !executeSchemeQueryStatus.IsSuccess() && !isRollback;
+
+ if (successExecutionRunMode) {
+ if (!InitiateSchemaQueryExecution(event->Get()->TaskIndex + 1, false, Nothing())) {
+ FinishSuccessfully();
+ return;
+ }
+ } else if (successExecutionRollbackMode) {
+ if (event->Get()->TaskIndex == 0 ||
+ !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1,
+ true,
+ std::move(event->Get()->MaybeInitialStatus))) {
+ SendError(*event->Get()->MaybeInitialStatus);
+ return;
+ }
+ } else if (failedExecutionRunMode) {
+ if (event->Get()->TaskIndex == 0 ||
+ !InitiateSchemaQueryExecution(event->Get()->TaskIndex - 1,
+ true,
+ std::move(event->Get()->Result))) {
+ SendError(event->Get()->Result);
+ return;
+ }
+ } else {
+ // Failed during rollback
+ const auto& initialIssues = *(event->Get()->MaybeInitialStatus);
+
+ auto originalIssue =
+ MakeErrorIssue(TIssuesIds::INTERNAL_ERROR, "Couldn't execute SQL script");
+ for (const auto& subIssue : initialIssues.GetIssues()) {
+ originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ auto rollbackIssue =
+ MakeErrorIssue(TIssuesIds::INTERNAL_ERROR,
+ "Couldn't execute rollback SQL script");
+ for (const auto& subIssue : event->Get()->Result.GetIssues()) {
+ originalIssue.AddSubIssue(MakeIntrusive<NYql::TIssue>(subIssue));
+ }
+ SendError(TStatus{initialIssues.GetStatus(),
+ NYql::TIssues{std::move(originalIssue),
+ std::move(rollbackIssue)}});
return;
}
-
- CPP_LOG_I("TSchemaQueryYDBActor Handling query execution response. Query finished successfully. Actor id: " << TBase::SelfId());
- Request->Get()->ComputeYDBOperationWasPerformed = true;
- TBase::SendRequestToSender();
}
private:
- TQueryFactoryMethod QueryFactoryMethod;
+ TTasks Tasks;
TErrorMessageFactoryMethod ErrorMessageFactoryMethod;
};
@@ -123,13 +239,12 @@ NActors::IActor* MakeCreateConnectionActor(
TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const TString& objectStorageEndpoint,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
- [objectStorageEndpoint, signer = std::move(signer)](
+ [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
+ signer = std::move(signer)](
const TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr& request) -> TString {
- using namespace fmt::literals;
-
return MakeCreateExternalDataSourceQuery(request->Get()->Request.content(),
objectStorageEndpoint,
signer);
@@ -158,24 +273,68 @@ NActors::IActor* MakeModifyConnectionActor(
TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const TString& objectStorageEndpoint,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer) {
auto queryFactoryMethod =
- [objectStorageEndpoint, signer = std::move(signer)](
- const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request) -> TString {
+ [objectStorageEndpoint = commonConfig.GetObjectStorageEndpoint(),
+ signer = std::move(signer)](
+ const TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr& request)
+ -> std::vector<TSchemaQueryTask> {
using namespace fmt::literals;
auto& oldConnectionContent = (*request->Get()->OldConnectionContent);
- auto& newConnectionContent = request->Get()->Request.content();
- return fmt::format(
+ auto& newConnectionContent = request->Get()->Request.content();
+
+ auto deleteOldEntities = fmt::format(
R"(
+ {delete_external_data_tables};
{delete_external_data_source};
- {create_external_data_source};
)",
+ "delete_external_data_tables"_a =
+ JoinMapRange("\n",
+ request->Get()->OldBindingContents.begin(),
+ request->Get()->OldBindingContents.end(),
+ [](const FederatedQuery::BindingContent& binding) {
+ return MakeDeleteExternalDataTableQuery(binding.name());
+ }),
"delete_external_data_source"_a =
- MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer),
+ MakeDeleteExternalDataSourceQuery(oldConnectionContent, signer));
+
+ auto createOldEntities = fmt::format(
+ R"(
+ {create_external_data_source};
+ {create_external_data_tables};
+ )",
"create_external_data_source"_a = MakeCreateExternalDataSourceQuery(
- newConnectionContent, objectStorageEndpoint, signer));
+ oldConnectionContent, objectStorageEndpoint, signer),
+ "create_external_data_tables"_a = JoinMapRange(
+ "\n",
+ request->Get()->OldBindingContents.begin(),
+ request->Get()->OldBindingContents.end(),
+ [&oldConnectionContent](const FederatedQuery::BindingContent& binding) {
+ return MakeCreateExternalDataTableQuery(binding,
+ oldConnectionContent.name());
+ }));
+
+ auto createNewEntities = fmt::format(
+ R"(
+ {create_external_data_source};
+ {create_external_data_tables};
+ )",
+ "create_external_data_source"_a = MakeCreateExternalDataSourceQuery(
+ newConnectionContent, objectStorageEndpoint, signer),
+ "create_external_data_tables"_a = JoinMapRange(
+ "\n",
+ request->Get()->OldBindingContents.begin(),
+ request->Get()->OldBindingContents.end(),
+ [&newConnectionContent](const FederatedQuery::BindingContent& binding) {
+ return MakeCreateExternalDataTableQuery(binding,
+ newConnectionContent.name());
+ }));
+
+ return {TSchemaQueryTask{.SQL = TString{deleteOldEntities},
+ .RollbackSQL = TString{createOldEntities}},
+ TSchemaQueryTask{.SQL = TString{createNewEntities}}};
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
@@ -202,7 +361,6 @@ NActors::IActor* MakeDeleteConnectionActor(
auto queryFactoryMethod =
[signer = std::move(signer)](
const TEvControlPlaneProxy::TEvDeleteConnectionRequest::TPtr& request) -> TString {
- using namespace fmt::literals;
return MakeDeleteExternalDataSourceQuery(*request->Get()->ConnectionContent,
signer);
};
@@ -233,8 +391,6 @@ NActors::IActor* MakeCreateBindingActor(
TCounters& counters) {
auto queryFactoryMethod =
[](const TEvControlPlaneProxy::TEvCreateBindingRequest::TPtr& request) -> TString {
- using namespace fmt::literals;
-
auto externalSourceName = *request->Get()->ConnectionName;
return MakeCreateExternalDataTableQuery(request->Get()->Request.content(),
externalSourceName);
@@ -264,20 +420,18 @@ NActors::IActor* MakeModifyBindingActor(
TDuration requestTimeout,
TCounters& counters) {
auto queryFactoryMethod =
- [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> TString {
- using namespace fmt::literals;
-
+ [](const TEvControlPlaneProxy::TEvModifyBindingRequest::TPtr& request) -> std::vector<TSchemaQueryTask> {
auto sourceName = *request->Get()->ConnectionName;
- auto oldTableName = *request->Get()->OldBindingName;
- return fmt::format(
- R"(
- {delete_external_data_table};
- {create_external_data_table};
- )",
- "delete_external_data_table"_a = MakeDeleteExternalDataTableQuery(oldTableName),
- "create_external_data_table"_a =
- MakeCreateExternalDataTableQuery(request->Get()->Request.content(),
- sourceName));
+ auto oldTableName = request->Get()->OldBindingContent->name();
+
+ auto deleteOldEntities = MakeDeleteExternalDataTableQuery(oldTableName);
+ auto createOldEntities =
+ MakeCreateExternalDataTableQuery(*request->Get()->OldBindingContent, sourceName);
+ auto createNewEntities =
+ MakeCreateExternalDataTableQuery(request->Get()->Request.content(), sourceName);
+
+ return {TSchemaQueryTask{.SQL = deleteOldEntities, .RollbackSQL = createOldEntities},
+ TSchemaQueryTask{.SQL = createNewEntities}};
};
auto errorMessageFactoryMethod = [](const TStatus& queryStatus) -> TString {
@@ -302,7 +456,6 @@ NActors::IActor* MakeDeleteBindingActor(
TCounters& counters) {
auto queryFactoryMethod =
[](const TEvControlPlaneProxy::TEvDeleteBindingRequest::TPtr& request) -> TString {
- using namespace fmt::literals;
return MakeDeleteExternalDataTableQuery(*request->Get()->OldBindingName);
};
diff --git a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
index 7ba6a06038..4a86d8921b 100644
--- a/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
+++ b/ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h
@@ -17,7 +17,7 @@ NActors::IActor* MakeCreateConnectionActor(
TEvControlPlaneProxy::TEvCreateConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const TString& objectStorageEndpoint,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer);
NActors::IActor* MakeModifyConnectionActor(
@@ -25,7 +25,7 @@ NActors::IActor* MakeModifyConnectionActor(
TEvControlPlaneProxy::TEvModifyConnectionRequest::TPtr request,
TDuration requestTimeout,
TCounters& counters,
- const TString& objectStorageEndpoint,
+ const NConfig::TCommonConfig& commonConfig,
TSigner::TPtr signer);
NActors::IActor* MakeDeleteConnectionActor(
diff --git a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
index c81e34bee9..2ee716c84c 100644
--- a/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
+++ b/ydb/core/fq/libs/control_plane_proxy/control_plane_proxy.cpp
@@ -18,7 +18,7 @@
#include <ydb/core/fq/libs/config/yq_issue.h>
#include <ydb/core/fq/libs/control_plane_proxy/actors/control_plane_storage_requester_actor.h>
-#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_create_compute_session_actor.h>
+#include <ydb/core/fq/libs/control_plane_proxy/actors/utils.h>
#include <ydb/core/fq/libs/control_plane_proxy/actors/ydb_schema_query_actor.h>
#include <ydb/core/fq/libs/control_plane_proxy/events/events.h>
@@ -43,6 +43,7 @@
#include <ydb/library/folder_service/folder_service.h>
#include <ydb/library/folder_service/events.h>
+#include <memory>
#include <contrib/libs/fmt/include/fmt/format.h>
@@ -832,6 +833,8 @@ private:
return issues;
}
+
+
void Handle(TEvControlPlaneProxy::TEvCreateQueryRequest::TPtr& ev) {
TInstant startTime = TInstant::Now();
FederatedQuery::CreateQueryRequest request = ev->Get()->Request;
@@ -1687,18 +1690,11 @@ private:
};
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ydbOperationWasPerformed) {
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvCreateConnectionRequest,
- TEvControlPlaneProxy::TEvCreateConnectionResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
Register(NPrivate::MakeCreateConnectionActor(
@@ -1706,7 +1702,7 @@ private:
std::move(ev),
Config.RequestTimeout,
Counters,
- Config.CommonConfig.GetObjectStorageEndpoint(),
+ Config.CommonConfig,
Signer));
return;
}
@@ -1963,7 +1959,18 @@ private:
sender, ev, Counters, Config.RequestTimeout, permissions));
return;
}
-
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && !ev->Get()->OldBindingNamesDiscoveryFinished) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeListBindingIds(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
+ if (Config.ComputeConfig.YdbComputeControlPlaneEnabled() && ev->Get()->OldBindingIds.size() != ev->Get()->OldBindingContents.size()) {
+ auto permissions = ExtractPermissions(ev, availablePermissions);
+ Register(MakeDescribeListedBinding(
+ sender, ev, Counters, Config.RequestTimeout, permissions));
+ return;
+ }
const bool controlPlaneYDBOperationWasPerformed = ev->Get()->ControlPlaneYDBOperationWasPerformed;
if (!controlPlaneYDBOperationWasPerformed) {
auto sender = ev->Sender;
@@ -1992,18 +1999,11 @@ private:
}
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvModifyConnectionRequest,
- TEvControlPlaneProxy::TEvModifyConnectionResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
if (!ev->Get()->ComputeYDBOperationWasPerformed) {
@@ -2012,7 +2012,7 @@ private:
ev,
Config.RequestTimeout,
Counters,
- Config.CommonConfig.GetObjectStorageEndpoint(),
+ Config.CommonConfig,
Signer));
return;
}
@@ -2116,16 +2116,11 @@ private:
}
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvDeleteConnectionRequest,
- TEvControlPlaneProxy::TEvDeleteConnectionResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
if (!ev->Get()->ComputeYDBOperationWasPerformed) {
@@ -2283,16 +2278,11 @@ private:
return;
}
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvCreateBindingRequest,
- TEvControlPlaneProxy::TEvCreateBindingResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig, YqSharedResources, CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
Register(MakeCreateBindingActor(
@@ -2528,7 +2518,7 @@ private:
};
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
- if (!ev->Get()->OldBindingName) {
+ if (!ev->Get()->OldBindingContent) {
auto permissions = ExtractPermissions(ev, availablePermissions);
Register(MakeDiscoverYDBBindingName(
sender, ev, Counters, Config.RequestTimeout, permissions));
@@ -2564,18 +2554,11 @@ private:
}
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvModifyBindingRequest,
- TEvControlPlaneProxy::TEvModifyBindingResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
if (!ev->Get()->ComputeYDBOperationWasPerformed) {
@@ -2686,18 +2669,11 @@ private:
}
if (Config.ComputeConfig.YdbComputeControlPlaneEnabled()) {
- if (!ev->Get()->YDBSession) {
- Register(MakeComputeYDBSessionActor<
- TEvControlPlaneProxy::TEvDeleteBindingRequest,
- TEvControlPlaneProxy::TEvDeleteBindingResponse>(
- sender,
- std::move(ev),
- Config.RequestTimeout,
- Counters.GetCommonCounters(RTC_CREATE_YDB_SESSION),
- Config.ComputeConfig,
- YqSharedResources,
- CredentialsProviderFactory));
- return;
+ if (!ev->Get()->YDBClient) {
+ ev->Get()->YDBClient = CreateNewTableClient(ev,
+ Config.ComputeConfig,
+ YqSharedResources,
+ CredentialsProviderFactory);
}
if (!ev->Get()->ComputeYDBOperationWasPerformed) {
diff --git a/ydb/core/fq/libs/control_plane_proxy/events/events.h b/ydb/core/fq/libs/control_plane_proxy/events/events.h
index bde54c6a8a..7db87b0d15 100644
--- a/ydb/core/fq/libs/control_plane_proxy/events/events.h
+++ b/ydb/core/fq/libs/control_plane_proxy/events/events.h
@@ -104,7 +104,7 @@ struct TEvControlPlaneProxy {
bool ComputeYDBOperationWasPerformed;
bool ControlPlaneYDBOperationWasPerformed;
std::unique_ptr<TProxyResponse> Response;
- TMaybe<NYdb::NTable::TSession> YDBSession;
+ std::shared_ptr<NYdb::NTable::TTableClient> YDBClient;
TMaybe<FederatedQuery::Internal::ComputeDatabaseInternal> ComputeDatabase;
};
@@ -310,6 +310,12 @@ struct TEvControlPlaneProxy {
EvModifyConnectionRequest>::TBaseControlPlaneRequest;
TMaybe<FederatedQuery::ConnectionContent> OldConnectionContent;
+ // ListBindings
+ bool OldBindingNamesDiscoveryFinished = false;
+ TMaybe<TString> NextListingBindingsToken;
+ std::vector<TString> OldBindingIds;
+ // DescribeEachBinding
+ std::vector<FederatedQuery::BindingContent> OldBindingContents;
};
template<>
@@ -348,8 +354,7 @@ struct TEvControlPlaneProxy {
FederatedQuery::ModifyBindingRequest,
EvModifyBindingRequest>::TBaseControlPlaneRequest;
- TMaybe<TString> OldBindingName;
- TMaybe<TString> ConnectionId;
+ TMaybe<FederatedQuery::BindingContent> OldBindingContent;
TMaybe<TString> ConnectionName;
};
diff --git a/ydb/core/grpc_services/rpc_fq.cpp b/ydb/core/grpc_services/rpc_fq.cpp
index 1e6c59ea33..bf4e70a67c 100644
--- a/ydb/core/grpc_services/rpc_fq.cpp
+++ b/ydb/core/grpc_services/rpc_fq.cpp
@@ -546,6 +546,7 @@ std::unique_ptr<TEvProxyRuntimeEvent> CreateFederatedQueryModifyConnectionReques
TVector<NPerms::TPermission> basePermissions{
NPerms::Required("yq.connections.update"),
NPerms::Required("yq.connections.get"),
+ NPerms::Required("yq.bindings.get"),
NPerms::Optional("yq.resources.managePrivate"),
NPerms::Optional("yq.resources.viewPublic"),
NPerms::Optional("yq.resources.viewPrivate")